Skip to content
Snippets Groups Projects
Commit 7149ed62 authored by ensiouel's avatar ensiouel
Browse files

refactor

parent 303819b9
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,7 @@ import (
var SyncQueueOverflow = errors.New("sync queue overflow")
// BufferedWriteSyncer буферизирует записи в памяти и отправляет их в log.Service.
// BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service.
// Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал,
// записи отправляются в очередь для синхронизации с log.Service. Таким образом, выполнение потока не блокируется при отправке записей.
// В случае переполнения очереди для синхронизации будет возвращена ошибка SyncQueueOverflow.
......@@ -44,14 +44,18 @@ type BufferedWriteSyncer struct {
stopped bool // stopped указывает, был ли выполнен Stop
}
func (ws *BufferedWriteSyncer) Start() {
ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
ws.flushStop = make(chan struct{})
func NewBufferedWriteSyncer(service log.Service, options ...Option) *BufferedWriteSyncer {
cfg := newConfig(options)
ws.wg.Add(2)
go ws.syncLoop()
go ws.flushLoop()
ws := &BufferedWriteSyncer{
FlushInterval: cfg.flushInterval,
MaxBufferSize: cfg.maxBufferSize,
MaxSyncQueueSize: cfg.maxSyncQueueSize,
Service: service,
}
ws.start()
return ws
}
func (ws *BufferedWriteSyncer) Stop() error {
......@@ -102,6 +106,16 @@ func (ws *BufferedWriteSyncer) Sync() error {
return ws.flush()
}
func (ws *BufferedWriteSyncer) start() {
ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
ws.flushStop = make(chan struct{})
ws.wg.Add(2)
go ws.syncLoop()
go ws.flushLoop()
}
// flush освобождает буфер и отправляет копию записей на синхронизацию. Не является безопасным для конкурентного вызова.
func (ws *BufferedWriteSyncer) flush() error {
if len(ws.buffer) == 0 {
......
......@@ -17,13 +17,7 @@ func TestBufferedWriteSyncer_Write(t *testing.T) {
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
ws := &BufferedWriteSyncer{
FlushInterval: defaultFlushInterval,
MaxBufferSize: defaultMaxBufferSize,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
ws := NewBufferedWriteSyncer(service)
err := ws.Write(&log.Entry{Message: "first log message"})
require.NoError(t, err)
......@@ -44,13 +38,7 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) {
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
ws := &BufferedWriteSyncer{
FlushInterval: defaultFlushInterval,
MaxBufferSize: defaultMaxBufferSize,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
ws := NewBufferedWriteSyncer(service)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
......@@ -78,13 +66,7 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) {
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
ws := &BufferedWriteSyncer{
FlushInterval: defaultFlushInterval,
MaxBufferSize: defaultMaxBufferSize,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
ws := NewBufferedWriteSyncer(service)
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
......@@ -108,13 +90,7 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) {
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Times(10)
ws := &BufferedWriteSyncer{
FlushInterval: defaultFlushInterval,
MaxBufferSize: 10,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
ws := NewBufferedWriteSyncer(service, WithMaxBufferSize(10))
for i := 0; i < 100; i++ {
err := ws.Write(&log.Entry{Message: "log message"})
......@@ -134,13 +110,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) {
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
ws := &BufferedWriteSyncer{
FlushInterval: time.Second,
MaxBufferSize: defaultMaxBufferSize,
MaxSyncQueueSize: defaultMaxSyncQueueSize,
Service: service,
}
ws.Start()
ws := NewBufferedWriteSyncer(service, WithFlushInterval(time.Second))
for j := 0; j < 10; j++ {
err := ws.Write(&log.Entry{Message: "log message"})
......
......@@ -7,42 +7,25 @@ import (
"go.uber.org/zap/zapcore"
)
// WriteSyncer отвечает за синхронизацию записей, полученных через метод Write с сервисом логирования.
// WriteSyncer отвечает за хранение и синхронизацию log.Entry
type WriteSyncer interface {
Write(entry *log.Entry) error
Sync() error
}
// WithLogService добавляет функционал отправки записей в log.Service к переданному логгеру.
// Вторым параметром возвращается Stop, при вызове которого прекращается синхронизация записей по интервалу.
// Записи уровня Debug игнорируются и не отправляются.
func WithLogService(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, func() error) {
cfg := newConfig(options)
ws := &BufferedWriteSyncer{
FlushInterval: cfg.flushInterval,
MaxBufferSize: cfg.maxBufferSize,
MaxSyncQueueSize: cfg.maxSyncQueueSize,
Service: service,
}
ws.Start()
return zap.New(zapcore.NewTee(logger.Core(), &Core{LevelEnabler: zapcore.InfoLevel, ws: ws})), ws.Stop
}
// Core кодирует zapcore.Entry в log.Entry и отправляет их в WriteSyncer
type Core struct {
zapcore.LevelEnabler
writeSyncer WriteSyncer
fields []zap.Field
ws WriteSyncer
}
func (core *Core) With(fields []zapcore.Field) zapcore.Core {
return &Core{
LevelEnabler: core.LevelEnabler,
writeSyncer: core.writeSyncer,
fields: append(core.fields, fields...),
ws: core.ws,
}
}
......@@ -54,11 +37,11 @@ func (core *Core) Check(entry zapcore.Entry, checkedEntry *zapcore.CheckedEntry)
}
func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error {
return core.ws.Write(core.getEntry(entry, fields))
return core.writeSyncer.Write(core.getEntry(entry, fields))
}
func (core *Core) Sync() error {
return core.ws.Sync()
return core.writeSyncer.Sync()
}
func (core *Core) getEntry(entry zapcore.Entry, fields []zapcore.Field) *log.Entry {
......
package zap
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// WithLogService объединяет переданный логгер с ядром, которое кодирует и дублирует записи в WriteSyncer.
// Пример использования:
//
// func main() {
// service := ... // ваш log.Service
// ws := NewBufferedWriteSyncer(service)
// defer ws.Stop()
//
// logger := ... // ваш логгер, который нужно обернуть
// logger = WithLogService(logger, ws)
//
// // ...
// }
func WithLogService(logger *zap.Logger, writeSyncer WriteSyncer) *zap.Logger {
return zap.New(zapcore.NewTee(logger.Core(), &Core{LevelEnabler: zapcore.InfoLevel, writeSyncer: writeSyncer}))
}
package zap
import (
"context"
"testing"
"git.perx.ru/perxis/perxis-go/pkg/log"
logmocks "git.perx.ru/perxis/perxis-go/pkg/log/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
)
func TestExample(t *testing.T) {
service := &logmocks.Service{}
service.On("Log", mock.Anything, mock.Anything).
Return(nil).Once()
func() {
ws := NewBufferedWriteSyncer(service)
defer func() {
err := ws.Stop()
assert.NoError(t, err)
}()
logger := WithLogService(zap.NewNop(), ws)
logger.Info("создан элемент коллекции",
Category("create"),
Component("Items.Service"),
Event("Items.Create"),
Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl"),
CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")),
Attr("any"),
Tags("tag1", "tag2", "tag3"),
)
logger.Warn("изменен элемент коллекции",
Category("update"),
Component("Items.Service"),
Event("Items.Update"),
Object("/spaces/WPNN/envs/9VGP/cols/GxNv/items/W0fl/revs/cmV2cw"),
CallerFromContext(ContextWithCaller(context.Background(), "/users/PHVz")),
)
}()
service.AssertCalled(t, "Log", mock.Anything, mock.MatchedBy(func(entries []*log.Entry) bool {
return len(entries) == 2
}))
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment