Select Git revision
buffered_write_syncer.go
buffered_write_syncer.go 5.44 KiB
package zap
import (
"context"
"sync"
"time"
"git.perx.ru/perxis/perxis-go/pkg/errors"
"git.perx.ru/perxis/perxis-go/pkg/log"
)
var SyncQueueOverflow = errors.New("sync queue overflow")
// BufferedWriteSyncer буферизирует записи в памяти и отправляет их в log.Service.
// Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал,
// записи отправляются в очередь для синхронизации с log.Service. Таким образом, выполнение потока не блокируется при отправке записей.
// В случае переполнения очереди для синхронизации будет возвращена ошибка SyncQueueOverflow.
type BufferedWriteSyncer struct {
// FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию.
//
// Значение по умолчанию для этого параметра равно 5 секунд.
FlushInterval time.Duration
// MaxBufferSize устанавливает максимальное количество записей, которые могут быть буферизованы.
// Когда количество буферизованных записей превысит этот порог, они будут отправлены на синхронизацию в log.Service.
//
// Значение по умолчанию для этого параметра равно 1000.
MaxBufferSize int
// MaxSyncQueueSize устанавливает максимальный размер очереди записей на синхронизацию с log.Service.
//
// Значение по умолчанию для этого параметра равно 16.
MaxSyncQueueSize int
// Service сервис для хранения записей
Service log.Service
wg sync.WaitGroup
mu sync.RWMutex
buffer []*log.Entry
syncQueue chan []*log.Entry
flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен
stopped bool // stopped указывает, был ли выполнен Stop
}
func (ws *BufferedWriteSyncer) Start() {
ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
ws.flushStop = make(chan struct{})
ws.wg.Add(2)
go ws.syncLoop()
go ws.flushLoop()
}
func (ws *BufferedWriteSyncer) Stop() error {
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.stopped {
return nil
}
ws.stopped = true
close(ws.flushStop) // завершаем flushLoop
err := ws.flush() // очищаем оставшиеся записи
close(ws.syncQueue) // завершаем syncLoop
ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop
return err
}
// Write отправляет запись в буфер.
// Когда количество буферизованных записей превышает максимальный размер буфера или когда пройдет заданный интервал,
// буферизированные записи будут отправлены на синхронизацию.
func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
ws.mu.Lock()
defer ws.mu.Unlock()
// Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его.
if len(ws.buffer)+1 > ws.MaxBufferSize {
err := ws.flush()
if err != nil {
return err
}
}
ws.buffer = append(ws.buffer, entry)
return nil
}
// Sync освобождает буфер и отправляет копию записей на синхронизацию.
func (ws *BufferedWriteSyncer) Sync() error {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.flush()
}
// flush освобождает буфер и отправляет копию записей на синхронизацию. Не является безопасным для конкурентного вызова.
func (ws *BufferedWriteSyncer) flush() error {
if len(ws.buffer) == 0 {
return nil
}
if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize {
return SyncQueueOverflow
}
// Создаем копию буфера для отправки в очередь на синхронизацию.
entries := make([]*log.Entry, len(ws.buffer))
copy(entries, ws.buffer)
// Очищаем буфер с переиспользованием ёмкости.
ws.buffer = (ws.buffer)[:0]
ws.syncQueue <- entries
return nil
}
// flushLoop периодически отправляет буферизированные записи на синхронизацию.
func (ws *BufferedWriteSyncer) flushLoop() {
ticker := time.NewTicker(ws.FlushInterval)
defer func() {
ticker.Stop()
ws.wg.Done()
}()
for {
select {
case <-ticker.C:
_ = ws.Sync()
case <-ws.flushStop:
return
}
}
}
// syncLoop синхронизирует полученные записи с log.Service.
func (ws *BufferedWriteSyncer) syncLoop() {
defer ws.wg.Done()
for entries := range ws.syncQueue {
_ = ws.Service.Log(context.Background(), entries)
}
}