Skip to content
Snippets Groups Projects
Commit 2ce33396 authored by ensiouel's avatar ensiouel
Browse files

add zap integration

parent 930bb7a5
No related branches found
No related tags found
No related merge requests found
package zap
import (
"context"
"sync"
"time"
"git.perx.ru/perxis/perxis-go/pkg/errors"
"git.perx.ru/perxis/perxis-go/pkg/log"
)
var SyncQueueOverflow = errors.New("sync queue overflow")
// BufferedWriteSyncer буферизирует записи в памяти и отправляет их в log.Service.
// Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал,
// записи отправляются в очередь для синхронизации с log.Service. Таким образом, выполнение потока не блокируется при отправке записей.
// В случае переполнения очереди для синхронизации будет возвращена ошибка SyncQueueOverflow.
type BufferedWriteSyncer struct {
// FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию.
//
// Значение по умолчанию для этого параметра равно 5 секунд.
FlushInterval time.Duration
// MaxBufferSize устанавливает максимальное количество записей, которые могут быть буферизованы.
// Когда количество буферизованных записей превысит этот порог, они будут отправлены на синхронизацию в log.Service.
//
// Значение по умолчанию для этого параметра равно 1000.
MaxBufferSize int
// MaxSyncQueueSize устанавливает максимальный размер очереди записей на синхронизацию с log.Service.
//
// Значение по умолчанию для этого параметра равно 16.
MaxSyncQueueSize int
// Service сервис для хранения записей
Service log.Service
wg sync.WaitGroup
mu sync.RWMutex
buffer []*log.Entry
syncQueue chan []*log.Entry
flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен
stopped bool // stopped указывает, был ли выполнен Stop
}
func (ws *BufferedWriteSyncer) Start() {
ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
ws.flushStop = make(chan struct{})
go ws.syncLoop()
go ws.flushLoop()
}
func (ws *BufferedWriteSyncer) Stop() error {
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.stopped {
return nil
}
ws.stopped = true
ws.wg.Add(2)
close(ws.flushStop) // завершаем flushLoop
err := ws.flush() // очищаем оставшиеся записи
close(ws.syncQueue) // завершаем syncLoop
ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop
return err
}
// Write отправляет запись в буфер.
// Когда количество буферизованных записей превышает максимальный размер буфера или когда пройдет заданный интервал,
// буферизированные записи будут отправлены на синхронизацию.
func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
ws.mu.Lock()
defer ws.mu.Unlock()
// Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его.
if len(ws.buffer)+1 > ws.MaxBufferSize {
err := ws.flush()
if err != nil {
return err
}
}
ws.buffer = append(ws.buffer, entry)
return nil
}
// Flush освобождает буфер и отправляет копию записей на синхронизацию.
func (ws *BufferedWriteSyncer) Flush() error {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.flush()
}
// flush освобождает буфер и отправляет копию записей на синхронизацию. Не является безопасным для конкурентного вызова.
func (ws *BufferedWriteSyncer) flush() error {
if len(ws.buffer) == 0 {
return nil
}
if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize {
return SyncQueueOverflow
}
// Создаем копию буфера для отправки в очередь на синхронизацию.
entries := make([]*log.Entry, len(ws.buffer))
copy(entries, ws.buffer)
// Очищаем буфер с переиспользованием ёмкости.
ws.buffer = (ws.buffer)[:0]
ws.syncQueue <- entries
return nil
}
// flushLoop периодически отправляет буферизированные записи на синхронизацию.
func (ws *BufferedWriteSyncer) flushLoop() {
ticker := time.NewTicker(ws.FlushInterval)
defer func() {
ticker.Stop()
ws.wg.Done()
}()
for {
select {
case <-ticker.C:
_ = ws.Flush()
case <-ws.flushStop:
return
}
}
}
// syncLoop синхронизирует полученные записи с log.Service.
func (ws *BufferedWriteSyncer) syncLoop() {
defer ws.wg.Done()
for entries := range ws.syncQueue {
_ = ws.Service.Log(context.Background(), entries)
}
}
package zap
import (
"time"
)
const (
defaultMaxBufferSize = 1000
defaultMaxSyncQueueSize = 16
defaultFlushInterval = 5 * time.Second
)
type config struct {
maxBufferSize int
maxSyncQueueSize int
flushInterval time.Duration
}
func newConfig(options []Option) *config {
cfg := new(config)
for _, option := range options {
option(cfg)
}
if cfg.maxBufferSize == 0 {
cfg.maxBufferSize = defaultMaxBufferSize
}
if cfg.maxSyncQueueSize == 0 {
cfg.maxSyncQueueSize = defaultMaxSyncQueueSize
}
if cfg.flushInterval == 0 {
cfg.flushInterval = defaultFlushInterval
}
return cfg
}
type Option func(config *config)
// WithFlushInterval устанавливает значение для BufferedWriteSyncer.FlushInterval
func WithFlushInterval(interval time.Duration) Option {
return func(config *config) {
config.flushInterval = interval
}
}
// WithMaxBufferSize устанавливает значение для BufferedWriteSyncer.MaxBufferSize
func WithMaxBufferSize(size int) Option {
return func(config *config) {
config.maxBufferSize = size
}
}
// WithMaxSyncQueueSize устанавливает значение для BufferedWriteSyncer.MaxSyncQueueSize
func WithMaxSyncQueueSize(size int) Option {
return func(config *config) {
config.maxSyncQueueSize = size
}
}
package zap
import (
"git.perx.ru/perxis/perxis-go/pkg/id"
"git.perx.ru/perxis/perxis-go/pkg/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type Core struct {
zapcore.LevelEnabler
fields []zap.Field
writeSyncer *BufferedWriteSyncer
}
// LoggerWithLogService добавляет функционал отправки записей в log.Service к переданному логгеру.
// Вторым параметром возвращается Stop, при вызове которого прекращается синхронизация записей по интервалу.
// Записи уровня Debug игнорируются и не отправляются.
func LoggerWithLogService(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, func() error) {
cfg := newConfig(options)
core := &Core{
LevelEnabler: zapcore.InfoLevel,
writeSyncer: &BufferedWriteSyncer{
FlushInterval: cfg.flushInterval,
MaxBufferSize: cfg.maxBufferSize,
MaxSyncQueueSize: cfg.maxSyncQueueSize,
Service: service,
},
}
core.writeSyncer.Start()
return zap.New(zapcore.NewTee(logger.Core(), core)), core.writeSyncer.Stop
}
func (core *Core) With(fields []zapcore.Field) zapcore.Core {
return &Core{
LevelEnabler: core.LevelEnabler,
fields: append(core.fields, fields...),
writeSyncer: core.writeSyncer,
}
}
func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if core.Enabled(entry.Level) {
return checkedEntry.AddCore(entry, core)
}
return checkedEntry
}
func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error {
return core.writeSyncer.Write(core.getEntry(entry, fields))
}
func (core *Core) Sync() error {
return core.writeSyncer.Flush()
}
func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry {
if len(core.fields) > 0 {
fields = append(fields, core.fields...)
}
enc := zapcore.NewMapObjectEncoder()
for _, field := range fields {
field.AddTo(enc)
}
ent := &log.Entry{
ID: id.GenerateNewID(),
Timestamp: entry.Time,
LogLevel: log.Level(entry.Level),
Message: entry.Message,
}
ent.Category, _ = enc.Fields["category"].(string)
ent.Component, _ = enc.Fields["component"].(string)
ent.Event, _ = enc.Fields["event"].(string)
ent.Object, _ = enc.Fields["object"].(string)
ent.Caller, _ = enc.Fields["caller"].(string)
ent.Attr = enc.Fields["attr"]
if tags, ok := enc.Fields["tags"].([]any); ok {
for _, item := range tags {
if tag, ok := item.(string); ok {
ent.Tags = append(ent.Tags, tag)
}
}
}
return ent
}
package zap
import (
"context"
"reflect"
"slices"
"sync"
"testing"
"time"
"git.perx.ru/perxis/perxis-go/pkg/log"
logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestCore_Write(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Twice()
logger, stop := LoggerWithLogService(zap.NewNop(), service)
logger.Debug("debug сообщение") // будет проигнорировано
logger = logger.With(
Component("Items.Service"),
)
logger.Info("создан элемент коллекции",
zap.String("key", "val"), // будет проигнорировано
Category("create"),
Event("Items.Create"),
Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"),
CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")),
Attr(nil),
Tags("tag1", "tag2", "tag3"),
)
logger.Warn("изменен элемент коллекции",
Category("update"),
Event("Items.Update"),
Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2"),
CallerFromContext(ContextWithCaller(context.Background(), "/users/UEhW")),
)
err := stop()
require.NoError(t, err)
wantEntries := []*log.Entry{
{
LogLevel: log.Level(zap.InfoLevel),
Message: "создан элемент коллекции",
Category: "create",
Component: "Items.Service",
Event: "Items.Create",
Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl",
Caller: "/users/PHVz",
Attr: nil,
Tags: []string{"tag1", "tag2", "tag3"},
},
{
LogLevel: log.Level(zap.WarnLevel),
Message: "изменен элемент коллекции",
Category: "update",
Component: "Items.Service",
Event: "Items.Update",
Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2",
Caller: "/users/UEhW",
Attr: nil,
Tags: nil,
},
}
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 2 && slices.EqualFunc(wantEntries, entries, func(entryA *log.Entry, entryB *log.Entry) bool {
entryA.ID = entryB.ID // игнорируем ID, потому что он генерируется случайно
entryA.Timestamp = entryB.Timestamp // игнорируем Timestamp
return reflect.DeepEqual(entryA, entryB)
})
}))
}
func TestCore_ConcurrentWrite(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
logger, stop := LoggerWithLogService(zap.NewNop(), service)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
logger.Warn("msg")
}(&wg)
}
wg.Wait()
err := stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 100
}))
}
func TestCore_ConcurrentWrite_WithMaxBufferSize(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
logger, stop := LoggerWithLogService(zap.NewNop(), service, WithMaxBufferSize(10))
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
logger.Info("msg")
}(&wg)
}
wg.Wait()
err := stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
func TestCore_Write_WithMaxBufferSize(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
logger, stop := LoggerWithLogService(zap.NewNop(), service, WithMaxBufferSize(10))
for i := 0; i < 100; i++ {
logger.Info("msg")
}
err := stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
func TestCore_Write_WithFlushInterval(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
// в данном случае stop нам не нужен
logger, _ := LoggerWithLogService(zap.NewNop(), service, WithFlushInterval(1*time.Second))
for j := 0; j < 10; j++ {
logger.Info("msg")
}
time.Sleep(3 * time.Second) // ждем, пока сработает интервал
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
func TestCore_Write_Sync(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
logger, stop := LoggerWithLogService(zap.NewNop(), service)
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
logger.Info("msg")
}
err := logger.Sync()
assert.NoError(t, err)
}
err := stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment