diff --git a/pkg/log/zap/buffered_write_syncer.go b/pkg/log/zap/buffered_write_syncer.go index c9cd27f988ca2308f092fe54b602e4ec4217e566..2de35c996b54da17190b03e3d1526a3ffbaa0cb6 100644 --- a/pkg/log/zap/buffered_write_syncer.go +++ b/pkg/log/zap/buffered_write_syncer.go @@ -11,7 +11,7 @@ import ( var SyncQueueOverflow = errors.New("sync queue overflow") -// BufferedWriteSyncer буферизирует записи в памяти и отправляет их в log.Service. +// BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service. // Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал, // записи отправляются в очередь для синхронизации с log.Service. Таким образом, выполнение потока не блокируется при отправке записей. // В случае переполнения очереди для синхронизации будет возвращена ошибка SyncQueueOverflow. @@ -44,14 +44,18 @@ type BufferedWriteSyncer struct { stopped bool // stopped указывает, был ли выполнен Stop } -func (ws *BufferedWriteSyncer) Start() { - ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) - ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) - ws.flushStop = make(chan struct{}) +func NewBufferedWriteSyncer(service log.Service, options ...Option) *BufferedWriteSyncer { + cfg := newConfig(options) - ws.wg.Add(2) - go ws.syncLoop() - go ws.flushLoop() + ws := &BufferedWriteSyncer{ + FlushInterval: cfg.flushInterval, + MaxBufferSize: cfg.maxBufferSize, + MaxSyncQueueSize: cfg.maxSyncQueueSize, + Service: service, + } + ws.start() + + return ws } func (ws *BufferedWriteSyncer) Stop() error { @@ -102,6 +106,16 @@ func (ws *BufferedWriteSyncer) Sync() error { return ws.flush() } +func (ws *BufferedWriteSyncer) start() { + ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) + ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) + ws.flushStop = make(chan struct{}) + + ws.wg.Add(2) + go ws.syncLoop() + go ws.flushLoop() +} + // flush освобождает буфер и отправляет копию записей на синхронизацию. Не является безопасным для конкурентного вызова. func (ws *BufferedWriteSyncer) flush() error { if len(ws.buffer) == 0 { diff --git a/pkg/log/zap/buffered_write_syncer_test.go b/pkg/log/zap/buffered_write_syncer_test.go index ae426002ca53baa5e09f27691a2693ae047ce50c..f03da85e4a8256ed93717d9ec53c4afbb4514ac7 100644 --- a/pkg/log/zap/buffered_write_syncer_test.go +++ b/pkg/log/zap/buffered_write_syncer_test.go @@ -17,13 +17,7 @@ func TestBufferedWriteSyncer_Write(t *testing.T) { service.On("Log", mock.Anything, mock.Anything). Return(nil).Once() - ws := &BufferedWriteSyncer{ - FlushInterval: defaultFlushInterval, - MaxBufferSize: defaultMaxBufferSize, - MaxSyncQueueSize: defaultMaxSyncQueueSize, - Service: service, - } - ws.Start() + ws := NewBufferedWriteSyncer(service) err := ws.Write(&log.Entry{Message: "first log message"}) require.NoError(t, err) @@ -44,13 +38,7 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { service.On("Log", mock.Anything, mock.Anything). Return(nil).Once() - ws := &BufferedWriteSyncer{ - FlushInterval: defaultFlushInterval, - MaxBufferSize: defaultMaxBufferSize, - MaxSyncQueueSize: defaultMaxSyncQueueSize, - Service: service, - } - ws.Start() + ws := NewBufferedWriteSyncer(service) var wg sync.WaitGroup for i := 0; i < 100; i++ { @@ -78,13 +66,7 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) { service.On("Log", mock.Anything, mock.Anything). Return(nil).Times(10) - ws := &BufferedWriteSyncer{ - FlushInterval: defaultFlushInterval, - MaxBufferSize: defaultMaxBufferSize, - MaxSyncQueueSize: defaultMaxSyncQueueSize, - Service: service, - } - ws.Start() + ws := NewBufferedWriteSyncer(service) for i := 0; i < 10; i++ { for j := 0; j < 10; j++ { @@ -108,13 +90,7 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) { service.On("Log", mock.Anything, mock.Anything). Return(nil).Times(10) - ws := &BufferedWriteSyncer{ - FlushInterval: defaultFlushInterval, - MaxBufferSize: 10, - MaxSyncQueueSize: defaultMaxSyncQueueSize, - Service: service, - } - ws.Start() + ws := NewBufferedWriteSyncer(service, WithMaxBufferSize(10)) for i := 0; i < 100; i++ { err := ws.Write(&log.Entry{Message: "log message"}) @@ -134,13 +110,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { service.On("Log", mock.Anything, mock.Anything). Return(nil).Once() - ws := &BufferedWriteSyncer{ - FlushInterval: time.Second, - MaxBufferSize: defaultMaxBufferSize, - MaxSyncQueueSize: defaultMaxSyncQueueSize, - Service: service, - } - ws.Start() + ws := NewBufferedWriteSyncer(service, WithFlushInterval(time.Second)) for j := 0; j < 10; j++ { err := ws.Write(&log.Entry{Message: "log message"}) diff --git a/pkg/log/zap/core.go b/pkg/log/zap/core.go index 2680c94c046635aa344fb9d94b7fbdfa5d443f93..924a67be4787e0eb01c9cc0ab4f9106005dc1f6d 100644 --- a/pkg/log/zap/core.go +++ b/pkg/log/zap/core.go @@ -7,42 +7,25 @@ import ( "go.uber.org/zap/zapcore" ) -// WriteSyncer отвечает за синхронизацию записей, полученных через метод Write с сервисом логирования. +// WriteSyncer отвечает за хранение и синхронизацию log.Entry type WriteSyncer interface { Write(entry *log.Entry) error Sync() error } -// WithLogService добавляет функционал отправки записей в log.Service к переданному логгеру. -// Вторым параметром возвращается Stop, при вызове которого прекращается синхронизация записей по интервалу. -// Записи уровня Debug игнорируются и не отправляются. -func WithLogService(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, func() error) { - cfg := newConfig(options) - - ws := &BufferedWriteSyncer{ - FlushInterval: cfg.flushInterval, - MaxBufferSize: cfg.maxBufferSize, - MaxSyncQueueSize: cfg.maxSyncQueueSize, - Service: service, - } - - ws.Start() - - return zap.New(zapcore.NewTee(logger.Core(), &Core{LevelEnabler: zapcore.InfoLevel, ws: ws})), ws.Stop -} - +// Core кодирует zapcore.Entry в log.Entry и отправляет их в WriteSyncer type Core struct { zapcore.LevelEnabler - fields []zap.Field - ws WriteSyncer + writeSyncer WriteSyncer + fields []zap.Field } func (core *Core) With(fields []zapcore.Field) zapcore.Core { return &Core{ LevelEnabler: core.LevelEnabler, + writeSyncer: core.writeSyncer, fields: append(core.fields, fields...), - ws: core.ws, } } @@ -54,11 +37,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) } func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { - return core.ws.Write(core.getEntry(entry, fields)) + return core.writeSyncer.Write(core.getEntry(entry, fields)) } func (core *Core) Sync() error { - return core.ws.Sync() + return core.writeSyncer.Sync() } func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry { diff --git a/pkg/log/zap/zap.go b/pkg/log/zap/zap.go new file mode 100644 index 0000000000000000000000000000000000000000..9d884475674545d3aa90c361edbaaa650101d88b --- /dev/null +++ b/pkg/log/zap/zap.go @@ -0,0 +1,23 @@ +package zap + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// WithLogService объединяет переданный логгер с ядром, которое кодирует и дублирует записи в WriteSyncer. +// Пример использования: +// +// func main() { +// service := ... // ваш log.Service +// ws := NewBufferedWriteSyncer(service) +// defer ws.Stop() +// +// logger := ... // ваш логгер, который нужно обернуть +// logger = WithLogService(logger, ws) +// +// // ... +// } +func WithLogService(logger *zap.Logger, writeSyncer WriteSyncer) *zap.Logger { + return zap.New(zapcore.NewTee(logger.Core(), &Core{LevelEnabler: zapcore.InfoLevel, writeSyncer: writeSyncer})) +} diff --git a/pkg/log/zap/zap_test.go b/pkg/log/zap/zap_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4b24f980cc71671f094387f9bd2c6adedc4c167a --- /dev/null +++ b/pkg/log/zap/zap_test.go @@ -0,0 +1,49 @@ +package zap + +import ( + "context" + "testing" + + "git.perx.ru/perxis/perxis-go/pkg/log" + logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/zap" +) + +func TestExample(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Once() + + func() { + ws := NewBufferedWriteSyncer(service) + defer func() { + err := ws.Stop() + assert.NoError(t, err) + }() + + logger := WithLogService(zap.NewNop(), ws) + + logger.Info("создан элемент коллекции", + Category("create"), + Component("Items.Service"), + Event("Items.Create"), + Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"), + CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")), + Attr("any"), + Tags("tag1", "tag2", "tag3"), + ) + logger.Warn("изменен элемент коллекции", + Category("update"), + Component("Items.Service"), + Event("Items.Update"), + Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2cw"), + CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")), + ) + }() + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 2 + })) +}