Skip to content
Snippets Groups Projects
Commit 597105d1 authored by Semyon Krestyaninov's avatar Semyon Krestyaninov
Browse files

refactor

parent f5b586d4
No related branches found
No related tags found
No related merge requests found
...@@ -9,12 +9,6 @@ import ( ...@@ -9,12 +9,6 @@ import (
"git.perx.ru/perxis/perxis-go/pkg/log" "git.perx.ru/perxis/perxis-go/pkg/log"
) )
const (
defaultMaxBufferSize = 1000
defaultMaxSyncQueueSize = 16
defaultFlushInterval = 5 * time.Second
)
var SyncQueueOverflow = errors.New("sync queue overflow") var SyncQueueOverflow = errors.New("sync queue overflow")
// BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service. // BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service.
...@@ -50,19 +44,17 @@ type BufferedWriteSyncer struct { ...@@ -50,19 +44,17 @@ type BufferedWriteSyncer struct {
stopped bool // stopped указывает, был ли выполнен Stop stopped bool // stopped указывает, был ли выполнен Stop
} }
func (ws *BufferedWriteSyncer) Start() { func NewBufferedWriteSyncer(service log.Service, options ...Option) *BufferedWriteSyncer {
if ws.MaxBufferSize == 0 { cfg := newConfig(options)
ws.MaxBufferSize = defaultMaxBufferSize return &BufferedWriteSyncer{
FlushInterval: cfg.flushInterval,
MaxBufferSize: cfg.maxBufferSize,
MaxSyncQueueSize: cfg.maxSyncQueueSize,
Service: service,
} }
if ws.MaxSyncQueueSize == 0 {
ws.MaxSyncQueueSize = defaultMaxSyncQueueSize
}
if ws.FlushInterval == 0 {
ws.FlushInterval = defaultFlushInterval
} }
func (ws *BufferedWriteSyncer) start() {
ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize) ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize) ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
ws.flushStop = make(chan struct{}) ws.flushStop = make(chan struct{})
...@@ -74,22 +66,24 @@ func (ws *BufferedWriteSyncer) Start() { ...@@ -74,22 +66,24 @@ func (ws *BufferedWriteSyncer) Start() {
ws.started = true ws.started = true
} }
func (ws *BufferedWriteSyncer) Stop() { func (ws *BufferedWriteSyncer) Stop() error {
ws.mu.Lock() ws.mu.Lock()
defer ws.mu.Unlock() defer ws.mu.Unlock()
if !ws.started || ws.stopped { if !ws.started || ws.stopped {
return return nil
} }
ws.stopped = true ws.stopped = true
close(ws.flushStop) // завершаем flushLoop close(ws.flushStop) // завершаем flushLoop
_ = ws.flush() // очищаем оставшиеся записи err := ws.flush() // очищаем оставшиеся записи
close(ws.syncQueue) // завершаем syncLoop close(ws.syncQueue) // завершаем syncLoop
ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop
return err
} }
// Write отправляет запись в буфер. // Write отправляет запись в буфер.
...@@ -99,7 +93,7 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { ...@@ -99,7 +93,7 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
defer ws.mu.Unlock() defer ws.mu.Unlock()
if !ws.started { if !ws.started {
panic("BufferedWriteSyncer must be started") ws.start()
} }
// Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его. // Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его.
......
...@@ -18,12 +18,11 @@ func TestBufferedWriteSyncer_Write(t *testing.T) { ...@@ -18,12 +18,11 @@ func TestBufferedWriteSyncer_Write(t *testing.T) {
Return(nil). Return(nil).
Run(func(args mock.Arguments) { Run(func(args mock.Arguments) {
entries := args.Get(1).([]*log.Entry) entries := args.Get(1).([]*log.Entry)
assert.Equal(t, 2, len(entries)) require.Equal(t, 2, len(entries))
}). }).
Once() Once()
ws := &BufferedWriteSyncer{Service: service} ws := NewBufferedWriteSyncer(service)
ws.Start()
err := ws.Write(&log.Entry{Message: "first log message"}) err := ws.Write(&log.Entry{Message: "first log message"})
require.NoError(t, err) require.NoError(t, err)
...@@ -31,7 +30,8 @@ func TestBufferedWriteSyncer_Write(t *testing.T) { ...@@ -31,7 +30,8 @@ func TestBufferedWriteSyncer_Write(t *testing.T) {
err = ws.Write(&log.Entry{Message: "second log message"}) err = ws.Write(&log.Entry{Message: "second log message"})
require.NoError(t, err) require.NoError(t, err)
ws.Stop() err = ws.Stop()
require.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
...@@ -42,12 +42,11 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { ...@@ -42,12 +42,11 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) {
Return(nil). Return(nil).
Run(func(args mock.Arguments) { Run(func(args mock.Arguments) {
entries := args.Get(1).([]*log.Entry) entries := args.Get(1).([]*log.Entry)
assert.Equal(t, 100, len(entries)) require.Equal(t, 100, len(entries))
}). }).
Once() Once()
ws := &BufferedWriteSyncer{Service: service} ws := NewBufferedWriteSyncer(service)
ws.Start()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
...@@ -62,7 +61,8 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { ...@@ -62,7 +61,8 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) {
wg.Wait() wg.Wait()
ws.Stop() err := ws.Stop()
require.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
...@@ -73,12 +73,11 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) { ...@@ -73,12 +73,11 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) {
Return(nil). Return(nil).
Run(func(args mock.Arguments) { Run(func(args mock.Arguments) {
entries := args.Get(1).([]*log.Entry) entries := args.Get(1).([]*log.Entry)
assert.Equal(t, 10, len(entries)) require.Equal(t, 10, len(entries))
}). }).
Times(10) Times(10)
ws := &BufferedWriteSyncer{Service: service} ws := NewBufferedWriteSyncer(service)
ws.Start()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
...@@ -86,10 +85,11 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) { ...@@ -86,10 +85,11 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
err := ws.Sync() err := ws.Sync()
assert.NoError(t, err) require.NoError(t, err)
} }
ws.Stop() err := ws.Stop()
require.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
...@@ -104,15 +104,15 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) { ...@@ -104,15 +104,15 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) {
}). }).
Times(10) Times(10)
ws := &BufferedWriteSyncer{Service: service, MaxBufferSize: 10} ws := NewBufferedWriteSyncer(service, WithMaxBufferSize(10))
ws.Start()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
err := ws.Write(&log.Entry{Message: "log message"}) err := ws.Write(&log.Entry{Message: "log message"})
require.NoError(t, err) require.NoError(t, err)
} }
ws.Stop() err := ws.Stop()
require.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
...@@ -127,8 +127,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { ...@@ -127,8 +127,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) {
}). }).
Once() Once()
ws := &BufferedWriteSyncer{Service: service, FlushInterval: time.Second} ws := NewBufferedWriteSyncer(service, WithFlushInterval(time.Second))
ws.Start()
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
err := ws.Write(&log.Entry{Message: "log message"}) err := ws.Write(&log.Entry{Message: "log message"})
...@@ -137,12 +136,8 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { ...@@ -137,12 +136,8 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) {
time.Sleep(3 * time.Second) // ждем, пока сработает интервал time.Sleep(3 * time.Second) // ждем, пока сработает интервал
service.AssertExpectations(t) err := ws.Stop()
} require.NoError(t, err)
func TestBufferedWriteSyncer_NotStartedWrite(t *testing.T) { service.AssertExpectations(t)
ws := &BufferedWriteSyncer{}
assert.Panics(t, func() {
_ = ws.Write(&log.Entry{Message: "log message"})
})
} }
...@@ -4,10 +4,16 @@ import ( ...@@ -4,10 +4,16 @@ import (
"time" "time"
) )
const (
defaultMaxBufferSize = 1000
defaultMaxSyncQueueSize = 16
defaultFlushInterval = 5 * time.Second
)
type config struct { type config struct {
flushInterval time.Duration
maxBufferSize int maxBufferSize int
maxSyncQueueSize int maxSyncQueueSize int
flushInterval time.Duration
} }
func newConfig(options []Option) *config { func newConfig(options []Option) *config {
...@@ -16,6 +22,18 @@ func newConfig(options []Option) *config { ...@@ -16,6 +22,18 @@ func newConfig(options []Option) *config {
option(cfg) option(cfg)
} }
if cfg.flushInterval == 0 {
cfg.flushInterval = defaultFlushInterval
}
if cfg.maxBufferSize == 0 {
cfg.maxBufferSize = defaultMaxBufferSize
}
if cfg.maxSyncQueueSize == 0 {
cfg.maxSyncQueueSize = defaultMaxSyncQueueSize
}
return cfg return cfg
} }
......
...@@ -17,14 +17,21 @@ type WriteSyncer interface { ...@@ -17,14 +17,21 @@ type WriteSyncer interface {
type Core struct { type Core struct {
zapcore.LevelEnabler zapcore.LevelEnabler
WriteSyncer WriteSyncer writeSyncer WriteSyncer
fields []zap.Field fields []zap.Field
} }
func NewCore(writeSyncer WriteSyncer) *Core {
return &Core{
LevelEnabler: zapcore.InfoLevel,
writeSyncer: writeSyncer,
}
}
func (core *Core) With(fields []zapcore.Field) zapcore.Core { func (core *Core) With(fields []zapcore.Field) zapcore.Core {
return &Core{ return &Core{
LevelEnabler: core.LevelEnabler, LevelEnabler: core.LevelEnabler,
WriteSyncer: core.WriteSyncer, writeSyncer: core.writeSyncer,
fields: append(core.fields, fields...), fields: append(core.fields, fields...),
} }
} }
...@@ -37,11 +44,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) ...@@ -37,11 +44,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry)
} }
func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error {
return core.WriteSyncer.Write(core.getEntry(entry, fields)) return core.writeSyncer.Write(core.getEntry(entry, fields))
} }
func (core *Core) Sync() error { func (core *Core) Sync() error {
return core.WriteSyncer.Sync() return core.writeSyncer.Sync()
} }
func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry { func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry {
......
...@@ -4,13 +4,13 @@ import ( ...@@ -4,13 +4,13 @@ import (
"testing" "testing"
"git.perx.ru/perxis/perxis-go/pkg/log" "git.perx.ru/perxis/perxis-go/pkg/log"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/require"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
) )
func TestCore_getEntry(t *testing.T) { func TestCore_getEntry(t *testing.T) {
core := &Core{} core := NewCore(nil)
tests := []struct { tests := []struct {
name string name string
...@@ -57,7 +57,7 @@ func TestCore_getEntry(t *testing.T) { ...@@ -57,7 +57,7 @@ func TestCore_getEntry(t *testing.T) {
got := core.getEntry(tc.input.entry, tc.input.fields) got := core.getEntry(tc.input.entry, tc.input.fields)
got.ID = tc.want.ID // игнорируем ID got.ID = tc.want.ID // игнорируем ID
got.Timestamp = tc.want.Timestamp // игнорируем Timestamp got.Timestamp = tc.want.Timestamp // игнорируем Timestamp
assert.Equal(t, tc.want, got) require.Equal(t, tc.want, got)
}) })
} }
} }
package zap
import (
"git.perx.ru/perxis/perxis-go/pkg/content"
"git.perx.ru/perxis/perxis-go/pkg/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// New создает логгер, который отправляет записи в log.Service,
// вторым параметром возвращается интерфейс для запуска и остановки логгера
func New(service log.Service, options ...Option) (*zap.Logger, content.Runnable) {
cfg := newConfig(options)
ws := &BufferedWriteSyncer{
FlushInterval: cfg.flushInterval,
MaxBufferSize: cfg.maxBufferSize,
MaxSyncQueueSize: cfg.maxSyncQueueSize,
Service: service,
}
return zap.New(&Core{LevelEnabler: zapcore.InfoLevel, WriteSyncer: ws}), ws
}
// WithLogger добавляет к переданном логгеру логгер, который отправляет записи в log.Service.
// вторым параметром возвращается интерфейс для запуска и остановки логгера
func WithLogger(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, content.Runnable) {
logLogger, runnable := New(service, options...)
return zap.New(zapcore.NewTee(logger.Core(), logLogger.Core())), runnable
}
...@@ -7,8 +7,9 @@ import ( ...@@ -7,8 +7,9 @@ import (
"git.perx.ru/perxis/perxis-go/pkg/log" "git.perx.ru/perxis/perxis-go/pkg/log"
logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks" logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
) )
...@@ -41,9 +42,9 @@ func TestExample(t *testing.T) { ...@@ -41,9 +42,9 @@ func TestExample(t *testing.T) {
Return(nil). Return(nil).
Run(func(args mock.Arguments) { Run(func(args mock.Arguments) {
entries := args.Get(1).([]*log.Entry) entries := args.Get(1).([]*log.Entry)
assert.True(t, slices.EqualFunc(wantEntries, entries, func(wantEntry, gotEntry *log.Entry) bool { require.True(t, slices.EqualFunc(wantEntries, entries, func(wantEntry, gotEntry *log.Entry) bool {
assert.NotEmpty(t, gotEntry.ID) require.NotEmpty(t, gotEntry.ID)
assert.NotEmpty(t, gotEntry.Timestamp) require.NotEmpty(t, gotEntry.Timestamp)
gotEntry.ID = wantEntry.ID // игнорируем ID gotEntry.ID = wantEntry.ID // игнорируем ID
gotEntry.Timestamp = wantEntry.Timestamp // игнорируем Timestamp gotEntry.Timestamp = wantEntry.Timestamp // игнорируем Timestamp
return reflect.DeepEqual(wantEntry, gotEntry) return reflect.DeepEqual(wantEntry, gotEntry)
...@@ -51,8 +52,8 @@ func TestExample(t *testing.T) { ...@@ -51,8 +52,8 @@ func TestExample(t *testing.T) {
}). }).
Once() Once()
logger, runnable := New(service) ws := NewBufferedWriteSyncer(service)
runnable.Start() logger := zap.New(NewCore(ws))
logger.Info("создан элемент коллекции", logger.Info("создан элемент коллекции",
Category("create"), Category("create"),
...@@ -71,7 +72,8 @@ func TestExample(t *testing.T) { ...@@ -71,7 +72,8 @@ func TestExample(t *testing.T) {
Attr(map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}), Attr(map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}),
) )
runnable.Stop() err := ws.Stop()
require.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment