diff --git a/cmd/check_data.go b/cmd/check_data.go index d6c2b7fa..2277ffdd 100644 --- a/cmd/check_data.go +++ b/cmd/check_data.go @@ -142,6 +142,10 @@ func runCheckDataCmd(cmd *cobra.Command, args []string) error { return dataTester.WatchEndConditions(ctx) }) + g.Go(func() error { + return dataTester.StartReconcilerCountUpdater(ctx) + }) + g.Go(func() error { return tester.LogMemoryLoop(ctx) }) diff --git a/pkg/processor/reconciler_handler.go b/pkg/processor/reconciler_handler.go index 2d74ba11..6ce94f44 100644 --- a/pkg/processor/reconciler_handler.go +++ b/pkg/processor/reconciler_handler.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "math/big" + "sync" + "time" "github.com/coinbase/rosetta-cli/pkg/logger" "github.com/coinbase/rosetta-cli/pkg/results" @@ -27,8 +29,22 @@ import ( "github.com/coinbase/rosetta-sdk-go/types" ) +const ( + updateFrequency = 10 * time.Second +) + var _ reconciler.Handler = (*ReconcilerHandler)(nil) +var ( + countKeys = []string{ + storage.FailedReconciliationCounter, + storage.SkippedReconciliationsCounter, + storage.ExemptReconciliationCounter, + storage.ActiveReconciliationCounter, + storage.InactiveReconciliationCounter, + } +) + // ReconcilerHandler implements the Reconciler.Handler interface. type ReconcilerHandler struct { logger *logger.Logger @@ -40,6 +56,9 @@ type ReconcilerHandler struct { InactiveFailureBlock *types.BlockIdentifier ActiveFailureBlock *types.BlockIdentifier + + counterLock sync.Mutex + counts map[string]int64 } // NewReconcilerHandler creates a new ReconcilerHandler. @@ -49,12 +68,55 @@ func NewReconcilerHandler( balanceStorage *storage.BalanceStorage, haltOnReconciliationError bool, ) *ReconcilerHandler { + counts := map[string]int64{} + for _, key := range countKeys { + counts[key] = 0 + } + return &ReconcilerHandler{ logger: logger, counterStorage: counterStorage, balanceStorage: balanceStorage, haltOnReconciliationError: haltOnReconciliationError, + counts: counts, + } +} + +// Updater periodically updates storage with cached counts. +func (h *ReconcilerHandler) Updater(ctx context.Context) error { + tc := time.NewTicker(updateFrequency) + defer tc.Stop() + + for { + select { + case <-tc.C: + if err := h.UpdateCounts(ctx); err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// UpdateCounts forces cached counts to be written to storage. +func (h *ReconcilerHandler) UpdateCounts(ctx context.Context) error { + for _, key := range countKeys { + h.counterLock.Lock() + count := h.counts[key] + h.counts[key] = 0 + h.counterLock.Unlock() + + if count == 0 { + continue + } + + if _, err := h.counterStorage.Update(ctx, key, big.NewInt(count)); err != nil { + return err + } } + + return nil } // ReconciliationFailed is called each time a reconciliation fails. @@ -69,7 +131,9 @@ func (h *ReconcilerHandler) ReconciliationFailed( liveBalance string, block *types.BlockIdentifier, ) error { - _, _ = h.counterStorage.Update(ctx, storage.FailedReconciliationCounter, big.NewInt(1)) + h.counterLock.Lock() + h.counts[storage.FailedReconciliationCounter]++ + h.counterLock.Unlock() err := h.logger.ReconcileFailureStream( ctx, @@ -85,6 +149,9 @@ func (h *ReconcilerHandler) ReconciliationFailed( } if h.haltOnReconciliationError { + // Update counts before exiting + _ = h.UpdateCounts(ctx) + if reconciliationType == reconciler.InactiveReconciliation { // Populate inactive failure information so we can try to find block with // missing ops. @@ -134,7 +201,9 @@ func (h *ReconcilerHandler) ReconciliationExempt( block *types.BlockIdentifier, exemption *types.BalanceExemption, ) error { - _, _ = h.counterStorage.Update(ctx, storage.ExemptReconciliationCounter, big.NewInt(1)) + h.counterLock.Lock() + h.counts[storage.ExemptReconciliationCounter]++ + h.counterLock.Unlock() // Although the reconciliation was exempt (non-zero difference that was ignored), // we still mark the account as being reconciled because the balance was in the range @@ -154,7 +223,9 @@ func (h *ReconcilerHandler) ReconciliationSkipped( currency *types.Currency, cause string, ) error { - _, _ = h.counterStorage.Update(ctx, storage.SkippedReconciliationsCounter, big.NewInt(1)) + h.counterLock.Lock() + h.counts[storage.SkippedReconciliationsCounter]++ + h.counterLock.Unlock() return nil } @@ -174,7 +245,9 @@ func (h *ReconcilerHandler) ReconciliationSucceeded( counter = storage.InactiveReconciliationCounter } - _, _ = h.counterStorage.Update(ctx, counter, big.NewInt(1)) + h.counterLock.Lock() + h.counts[counter]++ + h.counterLock.Unlock() if err := h.balanceStorage.Reconciled(ctx, account, currency, block); err != nil { return fmt.Errorf("%w: unable to store updated reconciliation", err) diff --git a/pkg/tester/data.go b/pkg/tester/data.go index 068b30ba..c851d8f7 100644 --- a/pkg/tester/data.go +++ b/pkg/tester/data.go @@ -372,6 +372,14 @@ func (t *DataTester) StartPruning( return t.syncer.Prune(ctx, t) } +// StartReconcilerCountUpdater attempts to periodically +// write cached reconciler count updates to storage. +func (t *DataTester) StartReconcilerCountUpdater( + ctx context.Context, +) error { + return t.reconcilerHandler.Updater(ctx) +} + // PruneableIndex is the index that is // safe for pruning. func (t *DataTester) PruneableIndex( @@ -712,6 +720,12 @@ func (t *DataTester) WaitForEmptyQueue( return ctx.Err() case <-tc.C: + // We force cached counts to be written before + // determining if we should exit. + if err := t.reconcilerHandler.UpdateCounts(ctx); err != nil { + return err + } + nowComplete, err := t.CompleteReconciliations(ctx) if err != nil { return err