Skip to content
Snippets Groups Projects
Commit 303819b9 authored by ensiouel's avatar ensiouel
Browse files

в core BufferedWriteSyncer заменен на интерфейс WriteSyncer, добавлены тесты

parent 76d254c6
No related branches found
No related tags found
No related merge requests found
...@@ -49,6 +49,7 @@ func (ws *BufferedWriteSyncer) Start() { ...@@ -49,6 +49,7 @@ func (ws *BufferedWriteSyncer) Start() {
ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
ws.flushStop = make(chan struct{}) ws.flushStop = make(chan struct{})
ws.wg.Add(2)
go ws.syncLoop() go ws.syncLoop()
go ws.flushLoop() go ws.flushLoop()
} }
...@@ -62,8 +63,6 @@ func (ws *BufferedWriteSyncer) Stop() error { ...@@ -62,8 +63,6 @@ func (ws *BufferedWriteSyncer) Stop() error {
} }
ws.stopped = true ws.stopped = true
ws.wg.Add(2)
close(ws.flushStop) // завершаем flushLoop close(ws.flushStop) // завершаем flushLoop
err := ws.flush() // очищаем оставшиеся записи err := ws.flush() // очищаем оставшиеся записи
...@@ -95,8 +94,8 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { ...@@ -95,8 +94,8 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
return nil return nil
} }
// Flush освобождает буфер и отправляет копию записей на синхронизацию. // Sync освобождает буфер и отправляет копию записей на синхронизацию.
func (ws *BufferedWriteSyncer) Flush() error { func (ws *BufferedWriteSyncer) Sync() error {
ws.mu.Lock() ws.mu.Lock()
defer ws.mu.Unlock() defer ws.mu.Unlock()
...@@ -136,7 +135,7 @@ func (ws *BufferedWriteSyncer) flushLoop() { ...@@ -136,7 +135,7 @@ func (ws *BufferedWriteSyncer) flushLoop() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
_ = ws.Flush() _ = ws.Sync()
case <-ws.flushStop: case <-ws.flushStop:
return return
} }
......
package zap
import (
"sync"
"testing"
"time"
"git.perx.ru/perxis/perxis-go/pkg/log"
logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestBufferedWriteSyncer_Write(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
ws := &BufferedWriteSyncer{
FlushInterval: defaultFlushInterval,
MaxBufferSize: defaultMaxBufferSize,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
err := ws.Write(&log.Entry{Message: "first log message"})
require.NoError(t, err)
err = ws.Write(&log.Entry{Message: "second log message"})
require.NoError(t, err)
err = ws.Stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 2
}))
}
func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
ws := &BufferedWriteSyncer{
FlushInterval: defaultFlushInterval,
MaxBufferSize: defaultMaxBufferSize,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
err := ws.Write(&log.Entry{Message: "log message"})
require.NoError(t, err)
}(&wg)
}
wg.Wait()
err := ws.Stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 100
}))
}
func TestBufferedWriteSyncer_Flush(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
ws := &BufferedWriteSyncer{
FlushInterval: defaultFlushInterval,
MaxBufferSize: defaultMaxBufferSize,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
err := ws.Write(&log.Entry{Message: "log message"})
require.NoError(t, err)
}
err := ws.Sync()
assert.NoError(t, err)
}
err := ws.Stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
ws := &BufferedWriteSyncer{
FlushInterval: defaultFlushInterval,
MaxBufferSize: 10,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
for i := 0; i < 100; i++ {
err := ws.Write(&log.Entry{Message: "log message"})
require.NoError(t, err)
}
err := ws.Stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
func TestBufferedWriteSyncer_FlushInterval(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
ws := &BufferedWriteSyncer{
FlushInterval: time.Second,
MaxBufferSize: defaultMaxBufferSize,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
for j := 0; j < 10; j++ {
err := ws.Write(&log.Entry{Message: "log message"})
require.NoError(t, err)
}
time.Sleep(3 * time.Second) // ждем, пока сработает интервал
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
...@@ -7,39 +7,42 @@ import ( ...@@ -7,39 +7,42 @@ import (
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
) )
type Core struct { // WriteSyncer отвечает за синхронизацию записей, полученных через метод Write с сервисом логирования.
zapcore.LevelEnabler type WriteSyncer interface {
Write(entry *log.Entry) error
fields []zap.Field Sync() error
writeSyncer *BufferedWriteSyncer
} }
// LoggerWithLogService добавляет функционал отправки записей в log.Service к переданному логгеру. // WithLogService добавляет функционал отправки записей в log.Service к переданному логгеру.
// Вторым параметром возвращается Stop, при вызове которого прекращается синхронизация записей по интервалу. // Вторым параметром возвращается Stop, при вызове которого прекращается синхронизация записей по интервалу.
// Записи уровня Debug игнорируются и не отправляются. // Записи уровня Debug игнорируются и не отправляются.
func LoggerWithLogService(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, func() error) { func WithLogService(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, func() error) {
cfg := newConfig(options) cfg := newConfig(options)
core := &Core{ ws := &BufferedWriteSyncer{
LevelEnabler: zapcore.InfoLevel,
writeSyncer: &BufferedWriteSyncer{
FlushInterval: cfg.flushInterval, FlushInterval: cfg.flushInterval,
MaxBufferSize: cfg.maxBufferSize, MaxBufferSize: cfg.maxBufferSize,
MaxSyncQueueSize: cfg.maxSyncQueueSize, MaxSyncQueueSize: cfg.maxSyncQueueSize,
Service: service, Service: service,
},
} }
core.writeSyncer.Start() ws.Start()
return zap.New(zapcore.NewTee(logger.Core(), &Core{LevelEnabler: zapcore.InfoLevel, ws: ws})), ws.Stop
}
type Core struct {
zapcore.LevelEnabler
return zap.New(zapcore.NewTee(logger.Core(), core)), core.writeSyncer.Stop fields []zap.Field
ws WriteSyncer
} }
func (core *Core) With(fields []zapcore.Field) zapcore.Core { func (core *Core) With(fields []zapcore.Field) zapcore.Core {
return &Core{ return &Core{
LevelEnabler: core.LevelEnabler, LevelEnabler: core.LevelEnabler,
fields: append(core.fields, fields...), fields: append(core.fields, fields...),
writeSyncer: core.writeSyncer, ws: core.ws,
} }
} }
...@@ -51,11 +54,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) ...@@ -51,11 +54,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry)
} }
func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error {
return core.writeSyncer.Write(core.getEntry(entry, fields)) return core.ws.Write(core.getEntry(entry, fields))
} }
func (core *Core) Sync() error { func (core *Core) Sync() error {
return core.writeSyncer.Flush() return core.ws.Sync()
} }
func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry { func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry {
......
...@@ -2,194 +2,63 @@ package zap ...@@ -2,194 +2,63 @@ package zap
import ( import (
"context" "context"
"reflect"
"slices"
"sync"
"testing" "testing"
"time"
"git.perx.ru/perxis/perxis-go/pkg/log" "git.perx.ru/perxis/perxis-go/pkg/log"
logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore"
) )
func TestCore_Write(t *testing.T) { func TestCore_getEntry(t *testing.T) {
service := &logmocks.Service{} core := &Core{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
logger, stop := LoggerWithLogService(zap.NewNop(), service) tests := []struct {
logger.Debug("debug сообщение") // будет проигнорировано name string
input struct {
logger = logger.With( entry zapcore.Entry
Component("Items.Service"), fields []zapcore.Field
) }
want *log.Entry
logger.Info("создан элемент коллекции", }{
{
name: "simple",
input: struct {
entry zapcore.Entry
fields []zapcore.Field
}{
entry: zapcore.Entry{Level: zapcore.InfoLevel, Message: "создан элемент коллекции"},
fields: []zapcore.Field{
zap.String("key", "val"), // будет проигнорировано zap.String("key", "val"), // будет проигнорировано
Category("create"), Category("create"),
Component("Items.Service"),
Event("Items.Create"), Event("Items.Create"),
Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"), Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"),
CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")), CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")),
Attr(nil), Attr("any"),
Tags("tag1", "tag2", "tag3"), Tags("tag1", "tag2", "tag3"),
) },
logger.Warn("изменен элемент коллекции", },
Category("update"), want: &log.Entry{
Event("Items.Update"), LogLevel: log.Level(zapcore.InfoLevel),
Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2"),
CallerFromContext(ContextWithCaller(context.Background(), "/users/UEhW")),
)
err := stop()
require.NoError(t, err)
wantEntries := []*log.Entry{
{
LogLevel: log.Level(zap.InfoLevel),
Message: "создан элемент коллекции", Message: "создан элемент коллекции",
Category: "create", Category: "create",
Component: "Items.Service", Component: "Items.Service",
Event: "Items.Create", Event: "Items.Create",
Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl", Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl",
Caller: "/users/PHVz", Caller: "/users/PHVz",
Attr: nil, Attr: "any",
Tags: []string{"tag1", "tag2", "tag3"}, Tags: []string{"tag1", "tag2", "tag3"},
}, },
{
LogLevel: log.Level(zap.WarnLevel),
Message: "изменен элемент коллекции",
Category: "update",
Component: "Items.Service",
Event: "Items.Update",
Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2",
Caller: "/users/UEhW",
Attr: nil,
Tags: nil,
}, },
} }
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool { for _, tc := range tests {
return len(entries) == 2 && slices.EqualFunc(wantEntries, entries, func(entryA *log.Entry, entryB *log.Entry) bool { t.Run(tc.name, func(t *testing.T) {
entryA.ID = entryB.ID // игнорируем ID, потому что он генерируется случайно got := core.getEntry(tc.input.entry, tc.input.fields)
entryA.Timestamp = entryB.Timestamp // игнорируем Timestamp got.ID = tc.want.ID // игнорируем ID
return reflect.DeepEqual(entryA, entryB) got.Timestamp = tc.want.Timestamp // игнорируем Timestamp
assert.Equal(t, tc.want, got)
}) })
}))
} }
func TestCore_ConcurrentWrite(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
logger, stop := LoggerWithLogService(zap.NewNop(), service)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
logger.Warn("msg")
}(&wg)
}
wg.Wait()
err := stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 100
}))
}
func TestCore_ConcurrentWrite_WithMaxBufferSize(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
logger, stop := LoggerWithLogService(zap.NewNop(), service, WithMaxBufferSize(10))
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
logger.Info("msg")
}(&wg)
}
wg.Wait()
err := stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
func TestCore_Write_WithMaxBufferSize(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
logger, stop := LoggerWithLogService(zap.NewNop(), service, WithMaxBufferSize(10))
for i := 0; i < 100; i++ {
logger.Info("msg")
}
err := stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
func TestCore_Write_WithFlushInterval(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
// в данном случае stop нам не нужен
logger, _ := LoggerWithLogService(zap.NewNop(), service, WithFlushInterval(1*time.Second))
for j := 0; j < 10; j++ {
logger.Info("msg")
}
time.Sleep(3 * time.Second) // ждем, пока сработает интервал
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
}
func TestCore_Write_Sync(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
logger, stop := LoggerWithLogService(zap.NewNop(), service)
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
logger.Info("msg")
}
err := logger.Sync()
assert.NoError(t, err)
}
err := stop()
require.NoError(t, err)
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 10
}))
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment