Skip to content
Snippets Groups Projects
Commit 4ecfada1 authored by Valera Shaitorov's avatar Valera Shaitorov :alien:
Browse files

Добавлены пакеты events, optional

parent 3c8de7ed
No related branches found
No related tags found
No related merge requests found
...@@ -16,8 +16,8 @@ require ( ...@@ -16,8 +16,8 @@ require (
github.com/stretchr/testify v1.8.0 github.com/stretchr/testify v1.8.0
go.mongodb.org/mongo-driver v1.11.4 go.mongodb.org/mongo-driver v1.11.4
go.uber.org/zap v1.19.1 go.uber.org/zap v1.19.1
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d golang.org/x/crypto v0.5.0
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 golang.org/x/net v0.5.0
google.golang.org/grpc v1.45.0 google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.28.0 google.golang.org/protobuf v1.28.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
...@@ -35,6 +35,9 @@ require ( ...@@ -35,6 +35,9 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/nats-io/nats.go v1.23.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.4.0 // indirect github.com/stretchr/objx v0.4.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect
...@@ -43,6 +46,8 @@ require ( ...@@ -43,6 +46,8 @@ require (
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect go.uber.org/multierr v1.7.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect
golang.org/x/text v0.3.7 // indirect golang.org/x/text v0.3.7 // indirect
......
...@@ -108,6 +108,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G ...@@ -108,6 +108,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/nats-io/nats.go v1.23.0 h1:lR28r7IX44WjYgdiKz9GmUeW0uh/m33uD3yEjLZ2cOE=
github.com/nats-io/nats.go v1.23.0/go.mod h1:ki/Scsa23edbh8IRZbCuNXR9TDcbvfaSijKtaqQgw+Q=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
...@@ -160,8 +166,11 @@ go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= ...@@ -160,8 +166,11 @@ go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
...@@ -178,9 +187,11 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn ...@@ -178,9 +187,11 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
...@@ -204,6 +215,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc ...@@ -204,6 +215,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
...@@ -212,6 +224,7 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= ...@@ -212,6 +224,7 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
......
package events
type Subscription interface {
Unsubscribe() error
}
type Connection interface {
Publish(subject string, msg any, opts ...PublishOption) error
Subscribe(subject string, handler any, opts ...SubscribeOption) (Subscription, error)
Close() error
}
type PublishOptions struct {
Tags []string
}
func NewPublishOptions(opts ...PublishOption) *PublishOptions {
o := &PublishOptions{}
for _, opt := range opts {
if opt != nil {
opt(o)
}
}
return o
}
type PublishOption func(options *PublishOptions)
func Tag(tag ...string) PublishOption {
return func(o *PublishOptions) {
o.Tags = tag
}
}
type SubscribeOptions struct {
FilterTags []string
}
func NewSubscribeOptions(opts ...SubscribeOption) *SubscribeOptions {
o := &SubscribeOptions{}
for _, opt := range opts {
if opt != nil {
opt(o)
}
}
return o
}
type SubscribeOption func(options *SubscribeOptions)
func FilterTag(tag ...string) SubscribeOption {
return func(o *SubscribeOptions) {
o.FilterTags = tag
}
}
// Code generated by mockery v2.20.0. DO NOT EDIT.
package mocks
import (
events "git.perx.ru/perxis/perxis-go/pkg/events"
mock "github.com/stretchr/testify/mock"
)
// Connection is an autogenerated mock type for the Connection type
type Connection struct {
mock.Mock
}
// Close provides a mock function with given fields:
func (_m *Connection) Close() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Publish provides a mock function with given fields: subject, msg, opts
func (_m *Connection) Publish(subject string, msg interface{}, opts ...events.PublishOption) error {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, subject, msg)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(string, interface{}, ...events.PublishOption) error); ok {
r0 = rf(subject, msg, opts...)
} else {
r0 = ret.Error(0)
}
return r0
}
// Subscribe provides a mock function with given fields: subject, handler, opts
func (_m *Connection) Subscribe(subject string, handler interface{}, opts ...events.SubscribeOption) (events.Subscription, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, subject, handler)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 events.Subscription
var r1 error
if rf, ok := ret.Get(0).(func(string, interface{}, ...events.SubscribeOption) (events.Subscription, error)); ok {
return rf(subject, handler, opts...)
}
if rf, ok := ret.Get(0).(func(string, interface{}, ...events.SubscribeOption) events.Subscription); ok {
r0 = rf(subject, handler, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(events.Subscription)
}
}
if rf, ok := ret.Get(1).(func(string, interface{}, ...events.SubscribeOption) error); ok {
r1 = rf(subject, handler, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTNewConnection interface {
mock.TestingT
Cleanup(func())
}
// NewConnection creates a new instance of Connection. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConnection(t mockConstructorTestingTNewConnection) *Connection {
mock := &Connection{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.20.0. DO NOT EDIT.
package mocks
import (
nats "github.com/nats-io/nats.go"
mock "github.com/stretchr/testify/mock"
)
// MsgFilter is an autogenerated mock type for the MsgFilter type
type MsgFilter struct {
mock.Mock
}
// Execute provides a mock function with given fields: _a0
func (_m *MsgFilter) Execute(_a0 *nats.Msg) *nats.Msg {
ret := _m.Called(_a0)
var r0 *nats.Msg
if rf, ok := ret.Get(0).(func(*nats.Msg) *nats.Msg); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*nats.Msg)
}
}
return r0
}
type mockConstructorTestingTNewMsgFilter interface {
mock.TestingT
Cleanup(func())
}
// NewMsgFilter creates a new instance of MsgFilter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMsgFilter(t mockConstructorTestingTNewMsgFilter) *MsgFilter {
mock := &MsgFilter{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.20.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
protoiface "google.golang.org/protobuf/runtime/protoiface"
)
// ProtoEncoder is an autogenerated mock type for the ProtoEncoder type
type ProtoEncoder struct {
mock.Mock
}
// FromProto provides a mock function with given fields: message
func (_m *ProtoEncoder) FromProto(message protoiface.MessageV1) error {
ret := _m.Called(message)
var r0 error
if rf, ok := ret.Get(0).(func(protoiface.MessageV1) error); ok {
r0 = rf(message)
} else {
r0 = ret.Error(0)
}
return r0
}
// ToProto provides a mock function with given fields:
func (_m *ProtoEncoder) ToProto() (protoiface.MessageV1, error) {
ret := _m.Called()
var r0 protoiface.MessageV1
var r1 error
if rf, ok := ret.Get(0).(func() (protoiface.MessageV1, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() protoiface.MessageV1); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(protoiface.MessageV1)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTNewProtoEncoder interface {
mock.TestingT
Cleanup(func())
}
// NewProtoEncoder creates a new instance of ProtoEncoder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewProtoEncoder(t mockConstructorTestingTNewProtoEncoder) *ProtoEncoder {
mock := &ProtoEncoder{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.20.0. DO NOT EDIT.
package mocks
import (
events "git.perx.ru/perxis/perxis-go/pkg/events"
mock "github.com/stretchr/testify/mock"
)
// PublishOption is an autogenerated mock type for the PublishOption type
type PublishOption struct {
mock.Mock
}
// Execute provides a mock function with given fields: options
func (_m *PublishOption) Execute(options *events.PublishOptions) {
_m.Called(options)
}
type mockConstructorTestingTNewPublishOption interface {
mock.TestingT
Cleanup(func())
}
// NewPublishOption creates a new instance of PublishOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewPublishOption(t mockConstructorTestingTNewPublishOption) *PublishOption {
mock := &PublishOption{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.20.0. DO NOT EDIT.
package mocks
import (
events "git.perx.ru/perxis/perxis-go/pkg/events"
mock "github.com/stretchr/testify/mock"
)
// SubscribeOption is an autogenerated mock type for the SubscribeOption type
type SubscribeOption struct {
mock.Mock
}
// Execute provides a mock function with given fields: options
func (_m *SubscribeOption) Execute(options *events.SubscribeOptions) {
_m.Called(options)
}
type mockConstructorTestingTNewSubscribeOption interface {
mock.TestingT
Cleanup(func())
}
// NewSubscribeOption creates a new instance of SubscribeOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewSubscribeOption(t mockConstructorTestingTNewSubscribeOption) *SubscribeOption {
mock := &SubscribeOption{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.20.0. DO NOT EDIT.
package mocks
import mock "github.com/stretchr/testify/mock"
// Subscription is an autogenerated mock type for the Subscription type
type Subscription struct {
mock.Mock
}
// Unsubscribe provides a mock function with given fields:
func (_m *Subscription) Unsubscribe() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewSubscription interface {
mock.TestingT
Cleanup(func())
}
// NewSubscription creates a new instance of Subscription. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewSubscription(t mockConstructorTestingTNewSubscription) *Subscription {
mock := &Subscription{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
package events
import (
"reflect"
"git.perx.ru/perxis/perxis-go/pkg/errors"
"github.com/nats-io/nats.go"
)
type natsConnetion struct {
Conn *nats.Conn
enc nats.Encoder
// добавление префикса для всех топиков
prefix string
}
func Open(url string, prefix string) (Connection, error) {
var err error
b := new(natsConnetion)
b.Conn, err = nats.Connect(url)
if err != nil {
return nil, err
}
b.enc = &ProtobufEncoder{}
b.prefix = prefix
return b, nil
}
func (c *natsConnetion) getSubject(subject string) string {
if c.prefix != "" {
subject = c.prefix + "." + subject
}
return subject
}
func (c *natsConnetion) Publish(subject string, msg any, opts ...PublishOption) error {
m := &nats.Msg{Subject: c.getSubject(subject)}
switch v := msg.(type) {
case *nats.Msg:
m = v
case []byte:
m.Data = v
default:
data, err := c.enc.Encode(subject, v)
if err != nil {
return err
}
m.Data = data
}
filters := PublishFilters(NewPublishOptions(opts...))
if len(filters) > 0 {
for _, f := range filters {
if m = f(m); m == nil {
return nil
}
}
}
return c.Conn.PublishMsg(m)
}
func (c *natsConnetion) Subscribe(subject string, handler any, opts ...SubscribeOption) (Subscription, error) {
subject = c.getSubject(subject)
return c.subscribe(subject, handler, SubscribeFilters(NewSubscribeOptions(opts...)))
}
func (c *natsConnetion) Close() (err error) {
if err = c.Conn.Drain(); err != nil {
return err
}
c.Conn.Close()
return
}
// Dissect the cb Handler's signature
func argInfo(cb nats.Handler) (reflect.Type, int) {
cbType := reflect.TypeOf(cb)
if cbType.Kind() != reflect.Func {
panic("handler needs to be a func")
}
numArgs := cbType.NumIn()
if numArgs == 0 {
return nil, numArgs
}
return cbType.In(numArgs - 1), numArgs
}
var emptyMsgType = reflect.TypeOf(&nats.Msg{})
type MsgFilter func(*nats.Msg) *nats.Msg
// Internal implementation that all public functions will use.
func (c *natsConnetion) subscribe(subject string, cb nats.Handler, filters []MsgFilter) (*nats.Subscription, error) {
if cb == nil {
return nil, errors.New("handler required for subscription")
}
argType, numArgs := argInfo(cb)
if argType == nil {
return nil, errors.New("handler requires at least one argument")
}
cbValue := reflect.ValueOf(cb)
wantsRaw := (argType == emptyMsgType)
natsCB := func(m *nats.Msg) {
if len(filters) > 0 {
for _, f := range filters {
if m = f(m); m == nil {
return
}
}
}
var oV []reflect.Value
if wantsRaw {
oV = []reflect.Value{reflect.ValueOf(m)}
} else {
var oPtr reflect.Value
if argType.Kind() != reflect.Ptr {
oPtr = reflect.New(argType)
} else {
oPtr = reflect.New(argType.Elem())
}
if err := c.enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
if errorHandler := c.Conn.ErrorHandler(); errorHandler != nil {
errorHandler(c.Conn, m.Sub, errors.Wrap(err, "Got an unmarshal error"))
}
return
}
if argType.Kind() != reflect.Ptr {
oPtr = reflect.Indirect(oPtr)
}
switch numArgs {
case 1:
oV = []reflect.Value{oPtr}
case 2:
subV := reflect.ValueOf(m.Subject)
oV = []reflect.Value{subV, oPtr}
case 3:
subV := reflect.ValueOf(m.Subject)
replyV := reflect.ValueOf(m.Reply)
oV = []reflect.Value{subV, replyV, oPtr}
}
}
cbValue.Call(oV)
}
return c.Conn.Subscribe(subject, natsCB)
}
func PublishFilters(opts *PublishOptions) []MsgFilter {
if opts == nil {
return nil
}
var filters []MsgFilter
if len(opts.Tags) > 0 {
filters = append(filters, func(msg *nats.Msg) *nats.Msg {
if msg.Header == nil {
msg.Header = make(nats.Header)
}
for _, v := range opts.Tags {
msg.Header.Add("Tag", v)
}
return msg
})
}
return filters
}
func SubscribeFilters(opts *SubscribeOptions) []MsgFilter {
if opts == nil {
return nil
}
var filters []MsgFilter
if len(opts.FilterTags) > 0 {
filters = append(filters, func(msg *nats.Msg) *nats.Msg {
tags := msg.Header.Values("Tag")
for _, tag := range tags {
for _, v := range opts.FilterTags {
if v == tag {
return msg
}
}
}
return nil
})
}
return filters
}
//go:build integration
package events
import (
"testing"
"time"
pb "git.perx.ru/perxis/perxis-go/pkg/events/test_proto"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type Test struct {
Text string
}
func (t *Test) ToProto() (proto.Message, error) {
return &pb.Test{Text: t.Text}, nil
}
func (t *Test) FromProto(message proto.Message) error {
t.Text = message.(*pb.Test).Text
return nil
}
func TestNatsBroker(t *testing.T) {
b, err := Open("nats://localhost:4222", "")
require.NoError(t, err)
resCh := make(chan string, 3)
_, err = b.Subscribe("a.*.c.>", func(t *Test) { resCh <- t.Text })
require.NoError(t, err)
require.NoError(t, b.Publish("a.b.c", &Test{Text: "1"}))
require.NoError(t, b.Publish("a.b.c.d", &Test{Text: "2"}))
require.NoError(t, b.Publish("a.b.c.d.e", &Test{Text: "3"}))
require.NoError(t, b.Publish("a.x.c", &Test{Text: "4"}))
require.NoError(t, b.Publish("a.x.c.d", &Test{Text: "5"}))
time.Sleep(200 * time.Millisecond)
require.NoError(t, b.Close())
close(resCh)
assert.ElementsMatch(t, []string{"2", "3", "5"}, func() []string {
var res []string
for v := range resCh {
res = append(res, v)
}
return res
}())
}
func TestTags(t *testing.T) {
b, err := Open("nats://localhost:4222", "")
require.NoError(t, err)
resCh := make(chan string, 3)
_, err = b.Subscribe("a.*.c.>", func(t *Test) { resCh <- t.Text }, FilterTag("one", "two", "three"))
require.NoError(t, err)
require.NoError(t, b.Publish("a.b.c", &Test{Text: "1"}))
require.NoError(t, b.Publish("a.b.c.d", &Test{Text: "2"}))
require.NoError(t, b.Publish("a.b.c.d.e", &Test{Text: "3"}, Tag("one")))
require.NoError(t, b.Publish("a.x.c", &Test{Text: "4"}))
require.NoError(t, b.Publish("a.x.c.d", &Test{Text: "5"}, Tag("two")))
require.NoError(t, b.Publish("a.x.c.d", &Test{Text: "6"}, Tag("two", "one")))
require.NoError(t, b.Publish("a.x.c.d", &Test{Text: "7"}, Tag("four")))
time.Sleep(200 * time.Millisecond)
require.NoError(t, b.Close())
close(resCh)
assert.ElementsMatch(t, []string{"3", "5", "6"}, func() []string {
var res []string
for v := range resCh {
res = append(res, v)
}
return res
}())
}
package events
import (
"git.perx.ru/perxis/perxis-go/pkg/errors"
"github.com/golang/protobuf/proto"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/encoders/protobuf"
)
type ProtoEncoder interface {
ToProto() (proto.Message, error)
FromProto(message proto.Message) error
}
const (
ProtobufEncoderName = "protobuf"
)
func init() {
nats.RegisterEncoder(ProtobufEncoderName, &ProtobufEncoder{})
}
type ProtobufEncoder struct {
protobuf.ProtobufEncoder
}
var (
ErrInvalidProtoMsgEncode = errors.New("events: object passed to encode must implement ProtoEncoder")
ErrInvalidProtoMsgDecode = errors.New("events: object passed to decode must implement ProtoDecoder")
)
func (pb *ProtobufEncoder) Encode(subject string, v interface{}) ([]byte, error) {
if v == nil {
return nil, nil
}
e, ok := v.(ProtoEncoder)
if !ok {
return nil, ErrInvalidProtoMsgEncode
}
m, err := e.ToProto()
if err != nil {
return nil, errors.Wrap(err, "nats: encode to proto")
}
return pb.ProtobufEncoder.Encode(subject, m)
}
func (pb *ProtobufEncoder) Decode(subject string, data []byte, vPtr interface{}) error {
enc, ok := vPtr.(ProtoEncoder)
if !ok {
return ErrInvalidProtoMsgDecode
}
msg, _ := enc.ToProto()
if err := pb.ProtobufEncoder.Decode(subject, data, msg); err != nil {
return err
}
return enc.FromProto(msg)
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.21.5
// source: test.proto
package test_proto
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Test struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Text string `protobuf:"bytes,1,opt,name=text,proto3" json:"text,omitempty"`
}
func (x *Test) Reset() {
*x = Test{}
if protoimpl.UnsafeEnabled {
mi := &file_test_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Test) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Test) ProtoMessage() {}
func (x *Test) ProtoReflect() protoreflect.Message {
mi := &file_test_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Test.ProtoReflect.Descriptor instead.
func (*Test) Descriptor() ([]byte, []int) {
return file_test_proto_rawDescGZIP(), []int{0}
}
func (x *Test) GetText() string {
if x != nil {
return x.Text
}
return ""
}
var File_test_proto protoreflect.FileDescriptor
var file_test_proto_rawDesc = []byte{
0x0a, 0x0a, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x74, 0x65,
0x73, 0x74, 0x22, 0x1a, 0x0a, 0x04, 0x54, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65,
0x78, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x42, 0x38,
0x5a, 0x36, 0x67, 0x69, 0x74, 0x2e, 0x70, 0x65, 0x72, 0x78, 0x2e, 0x72, 0x75, 0x2f, 0x70, 0x65,
0x72, 0x78, 0x69, 0x73, 0x2f, 0x70, 0x65, 0x72, 0x78, 0x69, 0x73, 0x2f, 0x62, 0x72, 0x6f, 0x6b,
0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x74, 0x65,
0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_test_proto_rawDescOnce sync.Once
file_test_proto_rawDescData = file_test_proto_rawDesc
)
func file_test_proto_rawDescGZIP() []byte {
file_test_proto_rawDescOnce.Do(func() {
file_test_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_proto_rawDescData)
})
return file_test_proto_rawDescData
}
var file_test_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_test_proto_goTypes = []interface{}{
(*Test)(nil), // 0: test.Test
}
var file_test_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_test_proto_init() }
func file_test_proto_init() {
if File_test_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_test_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Test); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_test_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_test_proto_goTypes,
DependencyIndexes: file_test_proto_depIdxs,
MessageInfos: file_test_proto_msgTypes,
}.Build()
File_test_proto = out.File
file_test_proto_rawDesc = nil
file_test_proto_goTypes = nil
file_test_proto_depIdxs = nil
}
syntax = "proto3";
option go_package = "git.perx.ru/perxis/perxis-go/broker/test_proto;test_proto";
package test;
message Test {
string text = 1;
}
package optional
var (
True *bool = Bool(true)
False *bool = Bool(false)
)
func Bool(v bool) *bool {
return &v
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment