Skip to content
Snippets Groups Projects
Commit f5b586d4 authored by Semyon Krestyaninov's avatar Semyon Krestyaninov
Browse files

refactor

parent 568f1fb4
No related branches found
No related tags found
No related merge requests found
...@@ -46,11 +46,11 @@ type BufferedWriteSyncer struct { ...@@ -46,11 +46,11 @@ type BufferedWriteSyncer struct {
syncQueue chan []*log.Entry syncQueue chan []*log.Entry
flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен
started bool // started указывает, был ли выполнен start started bool // started указывает, был ли выполнен Start
stopped bool // stopped указывает, был ли выполнен Stop stopped bool // stopped указывает, был ли выполнен Stop
} }
func (ws *BufferedWriteSyncer) start() { func (ws *BufferedWriteSyncer) Start() {
if ws.MaxBufferSize == 0 { if ws.MaxBufferSize == 0 {
ws.MaxBufferSize = defaultMaxBufferSize ws.MaxBufferSize = defaultMaxBufferSize
} }
...@@ -74,24 +74,22 @@ func (ws *BufferedWriteSyncer) start() { ...@@ -74,24 +74,22 @@ func (ws *BufferedWriteSyncer) start() {
ws.started = true ws.started = true
} }
func (ws *BufferedWriteSyncer) Stop() error { func (ws *BufferedWriteSyncer) Stop() {
ws.mu.Lock() ws.mu.Lock()
defer ws.mu.Unlock() defer ws.mu.Unlock()
if !ws.started || ws.stopped { if !ws.started || ws.stopped {
return nil return
} }
ws.stopped = true ws.stopped = true
close(ws.flushStop) // завершаем flushLoop close(ws.flushStop) // завершаем flushLoop
err := ws.flush() // очищаем оставшиеся записи _ = ws.flush() // очищаем оставшиеся записи
close(ws.syncQueue) // завершаем syncLoop close(ws.syncQueue) // завершаем syncLoop
ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop
return err
} }
// Write отправляет запись в буфер. // Write отправляет запись в буфер.
...@@ -101,7 +99,7 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error { ...@@ -101,7 +99,7 @@ func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
defer ws.mu.Unlock() defer ws.mu.Unlock()
if !ws.started { if !ws.started {
ws.start() panic("BufferedWriteSyncer must be started")
} }
// Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его. // Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его.
......
...@@ -23,6 +23,7 @@ func TestBufferedWriteSyncer_Write(t *testing.T) { ...@@ -23,6 +23,7 @@ func TestBufferedWriteSyncer_Write(t *testing.T) {
Once() Once()
ws := &BufferedWriteSyncer{Service: service} ws := &BufferedWriteSyncer{Service: service}
ws.Start()
err := ws.Write(&log.Entry{Message: "first log message"}) err := ws.Write(&log.Entry{Message: "first log message"})
require.NoError(t, err) require.NoError(t, err)
...@@ -30,8 +31,7 @@ func TestBufferedWriteSyncer_Write(t *testing.T) { ...@@ -30,8 +31,7 @@ func TestBufferedWriteSyncer_Write(t *testing.T) {
err = ws.Write(&log.Entry{Message: "second log message"}) err = ws.Write(&log.Entry{Message: "second log message"})
require.NoError(t, err) require.NoError(t, err)
err = ws.Stop() ws.Stop()
require.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
...@@ -47,6 +47,7 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { ...@@ -47,6 +47,7 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) {
Once() Once()
ws := &BufferedWriteSyncer{Service: service} ws := &BufferedWriteSyncer{Service: service}
ws.Start()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
...@@ -61,8 +62,7 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) { ...@@ -61,8 +62,7 @@ func TestBufferedWriteSyncer_Write_Concurrent(t *testing.T) {
wg.Wait() wg.Wait()
err := ws.Stop() ws.Stop()
require.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
...@@ -78,6 +78,7 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) { ...@@ -78,6 +78,7 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) {
Times(10) Times(10)
ws := &BufferedWriteSyncer{Service: service} ws := &BufferedWriteSyncer{Service: service}
ws.Start()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
...@@ -88,8 +89,7 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) { ...@@ -88,8 +89,7 @@ func TestBufferedWriteSyncer_Flush(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
} }
err := ws.Stop() ws.Stop()
require.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
...@@ -105,14 +105,14 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) { ...@@ -105,14 +105,14 @@ func TestBufferedWriteSyncer_MaxBufferSize(t *testing.T) {
Times(10) Times(10)
ws := &BufferedWriteSyncer{Service: service, MaxBufferSize: 10} ws := &BufferedWriteSyncer{Service: service, MaxBufferSize: 10}
ws.Start()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
err := ws.Write(&log.Entry{Message: "log message"}) err := ws.Write(&log.Entry{Message: "log message"})
require.NoError(t, err) require.NoError(t, err)
} }
err := ws.Stop() ws.Stop()
require.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
...@@ -128,6 +128,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { ...@@ -128,6 +128,7 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) {
Once() Once()
ws := &BufferedWriteSyncer{Service: service, FlushInterval: time.Second} ws := &BufferedWriteSyncer{Service: service, FlushInterval: time.Second}
ws.Start()
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
err := ws.Write(&log.Entry{Message: "log message"}) err := ws.Write(&log.Entry{Message: "log message"})
...@@ -138,3 +139,10 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) { ...@@ -138,3 +139,10 @@ func TestBufferedWriteSyncer_FlushInterval(t *testing.T) {
service.AssertExpectations(t) service.AssertExpectations(t)
} }
func TestBufferedWriteSyncer_NotStartedWrite(t *testing.T) {
ws := &BufferedWriteSyncer{}
assert.Panics(t, func() {
_ = ws.Write(&log.Entry{Message: "log message"})
})
}
package zap package zap
import ( import (
"git.perx.ru/perxis/perxis-go/pkg/content"
"git.perx.ru/perxis/perxis-go/pkg/log" "git.perx.ru/perxis/perxis-go/pkg/log"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
) )
// NewLogLogger создает логгер, который отправляет записи в log.Service, // New создает логгер, который отправляет записи в log.Service,
// вторым параметром возвращается функция, которая должна быть вызвана для остановки логгера. // вторым параметром возвращается интерфейс для запуска и остановки логгера
func NewLogLogger(service log.Service, options ...Option) (*zap.Logger, func() error) { func New(service log.Service, options ...Option) (*zap.Logger, content.Runnable) {
cfg := newConfig(options) cfg := newConfig(options)
ws := &BufferedWriteSyncer{ ws := &BufferedWriteSyncer{
...@@ -17,12 +18,12 @@ func NewLogLogger(service log.Service, options ...Option) (*zap.Logger, func() e ...@@ -17,12 +18,12 @@ func NewLogLogger(service log.Service, options ...Option) (*zap.Logger, func() e
MaxSyncQueueSize: cfg.maxSyncQueueSize, MaxSyncQueueSize: cfg.maxSyncQueueSize,
Service: service, Service: service,
} }
return zap.New(&Core{LevelEnabler: zapcore.InfoLevel, WriteSyncer: ws}), ws.Stop return zap.New(&Core{LevelEnabler: zapcore.InfoLevel, WriteSyncer: ws}), ws
} }
// WithLogLogger добавляет к переданном логгеру логгер, который отправляет записи в log.Service. // WithLogger добавляет к переданном логгеру логгер, который отправляет записи в log.Service.
// вторым параметром возвращается функция, которая должна быть вызвана для остановки логгера. // вторым параметром возвращается интерфейс для запуска и остановки логгера
func WithLogLogger(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, func() error) { func WithLogger(logger *zap.Logger, service log.Service, options ...Option) (*zap.Logger, content.Runnable) {
logLogger, stop := NewLogLogger(service, options...) logLogger, runnable := New(service, options...)
return zap.New(zapcore.NewTee(logger.Core(), logLogger.Core())), stop return zap.New(zapcore.NewTee(logger.Core(), logLogger.Core())), runnable
} }
...@@ -51,7 +51,8 @@ func TestExample(t *testing.T) { ...@@ -51,7 +51,8 @@ func TestExample(t *testing.T) {
}). }).
Once() Once()
logger, stop := NewLogLogger(service) logger, runnable := New(service)
runnable.Start()
logger.Info("создан элемент коллекции", logger.Info("создан элемент коллекции",
Category("create"), Category("create"),
...@@ -70,8 +71,7 @@ func TestExample(t *testing.T) { ...@@ -70,8 +71,7 @@ func TestExample(t *testing.T) {
Attr(map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}), Attr(map[string]map[string]any{"title": {"old": "old title", "new": "new title"}}),
) )
err := stop() runnable.Stop()
assert.NoError(t, err)
service.AssertExpectations(t) service.AssertExpectations(t)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment