diff --git a/pkg/log/zap/buffered_write_syncer.go b/pkg/log/zap/buffered_write_syncer.go index ecfb53b3a91e725e3f348d9eb32ff1ee542ec162..09e6051f012b7e6c635feb77c3f0f6dd5750faa6 100644 --- a/pkg/log/zap/buffered_write_syncer.go +++ b/pkg/log/zap/buffered_write_syncer.go @@ -9,12 +9,6 @@ import ( "git.perx.ru/perxis/perxis-go/pkg/log" ) -const ( - defaultMaxBufferSize = 1000 - defaultMaxSyncQueueSize = 16 - defaultFlushInterval = 5 * time.Second -) - var SyncQueueOverflow = errors.New("sync queue overflow") // BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service. @@ -50,19 +44,17 @@ type BufferedWriteSyncer struct { stopped bool // stopped указывает, был ли выполнен Stop } -func (ws *BufferedWriteSyncer) Start() { - if ws.MaxBufferSize == 0 { - ws.MaxBufferSize = defaultMaxBufferSize - } - - if ws.MaxSyncQueueSize == 0 { - ws.MaxSyncQueueSize = defaultMaxSyncQueueSize - } - - if ws.FlushInterval == 0 { - ws.FlushInterval = defaultFlushInterval +func NewBufferedWriteSyncer(service log.Service, options ...Option) *BufferedWriteSyncer { + cfg := newConfig(options) + return &BufferedWriteSyncer{ + FlushInterval: cfg.flushInterval, + MaxBufferSize: cfg.maxBufferSize, + MaxSyncQueueSize: cfg.maxSyncQueueSize, + Service: service, } +} +func (ws *BufferedWriteSyncer) start() { ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) ws.flushStop = make(chan struct{}) @@ -74,22 +66,24 @@ func (ws *BufferedWriteSyncer) Start() { ws.started = true } -func (ws *BufferedWriteSyncer) Stop() { +func (ws *BufferedWriteSyncer) Stop() error { ws.mu.Lock() defer ws.mu.Unlock() if !ws.started || ws.stopped { - return + return nil } ws.stopped = true close(ws.flushStop) // завершаем flushLoop - _ = ws.flush() // очищаем оставшиеся записи + err := ws.flush() // очищаем оставшиеся записи close(ws.syncQueue) // завершаем syncLoop ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop + + return err } // Write отправляет запись в буфер. @@ -99,7 +93,7 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { defer ws.mu.Unlock() if !ws.started { - panic("BufferedWriteSyncer must be started") + ws.start() } // Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его. diff --git a/pkg/log/zap/buffered_write_syncer_test.go b/pkg/log/zap/buffered_write_syncer_test.go index 2e2b64b9a857b9f50c4d623083745edb742c419a..bbb27b6cd4f46b097a666b5791764cb6306d979f 100644 --- a/pkg/log/zap/buffered_write_syncer_test.go +++ b/pkg/log/zap/buffered_write_syncer_test.go @@ -18,12 +18,11 @@ func TestBufferedWriteSyncer_Write(t *testing.T) { Return(nil). Run(func(args mock.Arguments) { entries := args.Get(1).([]*log.Entry) - assert.Equal(t, 2, len(entries)) + require.Equal(t, 2, len(entries)) }). Once() - ws := &BufferedWriteSyncer{Service: service} - ws.Start() + ws := NewBufferedWriteSyncer(service) err := ws.Write(&log.Entry{Message: "first log message"}) require.NoError(t, err) @@ -31,7 +30,8 @@ func TestBufferedWriteSyncer_Write(t *testing.T) { err = ws.Write(&log.Entry{Message: "second log message"}) require.NoError(t, err) - ws.Stop() + err = ws.Stop() + require.NoError(t, err) service.AssertExpectations(t) } @@ -42,12 +42,11 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { Return(nil). Run(func(args mock.Arguments) { entries := args.Get(1).([]*log.Entry) - assert.Equal(t, 100, len(entries)) + require.Equal(t, 100, len(entries)) }). Once() - ws := &BufferedWriteSyncer{Service: service} - ws.Start() + ws := NewBufferedWriteSyncer(service) var wg sync.WaitGroup for i := 0; i < 100; i++ { @@ -62,7 +61,8 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { wg.Wait() - ws.Stop() + err := ws.Stop() + require.NoError(t, err) service.AssertExpectations(t) } @@ -73,12 +73,11 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) { Return(nil). Run(func(args mock.Arguments) { entries := args.Get(1).([]*log.Entry) - assert.Equal(t, 10, len(entries)) + require.Equal(t, 10, len(entries)) }). Times(10) - ws := &BufferedWriteSyncer{Service: service} - ws.Start() + ws := NewBufferedWriteSyncer(service) for i := 0; i < 10; i++ { for j := 0; j < 10; j++ { @@ -86,10 +85,11 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) { require.NoError(t, err) } err := ws.Sync() - assert.NoError(t, err) + require.NoError(t, err) } - ws.Stop() + err := ws.Stop() + require.NoError(t, err) service.AssertExpectations(t) } @@ -104,15 +104,15 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) { }). Times(10) - ws := &BufferedWriteSyncer{Service: service, MaxBufferSize: 10} - ws.Start() + ws := NewBufferedWriteSyncer(service, WithMaxBufferSize(10)) for i := 0; i < 100; i++ { err := ws.Write(&log.Entry{Message: "log message"}) require.NoError(t, err) } - ws.Stop() + err := ws.Stop() + require.NoError(t, err) service.AssertExpectations(t) } @@ -127,8 +127,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { }). Once() - ws := &BufferedWriteSyncer{Service: service, FlushInterval: time.Second} - ws.Start() + ws := NewBufferedWriteSyncer(service, WithFlushInterval(time.Second)) for j := 0; j < 10; j++ { err := ws.Write(&log.Entry{Message: "log message"}) @@ -137,12 +136,8 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { time.Sleep(3 * time.Second) // ждем, пока сработает интервал - service.AssertExpectations(t) -} + err := ws.Stop() + require.NoError(t, err) -func TestBufferedWriteSyncer_NotStartedWrite(t *testing.T) { - ws := &BufferedWriteSyncer{} - assert.Panics(t, func() { - _ = ws.Write(&log.Entry{Message: "log message"}) - }) + service.AssertExpectations(t) } diff --git a/pkg/log/zap/config.go b/pkg/log/zap/config.go index dc602dff281a7c68caecc33bc1d73ba4a765aeec..d586d1207f18aa75c5c3afa10a3d63086371efe0 100644 --- a/pkg/log/zap/config.go +++ b/pkg/log/zap/config.go @@ -4,10 +4,16 @@ import ( "time" ) +const ( + defaultMaxBufferSize = 1000 + defaultMaxSyncQueueSize = 16 + defaultFlushInterval = 5 * time.Second +) + type config struct { + flushInterval time.Duration maxBufferSize int maxSyncQueueSize int - flushInterval time.Duration } func newConfig(options []Option) *config { @@ -16,6 +22,18 @@ func newConfig(options []Option) *config { option(cfg) } + if cfg.flushInterval == 0 { + cfg.flushInterval = defaultFlushInterval + } + + if cfg.maxBufferSize == 0 { + cfg.maxBufferSize = defaultMaxBufferSize + } + + if cfg.maxSyncQueueSize == 0 { + cfg.maxSyncQueueSize = defaultMaxSyncQueueSize + } + return cfg } diff --git a/pkg/log/zap/core.go b/pkg/log/zap/core.go index 0af6fe838579c9bff5de80fa6a03472b10a1a353..cf925e92540206f9a7e492609db6c970aee3704b 100644 --- a/pkg/log/zap/core.go +++ b/pkg/log/zap/core.go @@ -17,14 +17,21 @@ type WriteSyncer interface { type Core struct { zapcore.LevelEnabler - WriteSyncer WriteSyncer + writeSyncer WriteSyncer fields []zap.Field } +func NewCore(writeSyncer WriteSyncer) *Core { + return &Core{ + LevelEnabler: zapcore.InfoLevel, + writeSyncer: writeSyncer, + } +} + func (core *Core) With(fields []zapcore.Field) zapcore.Core { return &Core{ LevelEnabler: core.LevelEnabler, - WriteSyncer: core.WriteSyncer, + writeSyncer: core.writeSyncer, fields: append(core.fields, fields...), } } @@ -37,11 +44,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) } func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { - return core.WriteSyncer.Write(core.getEntry(entry, fields)) + return core.writeSyncer.Write(core.getEntry(entry, fields)) } func (core *Core) Sync() error { - return core.WriteSyncer.Sync() + return core.writeSyncer.Sync() } func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry { diff --git a/pkg/log/zap/core_test.go b/pkg/log/zap/core_test.go index 180f622aaf663b2b3ac032fd0cbac0dcf4e80057..8378658e71b5217cfd5f19c8b905462d5be05c5d 100644 --- a/pkg/log/zap/core_test.go +++ b/pkg/log/zap/core_test.go @@ -4,13 +4,13 @@ import ( "testing" "git.perx.ru/perxis/perxis-go/pkg/log" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) func TestCore_getEntry(t *testing.T) { - core := &Core{} + core := NewCore(nil) tests := []struct { name string @@ -57,7 +57,7 @@ func TestCore_getEntry(t *testing.T) { got := core.getEntry(tc.input.entry, tc.input.fields) got.ID = tc.want.ID // игнорируем ID got.Timestamp = tc.want.Timestamp // игнорируем Timestamp - assert.Equal(t, tc.want, got) + require.Equal(t, tc.want, got) }) } } diff --git a/pkg/log/zap/zap.go b/pkg/log/zap/zap.go deleted file mode 100644 index acae34a6159c7f0c14ff5827d47a9d9c3c34a0a3..0000000000000000000000000000000000000000 --- a/pkg/log/zap/zap.go +++ /dev/null @@ -1,29 +0,0 @@ -package zap - -import ( - "git.perx.ru/perxis/perxis-go/pkg/content" - "git.perx.ru/perxis/perxis-go/pkg/log" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -// New создает логгер, который отправляет записи в log.Service, -// вторым параметром возвращается интерфейс для запуска и остановки логгера -func New(service log.Service, options ...Option) (*zap.Logger, content.Runnable) { - cfg := newConfig(options) - - ws := &BufferedWriteSyncer{ - FlushInterval: cfg.flushInterval, - MaxBufferSize: cfg.maxBufferSize, - MaxSyncQueueSize: cfg.maxSyncQueueSize, - Service: service, - } - return zap.New(&Core{LevelEnabler: zapcore.InfoLevel, WriteSyncer: ws}), ws -} - -// WithLogger добавляет к переданном логгеру логгер, который отправляет записи в log.Service. -// вторым параметром возвращается интерфейс для запуска и остановки логгера -func WithLogger(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, content.Runnable) { - logLogger, runnable := New(service, options...) - return zap.New(zapcore.NewTee(logger.Core(), logLogger.Core())), runnable -} diff --git a/pkg/log/zap/zap_test.go b/pkg/log/zap/zap_test.go index 31e0566b73fe976196a650a88ee0c11e0b0aba65..c8a4dff9c09fe521e0f76fe35a99d909b1f172e8 100644 --- a/pkg/log/zap/zap_test.go +++ b/pkg/log/zap/zap_test.go @@ -7,8 +7,9 @@ import ( "git.perx.ru/perxis/perxis-go/pkg/log" logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -41,9 +42,9 @@ func TestExample(t *testing.T) { Return(nil). Run(func(args mock.Arguments) { entries := args.Get(1).([]*log.Entry) - assert.True(t, slices.EqualFunc(wantEntries, entries, func(wantEntry, gotEntry *log.Entry) bool { - assert.NotEmpty(t, gotEntry.ID) - assert.NotEmpty(t, gotEntry.Timestamp) + require.True(t, slices.EqualFunc(wantEntries, entries, func(wantEntry, gotEntry *log.Entry) bool { + require.NotEmpty(t, gotEntry.ID) + require.NotEmpty(t, gotEntry.Timestamp) gotEntry.ID = wantEntry.ID // игнорируем ID gotEntry.Timestamp = wantEntry.Timestamp // игнорируем Timestamp return reflect.DeepEqual(wantEntry, gotEntry) @@ -51,8 +52,8 @@ func TestExample(t *testing.T) { }). Once() - logger, runnable := New(service) - runnable.Start() + ws := NewBufferedWriteSyncer(service) + logger := zap.New(NewCore(ws)) logger.Info("создан элемент коллекции", Category("create"), @@ -71,7 +72,8 @@ func TestExample(t *testing.T) { Attr(map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}), ) - runnable.Stop() + err := ws.Stop() + require.NoError(t, err) service.AssertExpectations(t) }