From eb1bac17c29d0433df388fbf9612de6a90dcecf7 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Fri, 11 Oct 2024 16:23:44 -0400 Subject: [PATCH] materialize: don't use locks in the extended logger The use of a mutex in both the extended logger event handler and the async logger functions was simple, but there was a potential deadlock if the event handler needed to stop the logger and wait for it to stop while holding the lock, and the logger timer ticked over at nearly the same instant making the logger need to get the lock also to finish its last log before exiting. This updates the extended logger to not use locks at all, since it wasn't far from being able to do that in the first place. The "count" values that are read concurrently by the async loggers have been changed to use atomics, and for good measure the "round" counter is provided as an explicit argument to them, even though it doesn't strictly need to be, it's just easier to understand this way. --- materialize-boilerplate/logging.go | 127 ++++++++++++----------------- 1 file changed, 54 insertions(+), 73 deletions(-) diff --git a/materialize-boilerplate/logging.go b/materialize-boilerplate/logging.go index 1ae69136e..28bac1a80 100644 --- a/materialize-boilerplate/logging.go +++ b/materialize-boilerplate/logging.go @@ -3,6 +3,7 @@ package boilerplate import ( "strings" "sync" + "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -120,11 +121,9 @@ type extendedLogger struct { be *BindingEvents log func(log.Fields, string) - mu sync.Mutex - round int - readLoads int - sentLoaded int - readStores int + readLoads atomic.Int32 + sentLoaded atomic.Int32 + readStores atomic.Int32 readingLoadsStart time.Time sendingLoadedsStart time.Time readingStoresStart time.Time @@ -144,11 +143,7 @@ func newExtendedLogger(ll loggerAtLevel, be *BindingEvents) *extendedLogger { // stop it. The asyncLog logging callback is run periodically and while holding // the mutex for extendedLogger. The doneLog callback does not acquire the lock. func (l *extendedLogger) logAsync(asyncLog func()) (stopLogger func(doneLog func())) { - return repeatAsync(func() { - l.mu.Lock() - asyncLog() - l.mu.Unlock() - }, loggingFrequency) + return repeatAsync(func() { asyncLog() }, loggingFrequency) } func (l *extendedLogger) handler() func(transactionsEvent) { @@ -158,35 +153,23 @@ func (l *extendedLogger) handler() func(transactionsEvent) { var loadPhaseStarted bool var ackDelayActive bool var recovery = true + var round int return func(event transactionsEvent) { - l.mu.Lock() - defer l.mu.Unlock() - - // Increment counters. - switch event { - case readLoad: - l.readLoads++ - case sentLoaded: - l.sentLoaded++ - case readStore: - l.readStores++ - } - // Increment the "round" counter and establish logging for the case of // "waiting for documents". switch event { case sentStartedCommit: loadPhaseStarted = false - l.round++ + round++ l.be.round++ case sentAcknowledged: if !loadPhaseStarted { - stopWaitingForDocsLogger = l.logAsync(l.waitingForDocsLogFn()) + stopWaitingForDocsLogger = l.logAsync(l.waitingForDocsLogFn(round)) } case readLoad, readFlush: if !l.waitingForDocsStart.IsZero() { - stopWaitingForDocsLogger(l.finishedWaitingForDocsLogFn()) + stopWaitingForDocsLogger(l.finishedWaitingForDocsLogFn(round)) } l.waitingForDocsStart = time.Time{} loadPhaseStarted = true @@ -195,118 +178,116 @@ func (l *extendedLogger) handler() func(transactionsEvent) { // Start and stop other loggers, resetting counters as needed. switch event { case readLoad: - if l.readLoads == 1 { - stopLoadLogger = l.logAsync(l.readingLoadsLogFn()) + if n := l.readLoads.Add(1); n == 1 { + stopLoadLogger = l.logAsync(l.readingLoadsLogFn(round)) } case readFlush: - if l.readLoads != 0 { - stopLoadLogger(l.finishedReadingLoadsLogFn()) - l.readLoads = 0 + if total := l.readLoads.Swap(0); total != 0 { + stopLoadLogger(l.finishedReadingLoadsLogFn(round, total)) } case sentLoaded: - if l.sentLoaded == 1 { - stopLoadLogger = l.logAsync(l.processingLoadedsLogFn()) + if n := l.sentLoaded.Add(1); n == 1 { + stopLoadLogger = l.logAsync(l.processingLoadedsLogFn(round)) } case sentFlushed: - if l.sentLoaded != 0 { - stopLoadLogger(l.finishedProcessingLoadedsLogFn()) - l.sentLoaded = 0 + if total := l.sentLoaded.Swap(0); total != 0 { + stopLoadLogger(l.finishedProcessingLoadedsLogFn(round, total)) } case readStore: - if l.readStores == 1 { - stopStoreLogger = l.logAsync(l.readingStoresLogFn()) + if n := l.readStores.Add(1); n == 1 { + stopStoreLogger = l.logAsync(l.readingStoresLogFn(round)) } case readStartCommit: - if l.readStores != 0 { - stopStoreLogger(l.finishedReadingStoresLogFn()) - l.readStores = 0 + if total := l.readStores.Swap(0); total != 0 { + stopStoreLogger(l.finishedReadingStoresLogFn(round, total)) } case sentStartedCommit: // NB: The "round" is incremented in on sentStartedCommit prior to // the handling here and below by stopStoreLogger. This means that // the round for the commit is actually one less than currently // recorded. - stopStoreLogger = l.logAsync(l.runningCommitLogFn(l.round - 1)) + stopStoreLogger = l.logAsync(l.runningCommitLogFn(round - 1)) case readAcknowledge: if recovery { stopStoreLogger = l.logAsync(l.runningRecoveryCommitLogFn()) } case startedAckDelay: // NB: Ack delay is never used for the recovery commit. - stopStoreLogger(l.finishedCommitLogFn(l.round - 1)) - stopStoreLogger = l.logAsync(l.waitingForAckDelayLogFn(l.round - 1)) + stopStoreLogger(l.finishedCommitLogFn(round - 1)) + stopStoreLogger = l.logAsync(l.waitingForAckDelayLogFn(round - 1)) ackDelayActive = true case sentAcknowledged: if recovery { stopStoreLogger(l.finishedRecoveryCommitLogFn()) recovery = false } else if ackDelayActive { - stopStoreLogger(l.finishedAckDelayLogFn(l.round - 1)) + stopStoreLogger(l.finishedAckDelayLogFn(round - 1)) ackDelayActive = false } else { - stopStoreLogger(l.finishedCommitLogFn(l.round - 1)) + stopStoreLogger(l.finishedCommitLogFn(round - 1)) } } } } -func (l *extendedLogger) readingLoadsLogFn() func() { +func (l *extendedLogger) readingLoadsLogFn(round int) func() { l.readingLoadsStart = time.Now() - l.log(log.Fields{"round": l.round}, "started reading load requests") - lastLoadCount := l.readLoads + l.log(log.Fields{"round": round}, "started reading load requests") + var lastLoadCount int32 return func() { - if l.readLoads == lastLoadCount { - l.log(log.Fields{"round": l.round, "count": l.readLoads}, "waiting for more load requests") + thisLoadCount := l.readLoads.Load() + if thisLoadCount == lastLoadCount { + l.log(log.Fields{"round": round, "count": thisLoadCount}, "waiting for more load requests") } else { - l.log(log.Fields{"round": l.round, "count": l.readLoads}, "reading load requests") + l.log(log.Fields{"round": round, "count": thisLoadCount}, "reading load requests") } - lastLoadCount = l.readLoads + lastLoadCount = thisLoadCount } } -func (l *extendedLogger) finishedReadingLoadsLogFn() func() { +func (l *extendedLogger) finishedReadingLoadsLogFn(round int, total int32) func() { return func() { l.log(log.Fields{ - "round": l.round, + "round": round, "took": time.Since(l.readingLoadsStart).String(), - "count": l.readLoads, + "count": total, }, "finished reading load requests") } } -func (l *extendedLogger) processingLoadedsLogFn() func() { +func (l *extendedLogger) processingLoadedsLogFn(round int) func() { l.sendingLoadedsStart = time.Now() - l.log(log.Fields{"round": l.round}, "started processing loaded documents") + l.log(log.Fields{"round": round}, "started processing loaded documents") return func() { - l.log(log.Fields{"round": l.round, "count": l.sentLoaded}, "processing loaded documents") + l.log(log.Fields{"round": round, "count": l.sentLoaded.Load()}, "processing loaded documents") } } -func (l *extendedLogger) finishedProcessingLoadedsLogFn() func() { +func (l *extendedLogger) finishedProcessingLoadedsLogFn(round int, total int32) func() { return func() { l.log(log.Fields{ - "round": l.round, + "round": round, "took": time.Since(l.sendingLoadedsStart).String(), - "count": l.sentLoaded, + "count": total, }, "finished processing loaded documents") } } -func (l *extendedLogger) readingStoresLogFn() func() { +func (l *extendedLogger) readingStoresLogFn(round int) func() { l.readingStoresStart = time.Now() - l.log(log.Fields{"round": l.round}, "started reading store requests") + l.log(log.Fields{"round": round}, "started reading store requests") return func() { - l.log(log.Fields{"round": l.round, "count": l.readStores}, "reading store requests") + l.log(log.Fields{"round": round, "count": l.readStores.Load()}, "reading store requests") } } -func (l *extendedLogger) finishedReadingStoresLogFn() func() { +func (l *extendedLogger) finishedReadingStoresLogFn(round int, total int32) func() { return func() { l.log(log.Fields{ - "round": l.round, + "round": round, "took": time.Since(l.readingStoresStart).String(), - "count": l.readStores, + "count": total, }, "finished reading store requests") } } @@ -355,17 +336,17 @@ func (l *extendedLogger) finishedRecoveryCommitLogFn() func() { } } -func (l *extendedLogger) waitingForDocsLogFn() func() { +func (l *extendedLogger) waitingForDocsLogFn(round int) func() { l.waitingForDocsStart = time.Now() - l.log(log.Fields{"round": l.round}, "started waiting for documents") + l.log(log.Fields{"round": round}, "started waiting for documents") return func() { - l.log(log.Fields{"round": l.round}, "waiting for documents") + l.log(log.Fields{"round": round}, "waiting for documents") } } -func (l *extendedLogger) finishedWaitingForDocsLogFn() func() { +func (l *extendedLogger) finishedWaitingForDocsLogFn(round int) func() { return func() { - l.log(log.Fields{"round": l.round, "took": time.Since(l.waitingForDocsStart).String()}, "finished waiting for documents") + l.log(log.Fields{"round": round, "took": time.Since(l.waitingForDocsStart).String()}, "finished waiting for documents") } }