Skip to content

Commit

Permalink
materialize: don't use locks in the extended logger
Browse files Browse the repository at this point in the history
The use of a mutex in both the extended logger event handler and the async
logger functions was simple, but there was a potential deadlock if the event
handler needed to stop the logger and wait for it to stop while holding the
lock, and the logger timer ticked over at nearly the same instant making the
logger need to get the lock also to finish its last log before exiting.

This updates the extended logger to not use locks at all, since it wasn't far
from being able to do that in the first place. The "count" values that are read
concurrently by the async loggers have been changed to use atomics, and for good
measure the "round" counter is provided as an explicit argument to them, even
though it doesn't strictly need to be, it's just easier to understand this way.
  • Loading branch information
williamhbaker committed Oct 11, 2024
1 parent 44de46e commit 611dc61
Showing 1 changed file with 54 additions and 73 deletions.
127 changes: 54 additions & 73 deletions materialize-boilerplate/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package boilerplate
import (
"strings"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -120,11 +121,9 @@ type extendedLogger struct {
be *BindingEvents
log func(log.Fields, string)

mu sync.Mutex
round int
readLoads int
sentLoaded int
readStores int
readLoads atomic.Int32
sentLoaded atomic.Int32
readStores atomic.Int32
readingLoadsStart time.Time
sendingLoadedsStart time.Time
readingStoresStart time.Time
Expand All @@ -144,11 +143,7 @@ func newExtendedLogger(ll loggerAtLevel, be *BindingEvents) *extendedLogger {
// stop it. The asyncLog logging callback is run periodically and while holding
// the mutex for extendedLogger. The doneLog callback does not acquire the lock.
func (l *extendedLogger) logAsync(asyncLog func()) (stopLogger func(doneLog func())) {
return repeatAsync(func() {
l.mu.Lock()
asyncLog()
l.mu.Unlock()
}, loggingFrequency)
return repeatAsync(asyncLog, loggingFrequency)
}

func (l *extendedLogger) handler() func(transactionsEvent) {
Expand All @@ -158,35 +153,23 @@ func (l *extendedLogger) handler() func(transactionsEvent) {
var loadPhaseStarted bool
var ackDelayActive bool
var recovery = true
var round int

return func(event transactionsEvent) {
l.mu.Lock()
defer l.mu.Unlock()

// Increment counters.
switch event {
case readLoad:
l.readLoads++
case sentLoaded:
l.sentLoaded++
case readStore:
l.readStores++
}

// Increment the "round" counter and establish logging for the case of
// "waiting for documents".
switch event {
case sentStartedCommit:
loadPhaseStarted = false
l.round++
round++
l.be.round++
case sentAcknowledged:
if !loadPhaseStarted {
stopWaitingForDocsLogger = l.logAsync(l.waitingForDocsLogFn())
stopWaitingForDocsLogger = l.logAsync(l.waitingForDocsLogFn(round))
}
case readLoad, readFlush:
if !l.waitingForDocsStart.IsZero() {
stopWaitingForDocsLogger(l.finishedWaitingForDocsLogFn())
stopWaitingForDocsLogger(l.finishedWaitingForDocsLogFn(round))
}
l.waitingForDocsStart = time.Time{}
loadPhaseStarted = true
Expand All @@ -195,118 +178,116 @@ func (l *extendedLogger) handler() func(transactionsEvent) {
// Start and stop other loggers, resetting counters as needed.
switch event {
case readLoad:
if l.readLoads == 1 {
stopLoadLogger = l.logAsync(l.readingLoadsLogFn())
if n := l.readLoads.Add(1); n == 1 {
stopLoadLogger = l.logAsync(l.readingLoadsLogFn(round))
}
case readFlush:
if l.readLoads != 0 {
stopLoadLogger(l.finishedReadingLoadsLogFn())
l.readLoads = 0
if total := l.readLoads.Swap(0); total != 0 {
stopLoadLogger(l.finishedReadingLoadsLogFn(round, total))
}
case sentLoaded:
if l.sentLoaded == 1 {
stopLoadLogger = l.logAsync(l.processingLoadedsLogFn())
if n := l.sentLoaded.Add(1); n == 1 {
stopLoadLogger = l.logAsync(l.processingLoadedsLogFn(round))
}
case sentFlushed:
if l.sentLoaded != 0 {
stopLoadLogger(l.finishedProcessingLoadedsLogFn())
l.sentLoaded = 0
if total := l.sentLoaded.Swap(0); total != 0 {
stopLoadLogger(l.finishedProcessingLoadedsLogFn(round, total))
}
case readStore:
if l.readStores == 1 {
stopStoreLogger = l.logAsync(l.readingStoresLogFn())
if n := l.readStores.Add(1); n == 1 {
stopStoreLogger = l.logAsync(l.readingStoresLogFn(round))
}
case readStartCommit:
if l.readStores != 0 {
stopStoreLogger(l.finishedReadingStoresLogFn())
l.readStores = 0
if total := l.readStores.Swap(0); total != 0 {
stopStoreLogger(l.finishedReadingStoresLogFn(round, total))
}
case sentStartedCommit:
// NB: The "round" is incremented in on sentStartedCommit prior to
// the handling here and below by stopStoreLogger. This means that
// the round for the commit is actually one less than currently
// recorded.
stopStoreLogger = l.logAsync(l.runningCommitLogFn(l.round - 1))
stopStoreLogger = l.logAsync(l.runningCommitLogFn(round - 1))
case readAcknowledge:
if recovery {
stopStoreLogger = l.logAsync(l.runningRecoveryCommitLogFn())
}
case startedAckDelay:
// NB: Ack delay is never used for the recovery commit.
stopStoreLogger(l.finishedCommitLogFn(l.round - 1))
stopStoreLogger = l.logAsync(l.waitingForAckDelayLogFn(l.round - 1))
stopStoreLogger(l.finishedCommitLogFn(round - 1))
stopStoreLogger = l.logAsync(l.waitingForAckDelayLogFn(round - 1))
ackDelayActive = true
case sentAcknowledged:
if recovery {
stopStoreLogger(l.finishedRecoveryCommitLogFn())
recovery = false
} else if ackDelayActive {
stopStoreLogger(l.finishedAckDelayLogFn(l.round - 1))
stopStoreLogger(l.finishedAckDelayLogFn(round - 1))
ackDelayActive = false
} else {
stopStoreLogger(l.finishedCommitLogFn(l.round - 1))
stopStoreLogger(l.finishedCommitLogFn(round - 1))
}
}
}
}

func (l *extendedLogger) readingLoadsLogFn() func() {
func (l *extendedLogger) readingLoadsLogFn(round int) func() {
l.readingLoadsStart = time.Now()
l.log(log.Fields{"round": l.round}, "started reading load requests")
lastLoadCount := l.readLoads
l.log(log.Fields{"round": round}, "started reading load requests")
var lastLoadCount int32

return func() {
if l.readLoads == lastLoadCount {
l.log(log.Fields{"round": l.round, "count": l.readLoads}, "waiting for more load requests")
thisLoadCount := l.readLoads.Load()
if thisLoadCount == lastLoadCount {
l.log(log.Fields{"round": round, "count": thisLoadCount}, "waiting for more load requests")
} else {
l.log(log.Fields{"round": l.round, "count": l.readLoads}, "reading load requests")
l.log(log.Fields{"round": round, "count": thisLoadCount}, "reading load requests")
}
lastLoadCount = l.readLoads
lastLoadCount = thisLoadCount
}
}

func (l *extendedLogger) finishedReadingLoadsLogFn() func() {
func (l *extendedLogger) finishedReadingLoadsLogFn(round int, total int32) func() {
return func() {
l.log(log.Fields{
"round": l.round,
"round": round,
"took": time.Since(l.readingLoadsStart).String(),
"count": l.readLoads,
"count": total,
}, "finished reading load requests")
}
}

func (l *extendedLogger) processingLoadedsLogFn() func() {
func (l *extendedLogger) processingLoadedsLogFn(round int) func() {
l.sendingLoadedsStart = time.Now()
l.log(log.Fields{"round": l.round}, "started processing loaded documents")
l.log(log.Fields{"round": round}, "started processing loaded documents")
return func() {
l.log(log.Fields{"round": l.round, "count": l.sentLoaded}, "processing loaded documents")
l.log(log.Fields{"round": round, "count": l.sentLoaded.Load()}, "processing loaded documents")
}
}

func (l *extendedLogger) finishedProcessingLoadedsLogFn() func() {
func (l *extendedLogger) finishedProcessingLoadedsLogFn(round int, total int32) func() {
return func() {
l.log(log.Fields{
"round": l.round,
"round": round,
"took": time.Since(l.sendingLoadedsStart).String(),
"count": l.sentLoaded,
"count": total,
}, "finished processing loaded documents")
}
}

func (l *extendedLogger) readingStoresLogFn() func() {
func (l *extendedLogger) readingStoresLogFn(round int) func() {
l.readingStoresStart = time.Now()
l.log(log.Fields{"round": l.round}, "started reading store requests")
l.log(log.Fields{"round": round}, "started reading store requests")
return func() {
l.log(log.Fields{"round": l.round, "count": l.readStores}, "reading store requests")
l.log(log.Fields{"round": round, "count": l.readStores.Load()}, "reading store requests")
}
}

func (l *extendedLogger) finishedReadingStoresLogFn() func() {
func (l *extendedLogger) finishedReadingStoresLogFn(round int, total int32) func() {
return func() {
l.log(log.Fields{
"round": l.round,
"round": round,
"took": time.Since(l.readingStoresStart).String(),
"count": l.readStores,
"count": total,
}, "finished reading store requests")
}
}
Expand Down Expand Up @@ -355,17 +336,17 @@ func (l *extendedLogger) finishedRecoveryCommitLogFn() func() {
}
}

func (l *extendedLogger) waitingForDocsLogFn() func() {
func (l *extendedLogger) waitingForDocsLogFn(round int) func() {
l.waitingForDocsStart = time.Now()
l.log(log.Fields{"round": l.round}, "started waiting for documents")
l.log(log.Fields{"round": round}, "started waiting for documents")
return func() {
l.log(log.Fields{"round": l.round}, "waiting for documents")
l.log(log.Fields{"round": round}, "waiting for documents")
}
}

func (l *extendedLogger) finishedWaitingForDocsLogFn() func() {
func (l *extendedLogger) finishedWaitingForDocsLogFn(round int) func() {
return func() {
l.log(log.Fields{"round": l.round, "took": time.Since(l.waitingForDocsStart).String()}, "finished waiting for documents")
l.log(log.Fields{"round": round, "took": time.Since(l.waitingForDocsStart).String()}, "finished waiting for documents")
}
}

Expand Down

0 comments on commit 611dc61

Please sign in to comment.