diff --git a/pkg/log/zap/buffered_write_syncer.go b/pkg/log/zap/buffered_write_syncer.go new file mode 100644 index 0000000000000000000000000000000000000000..613161fb3620cc567cd9e043450ff056bf9f7f78 --- /dev/null +++ b/pkg/log/zap/buffered_write_syncer.go @@ -0,0 +1,153 @@ +package zap + +import ( + "context" + "sync" + "time" + + "git.perx.ru/perxis/perxis-go/pkg/errors" + "git.perx.ru/perxis/perxis-go/pkg/log" +) + +var SyncQueueOverflow = errors.New("sync queue overflow") + +// BufferedWriteSyncer буферизирует записи в памяти и отправляет их в log.Service. +// Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал, +// записи отправляются в очередь для синхронизации с log.Service. Таким образом, выполнение потока не блокируется при отправке записей. +// В случае переполнения очереди для синхронизации будет возвращена ошибка SyncQueueOverflow. +type BufferedWriteSyncer struct { + // FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию. + // + // Значение по умолчанию для этого параметра равно 5 секунд. + FlushInterval time.Duration + + // MaxBufferSize устанавливает максимальное количество записей, которые могут быть буферизованы. + // Когда количество буферизованных записей превысит этот порог, они будут отправлены на синхронизацию в log.Service. + // + // Значение по умолчанию для этого параметра равно 1000. + MaxBufferSize int + + // MaxSyncQueueSize устанавливает максимальный размер очереди записей на синхронизацию с log.Service. + // + // Значение по умолчанию для этого параметра равно 16. + MaxSyncQueueSize int + + // Service сервис для хранения записей + Service log.Service + + wg sync.WaitGroup + mu sync.RWMutex + buffer []*log.Entry + syncQueue chan []*log.Entry + + flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен + stopped bool // stopped указывает, был ли выполнен Stop +} + +func (ws *BufferedWriteSyncer) Start() { + ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) + ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) + ws.flushStop = make(chan struct{}) + + go ws.syncLoop() + go ws.flushLoop() +} + +func (ws *BufferedWriteSyncer) Stop() error { + ws.mu.Lock() + defer ws.mu.Unlock() + + if ws.stopped { + return nil + } + ws.stopped = true + + ws.wg.Add(2) + + close(ws.flushStop) // завершаем flushLoop + + err := ws.flush() // очищаем оставшиеся записи + + close(ws.syncQueue) // завершаем syncLoop + + ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop + + return err +} + +// Write отправляет запись в буфер. +// Когда количество буферизованных записей превышает максимальный размер буфера или когда пройдет заданный интервал, +// буферизированные записи будут отправлены на синхронизацию. +func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { + ws.mu.Lock() + defer ws.mu.Unlock() + + // Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его. + if len(ws.buffer)+1 > ws.MaxBufferSize { + err := ws.flush() + if err != nil { + return err + } + } + + ws.buffer = append(ws.buffer, entry) + + return nil +} + +// Flush освобождает буфер и отправляет копию записей на синхронизацию. +func (ws *BufferedWriteSyncer) Flush() error { + ws.mu.Lock() + defer ws.mu.Unlock() + + return ws.flush() +} + +// flush освобождает буфер и отправляет копию записей на синхронизацию. Не является безопасным для конкурентного вызова. +func (ws *BufferedWriteSyncer) flush() error { + if len(ws.buffer) == 0 { + return nil + } + + if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize { + return SyncQueueOverflow + } + + // Создаем копию буфера для отправки в очередь на синхронизацию. + entries := make([]*log.Entry, len(ws.buffer)) + copy(entries, ws.buffer) + + // Очищаем буфер с переиспользованием ёмкости. + ws.buffer = (ws.buffer)[:0] + + ws.syncQueue <- entries + + return nil +} + +// flushLoop периодически отправляет буферизированные записи на синхронизацию. +func (ws *BufferedWriteSyncer) flushLoop() { + ticker := time.NewTicker(ws.FlushInterval) + defer func() { + ticker.Stop() + ws.wg.Done() + }() + + for { + select { + case <-ticker.C: + _ = ws.Flush() + case <-ws.flushStop: + return + } + } +} + +// syncLoop синхронизирует полученные записи с log.Service. +func (ws *BufferedWriteSyncer) syncLoop() { + defer ws.wg.Done() + + for entries := range ws.syncQueue { + _ = ws.Service.Log(context.Background(), entries) + } +} diff --git a/pkg/log/zap/config.go b/pkg/log/zap/config.go new file mode 100644 index 0000000000000000000000000000000000000000..1882352536de8b43edb93808a4570805c03be494 --- /dev/null +++ b/pkg/log/zap/config.go @@ -0,0 +1,61 @@ +package zap + +import ( + "time" +) + +const ( + defaultMaxBufferSize = 1000 + defaultMaxSyncQueueSize = 16 + defaultFlushInterval = 5 * time.Second +) + +type config struct { + maxBufferSize int + maxSyncQueueSize int + flushInterval time.Duration +} + +func newConfig(options []Option) *config { + cfg := new(config) + for _, option := range options { + option(cfg) + } + + if cfg.maxBufferSize == 0 { + cfg.maxBufferSize = defaultMaxBufferSize + } + + if cfg.maxSyncQueueSize == 0 { + cfg.maxSyncQueueSize = defaultMaxSyncQueueSize + } + + if cfg.flushInterval == 0 { + cfg.flushInterval = defaultFlushInterval + } + + return cfg +} + +type Option func(config *config) + +// WithFlushInterval устанавливает значение для BufferedWriteSyncer.FlushInterval +func WithFlushInterval(interval time.Duration) Option { + return func(config *config) { + config.flushInterval = interval + } +} + +// WithMaxBufferSize устанавливает значение для BufferedWriteSyncer.MaxBufferSize +func WithMaxBufferSize(size int) Option { + return func(config *config) { + config.maxBufferSize = size + } +} + +// WithMaxSyncQueueSize устанавливает значение для BufferedWriteSyncer.MaxSyncQueueSize +func WithMaxSyncQueueSize(size int) Option { + return func(config *config) { + config.maxSyncQueueSize = size + } +} diff --git a/pkg/log/zap/core.go b/pkg/log/zap/core.go new file mode 100644 index 0000000000000000000000000000000000000000..e0e0ea83e66a4bd59b57df47de05d7e7f4230b8b --- /dev/null +++ b/pkg/log/zap/core.go @@ -0,0 +1,94 @@ +package zap + +import ( + "git.perx.ru/perxis/perxis-go/pkg/id" + "git.perx.ru/perxis/perxis-go/pkg/log" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type Core struct { + zapcore.LevelEnabler + + fields []zap.Field + writeSyncer *BufferedWriteSyncer +} + +// LoggerWithLogService добавляет функционал отправки записей в log.Service к переданному логгеру. +// Вторым параметром возвращается Stop, при вызове которого прекращается синхронизация записей по интервалу. +// Записи уровня Debug игнорируются и не отправляются. +func LoggerWithLogService(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, func() error) { + cfg := newConfig(options) + + core := &Core{ + LevelEnabler: zapcore.InfoLevel, + writeSyncer: &BufferedWriteSyncer{ + FlushInterval: cfg.flushInterval, + MaxBufferSize: cfg.maxBufferSize, + MaxSyncQueueSize: cfg.maxSyncQueueSize, + Service: service, + }, + } + + core.writeSyncer.Start() + + return zap.New(zapcore.NewTee(logger.Core(), core)), core.writeSyncer.Stop +} + +func (core *Core) With(fields []zapcore.Field) zapcore.Core { + return &Core{ + LevelEnabler: core.LevelEnabler, + fields: append(core.fields, fields...), + writeSyncer: core.writeSyncer, + } +} + +func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if core.Enabled(entry.Level) { + return checkedEntry.AddCore(entry, core) + } + return checkedEntry +} + +func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { + return core.writeSyncer.Write(core.getEntry(entry, fields)) +} + +func (core *Core) Sync() error { + return core.writeSyncer.Flush() +} + +func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry { + if len(core.fields) > 0 { + fields = append(fields, core.fields...) + } + + enc := zapcore.NewMapObjectEncoder() + for _, field := range fields { + field.AddTo(enc) + } + + ent := &log.Entry{ + ID: id.GenerateNewID(), + Timestamp: entry.Time, + LogLevel: log.Level(entry.Level), + Message: entry.Message, + } + + ent.Category, _ = enc.Fields["category"].(string) + ent.Component, _ = enc.Fields["component"].(string) + ent.Event, _ = enc.Fields["event"].(string) + ent.Object, _ = enc.Fields["object"].(string) + ent.Caller, _ = enc.Fields["caller"].(string) + ent.Attr = enc.Fields["attr"] + + if tags, ok := enc.Fields["tags"].([]any); ok { + for _, item := range tags { + if tag, ok := item.(string); ok { + ent.Tags = append(ent.Tags, tag) + } + } + } + + return ent +} diff --git a/pkg/log/zap/core_test.go b/pkg/log/zap/core_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8ec9498484a08f2609440ae89c7209c8f8265b01 --- /dev/null +++ b/pkg/log/zap/core_test.go @@ -0,0 +1,195 @@ +package zap + +import ( + "context" + "reflect" + "slices" + "sync" + "testing" + "time" + + "git.perx.ru/perxis/perxis-go/pkg/log" + logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestCore_Write(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Twice() + + logger, stop := LoggerWithLogService(zap.NewNop(), service) + logger.Debug("debug сообщение") // будет проигнорировано + + logger = logger.With( + Component("Items.Service"), + ) + + logger.Info("создан элемент коллекции", + zap.String("key", "val"), // будет проигнорировано + Category("create"), + Event("Items.Create"), + Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"), + CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")), + Attr(nil), + Tags("tag1", "tag2", "tag3"), + ) + logger.Warn("изменен элемент коллекции", + Category("update"), + Event("Items.Update"), + Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2"), + CallerFromContext(ContextWithCaller(context.Background(), "/users/UEhW")), + ) + + err := stop() + require.NoError(t, err) + + wantEntries := []*log.Entry{ + { + LogLevel: log.Level(zap.InfoLevel), + Message: "создан элемент коллекции", + Category: "create", + Component: "Items.Service", + Event: "Items.Create", + Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl", + Caller: "/users/PHVz", + Attr: nil, + Tags: []string{"tag1", "tag2", "tag3"}, + }, + { + LogLevel: log.Level(zap.WarnLevel), + Message: "изменен элемент коллекции", + Category: "update", + Component: "Items.Service", + Event: "Items.Update", + Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2", + Caller: "/users/UEhW", + Attr: nil, + Tags: nil, + }, + } + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 2 && slices.EqualFunc(wantEntries, entries, func(entryA *log.Entry, entryB *log.Entry) bool { + entryA.ID = entryB.ID // игнорируем ID, потому что он генерируется случайно + entryA.Timestamp = entryB.Timestamp // игнорируем Timestamp + return reflect.DeepEqual(entryA, entryB) + }) + })) +} + +func TestCore_ConcurrentWrite(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Once() + + logger, stop := LoggerWithLogService(zap.NewNop(), service) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + logger.Warn("msg") + }(&wg) + } + + wg.Wait() + + err := stop() + require.NoError(t, err) + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 100 + })) +} + +func TestCore_ConcurrentWrite_WithMaxBufferSize(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Times(10) + + logger, stop := LoggerWithLogService(zap.NewNop(), service, WithMaxBufferSize(10)) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() + logger.Info("msg") + }(&wg) + } + + wg.Wait() + + err := stop() + require.NoError(t, err) + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 10 + })) +} + +func TestCore_Write_WithMaxBufferSize(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Times(10) + + logger, stop := LoggerWithLogService(zap.NewNop(), service, WithMaxBufferSize(10)) + + for i := 0; i < 100; i++ { + logger.Info("msg") + } + + err := stop() + require.NoError(t, err) + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 10 + })) +} + +func TestCore_Write_WithFlushInterval(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Once() + + // в данном случае stop нам не нужен + logger, _ := LoggerWithLogService(zap.NewNop(), service, WithFlushInterval(1*time.Second)) + + for j := 0; j < 10; j++ { + logger.Info("msg") + } + + time.Sleep(3 * time.Second) // ждем, пока сработает интервал + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 10 + })) +} + +func TestCore_Write_Sync(t *testing.T) { + service := &logmocks.Service{} + service.On("Log", mock.Anything, mock.Anything). + Return(nil).Times(10) + + logger, stop := LoggerWithLogService(zap.NewNop(), service) + + for i := 0; i < 10; i++ { + for j := 0; j < 10; j++ { + logger.Info("msg") + } + err := logger.Sync() + assert.NoError(t, err) + } + + err := stop() + require.NoError(t, err) + + service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { + return len(entries) == 10 + })) +}