Skip to content
Snippets Groups Projects
Commit e0fa431b authored by Semyon Krestyaninov's avatar Semyon Krestyaninov
Browse files

refactor

parent 6676741a
No related branches found
No related tags found
No related merge requests found
...@@ -9,12 +9,17 @@ import ( ...@@ -9,12 +9,17 @@ import (
"git.perx.ru/perxis/perxis-go/pkg/log" "git.perx.ru/perxis/perxis-go/pkg/log"
) )
const (
defaultMaxBufferSize = 1000
defaultMaxSyncQueueSize = 16
defaultFlushInterval = 5 * time.Second
)
var SyncQueueOverflow = errors.New("sync queue overflow") var SyncQueueOverflow = errors.New("sync queue overflow")
// BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service. // BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service.
// Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал, // Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал,
// записи отправляются в очередь для синхронизации с log.Service. Таким образом, выполнение потока не блокируется при отправке записей. // записи отправляются в очередь для синхронизации с log.Service.
// В случае переполнения очереди для синхронизации будет возвращена ошибка SyncQueueOverflow.
type BufferedWriteSyncer struct { type BufferedWriteSyncer struct {
// FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию. // FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию.
// //
...@@ -41,28 +46,39 @@ type BufferedWriteSyncer struct { ...@@ -41,28 +46,39 @@ type BufferedWriteSyncer struct {
syncQueue chan []*log.Entry syncQueue chan []*log.Entry
flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен
started bool // started указывает, был ли выполнен start
stopped bool // stopped указывает, был ли выполнен Stop stopped bool // stopped указывает, был ли выполнен Stop
} }
func NewBufferedWriteSyncer(service log.Service, options ...Option) *BufferedWriteSyncer { func (ws *BufferedWriteSyncer) start() {
cfg := newConfig(options) if ws.MaxBufferSize == 0 {
ws.MaxBufferSize = defaultMaxBufferSize
}
if ws.MaxSyncQueueSize == 0 {
ws.MaxSyncQueueSize = defaultMaxSyncQueueSize
}
ws := &BufferedWriteSyncer{ if ws.FlushInterval == 0 {
FlushInterval: cfg.flushInterval, ws.FlushInterval = defaultFlushInterval
MaxBufferSize: cfg.maxBufferSize,
MaxSyncQueueSize: cfg.maxSyncQueueSize,
Service: service,
} }
ws.start()
return ws ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
ws.flushStop = make(chan struct{})
ws.wg.Add(2)
go ws.syncLoop()
go ws.flushLoop()
ws.started = true
} }
func (ws *BufferedWriteSyncer) Stop() error { func (ws *BufferedWriteSyncer) Stop() error {
ws.mu.Lock() ws.mu.Lock()
defer ws.mu.Unlock() defer ws.mu.Unlock()
if ws.stopped { if !ws.started || ws.stopped {
return nil return nil
} }
ws.stopped = true ws.stopped = true
...@@ -79,12 +95,15 @@ func (ws *BufferedWriteSyncer) Stop() error { ...@@ -79,12 +95,15 @@ func (ws *BufferedWriteSyncer) Stop() error {
} }
// Write отправляет запись в буфер. // Write отправляет запись в буфер.
// Когда количество буферизованных записей превышает максимальный размер буфера или когда пройдет заданный интервал, // Когда количество буферизованных записей превышает максимальный размер буфера, буферизированные записи будут отправлены на синхронизацию.
// буферизированные записи будут отправлены на синхронизацию.
func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
ws.mu.Lock() ws.mu.Lock()
defer ws.mu.Unlock() defer ws.mu.Unlock()
if !ws.started {
ws.start()
}
// Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его. // Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его.
if len(ws.buffer)+1 > ws.MaxBufferSize { if len(ws.buffer)+1 > ws.MaxBufferSize {
err := ws.flush() err := ws.flush()
...@@ -98,42 +117,34 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { ...@@ -98,42 +117,34 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
return nil return nil
} }
// Sync освобождает буфер и отправляет копию записей на синхронизацию. // Sync освобождает буфер и отправляет буферизированные записи на синхронизацию.
func (ws *BufferedWriteSyncer) Sync() error { func (ws *BufferedWriteSyncer) Sync() error {
ws.mu.Lock() ws.mu.Lock()
defer ws.mu.Unlock() defer ws.mu.Unlock()
if ws.started {
return ws.flush() return ws.flush()
} }
func (ws *BufferedWriteSyncer) start() { return nil
ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
ws.flushStop = make(chan struct{})
ws.wg.Add(2)
go ws.syncLoop()
go ws.flushLoop()
} }
// flush освобождает буфер и отправляет копию записей на синхронизацию. Не является безопасным для конкурентного вызова. // flush освобождает буфер и отправляет буферизированные записи на синхронизацию.
// Если очередь на синхронизацию переполнена, будет возвращена ошибка SyncQueueOverflow
//
// ВНИМАНИЕ: Не является безопасным для конкурентного вызова.
func (ws *BufferedWriteSyncer) flush() error { func (ws *BufferedWriteSyncer) flush() error {
if len(ws.buffer) == 0 { if len(ws.buffer) == 0 {
return nil return nil
} }
// Проверяем, не достигли ли мы предела размера очереди. Если это так, возвращаем ошибку.
if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize { if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize {
return SyncQueueOverflow return SyncQueueOverflow
} }
// Создаем копию буфера для отправки в очередь на синхронизацию. ws.syncQueue <- ws.buffer
entries := make([]*log.Entry, len(ws.buffer)) ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
copy(entries, ws.buffer)
// Очищаем буфер с переиспользованием ёмкости.
ws.buffer = (ws.buffer)[:0]
ws.syncQueue <- entries
return nil return nil
} }
...@@ -156,7 +167,7 @@ func (ws *BufferedWriteSyncer) flushLoop() { ...@@ -156,7 +167,7 @@ func (ws *BufferedWriteSyncer) flushLoop() {
} }
} }
// syncLoop синхронизирует полученные записи с log.Service. // syncLoop синхронизирует записи с log.Service.
func (ws *BufferedWriteSyncer) syncLoop() { func (ws *BufferedWriteSyncer) syncLoop() {
defer ws.wg.Done() defer ws.wg.Done()
......
...@@ -22,7 +22,7 @@ func TestBufferedWriteSyncer_Write(t *testing.T) { ...@@ -22,7 +22,7 @@ func TestBufferedWriteSyncer_Write(t *testing.T) {
}). }).
Once() Once()
ws := NewBufferedWriteSyncer(service) ws := &BufferedWriteSyncer{Service: service}
err := ws.Write(&log.Entry{Message: "first log message"}) err := ws.Write(&log.Entry{Message: "first log message"})
require.NoError(t, err) require.NoError(t, err)
...@@ -46,7 +46,7 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { ...@@ -46,7 +46,7 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) {
}). }).
Once() Once()
ws := NewBufferedWriteSyncer(service) ws := &BufferedWriteSyncer{Service: service}
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
...@@ -77,7 +77,7 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) { ...@@ -77,7 +77,7 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) {
}). }).
Times(10) Times(10)
ws := NewBufferedWriteSyncer(service) ws := &BufferedWriteSyncer{Service: service}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
...@@ -104,7 +104,7 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) { ...@@ -104,7 +104,7 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) {
}). }).
Times(10) Times(10)
ws := NewBufferedWriteSyncer(service, WithMaxBufferSize(10)) ws := &BufferedWriteSyncer{Service: service, MaxBufferSize: 10}
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
err := ws.Write(&log.Entry{Message: "log message"}) err := ws.Write(&log.Entry{Message: "log message"})
...@@ -127,7 +127,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { ...@@ -127,7 +127,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) {
}). }).
Once() Once()
ws := NewBufferedWriteSyncer(service, WithFlushInterval(time.Second)) ws := &BufferedWriteSyncer{Service: service, FlushInterval: time.Second}
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
err := ws.Write(&log.Entry{Message: "log message"}) err := ws.Write(&log.Entry{Message: "log message"})
......
...@@ -4,12 +4,6 @@ import ( ...@@ -4,12 +4,6 @@ import (
"time" "time"
) )
const (
defaultMaxBufferSize = 1000
defaultMaxSyncQueueSize = 16
defaultFlushInterval = 5 * time.Second
)
type config struct { type config struct {
maxBufferSize int maxBufferSize int
maxSyncQueueSize int maxSyncQueueSize int
...@@ -22,18 +16,6 @@ func newConfig(options []Option) *config { ...@@ -22,18 +16,6 @@ func newConfig(options []Option) *config {
option(cfg) option(cfg)
} }
if cfg.maxBufferSize == 0 {
cfg.maxBufferSize = defaultMaxBufferSize
}
if cfg.maxSyncQueueSize == 0 {
cfg.maxSyncQueueSize = defaultMaxSyncQueueSize
}
if cfg.flushInterval == 0 {
cfg.flushInterval = defaultFlushInterval
}
return cfg return cfg
} }
......
...@@ -17,14 +17,14 @@ type WriteSyncer interface { ...@@ -17,14 +17,14 @@ type WriteSyncer interface {
type Core struct { type Core struct {
zapcore.LevelEnabler zapcore.LevelEnabler
writeSyncer WriteSyncer WriteSyncer WriteSyncer
fields []zap.Field fields []zap.Field
} }
func (core *Core) With(fields []zapcore.Field) zapcore.Core { func (core *Core) With(fields []zapcore.Field) zapcore.Core {
return &Core{ return &Core{
LevelEnabler: core.LevelEnabler, LevelEnabler: core.LevelEnabler,
writeSyncer: core.writeSyncer, WriteSyncer: core.WriteSyncer,
fields: append(core.fields, fields...), fields: append(core.fields, fields...),
} }
} }
...@@ -37,11 +37,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry) ...@@ -37,11 +37,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry)
} }
func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error { func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error {
return core.writeSyncer.Write(core.getEntry(entry, fields)) return core.WriteSyncer.Write(core.getEntry(entry, fields))
} }
func (core *Core) Sync() error { func (core *Core) Sync() error {
return core.writeSyncer.Sync() return core.WriteSyncer.Sync()
} }
func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry { func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry {
......
package zap package zap
import ( import (
"git.perx.ru/perxis/perxis-go/pkg/log"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
) )
// WithLogService объединяет переданный логгер с ядром, которое кодирует и дублирует записи в WriteSyncer. // NewLogger создает логгер, который отправляет записи в log.Service,
// Пример использования: // вторым параметром возвращается функция, которая должна быть вызвана для остановки логгера.
// func NewLogger(service log.Service, options ...Option) (*zap.Logger, func() error) {
// func main() { cfg := newConfig(options)
// service := ... // ваш log.Service
// ws := NewBufferedWriteSyncer(service) ws := &BufferedWriteSyncer{
// defer ws.Stop() FlushInterval: cfg.flushInterval,
// MaxBufferSize: cfg.maxBufferSize,
// logger := ... // ваш логгер, который нужно обернуть MaxSyncQueueSize: cfg.maxSyncQueueSize,
// logger = WithLogService(logger, ws) Service: service,
// }
// // ... return zap.New(&Core{
// } LevelEnabler: zapcore.InfoLevel,
func WithLogService(logger *zap.Logger, writeSyncer WriteSyncer) *zap.Logger { WriteSyncer: ws,
return zap.New(zapcore.NewTee(logger.Core(), &Core{LevelEnabler: zapcore.InfoLevel, writeSyncer: writeSyncer})) }), ws.Stop
}
// MergeLoggers объединяет два логгера в один.
func MergeLoggers(logger1 *zap.Logger, logger2 *zap.Logger) *zap.Logger {
return zap.New(zapcore.NewTee(logger1.Core(), logger2.Core()))
} }
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks" logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
) )
...@@ -23,7 +22,6 @@ func TestExample(t *testing.T) { ...@@ -23,7 +22,6 @@ func TestExample(t *testing.T) {
Event: "Items.Create", Event: "Items.Create",
Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl", Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl",
Caller: "/users/PHVz", Caller: "/users/PHVz",
Attr: "any",
Tags: []string{"tag1", "tag2", "tag3"}, Tags: []string{"tag1", "tag2", "tag3"},
}, },
{ {
...@@ -34,6 +32,7 @@ func TestExample(t *testing.T) { ...@@ -34,6 +32,7 @@ func TestExample(t *testing.T) {
Event: "Items.Update", Event: "Items.Update",
Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2cw", Object: "/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2cw",
Caller: "/users/PHVz", Caller: "/users/PHVz",
Attr: map[string]map[string]any{"title": {"old": "old title", "new": "new title"}},
}, },
} }
...@@ -52,8 +51,7 @@ func TestExample(t *testing.T) { ...@@ -52,8 +51,7 @@ func TestExample(t *testing.T) {
}). }).
Once() Once()
ws := NewBufferedWriteSyncer(service) logger, stop := NewLogger(service)
logger := WithLogService(zap.NewNop(), ws)
logger.Info("создан элемент коллекции", logger.Info("создан элемент коллекции",
Category("create"), Category("create"),
...@@ -61,7 +59,6 @@ func TestExample(t *testing.T) { ...@@ -61,7 +59,6 @@ func TestExample(t *testing.T) {
Event("Items.Create"), Event("Items.Create"),
Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"), Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"),
Caller("/users/PHVz"), Caller("/users/PHVz"),
Attr("any"),
Tags("tag1", "tag2", "tag3"), Tags("tag1", "tag2", "tag3"),
) )
logger.Warn("изменен элемент коллекции", logger.Warn("изменен элемент коллекции",
...@@ -70,9 +67,10 @@ func TestExample(t *testing.T) { ...@@ -70,9 +67,10 @@ func TestExample(t *testing.T) {
Event("Items.Update"), Event("Items.Update"),
Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2cw"), Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2cw"),
Caller("/users/PHVz"), Caller("/users/PHVz"),
Attr(map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}),
) )
err := ws.Stop() err := stop()
assert.NoError(t, err) assert.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment