From 303819b9585152cdc950003fb6b2f1f513078ad8 Mon Sep 17 00:00:00 2001 From: ensiouel <ensiouel@gmail.com> Date: Mon, 5 Feb 2024 09:49:40 +0300 Subject: [PATCH] =?UTF-8?q?=D0=B2=20core=20BufferedWriteSyncer=20=D0=B7?= =?UTF-8?q?=D0=B0=D0=BC=D0=B5=D0=BD=D0=B5=D0=BD=20=D0=BD=D0=B0=20=D0=B8?= =?UTF-8?q?=D0=BD=D1=82=D0=B5=D1=80=D1=84=D0=B5=D0=B9=D1=81=20WriteSyncer,?= =?UTF-8?q?=20=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D1=8B=20?= =?UTF-8?q?=D1=82=D0=B5=D1=81=D1=82=D1=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/log/zap/buffered_write_syncer.go | 9 +- pkg/log/zap/buffered_write_syncer_test.go | 155 +++++++++++++++ pkg/log/zap/core.go | 43 +++-- pkg/log/zap/core_test.go | 221 +++++----------------- 4 files changed, 227 insertions(+), 201 deletions(-) create mode 100644 pkg/log/zap/buffered_write_syncer_test.go diff --git a/pkg/log/zap/buffered_write_syncer.go b/pkg/log/zap/buffered_write_syncer.go index 613161fb..c9cd27f9 100644 --- a/pkg/log/zap/buffered_write_syncer.go +++ b/pkg/log/zap/buffered_write_syncer.go @@ -49,6 +49,7 @@ func (ws *BufferedWriteSyncer) Start() { ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) ws.flushStop = make(chan struct{}) + ws.wg.Add(2) go ws.syncLoop() go ws.flushLoop() } @@ -62,8 +63,6 @@ func (ws *BufferedWriteSyncer) Stop() error { } ws.stopped = true - ws.wg.Add(2) - close(ws.flushStop) // завершаем flushLoop err := ws.flush() // очищаем оставшиеся записи @@ -95,8 +94,8 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { return nil } -// Flush освобождает буфер и отправляет копию записей на синхронизацию. -func (ws *BufferedWriteSyncer) Flush() error { +// Sync освобождает буфер и отправляет копию записей на синхронизацию. +func (ws *BufferedWriteSyncer) Sync() error { ws.mu.Lock() defer ws.mu.Unlock() @@ -136,7 +135,7 @@ func (ws *BufferedWriteSyncer) flushLoop() { for { select { case <-ticker.C: - _ = ws.Flush() + _ = ws.Sync() case <-ws.flushStop: return } diff --git a/pkg/log/zap/buffered_write_syncer_test.go b/pkg/log/zap/buffered_write_syncer_test.go new file mode 100644 index 00000000..ae426002 --- /dev/null +++ b/pkg/log/zap/buffered_write_syncer_test.go @@ -0,0 +1,155 @@ +package zap + +import ( + "sync" + "testing" + "time" + + "git.perx.ru/perxis/perxis-go/pkg/log" + logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestBufferedWriteSyncer_Write(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Once() + + ws := &BufferedWriteSyncer{ + FlushInterval: defaultFlushInterval, + MaxBufferSize: defaultMaxBufferSize, + MaxSyncQueueSize: defaultMaxSyncQueueSize, + Service: service, + } + ws.Start() + + err := ws.Write(&log.Entry{Message: "first log message"}) + require.NoError(t, err) + + err = ws.Write(&log.Entry{Message: "second log message"}) + require.NoError(t, err) + + err = ws.Stop() + require.NoError(t, err) + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 2 + })) +} + +func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Once() + + ws := &BufferedWriteSyncer{ + FlushInterval: defaultFlushInterval, + MaxBufferSize: defaultMaxBufferSize, + MaxSyncQueueSize: defaultMaxSyncQueueSize, + Service: service, + } + ws.Start() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + + err := ws.Write(&log.Entry{Message: "log message"}) + require.NoError(t, err) + }(&wg) + } + + wg.Wait() + + err := ws.Stop() + require.NoError(t, err) + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 100 + })) +} + +func TestBufferedWriteSyncer_Flush(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Times(10) + + ws := &BufferedWriteSyncer{ + FlushInterval: defaultFlushInterval, + MaxBufferSize: defaultMaxBufferSize, + MaxSyncQueueSize: defaultMaxSyncQueueSize, + Service: service, + } + ws.Start() + + for i := 0; i < 10; i++ { + for j := 0; j < 10; j++ { + err := ws.Write(&log.Entry{Message: "log message"}) + require.NoError(t, err) + } + err := ws.Sync() + assert.NoError(t, err) + } + + err := ws.Stop() + require.NoError(t, err) + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 10 + })) +} + +func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Times(10) + + ws := &BufferedWriteSyncer{ + FlushInterval: defaultFlushInterval, + MaxBufferSize: 10, + MaxSyncQueueSize: defaultMaxSyncQueueSize, + Service: service, + } + ws.Start() + + for i := 0; i < 100; i++ { + err := ws.Write(&log.Entry{Message: "log message"}) + require.NoError(t, err) + } + + err := ws.Stop() + require.NoError(t, err) + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 10 + })) +} + +func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Once() + + ws := &BufferedWriteSyncer{ + FlushInterval: time.Second, + MaxBufferSize: defaultMaxBufferSize, + MaxSyncQueueSize: defaultMaxSyncQueueSize, + Service: service, + } + ws.Start() + + for j := 0; j < 10; j++ { + err := ws.Write(&log.Entry{Message: "log message"}) + require.NoError(t, err) + } + + time.Sleep(3 * time.Second) // ждем, пока сработает интервал + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 10 + })) +} diff --git a/pkg/log/zap/core.go b/pkg/log/zap/core.go index e0e0ea83..2680c94c 100644 --- a/pkg/log/zap/core.go +++ b/pkg/log/zap/core.go @@ -7,39 +7,42 @@ import ( "go.uber.org/zap/zapcore" ) -type Core struct { - zapcore.LevelEnabler - - fields []zap.Field - writeSyncer *BufferedWriteSyncer +// WriteSyncer отвечает за синхронизацию записей, полученных через метод Write с сервисом логирования. +type WriteSyncer interface { + Write(entry *log.Entry) error + Sync() error } -// LoggerWithLogService добавляет функционал отправки записей в log.Service к переданному логгеру. +// WithLogService добавляет функционал отправки записей в log.Service к переданному логгеру. // Вторым параметром возвращается Stop, при вызове которого прекращается синхронизация записей по интервалу. // Записи уровня Debug игнорируются и не отправляются. -func LoggerWithLogService(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, func() error) { +func WithLogService(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, func() error) { cfg := newConfig(options) - core := &Core{ - LevelEnabler: zapcore.InfoLevel, - writeSyncer: &BufferedWriteSyncer{ - FlushInterval: cfg.flushInterval, - MaxBufferSize: cfg.maxBufferSize, - MaxSyncQueueSize: cfg.maxSyncQueueSize, - Service: service, - }, + ws := &BufferedWriteSyncer{ + FlushInterval: cfg.flushInterval, + MaxBufferSize: cfg.maxBufferSize, + MaxSyncQueueSize: cfg.maxSyncQueueSize, + Service: service, } - core.writeSyncer.Start() + ws.Start() + + return zap.New(zapcore.NewTee(logger.Core(), &Core{LevelEnabler: zapcore.InfoLevel, ws: ws})), ws.Stop +} + +type Core struct { + zapcore.LevelEnabler - return zap.New(zapcore.NewTee(logger.Core(), core)), core.writeSyncer.Stop + fields []zap.Field + ws WriteSyncer } func (core *Core) With(fields []zapcore.Field) zapcore.Core { return &Core{ LevelEnabler: core.LevelEnabler, fields: append(core.fields, fields...), - writeSyncer: core.writeSyncer, + ws: core.ws, } } @@ -51,11 +54,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) } func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { - return core.writeSyncer.Write(core.getEntry(entry, fields)) + return core.ws.Write(core.getEntry(entry, fields)) } func (core *Core) Sync() error { - return core.writeSyncer.Flush() + return core.ws.Sync() } func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry { diff --git a/pkg/log/zap/core_test.go b/pkg/log/zap/core_test.go index 845d6483..07be4e3c 100644 --- a/pkg/log/zap/core_test.go +++ b/pkg/log/zap/core_test.go @@ -2,194 +2,63 @@ package zap import ( "context" - "reflect" - "slices" - "sync" "testing" - "time" "git.perx.ru/perxis/perxis-go/pkg/log" - logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) -func TestCore_Write(t *testing.T) { - service := &logmocks.Service{} - service.On("Log", mock.Anything, mock.Anything). - Return(nil).Once() +func TestCore_getEntry(t *testing.T) { + core := &Core{} - logger, stop := LoggerWithLogService(zap.NewNop(), service) - logger.Debug("debug сообщение") // будет проигнорировано - - logger = logger.With( - Component("Items.Service"), - ) - - logger.Info("создан элемент коллекции", - zap.String("key", "val"), // будет проигнорировано - Category("create"), - Event("Items.Create"), - Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"), - CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")), - Attr(nil), - Tags("tag1", "tag2", "tag3"), - ) - logger.Warn("изменен элемент коллекции", - Category("update"), - Event("Items.Update"), - Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2"), - CallerFromContext(ContextWithCaller(context.Background(), "/users/UEhW")), - ) - - err := stop() - require.NoError(t, err) - - wantEntries := []*log.Entry{ - { - LogLevel: log.Level(zap.InfoLevel), - Message: "создан элемент коллекции", - Category: "create", - Component: "Items.Service", - Event: "Items.Create", - Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl", - Caller: "/users/PHVz", - Attr: nil, - Tags: []string{"tag1", "tag2", "tag3"}, - }, + tests := []struct { + name string + input struct { + entry zapcore.Entry + fields []zapcore.Field + } + want *log.Entry + }{ { - LogLevel: log.Level(zap.WarnLevel), - Message: "изменен элемент коллекции", - Category: "update", - Component: "Items.Service", - Event: "Items.Update", - Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2", - Caller: "/users/UEhW", - Attr: nil, - Tags: nil, + name: "simple", + input: struct { + entry zapcore.Entry + fields []zapcore.Field + }{ + entry: zapcore.Entry{Level: zapcore.InfoLevel, Message: "создан элемент коллекции"}, + fields: []zapcore.Field{ + zap.String("key", "val"), // будет проигнорировано + Category("create"), + Component("Items.Service"), + Event("Items.Create"), + Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"), + CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")), + Attr("any"), + Tags("tag1", "tag2", "tag3"), + }, + }, + want: &log.Entry{ + LogLevel: log.Level(zapcore.InfoLevel), + Message: "создан элемент коллекции", + Category: "create", + Component: "Items.Service", + Event: "Items.Create", + Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl", + Caller: "/users/PHVz", + Attr: "any", + Tags: []string{"tag1", "tag2", "tag3"}, + }, }, } - service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { - return len(entries) == 2 && slices.EqualFunc(wantEntries, entries, func(entryA *log.Entry, entryB *log.Entry) bool { - entryA.ID = entryB.ID // игнорируем ID, потому что он генерируется случайно - entryA.Timestamp = entryB.Timestamp // игнорируем Timestamp - return reflect.DeepEqual(entryA, entryB) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := core.getEntry(tc.input.entry, tc.input.fields) + got.ID = tc.want.ID // игнорируем ID + got.Timestamp = tc.want.Timestamp // игнорируем Timestamp + assert.Equal(t, tc.want, got) }) - })) -} - -func TestCore_ConcurrentWrite(t *testing.T) { - service := &logmocks.Service{} - service.On("Log", mock.Anything, mock.Anything). - Return(nil).Once() - - logger, stop := LoggerWithLogService(zap.NewNop(), service) - - var wg sync.WaitGroup - for i := 0; i < 100; i++ { - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - logger.Warn("msg") - }(&wg) - } - - wg.Wait() - - err := stop() - require.NoError(t, err) - - service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { - return len(entries) == 100 - })) -} - -func TestCore_ConcurrentWrite_WithMaxBufferSize(t *testing.T) { - service := &logmocks.Service{} - service.On("Log", mock.Anything, mock.Anything). - Return(nil).Times(10) - - logger, stop := LoggerWithLogService(zap.NewNop(), service, WithMaxBufferSize(10)) - - var wg sync.WaitGroup - for i := 0; i < 100; i++ { - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - logger.Info("msg") - }(&wg) - } - - wg.Wait() - - err := stop() - require.NoError(t, err) - - service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { - return len(entries) == 10 - })) -} - -func TestCore_Write_WithMaxBufferSize(t *testing.T) { - service := &logmocks.Service{} - service.On("Log", mock.Anything, mock.Anything). - Return(nil).Times(10) - - logger, stop := LoggerWithLogService(zap.NewNop(), service, WithMaxBufferSize(10)) - - for i := 0; i < 100; i++ { - logger.Info("msg") - } - - err := stop() - require.NoError(t, err) - - service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { - return len(entries) == 10 - })) -} - -func TestCore_Write_WithFlushInterval(t *testing.T) { - service := &logmocks.Service{} - service.On("Log", mock.Anything, mock.Anything). - Return(nil).Once() - - // в данном случае stop нам не нужен - logger, _ := LoggerWithLogService(zap.NewNop(), service, WithFlushInterval(1*time.Second)) - - for j := 0; j < 10; j++ { - logger.Info("msg") } - - time.Sleep(3 * time.Second) // ждем, пока сработает интервал - - service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { - return len(entries) == 10 - })) -} - -func TestCore_Write_Sync(t *testing.T) { - service := &logmocks.Service{} - service.On("Log", mock.Anything, mock.Anything). - Return(nil).Times(10) - - logger, stop := LoggerWithLogService(zap.NewNop(), service) - - for i := 0; i < 10; i++ { - for j := 0; j < 10; j++ { - logger.Info("msg") - } - err := logger.Sync() - assert.NoError(t, err) - } - - err := stop() - require.NoError(t, err) - - service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { - return len(entries) == 10 - })) } -- GitLab