Skip to content
Snippets Groups Projects
Select Git revision
  • 7e3bfbb68629a4a2aa69073a30138ead2dcb7682
  • master default protected
  • feature/PRXS-1869-SuperUser
  • feature/PRXS-3127-RevertTestify
  • fix/PRXS-3121-SaveFileInfo
  • feature/PRXS-3106-NoCache
  • feature/PRXS-3043-NewURLFormat
  • feature/2781-SpacesLoggingMiddleware
  • feature/PRXS-2974-FillImageDimensions
  • feature/PRXS-3143-3235-ReferenceOptions
  • feature/PRXS-3056-LocalesFromToMap
  • feature/PRXS-3421-ImplementNewRefAPI
  • feature/PRXS-3143-LimitReferenceFields
  • feature/PRXS-3234-FeaturePruneIdents
  • PRXS-3421-RecursiveReferences
  • feature/3109-SerializeFeature
  • release/0.33
  • feature/3109-RecoverySchema
  • feature/3109-feature
  • fix/PRXS-3369-ValidateFields
  • refactor/PRXS-3306-MovePkgGroup1
  • v0.33.1
  • v0.32.0
  • v0.31.1
  • v0.31.0
  • v0.30.0
  • v0.29.0
  • v0.28.0
  • v0.27.0-alpha.1+16
  • v0.27.0-alpha.1+15
  • v0.27.0-alpha.1+14
  • v0.27.0-alpha.1+13
  • v0.27.0-alpha.1+12
  • v0.27.0-alpha.1+11
  • v0.27.0-alpha.1+10
  • v0.27.0-alpha.1+9
  • v0.27.0-alpha.1+8
  • v0.27.0-alpha.1+7
  • v0.27.0-alpha.1+6
  • v0.27.0-alpha.1+5
  • v0.27.0-alpha.1+4
41 results

field.go

Blame
  • buffered_write_syncer.go 5.44 KiB
    package zap
    
    import (
    	"context"
    	"sync"
    	"time"
    
    	"git.perx.ru/perxis/perxis-go/pkg/errors"
    	"git.perx.ru/perxis/perxis-go/pkg/log"
    )
    
    var SyncQueueOverflow = errors.New("sync queue overflow")
    
    // BufferedWriteSyncer буферизирует записи в памяти и отправляет их в log.Service.
    // Когда количество буферизированных записей достигает некоторого предела или проходит определенный фиксированный интервал,
    // записи отправляются в очередь для синхронизации с log.Service. Таким образом, выполнение потока не блокируется при отправке записей.
    // В случае переполнения очереди для синхронизации будет возвращена ошибка SyncQueueOverflow.
    type BufferedWriteSyncer struct {
    	// FlushInterval устанавливает интервал, через который буферизированные записи будут отправлены на синхронизацию.
    	//
    	// Значение по умолчанию для этого параметра равно 5 секунд.
    	FlushInterval time.Duration
    
    	// MaxBufferSize устанавливает максимальное количество записей, которые могут быть буферизованы.
    	// Когда количество буферизованных записей превысит этот порог, они будут отправлены на синхронизацию в log.Service.
    	//
    	// Значение по умолчанию для этого параметра равно 1000.
    	MaxBufferSize int
    
    	// MaxSyncQueueSize устанавливает максимальный размер очереди записей на синхронизацию с log.Service.
    	//
    	// Значение по умолчанию для этого параметра равно 16.
    	MaxSyncQueueSize int
    
    	// Service сервис для хранения записей
    	Service log.Service
    
    	wg        sync.WaitGroup
    	mu        sync.RWMutex
    	buffer    []*log.Entry
    	syncQueue chan []*log.Entry
    
    	flushStop chan struct{} // flushStop закрывается, когда flushLoop должен быть остановлен
    	stopped   bool          // stopped указывает, был ли выполнен Stop
    }
    
    func (ws *BufferedWriteSyncer) Start() {
    	ws.buffer = make([]*log.Entry, 0, ws.MaxBufferSize)
    	ws.syncQueue = make(chan []*log.Entry, ws.MaxSyncQueueSize)
    	ws.flushStop = make(chan struct{})
    
    	ws.wg.Add(2)
    	go ws.syncLoop()
    	go ws.flushLoop()
    }
    
    func (ws *BufferedWriteSyncer) Stop() error {
    	ws.mu.Lock()
    	defer ws.mu.Unlock()
    
    	if ws.stopped {
    		return nil
    	}
    	ws.stopped = true
    
    	close(ws.flushStop) // завершаем flushLoop
    
    	err := ws.flush() // очищаем оставшиеся записи
    
    	close(ws.syncQueue) // завершаем syncLoop
    
    	ws.wg.Wait() // дожидаемся завершения flushLoop и syncLoop
    
    	return err
    }
    
    // Write отправляет запись в буфер.
    // Когда количество буферизованных записей превышает максимальный размер буфера или когда пройдет заданный интервал,
    // буферизированные записи будут отправлены на синхронизацию.
    func (ws *BufferedWriteSyncer) Write(entry *log.Entry) error {
    	ws.mu.Lock()
    	defer ws.mu.Unlock()
    
    	// Проверяем, не достигли ли мы предела размера буфера. Если это так, тогда освобождаем его.
    	if len(ws.buffer)+1 > ws.MaxBufferSize {
    		err := ws.flush()
    		if err != nil {
    			return err
    		}
    	}
    
    	ws.buffer = append(ws.buffer, entry)
    
    	return nil
    }
    
    // Sync освобождает буфер и отправляет копию записей на синхронизацию.
    func (ws *BufferedWriteSyncer) Sync() error {
    	ws.mu.Lock()
    	defer ws.mu.Unlock()
    
    	return ws.flush()
    }
    
    // flush освобождает буфер и отправляет копию записей на синхронизацию. Не является безопасным для конкурентного вызова.
    func (ws *BufferedWriteSyncer) flush() error {
    	if len(ws.buffer) == 0 {
    		return nil
    	}
    
    	if len(ws.syncQueue)+1 > ws.MaxSyncQueueSize {
    		return SyncQueueOverflow
    	}
    
    	// Создаем копию буфера для отправки в очередь на синхронизацию.
    	entries := make([]*log.Entry, len(ws.buffer))
    	copy(entries, ws.buffer)
    
    	// Очищаем буфер с переиспользованием ёмкости.
    	ws.buffer = (ws.buffer)[:0]
    
    	ws.syncQueue <- entries
    
    	return nil
    }
    
    // flushLoop периодически отправляет буферизированные записи на синхронизацию.
    func (ws *BufferedWriteSyncer) flushLoop() {
    	ticker := time.NewTicker(ws.FlushInterval)
    	defer func() {
    		ticker.Stop()
    		ws.wg.Done()
    	}()
    
    	for {
    		select {
    		case <-ticker.C:
    			_ = ws.Sync()
    		case <-ws.flushStop:
    			return
    		}
    	}
    }
    
    // syncLoop синхронизирует полученные записи с log.Service.
    func (ws *BufferedWriteSyncer) syncLoop() {
    	defer ws.wg.Done()
    
    	for entries := range ws.syncQueue {
    		_ = ws.Service.Log(context.Background(), entries)
    	}
    }