From c908e582f05d3b927f1b76dcb0eda98237dde59b Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 11:40:28 -0600 Subject: [PATCH 1/3] Add cached count updates --- pkg/processor/reconciler_handler.go | 81 +++++++++++++++++++++++++++-- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/pkg/processor/reconciler_handler.go b/pkg/processor/reconciler_handler.go index 2d74ba11..6ce94f44 100644 --- a/pkg/processor/reconciler_handler.go +++ b/pkg/processor/reconciler_handler.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "math/big" + "sync" + "time" "github.com/coinbase/rosetta-cli/pkg/logger" "github.com/coinbase/rosetta-cli/pkg/results" @@ -27,8 +29,22 @@ import ( "github.com/coinbase/rosetta-sdk-go/types" ) +const ( + updateFrequency = 10 * time.Second +) + var _ reconciler.Handler = (*ReconcilerHandler)(nil) +var ( + countKeys = []string{ + storage.FailedReconciliationCounter, + storage.SkippedReconciliationsCounter, + storage.ExemptReconciliationCounter, + storage.ActiveReconciliationCounter, + storage.InactiveReconciliationCounter, + } +) + // ReconcilerHandler implements the Reconciler.Handler interface. type ReconcilerHandler struct { logger *logger.Logger @@ -40,6 +56,9 @@ type ReconcilerHandler struct { InactiveFailureBlock *types.BlockIdentifier ActiveFailureBlock *types.BlockIdentifier + + counterLock sync.Mutex + counts map[string]int64 } // NewReconcilerHandler creates a new ReconcilerHandler. @@ -49,12 +68,55 @@ func NewReconcilerHandler( balanceStorage *storage.BalanceStorage, haltOnReconciliationError bool, ) *ReconcilerHandler { + counts := map[string]int64{} + for _, key := range countKeys { + counts[key] = 0 + } + return &ReconcilerHandler{ logger: logger, counterStorage: counterStorage, balanceStorage: balanceStorage, haltOnReconciliationError: haltOnReconciliationError, + counts: counts, + } +} + +// Updater periodically updates storage with cached counts. +func (h *ReconcilerHandler) Updater(ctx context.Context) error { + tc := time.NewTicker(updateFrequency) + defer tc.Stop() + + for { + select { + case <-tc.C: + if err := h.UpdateCounts(ctx); err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// UpdateCounts forces cached counts to be written to storage. +func (h *ReconcilerHandler) UpdateCounts(ctx context.Context) error { + for _, key := range countKeys { + h.counterLock.Lock() + count := h.counts[key] + h.counts[key] = 0 + h.counterLock.Unlock() + + if count == 0 { + continue + } + + if _, err := h.counterStorage.Update(ctx, key, big.NewInt(count)); err != nil { + return err + } } + + return nil } // ReconciliationFailed is called each time a reconciliation fails. @@ -69,7 +131,9 @@ func (h *ReconcilerHandler) ReconciliationFailed( liveBalance string, block *types.BlockIdentifier, ) error { - _, _ = h.counterStorage.Update(ctx, storage.FailedReconciliationCounter, big.NewInt(1)) + h.counterLock.Lock() + h.counts[storage.FailedReconciliationCounter]++ + h.counterLock.Unlock() err := h.logger.ReconcileFailureStream( ctx, @@ -85,6 +149,9 @@ func (h *ReconcilerHandler) ReconciliationFailed( } if h.haltOnReconciliationError { + // Update counts before exiting + _ = h.UpdateCounts(ctx) + if reconciliationType == reconciler.InactiveReconciliation { // Populate inactive failure information so we can try to find block with // missing ops. @@ -134,7 +201,9 @@ func (h *ReconcilerHandler) ReconciliationExempt( block *types.BlockIdentifier, exemption *types.BalanceExemption, ) error { - _, _ = h.counterStorage.Update(ctx, storage.ExemptReconciliationCounter, big.NewInt(1)) + h.counterLock.Lock() + h.counts[storage.ExemptReconciliationCounter]++ + h.counterLock.Unlock() // Although the reconciliation was exempt (non-zero difference that was ignored), // we still mark the account as being reconciled because the balance was in the range @@ -154,7 +223,9 @@ func (h *ReconcilerHandler) ReconciliationSkipped( currency *types.Currency, cause string, ) error { - _, _ = h.counterStorage.Update(ctx, storage.SkippedReconciliationsCounter, big.NewInt(1)) + h.counterLock.Lock() + h.counts[storage.SkippedReconciliationsCounter]++ + h.counterLock.Unlock() return nil } @@ -174,7 +245,9 @@ func (h *ReconcilerHandler) ReconciliationSucceeded( counter = storage.InactiveReconciliationCounter } - _, _ = h.counterStorage.Update(ctx, counter, big.NewInt(1)) + h.counterLock.Lock() + h.counts[counter]++ + h.counterLock.Unlock() if err := h.balanceStorage.Reconciled(ctx, account, currency, block); err != nil { return fmt.Errorf("%w: unable to store updated reconciliation", err) From 8d2b9584bc585f921f0be707d20a0c7a51e740ce Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 11:43:39 -0600 Subject: [PATCH 2/3] Start count updater in check:data --- cmd/check_data.go | 4 ++++ pkg/tester/data.go | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/cmd/check_data.go b/cmd/check_data.go index d6c2b7fa..2277ffdd 100644 --- a/cmd/check_data.go +++ b/cmd/check_data.go @@ -142,6 +142,10 @@ func runCheckDataCmd(cmd *cobra.Command, args []string) error { return dataTester.WatchEndConditions(ctx) }) + g.Go(func() error { + return dataTester.StartReconcilerCountUpdater(ctx) + }) + g.Go(func() error { return tester.LogMemoryLoop(ctx) }) diff --git a/pkg/tester/data.go b/pkg/tester/data.go index 068b30ba..8cdb4ee2 100644 --- a/pkg/tester/data.go +++ b/pkg/tester/data.go @@ -372,6 +372,14 @@ func (t *DataTester) StartPruning( return t.syncer.Prune(ctx, t) } +// StartReconcilerCountUpdater attempts to periodically +// write cached reconciler count updates to storage. +func (t *DataTester) StartReconcilerCountUpdater( + ctx context.Context, +) error { + return t.reconcilerHandler.Updater(ctx) +} + // PruneableIndex is the index that is // safe for pruning. func (t *DataTester) PruneableIndex( From 16331d917e44be8b269a8a46581904c034d93fdd Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 11:45:07 -0600 Subject: [PATCH 3/3] invoke count update before exiting --- pkg/tester/data.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/tester/data.go b/pkg/tester/data.go index 8cdb4ee2..c851d8f7 100644 --- a/pkg/tester/data.go +++ b/pkg/tester/data.go @@ -720,6 +720,12 @@ func (t *DataTester) WaitForEmptyQueue( return ctx.Err() case <-tc.C: + // We force cached counts to be written before + // determining if we should exit. + if err := t.reconcilerHandler.UpdateCounts(ctx); err != nil { + return err + } + nowComplete, err := t.CompleteReconciliations(ctx) if err != nil { return err