Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg] Async Reconciler Count Updates #192

Merged
merged 3 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/check_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func runCheckDataCmd(cmd *cobra.Command, args []string) error {
return dataTester.WatchEndConditions(ctx)
})

g.Go(func() error {
return dataTester.StartReconcilerCountUpdater(ctx)
})

g.Go(func() error {
return tester.LogMemoryLoop(ctx)
})
Expand Down
81 changes: 77 additions & 4 deletions pkg/processor/reconciler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"math/big"
"sync"
"time"

"github.com/coinbase/rosetta-cli/pkg/logger"
"github.com/coinbase/rosetta-cli/pkg/results"
Expand All @@ -27,8 +29,22 @@ import (
"github.com/coinbase/rosetta-sdk-go/types"
)

const (
updateFrequency = 10 * time.Second
)

var _ reconciler.Handler = (*ReconcilerHandler)(nil)

var (
countKeys = []string{
storage.FailedReconciliationCounter,
storage.SkippedReconciliationsCounter,
storage.ExemptReconciliationCounter,
storage.ActiveReconciliationCounter,
storage.InactiveReconciliationCounter,
}
)

// ReconcilerHandler implements the Reconciler.Handler interface.
type ReconcilerHandler struct {
logger *logger.Logger
Expand All @@ -40,6 +56,9 @@ type ReconcilerHandler struct {
InactiveFailureBlock *types.BlockIdentifier

ActiveFailureBlock *types.BlockIdentifier

counterLock sync.Mutex
counts map[string]int64
}

// NewReconcilerHandler creates a new ReconcilerHandler.
Expand All @@ -49,12 +68,55 @@ func NewReconcilerHandler(
balanceStorage *storage.BalanceStorage,
haltOnReconciliationError bool,
) *ReconcilerHandler {
counts := map[string]int64{}
for _, key := range countKeys {
counts[key] = 0
}

return &ReconcilerHandler{
logger: logger,
counterStorage: counterStorage,
balanceStorage: balanceStorage,
haltOnReconciliationError: haltOnReconciliationError,
counts: counts,
}
}

// Updater periodically updates storage with cached counts.
func (h *ReconcilerHandler) Updater(ctx context.Context) error {
tc := time.NewTicker(updateFrequency)
defer tc.Stop()

for {
select {
case <-tc.C:
if err := h.UpdateCounts(ctx); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// UpdateCounts forces cached counts to be written to storage.
func (h *ReconcilerHandler) UpdateCounts(ctx context.Context) error {
for _, key := range countKeys {
h.counterLock.Lock()
count := h.counts[key]
h.counts[key] = 0
h.counterLock.Unlock()

if count == 0 {
continue
}

if _, err := h.counterStorage.Update(ctx, key, big.NewInt(count)); err != nil {
return err
}
}

return nil
}

// ReconciliationFailed is called each time a reconciliation fails.
Expand All @@ -69,7 +131,9 @@ func (h *ReconcilerHandler) ReconciliationFailed(
liveBalance string,
block *types.BlockIdentifier,
) error {
_, _ = h.counterStorage.Update(ctx, storage.FailedReconciliationCounter, big.NewInt(1))
h.counterLock.Lock()
h.counts[storage.FailedReconciliationCounter]++
h.counterLock.Unlock()

err := h.logger.ReconcileFailureStream(
ctx,
Expand All @@ -85,6 +149,9 @@ func (h *ReconcilerHandler) ReconciliationFailed(
}

if h.haltOnReconciliationError {
// Update counts before exiting
_ = h.UpdateCounts(ctx)

if reconciliationType == reconciler.InactiveReconciliation {
// Populate inactive failure information so we can try to find block with
// missing ops.
Expand Down Expand Up @@ -134,7 +201,9 @@ func (h *ReconcilerHandler) ReconciliationExempt(
block *types.BlockIdentifier,
exemption *types.BalanceExemption,
) error {
_, _ = h.counterStorage.Update(ctx, storage.ExemptReconciliationCounter, big.NewInt(1))
h.counterLock.Lock()
h.counts[storage.ExemptReconciliationCounter]++
h.counterLock.Unlock()

// Although the reconciliation was exempt (non-zero difference that was ignored),
// we still mark the account as being reconciled because the balance was in the range
Expand All @@ -154,7 +223,9 @@ func (h *ReconcilerHandler) ReconciliationSkipped(
currency *types.Currency,
cause string,
) error {
_, _ = h.counterStorage.Update(ctx, storage.SkippedReconciliationsCounter, big.NewInt(1))
h.counterLock.Lock()
h.counts[storage.SkippedReconciliationsCounter]++
h.counterLock.Unlock()

return nil
}
Expand All @@ -174,7 +245,9 @@ func (h *ReconcilerHandler) ReconciliationSucceeded(
counter = storage.InactiveReconciliationCounter
}

_, _ = h.counterStorage.Update(ctx, counter, big.NewInt(1))
h.counterLock.Lock()
h.counts[counter]++
h.counterLock.Unlock()

if err := h.balanceStorage.Reconciled(ctx, account, currency, block); err != nil {
return fmt.Errorf("%w: unable to store updated reconciliation", err)
Expand Down
14 changes: 14 additions & 0 deletions pkg/tester/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,14 @@ func (t *DataTester) StartPruning(
return t.syncer.Prune(ctx, t)
}

// StartReconcilerCountUpdater attempts to periodically
// write cached reconciler count updates to storage.
func (t *DataTester) StartReconcilerCountUpdater(
ctx context.Context,
) error {
return t.reconcilerHandler.Updater(ctx)
}

// PruneableIndex is the index that is
// safe for pruning.
func (t *DataTester) PruneableIndex(
Expand Down Expand Up @@ -712,6 +720,12 @@ func (t *DataTester) WaitForEmptyQueue(
return ctx.Err()

case <-tc.C:
// We force cached counts to be written before
// determining if we should exit.
if err := t.reconcilerHandler.UpdateCounts(ctx); err != nil {
return err
}

nowComplete, err := t.CompleteReconciliations(ctx)
if err != nil {
return err
Expand Down