From e0fa431ba27a38efa43e6d1b7fc46251bf56a869 Mon Sep 17 00:00:00 2001 From: Semyon Krestyaninov <ensiouel@gmail.com> Date: Wed, 7 Feb 2024 15:45:23 +0300 Subject: [PATCH] refactor --- pkg/log/zap/buffered_write_syncer.go | 81 +++++++++++++---------- pkg/log/zap/buffered_write_syncer_test.go | 10 +-- pkg/log/zap/config.go | 18 ----- pkg/log/zap/core.go | 8 +-- pkg/log/zap/zap.go | 36 +++++----- pkg/log/zap/zap_test.go | 10 ++- 6 files changed, 80 insertions(+), 83 deletions(-) diff --git a/pkg/log/zap/buffered_write_syncer.go b/pkg/log/zap/buffered_write_syncer.go index 2de35c99..ce62ade3 100644 --- a/pkg/log/zap/buffered_write_syncer.go +++ b/pkg/log/zap/buffered_write_syncer.go @@ -9,12 +9,17 @@ import ( "git.perx.ru/perxis/perxis-go/pkg/log" ) +const ( + defaultMaxBufferSize = 1000 + defaultMaxSyncQueueSize = 16 + defaultFlushInterval = 5 * time.Second +) + var SyncQueueOverflow = errors.New("sync queue overflow") // BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service. // Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал, -// записи отправляются в очередь для синхронизации с log.Service. Таким образом, выполнение потока не блокируется при отправке записей. -// В случае переполнения очереди для синхронизации будет возвращена ошибка SyncQueueOverflow. +// записи отправляются в очередь для синхронизации с log.Service. type BufferedWriteSyncer struct { // FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию. // @@ -41,28 +46,39 @@ type BufferedWriteSyncer struct { syncQueue chan []*log.Entry flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен + started bool // started указывает, был ли выполнен start stopped bool // stopped указывает, был ли выполнен Stop } -func NewBufferedWriteSyncer(service log.Service, options ...Option) *BufferedWriteSyncer { - cfg := newConfig(options) +func (ws *BufferedWriteSyncer) start() { + if ws.MaxBufferSize == 0 { + ws.MaxBufferSize = defaultMaxBufferSize + } + + if ws.MaxSyncQueueSize == 0 { + ws.MaxSyncQueueSize = defaultMaxSyncQueueSize + } - ws := &BufferedWriteSyncer{ - FlushInterval: cfg.flushInterval, - MaxBufferSize: cfg.maxBufferSize, - MaxSyncQueueSize: cfg.maxSyncQueueSize, - Service: service, + if ws.FlushInterval == 0 { + ws.FlushInterval = defaultFlushInterval } - ws.start() - return ws + ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) + ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) + ws.flushStop = make(chan struct{}) + + ws.wg.Add(2) + go ws.syncLoop() + go ws.flushLoop() + + ws.started = true } func (ws *BufferedWriteSyncer) Stop() error { ws.mu.Lock() defer ws.mu.Unlock() - if ws.stopped { + if !ws.started || ws.stopped { return nil } ws.stopped = true @@ -79,12 +95,15 @@ func (ws *BufferedWriteSyncer) Stop() error { } // Write отправляет запись в буфер. -// Когда количество буферизованных записей превышает максимальный размер буфера или когда пройдет заданный интервал, -// буферизированные записи будут отправлены на синхронизацию. +// Когда количество буферизованных записей превышает максимальный размер буфера, буферизированные записи будут отправлены на синхронизацию. func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { ws.mu.Lock() defer ws.mu.Unlock() + if !ws.started { + ws.start() + } + // Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его. if len(ws.buffer)+1 > ws.MaxBufferSize { err := ws.flush() @@ -98,42 +117,34 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { return nil } -// Sync освобождает буфер и отправляет копию записей на синхронизацию. +// Sync освобождает буфер и отправляет буферизированные записи на синхронизацию. func (ws *BufferedWriteSyncer) Sync() error { ws.mu.Lock() defer ws.mu.Unlock() - return ws.flush() -} - -func (ws *BufferedWriteSyncer) start() { - ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) - ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) - ws.flushStop = make(chan struct{}) + if ws.started { + return ws.flush() + } - ws.wg.Add(2) - go ws.syncLoop() - go ws.flushLoop() + return nil } -// flush освобождает буфер и отправляет копию записей на синхронизацию. Не является безопасным для конкурентного вызова. +// flush освобождает буфер и отправляет буферизированные записи на синхронизацию. +// Если очередь на синхронизацию переполнена, будет возвращена ошибка SyncQueueOverflow +// +// ВНИМАНИЕ: Не является безопасным для конкурентного вызова. func (ws *BufferedWriteSyncer) flush() error { if len(ws.buffer) == 0 { return nil } + // Проверяем, не достигли ли мы предела размера очереди. Если это так, возвращаем ошибку. if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize { return SyncQueueOverflow } - // Создаем копию буфера для отправки в очередь на синхронизацию. - entries := make([]*log.Entry, len(ws.buffer)) - copy(entries, ws.buffer) - - // Очищаем буфер с переиспользованием ёмкости. - ws.buffer = (ws.buffer)[:0] - - ws.syncQueue <- entries + ws.syncQueue <- ws.buffer + ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) return nil } @@ -156,7 +167,7 @@ func (ws *BufferedWriteSyncer) flushLoop() { } } -// syncLoop синхронизирует полученные записи с log.Service. +// syncLoop синхронизирует записи с log.Service. func (ws *BufferedWriteSyncer) syncLoop() { defer ws.wg.Done() diff --git a/pkg/log/zap/buffered_write_syncer_test.go b/pkg/log/zap/buffered_write_syncer_test.go index 2608596e..79d3a22d 100644 --- a/pkg/log/zap/buffered_write_syncer_test.go +++ b/pkg/log/zap/buffered_write_syncer_test.go @@ -22,7 +22,7 @@ func TestBufferedWriteSyncer_Write(t *testing.T) { }). Once() - ws := NewBufferedWriteSyncer(service) + ws := &BufferedWriteSyncer{Service: service} err := ws.Write(&log.Entry{Message: "first log message"}) require.NoError(t, err) @@ -46,7 +46,7 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { }). Once() - ws := NewBufferedWriteSyncer(service) + ws := &BufferedWriteSyncer{Service: service} var wg sync.WaitGroup for i := 0; i < 100; i++ { @@ -77,7 +77,7 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) { }). Times(10) - ws := NewBufferedWriteSyncer(service) + ws := &BufferedWriteSyncer{Service: service} for i := 0; i < 10; i++ { for j := 0; j < 10; j++ { @@ -104,7 +104,7 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) { }). Times(10) - ws := NewBufferedWriteSyncer(service, WithMaxBufferSize(10)) + ws := &BufferedWriteSyncer{Service: service, MaxBufferSize: 10} for i := 0; i < 100; i++ { err := ws.Write(&log.Entry{Message: "log message"}) @@ -127,7 +127,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { }). Once() - ws := NewBufferedWriteSyncer(service, WithFlushInterval(time.Second)) + ws := &BufferedWriteSyncer{Service: service, FlushInterval: time.Second} for j := 0; j < 10; j++ { err := ws.Write(&log.Entry{Message: "log message"}) diff --git a/pkg/log/zap/config.go b/pkg/log/zap/config.go index 18823525..dc602dff 100644 --- a/pkg/log/zap/config.go +++ b/pkg/log/zap/config.go @@ -4,12 +4,6 @@ import ( "time" ) -const ( - defaultMaxBufferSize = 1000 - defaultMaxSyncQueueSize = 16 - defaultFlushInterval = 5 * time.Second -) - type config struct { maxBufferSize int maxSyncQueueSize int @@ -22,18 +16,6 @@ func newConfig(options []Option) *config { option(cfg) } - if cfg.maxBufferSize == 0 { - cfg.maxBufferSize = defaultMaxBufferSize - } - - if cfg.maxSyncQueueSize == 0 { - cfg.maxSyncQueueSize = defaultMaxSyncQueueSize - } - - if cfg.flushInterval == 0 { - cfg.flushInterval = defaultFlushInterval - } - return cfg } diff --git a/pkg/log/zap/core.go b/pkg/log/zap/core.go index 924a67be..0af6fe83 100644 --- a/pkg/log/zap/core.go +++ b/pkg/log/zap/core.go @@ -17,14 +17,14 @@ type WriteSyncer interface { type Core struct { zapcore.LevelEnabler - writeSyncer WriteSyncer + WriteSyncer WriteSyncer fields []zap.Field } func (core *Core) With(fields []zapcore.Field) zapcore.Core { return &Core{ LevelEnabler: core.LevelEnabler, - writeSyncer: core.writeSyncer, + WriteSyncer: core.WriteSyncer, fields: append(core.fields, fields...), } } @@ -37,11 +37,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) } func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { - return core.writeSyncer.Write(core.getEntry(entry, fields)) + return core.WriteSyncer.Write(core.getEntry(entry, fields)) } func (core *Core) Sync() error { - return core.writeSyncer.Sync() + return core.WriteSyncer.Sync() } func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry { diff --git a/pkg/log/zap/zap.go b/pkg/log/zap/zap.go index 9d884475..d7bc4b17 100644 --- a/pkg/log/zap/zap.go +++ b/pkg/log/zap/zap.go @@ -1,23 +1,29 @@ package zap import ( + "git.perx.ru/perxis/perxis-go/pkg/log" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) -// WithLogService объединяет переданный логгер с ядром, которое кодирует и дублирует записи в WriteSyncer. -// Пример использования: -// -// func main() { -// service := ... // ваш log.Service -// ws := NewBufferedWriteSyncer(service) -// defer ws.Stop() -// -// logger := ... // ваш логгер, который нужно обернуть -// logger = WithLogService(logger, ws) -// -// // ... -// } -func WithLogService(logger *zap.Logger, writeSyncer WriteSyncer) *zap.Logger { - return zap.New(zapcore.NewTee(logger.Core(), &Core{LevelEnabler: zapcore.InfoLevel, writeSyncer: writeSyncer})) +// NewLogger создает логгер, который отправляет записи в log.Service, +// вторым параметром возвращается функция, которая должна быть вызвана для остановки логгера. +func NewLogger(service log.Service, options ...Option) (*zap.Logger, func() error) { + cfg := newConfig(options) + + ws := &BufferedWriteSyncer{ + FlushInterval: cfg.flushInterval, + MaxBufferSize: cfg.maxBufferSize, + MaxSyncQueueSize: cfg.maxSyncQueueSize, + Service: service, + } + return zap.New(&Core{ + LevelEnabler: zapcore.InfoLevel, + WriteSyncer: ws, + }), ws.Stop +} + +// MergeLoggers объединяет два логгера в один. +func MergeLoggers(logger1 *zap.Logger, logger2 *zap.Logger) *zap.Logger { + return zap.New(zapcore.NewTee(logger1.Core(), logger2.Core())) } diff --git a/pkg/log/zap/zap_test.go b/pkg/log/zap/zap_test.go index bcea51b9..21b8a5b1 100644 --- a/pkg/log/zap/zap_test.go +++ b/pkg/log/zap/zap_test.go @@ -9,7 +9,6 @@ import ( logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -23,7 +22,6 @@ func TestExample(t *testing.T) { Event: "Items.Create", Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl", Caller: "/users/PHVz", - Attr: "any", Tags: []string{"tag1", "tag2", "tag3"}, }, { @@ -34,6 +32,7 @@ func TestExample(t *testing.T) { Event: "Items.Update", Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2cw", Caller: "/users/PHVz", + Attr: map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}, }, } @@ -52,8 +51,7 @@ func TestExample(t *testing.T) { }). Once() - ws := NewBufferedWriteSyncer(service) - logger := WithLogService(zap.NewNop(), ws) + logger, stop := NewLogger(service) logger.Info("создан элемент коллекции", Category("create"), @@ -61,7 +59,6 @@ func TestExample(t *testing.T) { Event("Items.Create"), Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"), Caller("/users/PHVz"), - Attr("any"), Tags("tag1", "tag2", "tag3"), ) logger.Warn("изменен элемент коллекции", @@ -70,9 +67,10 @@ func TestExample(t *testing.T) { Event("Items.Update"), Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2cw"), Caller("/users/PHVz"), + Attr(map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}), ) - err := ws.Stop() + err := stop() assert.NoError(t, err) service.AssertExpectations(t) -- GitLab