Select Git revision
buffered_write_syncer.go 5.76 KiB
package zap
import (
"context"
"sync"
"time"
"git.perx.ru/perxis/perxis-go/pkg/errors"
"git.perx.ru/perxis/perxis-go/pkg/log"
)
const (
defaultMaxBufferSize = 1000
defaultMaxSyncQueueSize = 16
defaultFlushInterval = 5 * time.Second
)
var SyncQueueOverflow = errors.New("sync queue overflow")
// BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service.
// Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал,
// записи отправляются в очередь для синхронизации с log.Service.
type BufferedWriteSyncer struct {
// FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию.
//
// Значение по умолчанию для этого параметра равно 5 секунд.
FlushInterval time.Duration
// MaxBufferSize устанавливает максимальное количество записей, которые могут быть буферизованы.
// Когда количество буферизованных записей превысит этот порог, они будут отправлены на синхронизацию в log.Service.
//
// Значение по умолчанию для этого параметра равно 1000.
MaxBufferSize int
// MaxSyncQueueSize устанавливает максимальный размер очереди записей на синхронизацию с log.Service.
//
// Значение по умолчанию для этого параметра равно 16.
MaxSyncQueueSize int
// Service сервис для хранения записей
Service log.Service
wg sync.WaitGroup
mu sync.RWMutex
buffer []*log.Entry
syncQueue chan []*log.Entry
flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен
started bool // started указывает, был ли выполнен Start
stopped bool // stopped указывает, был ли выполнен Stop
}
func (ws *BufferedWriteSyncer) start() {
if ws.Service == nil {
panic("service is required")
}
if ws.FlushInterval == 0 {
ws.FlushInterval = defaultFlushInterval
}
if ws.MaxBufferSize == 0 {
ws.MaxBufferSize = defaultMaxBufferSize
}
if ws.MaxSyncQueueSize == 0 {
ws.MaxSyncQueueSize = defaultMaxSyncQueueSize
}
ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
ws.flushStop = make(chan struct{})
ws.wg.Add(2)
go ws.syncLoop()
go ws.flushLoop()
ws.started = true
}
func (ws *BufferedWriteSyncer) Stop() error {
ws.mu.Lock()
defer ws.mu.Unlock()
if !ws.started || ws.stopped {
return nil
}
ws.stopped = true
close(ws.flushStop) // завершаем flushLoop
err := ws.flush() // очищаем оставшиеся записи
close(ws.syncQueue) // завершаем syncLoop
ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop
return err
}
// Write отправляет запись в буфер.
// Когда количество буферизованных записей превышает максимальный размер буфера, буферизированные записи будут отправлены на синхронизацию.
func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
ws.mu.Lock()
defer ws.mu.Unlock()
if !ws.started {
ws.start()
}
// Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его.
if len(ws.buffer)+1 > ws.MaxBufferSize {
err := ws.flush()
if err != nil {
return err
}
}
ws.buffer = append(ws.buffer, entry)
return nil
}
// Sync освобождает буфер и отправляет буферизированные записи на синхронизацию.
func (ws *BufferedWriteSyncer) Sync() error {
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.started {
return ws.flush()
}
return nil
}
// flush освобождает буфер и отправляет буферизированные записи на синхронизацию.
// Если очередь на синхронизацию переполнена, будет возвращена ошибка SyncQueueOverflow
//
// ВНИМАНИЕ: Не является безопасным для конкурентного вызова.
func (ws *BufferedWriteSyncer) flush() error {
if len(ws.buffer) == 0 {
return nil
}
// Проверяем, не достигли ли мы предела размера очереди. Если это так, возвращаем ошибку.
if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize {
return SyncQueueOverflow
}
ws.syncQueue <- ws.buffer
ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
return nil
}
// flushLoop периодически отправляет буферизированные записи на синхронизацию.
func (ws *BufferedWriteSyncer) flushLoop() {
ticker := time.NewTicker(ws.FlushInterval)
defer func() {
ticker.Stop()
ws.wg.Done()
}()
for {
select {
case <-ticker.C:
_ = ws.Sync()
case <-ws.flushStop:
return
}
}
}
// syncLoop синхронизирует записи с log.Service.
func (ws *BufferedWriteSyncer) syncLoop() {
defer ws.wg.Done()
for entries := range ws.syncQueue {
_ = ws.Service.Log(context.Background(), entries)
}
}