Skip to content

Commit

Permalink
materialize: fix delay acknowledgement based on stored history
Browse files Browse the repository at this point in the history
  • Loading branch information
williamhbaker committed Oct 8, 2024
1 parent dfd0860 commit 391080d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
4 changes: 2 additions & 2 deletions materialize-boilerplate/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (l *BindingEvents) StartedResourceCommit(path []string) {
l.activeStores[strings.Join(path, ".")] = time.Now()
l.mu.Unlock()

l.log(log.Fields{"round": l.round, "resourcePath": path}, "started commiting documents to resource")
l.log(log.Fields{"round": l.round, "resourcePath": path}, "started commiting documents for resource")
})
}

Expand All @@ -462,7 +462,7 @@ func (l *BindingEvents) FinishedResourceCommit(path []string) {
"round": l.round,
"resourcePath": path,
"took": took.String(),
}, "finished commiting documents to resource")
}, "finished commiting documents for resource")
})
}

Expand Down
33 changes: 17 additions & 16 deletions materialize-boilerplate/transactions_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package boilerplate
import (
"context"
"fmt"
"slices"
"time"

m "github.com/estuary/connectors/go/protocols/materialize"
Expand All @@ -12,20 +13,20 @@ import (
)

const (
// storeThreshold is a somewhat crude indication that a transaction was
// storeBackfillThreshold is a somewhat crude indication that a transaction was
// likely part of a backfill vs. a smaller incremental streaming
// transaction. If a materialization is configured with a commit delay, it
// should only apply that delay to transactions that occur after it has
// fully backfilled from the collection. The idea is that if a transaction
// stored a lot of documents it's probably part of a backfill.
storeThreshold = 1_000_000
storeBackfillThreshold = 1_000_000

// The number of transactions we will look back at for determining if an
// acknowledgement delay should be applied. The number of documents stored
// for all of these transactions must be below storeThreshold for the delay
// to apply. This is used to estimate if we are in a "streaming" mode or
// not, based on a consistent number of documents per transactions being
// below the threshold.
// for all of these transactions must be below storeBackfillThreshold for
// the delay to apply. This is used to estimate if we are in a "streaming"
// mode or not, based on a consistent number of documents per transactions
// being below the threshold.
storedHistorySize = 5
)

Expand Down Expand Up @@ -168,14 +169,6 @@ func (l *transactionsStream) maybeDelayAcknowledgement() error {
// lastAckTime at a zero value means this is the recovery commit, and we
// never delay on the recovery commit.
if l.ackSchedule != nil && !l.lastAckTime.IsZero() {
aboveThreshold := false
for _, v := range l.storedHistory {
if v >= storeThreshold {
aboveThreshold = true
break
}
}

nextAckAt := l.ackSchedule.Next(l.lastAckTime)
d := time.Until(nextAckAt)

Expand All @@ -185,13 +178,21 @@ func (l *transactionsStream) maybeDelayAcknowledgement() error {
"storedHistory": l.storedHistory,
})

if aboveThreshold {
// If there have been at least `storedHistorySize` transactions
// completed and none of the transactions were large enough to suggest
// we are still in the midst of a backfill, the acknowledgement delay
// may be applicable.
delayBasedOnStoredHistory := len(l.storedHistory) >= storedHistorySize && !slices.ContainsFunc(l.storedHistory, func(n int) bool {
return n >= storeBackfillThreshold
})

if !delayBasedOnStoredHistory {
ll.Info("not delaying commit acknowledgement based on stored history")
} else if d <= 0 {
ll.Info("not delaying commit acknowledgement since current time is after next scheduled acknowledgement")
} else {
l.handler(startedAckDelay)
ll.WithField("delay", d.Truncate(time.Second).String()).Info("delaying before acknowledging commit")
ll.WithField("delay", d.String()).Info("delaying before acknowledging commit")

select {
case <-l.ctx.Done():
Expand Down

0 comments on commit 391080d

Please sign in to comment.