diff --git a/go.mod b/go.mod index 14a85cd72aebd444fa384bc1788849052a937c4c..05a2257b0c5ed4c7879f05e273bff4a851581d6c 100644 --- a/go.mod +++ b/go.mod @@ -16,8 +16,8 @@ require ( github.com/stretchr/testify v1.8.0 go.mongodb.org/mongo-driver v1.11.4 go.uber.org/zap v1.19.1 - golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d - golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 + golang.org/x/crypto v0.5.0 + golang.org/x/net v0.5.0 google.golang.org/grpc v1.45.0 google.golang.org/protobuf v1.28.0 gopkg.in/yaml.v3 v3.0.1 @@ -35,6 +35,9 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/nats-io/nats.go v1.23.0 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.4.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect @@ -43,6 +46,8 @@ require ( github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect + golang.org/x/sys v0.4.0 // indirect + golang.org/x/text v0.6.0 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index 511239a6e719ed8e4be2d67f6fc16c582aad70ef..75ddf3db8ea7f160eae4c325be9a7659f7f4c553 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/nats-io/nats.go v1.23.0 h1:lR28r7IX44WjYgdiKz9GmUeW0uh/m33uD3yEjLZ2cOE= +github.com/nats-io/nats.go v1.23.0/go.mod h1:ki/Scsa23edbh8IRZbCuNXR9TDcbvfaSijKtaqQgw+Q= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -160,8 +166,11 @@ go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= +golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -178,9 +187,11 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -204,6 +215,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -212,6 +224,7 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000000000000000000000000000000000000..454f690a196b08782d8110b0c2c13b186853a776 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,55 @@ +package events + +type Subscription interface { + Unsubscribe() error +} + +type Connection interface { + Publish(subject string, msg any, opts ...PublishOption) error + Subscribe(subject string, handler any, opts ...SubscribeOption) (Subscription, error) + Close() error +} + +type PublishOptions struct { + Tags []string +} + +func NewPublishOptions(opts ...PublishOption) *PublishOptions { + o := &PublishOptions{} + for _, opt := range opts { + if opt != nil { + opt(o) + } + } + return o +} + +type PublishOption func(options *PublishOptions) + +func Tag(tag ...string) PublishOption { + return func(o *PublishOptions) { + o.Tags = tag + } +} + +type SubscribeOptions struct { + FilterTags []string +} + +func NewSubscribeOptions(opts ...SubscribeOption) *SubscribeOptions { + o := &SubscribeOptions{} + for _, opt := range opts { + if opt != nil { + opt(o) + } + } + return o +} + +type SubscribeOption func(options *SubscribeOptions) + +func FilterTag(tag ...string) SubscribeOption { + return func(o *SubscribeOptions) { + o.FilterTags = tag + } +} diff --git a/pkg/events/mocks/Connection.go b/pkg/events/mocks/Connection.go new file mode 100644 index 0000000000000000000000000000000000000000..a295924662ba10243a788c219359d60720789759 --- /dev/null +++ b/pkg/events/mocks/Connection.go @@ -0,0 +1,96 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + events "git.perx.ru/perxis/perxis-go/pkg/events" + mock "github.com/stretchr/testify/mock" +) + +// Connection is an autogenerated mock type for the Connection type +type Connection struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Connection) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Publish provides a mock function with given fields: subject, msg, opts +func (_m *Connection) Publish(subject string, msg interface{}, opts ...events.PublishOption) error { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, subject, msg) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(string, interface{}, ...events.PublishOption) error); ok { + r0 = rf(subject, msg, opts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Subscribe provides a mock function with given fields: subject, handler, opts +func (_m *Connection) Subscribe(subject string, handler interface{}, opts ...events.SubscribeOption) (events.Subscription, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, subject, handler) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 events.Subscription + var r1 error + if rf, ok := ret.Get(0).(func(string, interface{}, ...events.SubscribeOption) (events.Subscription, error)); ok { + return rf(subject, handler, opts...) + } + if rf, ok := ret.Get(0).(func(string, interface{}, ...events.SubscribeOption) events.Subscription); ok { + r0 = rf(subject, handler, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(events.Subscription) + } + } + + if rf, ok := ret.Get(1).(func(string, interface{}, ...events.SubscribeOption) error); ok { + r1 = rf(subject, handler, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewConnection interface { + mock.TestingT + Cleanup(func()) +} + +// NewConnection creates a new instance of Connection. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewConnection(t mockConstructorTestingTNewConnection) *Connection { + mock := &Connection{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/events/mocks/MsgFilter.go b/pkg/events/mocks/MsgFilter.go new file mode 100644 index 0000000000000000000000000000000000000000..8e1340743309bfc3097e478e7aac7f1880bfb157 --- /dev/null +++ b/pkg/events/mocks/MsgFilter.go @@ -0,0 +1,44 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + nats "github.com/nats-io/nats.go" + mock "github.com/stretchr/testify/mock" +) + +// MsgFilter is an autogenerated mock type for the MsgFilter type +type MsgFilter struct { + mock.Mock +} + +// Execute provides a mock function with given fields: _a0 +func (_m *MsgFilter) Execute(_a0 *nats.Msg) *nats.Msg { + ret := _m.Called(_a0) + + var r0 *nats.Msg + if rf, ok := ret.Get(0).(func(*nats.Msg) *nats.Msg); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*nats.Msg) + } + } + + return r0 +} + +type mockConstructorTestingTNewMsgFilter interface { + mock.TestingT + Cleanup(func()) +} + +// NewMsgFilter creates a new instance of MsgFilter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMsgFilter(t mockConstructorTestingTNewMsgFilter) *MsgFilter { + mock := &MsgFilter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/events/mocks/ProtoEncoder.go b/pkg/events/mocks/ProtoEncoder.go new file mode 100644 index 0000000000000000000000000000000000000000..f0916a3c720b41de88ba029c2d44e23bffbb42c7 --- /dev/null +++ b/pkg/events/mocks/ProtoEncoder.go @@ -0,0 +1,68 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + protoiface "google.golang.org/protobuf/runtime/protoiface" +) + +// ProtoEncoder is an autogenerated mock type for the ProtoEncoder type +type ProtoEncoder struct { + mock.Mock +} + +// FromProto provides a mock function with given fields: message +func (_m *ProtoEncoder) FromProto(message protoiface.MessageV1) error { + ret := _m.Called(message) + + var r0 error + if rf, ok := ret.Get(0).(func(protoiface.MessageV1) error); ok { + r0 = rf(message) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ToProto provides a mock function with given fields: +func (_m *ProtoEncoder) ToProto() (protoiface.MessageV1, error) { + ret := _m.Called() + + var r0 protoiface.MessageV1 + var r1 error + if rf, ok := ret.Get(0).(func() (protoiface.MessageV1, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() protoiface.MessageV1); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(protoiface.MessageV1) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewProtoEncoder interface { + mock.TestingT + Cleanup(func()) +} + +// NewProtoEncoder creates a new instance of ProtoEncoder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewProtoEncoder(t mockConstructorTestingTNewProtoEncoder) *ProtoEncoder { + mock := &ProtoEncoder{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/events/mocks/PublishOption.go b/pkg/events/mocks/PublishOption.go new file mode 100644 index 0000000000000000000000000000000000000000..f3517b7602f5b9714cc9de98ac6413bf324f91e8 --- /dev/null +++ b/pkg/events/mocks/PublishOption.go @@ -0,0 +1,33 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + events "git.perx.ru/perxis/perxis-go/pkg/events" + mock "github.com/stretchr/testify/mock" +) + +// PublishOption is an autogenerated mock type for the PublishOption type +type PublishOption struct { + mock.Mock +} + +// Execute provides a mock function with given fields: options +func (_m *PublishOption) Execute(options *events.PublishOptions) { + _m.Called(options) +} + +type mockConstructorTestingTNewPublishOption interface { + mock.TestingT + Cleanup(func()) +} + +// NewPublishOption creates a new instance of PublishOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewPublishOption(t mockConstructorTestingTNewPublishOption) *PublishOption { + mock := &PublishOption{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/events/mocks/SubscribeOption.go b/pkg/events/mocks/SubscribeOption.go new file mode 100644 index 0000000000000000000000000000000000000000..5b2a9449f517c4d0881a53a64194139d50203961 --- /dev/null +++ b/pkg/events/mocks/SubscribeOption.go @@ -0,0 +1,33 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + events "git.perx.ru/perxis/perxis-go/pkg/events" + mock "github.com/stretchr/testify/mock" +) + +// SubscribeOption is an autogenerated mock type for the SubscribeOption type +type SubscribeOption struct { + mock.Mock +} + +// Execute provides a mock function with given fields: options +func (_m *SubscribeOption) Execute(options *events.SubscribeOptions) { + _m.Called(options) +} + +type mockConstructorTestingTNewSubscribeOption interface { + mock.TestingT + Cleanup(func()) +} + +// NewSubscribeOption creates a new instance of SubscribeOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewSubscribeOption(t mockConstructorTestingTNewSubscribeOption) *SubscribeOption { + mock := &SubscribeOption{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/events/mocks/Subscription.go b/pkg/events/mocks/Subscription.go new file mode 100644 index 0000000000000000000000000000000000000000..b43ed0fa5e55b3d026a6d682b44facb0f60c1ed3 --- /dev/null +++ b/pkg/events/mocks/Subscription.go @@ -0,0 +1,39 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// Subscription is an autogenerated mock type for the Subscription type +type Subscription struct { + mock.Mock +} + +// Unsubscribe provides a mock function with given fields: +func (_m *Subscription) Unsubscribe() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewSubscription interface { + mock.TestingT + Cleanup(func()) +} + +// NewSubscription creates a new instance of Subscription. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewSubscription(t mockConstructorTestingTNewSubscription) *Subscription { + mock := &Subscription{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/events/nats.go b/pkg/events/nats.go new file mode 100644 index 0000000000000000000000000000000000000000..4540a48216432f2332c4a9913051b3d8934127ed --- /dev/null +++ b/pkg/events/nats.go @@ -0,0 +1,197 @@ +package events + +import ( + "reflect" + + "git.perx.ru/perxis/perxis-go/pkg/errors" + "github.com/nats-io/nats.go" +) + +type natsConnetion struct { + Conn *nats.Conn + enc nats.Encoder + // добавление префикса для всех топиков + prefix string +} + +func Open(url string, prefix string) (Connection, error) { + var err error + b := new(natsConnetion) + b.Conn, err = nats.Connect(url) + if err != nil { + return nil, err + } + b.enc = &ProtobufEncoder{} + b.prefix = prefix + return b, nil +} + +func (c *natsConnetion) getSubject(subject string) string { + if c.prefix != "" { + subject = c.prefix + "." + subject + } + return subject +} + +func (c *natsConnetion) Publish(subject string, msg any, opts ...PublishOption) error { + m := &nats.Msg{Subject: c.getSubject(subject)} + switch v := msg.(type) { + case *nats.Msg: + m = v + case []byte: + m.Data = v + default: + data, err := c.enc.Encode(subject, v) + if err != nil { + return err + } + m.Data = data + } + + filters := PublishFilters(NewPublishOptions(opts...)) + if len(filters) > 0 { + for _, f := range filters { + if m = f(m); m == nil { + return nil + } + } + } + + return c.Conn.PublishMsg(m) +} + +func (c *natsConnetion) Subscribe(subject string, handler any, opts ...SubscribeOption) (Subscription, error) { + + subject = c.getSubject(subject) + return c.subscribe(subject, handler, SubscribeFilters(NewSubscribeOptions(opts...))) +} + +func (c *natsConnetion) Close() (err error) { + if err = c.Conn.Drain(); err != nil { + return err + } + c.Conn.Close() + return +} + +// Dissect the cb Handler's signature +func argInfo(cb nats.Handler) (reflect.Type, int) { + cbType := reflect.TypeOf(cb) + if cbType.Kind() != reflect.Func { + panic("handler needs to be a func") + } + numArgs := cbType.NumIn() + if numArgs == 0 { + return nil, numArgs + } + return cbType.In(numArgs - 1), numArgs +} + +var emptyMsgType = reflect.TypeOf(&nats.Msg{}) + +type MsgFilter func(*nats.Msg) *nats.Msg + +// Internal implementation that all public functions will use. +func (c *natsConnetion) subscribe(subject string, cb nats.Handler, filters []MsgFilter) (*nats.Subscription, error) { + if cb == nil { + return nil, errors.New("handler required for subscription") + } + argType, numArgs := argInfo(cb) + if argType == nil { + return nil, errors.New("handler requires at least one argument") + } + + cbValue := reflect.ValueOf(cb) + wantsRaw := (argType == emptyMsgType) + + natsCB := func(m *nats.Msg) { + if len(filters) > 0 { + for _, f := range filters { + if m = f(m); m == nil { + return + } + } + } + + var oV []reflect.Value + if wantsRaw { + oV = []reflect.Value{reflect.ValueOf(m)} + } else { + var oPtr reflect.Value + if argType.Kind() != reflect.Ptr { + oPtr = reflect.New(argType) + } else { + oPtr = reflect.New(argType.Elem()) + } + if err := c.enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil { + if errorHandler := c.Conn.ErrorHandler(); errorHandler != nil { + errorHandler(c.Conn, m.Sub, errors.Wrap(err, "Got an unmarshal error")) + } + return + } + if argType.Kind() != reflect.Ptr { + oPtr = reflect.Indirect(oPtr) + } + + switch numArgs { + case 1: + oV = []reflect.Value{oPtr} + case 2: + subV := reflect.ValueOf(m.Subject) + oV = []reflect.Value{subV, oPtr} + case 3: + subV := reflect.ValueOf(m.Subject) + replyV := reflect.ValueOf(m.Reply) + oV = []reflect.Value{subV, replyV, oPtr} + } + + } + cbValue.Call(oV) + } + + return c.Conn.Subscribe(subject, natsCB) +} + +func PublishFilters(opts *PublishOptions) []MsgFilter { + if opts == nil { + return nil + } + var filters []MsgFilter + + if len(opts.Tags) > 0 { + filters = append(filters, func(msg *nats.Msg) *nats.Msg { + if msg.Header == nil { + msg.Header = make(nats.Header) + } + for _, v := range opts.Tags { + msg.Header.Add("Tag", v) + } + return msg + }) + } + + return filters +} + +func SubscribeFilters(opts *SubscribeOptions) []MsgFilter { + if opts == nil { + return nil + } + var filters []MsgFilter + + if len(opts.FilterTags) > 0 { + filters = append(filters, func(msg *nats.Msg) *nats.Msg { + tags := msg.Header.Values("Tag") + for _, tag := range tags { + for _, v := range opts.FilterTags { + if v == tag { + return msg + } + } + } + return nil + }) + } + + return filters +} diff --git a/pkg/events/nats_integration_test.go b/pkg/events/nats_integration_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8fe38dbe1080d6a1ec7acdc9c565afd4c3852cdc --- /dev/null +++ b/pkg/events/nats_integration_test.go @@ -0,0 +1,82 @@ +//go:build integration + +package events + +import ( + "testing" + "time" + + pb "git.perx.ru/perxis/perxis-go/pkg/events/test_proto" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type Test struct { + Text string +} + +func (t *Test) ToProto() (proto.Message, error) { + return &pb.Test{Text: t.Text}, nil +} + +func (t *Test) FromProto(message proto.Message) error { + t.Text = message.(*pb.Test).Text + return nil +} + +func TestNatsBroker(t *testing.T) { + + b, err := Open("nats://localhost:4222", "") + require.NoError(t, err) + + resCh := make(chan string, 3) + _, err = b.Subscribe("a.*.c.>", func(t *Test) { resCh <- t.Text }) + require.NoError(t, err) + + require.NoError(t, b.Publish("a.b.c", &Test{Text: "1"})) + require.NoError(t, b.Publish("a.b.c.d", &Test{Text: "2"})) + require.NoError(t, b.Publish("a.b.c.d.e", &Test{Text: "3"})) + require.NoError(t, b.Publish("a.x.c", &Test{Text: "4"})) + require.NoError(t, b.Publish("a.x.c.d", &Test{Text: "5"})) + + time.Sleep(200 * time.Millisecond) + require.NoError(t, b.Close()) + close(resCh) + assert.ElementsMatch(t, []string{"2", "3", "5"}, func() []string { + var res []string + for v := range resCh { + res = append(res, v) + } + return res + }()) +} + +func TestTags(t *testing.T) { + + b, err := Open("nats://localhost:4222", "") + require.NoError(t, err) + + resCh := make(chan string, 3) + _, err = b.Subscribe("a.*.c.>", func(t *Test) { resCh <- t.Text }, FilterTag("one", "two", "three")) + require.NoError(t, err) + + require.NoError(t, b.Publish("a.b.c", &Test{Text: "1"})) + require.NoError(t, b.Publish("a.b.c.d", &Test{Text: "2"})) + require.NoError(t, b.Publish("a.b.c.d.e", &Test{Text: "3"}, Tag("one"))) + require.NoError(t, b.Publish("a.x.c", &Test{Text: "4"})) + require.NoError(t, b.Publish("a.x.c.d", &Test{Text: "5"}, Tag("two"))) + require.NoError(t, b.Publish("a.x.c.d", &Test{Text: "6"}, Tag("two", "one"))) + require.NoError(t, b.Publish("a.x.c.d", &Test{Text: "7"}, Tag("four"))) + + time.Sleep(200 * time.Millisecond) + require.NoError(t, b.Close()) + close(resCh) + assert.ElementsMatch(t, []string{"3", "5", "6"}, func() []string { + var res []string + for v := range resCh { + res = append(res, v) + } + return res + }()) +} diff --git a/pkg/events/proto_encoder.go b/pkg/events/proto_encoder.go new file mode 100644 index 0000000000000000000000000000000000000000..f18c2f576daddd4af61c9ca8138bbbc1903ad280 --- /dev/null +++ b/pkg/events/proto_encoder.go @@ -0,0 +1,63 @@ +package events + +import ( + "git.perx.ru/perxis/perxis-go/pkg/errors" + "github.com/golang/protobuf/proto" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/encoders/protobuf" +) + +type ProtoEncoder interface { + ToProto() (proto.Message, error) + FromProto(message proto.Message) error +} + +const ( + ProtobufEncoderName = "protobuf" +) + +func init() { + nats.RegisterEncoder(ProtobufEncoderName, &ProtobufEncoder{}) +} + +type ProtobufEncoder struct { + protobuf.ProtobufEncoder +} + +var ( + ErrInvalidProtoMsgEncode = errors.New("events: object passed to encode must implement ProtoEncoder") + ErrInvalidProtoMsgDecode = errors.New("events: object passed to decode must implement ProtoDecoder") +) + +func (pb *ProtobufEncoder) Encode(subject string, v interface{}) ([]byte, error) { + if v == nil { + return nil, nil + } + e, ok := v.(ProtoEncoder) + if !ok { + return nil, ErrInvalidProtoMsgEncode + } + + m, err := e.ToProto() + if err != nil { + return nil, errors.Wrap(err, "nats: encode to proto") + } + + return pb.ProtobufEncoder.Encode(subject, m) +} + +func (pb *ProtobufEncoder) Decode(subject string, data []byte, vPtr interface{}) error { + + enc, ok := vPtr.(ProtoEncoder) + if !ok { + return ErrInvalidProtoMsgDecode + } + + msg, _ := enc.ToProto() + + if err := pb.ProtobufEncoder.Decode(subject, data, msg); err != nil { + return err + } + + return enc.FromProto(msg) +} diff --git a/pkg/events/test_proto/test.pb.go b/pkg/events/test_proto/test.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..de333160b391355e2d56b0976e547d58f63a62e1 --- /dev/null +++ b/pkg/events/test_proto/test.pb.go @@ -0,0 +1,143 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.21.5 +// source: test.proto + +package test_proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Test struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Text string `protobuf:"bytes,1,opt,name=text,proto3" json:"text,omitempty"` +} + +func (x *Test) Reset() { + *x = Test{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Test) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Test) ProtoMessage() {} + +func (x *Test) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Test.ProtoReflect.Descriptor instead. +func (*Test) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{0} +} + +func (x *Test) GetText() string { + if x != nil { + return x.Text + } + return "" +} + +var File_test_proto protoreflect.FileDescriptor + +var file_test_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x74, 0x65, + 0x73, 0x74, 0x22, 0x1a, 0x0a, 0x04, 0x54, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, + 0x78, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x42, 0x38, + 0x5a, 0x36, 0x67, 0x69, 0x74, 0x2e, 0x70, 0x65, 0x72, 0x78, 0x2e, 0x72, 0x75, 0x2f, 0x70, 0x65, + 0x72, 0x78, 0x69, 0x73, 0x2f, 0x70, 0x65, 0x72, 0x78, 0x69, 0x73, 0x2f, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x74, 0x65, + 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_test_proto_rawDescOnce sync.Once + file_test_proto_rawDescData = file_test_proto_rawDesc +) + +func file_test_proto_rawDescGZIP() []byte { + file_test_proto_rawDescOnce.Do(func() { + file_test_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_proto_rawDescData) + }) + return file_test_proto_rawDescData +} + +var file_test_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_test_proto_goTypes = []interface{}{ + (*Test)(nil), // 0: test.Test +} +var file_test_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_test_proto_init() } +func file_test_proto_init() { + if File_test_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_test_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Test); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_test_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_test_proto_goTypes, + DependencyIndexes: file_test_proto_depIdxs, + MessageInfos: file_test_proto_msgTypes, + }.Build() + File_test_proto = out.File + file_test_proto_rawDesc = nil + file_test_proto_goTypes = nil + file_test_proto_depIdxs = nil +} diff --git a/pkg/events/test_proto/test.proto b/pkg/events/test_proto/test.proto new file mode 100644 index 0000000000000000000000000000000000000000..fecbc9d39bf39c65d97dc8d21cba8933a4243450 --- /dev/null +++ b/pkg/events/test_proto/test.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +option go_package = "git.perx.ru/perxis/perxis-go/broker/test_proto;test_proto"; + +package test; + +message Test { + string text = 1; +} diff --git a/pkg/optional/optional.go b/pkg/optional/optional.go new file mode 100644 index 0000000000000000000000000000000000000000..94e89bf6a04708abf853f2e8aaf8d7dbd9e99371 --- /dev/null +++ b/pkg/optional/optional.go @@ -0,0 +1,10 @@ +package optional + +var ( + True *bool = Bool(true) + False *bool = Bool(false) +) + +func Bool(v bool) *bool { + return &v +}