Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

materialize: don't use locks in the extended logger #2043

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 54 additions & 73 deletions materialize-boilerplate/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package boilerplate
import (
"strings"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -120,11 +121,9 @@ type extendedLogger struct {
be *BindingEvents
log func(log.Fields, string)

mu sync.Mutex
round int
readLoads int
sentLoaded int
readStores int
readLoads atomic.Int32
sentLoaded atomic.Int32
readStores atomic.Int32
readingLoadsStart time.Time
sendingLoadedsStart time.Time
readingStoresStart time.Time
Expand All @@ -144,11 +143,7 @@ func newExtendedLogger(ll loggerAtLevel, be *BindingEvents) *extendedLogger {
// stop it. The asyncLog logging callback is run periodically and while holding
// the mutex for extendedLogger. The doneLog callback does not acquire the lock.
func (l *extendedLogger) logAsync(asyncLog func()) (stopLogger func(doneLog func())) {
return repeatAsync(func() {
l.mu.Lock()
asyncLog()
l.mu.Unlock()
}, loggingFrequency)
return repeatAsync(asyncLog, loggingFrequency)
}

func (l *extendedLogger) handler() func(transactionsEvent) {
Expand All @@ -158,35 +153,23 @@ func (l *extendedLogger) handler() func(transactionsEvent) {
var loadPhaseStarted bool
var ackDelayActive bool
var recovery = true
var round int

return func(event transactionsEvent) {
l.mu.Lock()
defer l.mu.Unlock()

// Increment counters.
switch event {
case readLoad:
l.readLoads++
case sentLoaded:
l.sentLoaded++
case readStore:
l.readStores++
}

// Increment the "round" counter and establish logging for the case of
// "waiting for documents".
switch event {
case sentStartedCommit:
loadPhaseStarted = false
l.round++
round++
l.be.round++
case sentAcknowledged:
if !loadPhaseStarted {
stopWaitingForDocsLogger = l.logAsync(l.waitingForDocsLogFn())
stopWaitingForDocsLogger = l.logAsync(l.waitingForDocsLogFn(round))
}
case readLoad, readFlush:
if !l.waitingForDocsStart.IsZero() {
stopWaitingForDocsLogger(l.finishedWaitingForDocsLogFn())
stopWaitingForDocsLogger(l.finishedWaitingForDocsLogFn(round))
}
l.waitingForDocsStart = time.Time{}
loadPhaseStarted = true
Expand All @@ -195,118 +178,116 @@ func (l *extendedLogger) handler() func(transactionsEvent) {
// Start and stop other loggers, resetting counters as needed.
switch event {
case readLoad:
if l.readLoads == 1 {
stopLoadLogger = l.logAsync(l.readingLoadsLogFn())
if n := l.readLoads.Add(1); n == 1 {
stopLoadLogger = l.logAsync(l.readingLoadsLogFn(round))
}
case readFlush:
if l.readLoads != 0 {
stopLoadLogger(l.finishedReadingLoadsLogFn())
l.readLoads = 0
if total := l.readLoads.Swap(0); total != 0 {
stopLoadLogger(l.finishedReadingLoadsLogFn(round, total))
}
case sentLoaded:
if l.sentLoaded == 1 {
stopLoadLogger = l.logAsync(l.processingLoadedsLogFn())
if n := l.sentLoaded.Add(1); n == 1 {
stopLoadLogger = l.logAsync(l.processingLoadedsLogFn(round))
}
case sentFlushed:
if l.sentLoaded != 0 {
stopLoadLogger(l.finishedProcessingLoadedsLogFn())
l.sentLoaded = 0
if total := l.sentLoaded.Swap(0); total != 0 {
stopLoadLogger(l.finishedProcessingLoadedsLogFn(round, total))
}
case readStore:
if l.readStores == 1 {
stopStoreLogger = l.logAsync(l.readingStoresLogFn())
if n := l.readStores.Add(1); n == 1 {
stopStoreLogger = l.logAsync(l.readingStoresLogFn(round))
}
case readStartCommit:
if l.readStores != 0 {
stopStoreLogger(l.finishedReadingStoresLogFn())
l.readStores = 0
if total := l.readStores.Swap(0); total != 0 {
stopStoreLogger(l.finishedReadingStoresLogFn(round, total))
}
case sentStartedCommit:
// NB: The "round" is incremented in on sentStartedCommit prior to
// the handling here and below by stopStoreLogger. This means that
// the round for the commit is actually one less than currently
// recorded.
stopStoreLogger = l.logAsync(l.runningCommitLogFn(l.round - 1))
stopStoreLogger = l.logAsync(l.runningCommitLogFn(round - 1))
case readAcknowledge:
if recovery {
stopStoreLogger = l.logAsync(l.runningRecoveryCommitLogFn())
}
case startedAckDelay:
// NB: Ack delay is never used for the recovery commit.
stopStoreLogger(l.finishedCommitLogFn(l.round - 1))
stopStoreLogger = l.logAsync(l.waitingForAckDelayLogFn(l.round - 1))
stopStoreLogger(l.finishedCommitLogFn(round - 1))
stopStoreLogger = l.logAsync(l.waitingForAckDelayLogFn(round - 1))
ackDelayActive = true
case sentAcknowledged:
if recovery {
stopStoreLogger(l.finishedRecoveryCommitLogFn())
recovery = false
} else if ackDelayActive {
stopStoreLogger(l.finishedAckDelayLogFn(l.round - 1))
stopStoreLogger(l.finishedAckDelayLogFn(round - 1))
ackDelayActive = false
} else {
stopStoreLogger(l.finishedCommitLogFn(l.round - 1))
stopStoreLogger(l.finishedCommitLogFn(round - 1))
}
}
}
}

func (l *extendedLogger) readingLoadsLogFn() func() {
func (l *extendedLogger) readingLoadsLogFn(round int) func() {
l.readingLoadsStart = time.Now()
l.log(log.Fields{"round": l.round}, "started reading load requests")
lastLoadCount := l.readLoads
l.log(log.Fields{"round": round}, "started reading load requests")
var lastLoadCount int32

return func() {
if l.readLoads == lastLoadCount {
l.log(log.Fields{"round": l.round, "count": l.readLoads}, "waiting for more load requests")
thisLoadCount := l.readLoads.Load()
if thisLoadCount == lastLoadCount {
l.log(log.Fields{"round": round, "count": thisLoadCount}, "waiting for more load requests")
} else {
l.log(log.Fields{"round": l.round, "count": l.readLoads}, "reading load requests")
l.log(log.Fields{"round": round, "count": thisLoadCount}, "reading load requests")
}
lastLoadCount = l.readLoads
lastLoadCount = thisLoadCount
}
}

func (l *extendedLogger) finishedReadingLoadsLogFn() func() {
func (l *extendedLogger) finishedReadingLoadsLogFn(round int, total int32) func() {
return func() {
l.log(log.Fields{
"round": l.round,
"round": round,
"took": time.Since(l.readingLoadsStart).String(),
"count": l.readLoads,
"count": total,
}, "finished reading load requests")
}
}

func (l *extendedLogger) processingLoadedsLogFn() func() {
func (l *extendedLogger) processingLoadedsLogFn(round int) func() {
l.sendingLoadedsStart = time.Now()
l.log(log.Fields{"round": l.round}, "started processing loaded documents")
l.log(log.Fields{"round": round}, "started processing loaded documents")
return func() {
l.log(log.Fields{"round": l.round, "count": l.sentLoaded}, "processing loaded documents")
l.log(log.Fields{"round": round, "count": l.sentLoaded.Load()}, "processing loaded documents")
}
}

func (l *extendedLogger) finishedProcessingLoadedsLogFn() func() {
func (l *extendedLogger) finishedProcessingLoadedsLogFn(round int, total int32) func() {
return func() {
l.log(log.Fields{
"round": l.round,
"round": round,
"took": time.Since(l.sendingLoadedsStart).String(),
"count": l.sentLoaded,
"count": total,
}, "finished processing loaded documents")
}
}

func (l *extendedLogger) readingStoresLogFn() func() {
func (l *extendedLogger) readingStoresLogFn(round int) func() {
l.readingStoresStart = time.Now()
l.log(log.Fields{"round": l.round}, "started reading store requests")
l.log(log.Fields{"round": round}, "started reading store requests")
return func() {
l.log(log.Fields{"round": l.round, "count": l.readStores}, "reading store requests")
l.log(log.Fields{"round": round, "count": l.readStores.Load()}, "reading store requests")
}
}

func (l *extendedLogger) finishedReadingStoresLogFn() func() {
func (l *extendedLogger) finishedReadingStoresLogFn(round int, total int32) func() {
return func() {
l.log(log.Fields{
"round": l.round,
"round": round,
"took": time.Since(l.readingStoresStart).String(),
"count": l.readStores,
"count": total,
}, "finished reading store requests")
}
}
Expand Down Expand Up @@ -355,17 +336,17 @@ func (l *extendedLogger) finishedRecoveryCommitLogFn() func() {
}
}

func (l *extendedLogger) waitingForDocsLogFn() func() {
func (l *extendedLogger) waitingForDocsLogFn(round int) func() {
l.waitingForDocsStart = time.Now()
l.log(log.Fields{"round": l.round}, "started waiting for documents")
l.log(log.Fields{"round": round}, "started waiting for documents")
return func() {
l.log(log.Fields{"round": l.round}, "waiting for documents")
l.log(log.Fields{"round": round}, "waiting for documents")
}
}

func (l *extendedLogger) finishedWaitingForDocsLogFn() func() {
func (l *extendedLogger) finishedWaitingForDocsLogFn(round int) func() {
return func() {
l.log(log.Fields{"round": l.round, "took": time.Since(l.waitingForDocsStart).String()}, "finished waiting for documents")
l.log(log.Fields{"round": round, "took": time.Since(l.waitingForDocsStart).String()}, "finished waiting for documents")
}
}

Expand Down
Loading