Skip to content
Snippets Groups Projects
Select Git revision
  • 08aaf1e54d96c1af8c1fcc8178fec83d1c1fde53
  • master default protected
  • feature/PRXS-3043-NewURLFormat
  • feature/2781-SpacesLoggingMiddleware
  • feature/PRXS-2974-FillImageDimensions
  • feature/PRXS-3143-3235-ReferenceOptions
  • feature/PRXS-3056-LocalesFromToMap
  • feature/PRXS-3421-ImplementNewRefAPI
  • feature/PRXS-3143-LimitReferenceFields
  • feature/PRXS-3234-FeaturePruneIdents
  • PRXS-3421-RecursiveReferences
  • feature/3109-SerializeFeature
  • release/0.33
  • feature/3109-RecoverySchema
  • feature/3109-feature
  • fix/PRXS-3369-ValidateFields
  • refactor/PRXS-3306-MovePkgGroup1
  • refactor/6-pkg-refactor-expr
  • fix/PRXS-3360-TemplateBuilderPatch
  • feature/3293-MongoV2
  • feature/3272-GoVersionUp
  • v0.33.1
  • v0.32.0
  • v0.31.1
  • v0.31.0
  • v0.30.0
  • v0.29.0
  • v0.28.0
  • v0.27.0-alpha.1+16
  • v0.27.0-alpha.1+15
  • v0.27.0-alpha.1+14
  • v0.27.0-alpha.1+13
  • v0.27.0-alpha.1+12
  • v0.27.0-alpha.1+11
  • v0.27.0-alpha.1+10
  • v0.27.0-alpha.1+9
  • v0.27.0-alpha.1+8
  • v0.27.0-alpha.1+7
  • v0.27.0-alpha.1+6
  • v0.27.0-alpha.1+5
  • v0.27.0-alpha.1+4
41 results

space.go

Blame
  • buffered_write_syncer.go 5.76 KiB
    package zap
    
    import (
    	"context"
    	"sync"
    	"time"
    
    	"git.perx.ru/perxis/perxis-go/pkg/errors"
    	"git.perx.ru/perxis/perxis-go/pkg/log"
    )
    
    const (
    	defaultMaxBufferSize    = 1000
    	defaultMaxSyncQueueSize = 16
    	defaultFlushInterval    = 5 * time.Second
    )
    
    var SyncQueueOverflow = errors.New("sync queue overflow")
    
    // BufferedWriteSyncer это WriteSyncer, который отправляет записи в log.Service.
    // Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал,
    // записи отправляются в очередь для синхронизации с log.Service.
    type BufferedWriteSyncer struct {
    	// FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию.
    	//
    	// Значение по умолчанию для этого параметра равно 5 секунд.
    	FlushInterval time.Duration
    
    	// MaxBufferSize устанавливает максимальное количество записей, которые могут быть буферизованы.
    	// Когда количество буферизованных записей превысит этот порог, они будут отправлены на синхронизацию в log.Service.
    	//
    	// Значение по умолчанию для этого параметра равно 1000.
    	MaxBufferSize int
    
    	// MaxSyncQueueSize устанавливает максимальный размер очереди записей на синхронизацию с log.Service.
    	//
    	// Значение по умолчанию для этого параметра равно 16.
    	MaxSyncQueueSize int
    
    	// Service сервис для хранения записей
    	Service log.Service
    
    	wg        sync.WaitGroup
    	mu        sync.RWMutex
    	buffer    []*log.Entry
    	syncQueue chan []*log.Entry
    
    	flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен
    	started   bool          // started указывает, был ли выполнен Start
    	stopped   bool          // stopped указывает, был ли выполнен Stop
    }
    
    func (ws *BufferedWriteSyncer) start() {
    	if ws.Service == nil {
    		panic("service is required")
    	}
    
    	if ws.FlushInterval == 0 {
    		ws.FlushInterval = defaultFlushInterval
    	}
    
    	if ws.MaxBufferSize == 0 {
    		ws.MaxBufferSize = defaultMaxBufferSize
    	}
    
    	if ws.MaxSyncQueueSize == 0 {
    		ws.MaxSyncQueueSize = defaultMaxSyncQueueSize
    	}
    
    	ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
    	ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
    	ws.flushStop = make(chan struct{})
    
    	ws.wg.Add(2)
    	go ws.syncLoop()
    	go ws.flushLoop()
    
    	ws.started = true
    }
    
    func (ws *BufferedWriteSyncer) Stop() error {
    	ws.mu.Lock()
    	defer ws.mu.Unlock()
    
    	if !ws.started || ws.stopped {
    		return nil
    	}
    	ws.stopped = true
    
    	close(ws.flushStop) // завершаем flushLoop
    
    	err := ws.flush() // очищаем оставшиеся записи
    
    	close(ws.syncQueue) // завершаем syncLoop
    
    	ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop
    
    	return err
    }
    
    // Write отправляет запись в буфер.
    // Когда количество буферизованных записей превышает максимальный размер буфера, буферизированные записи будут отправлены на синхронизацию.
    func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
    	ws.mu.Lock()
    	defer ws.mu.Unlock()
    
    	if !ws.started {
    		ws.start()
    	}
    
    	// Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его.
    	if len(ws.buffer)+1 > ws.MaxBufferSize {
    		err := ws.flush()
    		if err != nil {
    			return err
    		}
    	}
    
    	ws.buffer = append(ws.buffer, entry)
    
    	return nil
    }
    
    // Sync освобождает буфер и отправляет буферизированные записи на синхронизацию.
    func (ws *BufferedWriteSyncer) Sync() error {
    	ws.mu.Lock()
    	defer ws.mu.Unlock()
    
    	if ws.started {
    		return ws.flush()
    	}
    
    	return nil
    }
    
    // flush освобождает буфер и отправляет буферизированные записи на синхронизацию.
    // Если очередь на синхронизацию переполнена, будет возвращена ошибка SyncQueueOverflow
    //
    // ВНИМАНИЕ: Не является безопасным для конкурентного вызова.
    func (ws *BufferedWriteSyncer) flush() error {
    	if len(ws.buffer) == 0 {
    		return nil
    	}
    
    	// Проверяем, не достигли ли мы предела размера очереди. Если это так, возвращаем ошибку.
    	if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize {
    		return SyncQueueOverflow
    	}
    
    	ws.syncQueue <- ws.buffer
    	ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
    
    	return nil
    }
    
    // flushLoop периодически отправляет буферизированные записи на синхронизацию.
    func (ws *BufferedWriteSyncer) flushLoop() {
    	ticker := time.NewTicker(ws.FlushInterval)
    	defer func() {
    		ticker.Stop()
    		ws.wg.Done()
    	}()
    
    	for {
    		select {
    		case <-ticker.C:
    			_ = ws.Sync()
    		case <-ws.flushStop:
    			return
    		}
    	}
    }
    
    // syncLoop синхронизирует записи с log.Service.
    func (ws *BufferedWriteSyncer) syncLoop() {
    	defer ws.wg.Done()
    
    	for entries := range ws.syncQueue {
    		_ = ws.Service.Log(context.Background(), entries)
    	}
    }