diff --git a/logs/zap/buffered_write_syncer.go b/logs/zap/buffered_write_syncer.go index e862a79d115ecd82bc09622276d4194d39715345..42fae2b134e958045ba2a5986499eac4834862f3 100644 --- a/logs/zap/buffered_write_syncer.go +++ b/logs/zap/buffered_write_syncer.go @@ -71,28 +71,40 @@ func (ws *BufferedWriteSyncer) start() { ws.buffer = make([]*logs.Entry, 0, ws.MaxBufferSize) ws.syncQueue = make(chan []*logs.Entry, ws.MaxSyncQueueSize) ws.flushStop = make(chan struct{}) + ws.started = true ws.wg.Add(2) go ws.syncLoop() go ws.flushLoop() - - ws.started = true } +// Stop останавливает РІСЃРµ фоновые работы Рё синхронизирует оставшиеся записи func (ws *BufferedWriteSyncer) Stop() error { - ws.mu.Lock() - defer ws.mu.Unlock() + // Создаем РЅРѕРІСѓСЋ область видимости для блокировки мьютекса только РІ указанной секции + // Рто необходимо, чтобы предотвратить возможный deadlock, который может возникнуть + // если после блокировки мьютекса РїСЂРё остановке будет вызван Sync внутри flushLoop(), + // который будет ожидать освобождения мьютекса + stopped, err := func() (bool, error) { + ws.mu.Lock() + defer ws.mu.Unlock() + + if !ws.started || ws.stopped { + return false, nil + } + ws.stopped = true - if !ws.started || ws.stopped { - return nil - } - ws.stopped = true + close(ws.flushStop) // завершаем flushLoop + + err := ws.flush() // очищаем оставшиеся записи - close(ws.flushStop) // завершаем flushLoop + close(ws.syncQueue) // завершаем syncLoop - err := ws.flush() // очищаем оставшиеся записи + return true, err + }() - close(ws.syncQueue) // завершаем syncLoop + if !stopped { + return nil + } ws.wg.Wait() // дожидаемся завершения flushLoop Рё syncLoop @@ -127,7 +139,7 @@ func (ws *BufferedWriteSyncer) Sync() error { ws.mu.Lock() defer ws.mu.Unlock() - if ws.started { + if ws.started && !ws.stopped { return ws.flush() }