Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reconciler] Lookup by Index + Orphan Race Condition Fix #177

Merged
merged 8 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,6 @@ var versionCmd = &cobra.Command{
Use: "version",
Short: "Print rosetta-cli version",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("v0.5.18")
fmt.Println("v0.5.19")
},
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/coinbase/rosetta-cli
go 1.13

require (
github.com/coinbase/rosetta-sdk-go v0.5.8
github.com/coinbase/rosetta-sdk-go v0.5.9-0.20201029210921-d7499a34c1f6
github.com/fatih/color v1.9.0
github.com/olekukonko/tablewriter v0.0.2-0.20190409134802-7e037d187b0c
github.com/spf13/cobra v1.1.1
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,8 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U=
github.com/coinbase/rosetta-sdk-go v0.5.7 h1:BaR/+O3GzrsyunVNkVQHtjDCcId8G1Fh/RqEbeyExnk=
github.com/coinbase/rosetta-sdk-go v0.5.7/go.mod h1:l5aNeyeZKBkmWbVdkdLpWdToQ6hTwI7cZ1OU9cMbljY=
github.com/coinbase/rosetta-sdk-go v0.5.8-0.20201027222031-dd9e29377d5f h1:aWkN9dKMkMMpZKX5QycpePxH176Fj2fNNC7jESfLZw0=
github.com/coinbase/rosetta-sdk-go v0.5.8-0.20201027222031-dd9e29377d5f/go.mod h1:l5aNeyeZKBkmWbVdkdLpWdToQ6hTwI7cZ1OU9cMbljY=
github.com/coinbase/rosetta-sdk-go v0.5.8 h1:Sf7iQPjexIsa7zQfx0PdRIlfsjrDPfBuzyxbJogZDqw=
github.com/coinbase/rosetta-sdk-go v0.5.8/go.mod h1:xd4wYUhV3LkY78SPH8BUhc88rXfn2jYgN9BfiSjbcvM=
github.com/coinbase/rosetta-sdk-go v0.5.9-0.20201029210921-d7499a34c1f6 h1:5zDhG7QoXRf2GtNeDqfnUdAe5gm1fdKy0h5Txa7NHec=
github.com/coinbase/rosetta-sdk-go v0.5.9-0.20201029210921-d7499a34c1f6/go.mod h1:xd4wYUhV3LkY78SPH8BUhc88rXfn2jYgN9BfiSjbcvM=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down Expand Up @@ -375,8 +371,6 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.8 h1:R2L6zPq1pWFumpeIxAJoeiov5GxyEZUq9NyS8eus/6s=
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.8/go.mod h1:HVxBVPUK/+fZMonk4bi1islLa8V3cfnBug0+4dykPzo=
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.9 h1:iBRIniTnWOo0kqkg3k3XR8Vn6OCkVlIuZNo0UoBrKx4=
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.9/go.mod h1:HVxBVPUK/+fZMonk4bi1islLa8V3cfnBug0+4dykPzo=
github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vbd1qPqc=
Expand Down
6 changes: 3 additions & 3 deletions pkg/processor/balance_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ type BalanceStorageHandler struct {
reconciler *reconciler.Reconciler

reconcile bool
interestingAccount *reconciler.AccountCurrency
interestingAccount *types.AccountCurrency
}

// NewBalanceStorageHandler returns a new *BalanceStorageHandler.
func NewBalanceStorageHandler(
logger *logger.Logger,
reconciler *reconciler.Reconciler,
reconcile bool,
interestingAccount *reconciler.AccountCurrency,
interestingAccount *types.AccountCurrency,
) *BalanceStorageHandler {
return &BalanceStorageHandler{
logger: logger,
Expand Down Expand Up @@ -73,7 +73,7 @@ func (h *BalanceStorageHandler) BlockAdded(
if h.interestingAccount != nil {
var interestingChange *parser.BalanceChange
for _, change := range changes {
if types.Hash(&reconciler.AccountCurrency{
if types.Hash(&types.AccountCurrency{
Account: change.Account,
Currency: change.Currency,
}) == types.Hash(h.interestingAccount) {
Expand Down
18 changes: 12 additions & 6 deletions pkg/processor/balance_storage_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/coinbase/rosetta-sdk-go/asserter"
"github.com/coinbase/rosetta-sdk-go/fetcher"
"github.com/coinbase/rosetta-sdk-go/parser"
"github.com/coinbase/rosetta-sdk-go/reconciler"
"github.com/coinbase/rosetta-sdk-go/storage"
"github.com/coinbase/rosetta-sdk-go/syncer"
"github.com/coinbase/rosetta-sdk-go/types"
"github.com/coinbase/rosetta-sdk-go/utils"
)
Expand Down Expand Up @@ -51,7 +51,7 @@ func NewBalanceStorageHelper(
network *types.NetworkIdentifier,
fetcher *fetcher.Fetcher,
lookupBalanceByBlock bool,
exemptAccounts []*reconciler.AccountCurrency,
exemptAccounts []*types.AccountCurrency,
interestingOnly bool,
balanceExemptions []*types.BalanceExemption,
initialFetchDisabled bool,
Expand Down Expand Up @@ -84,7 +84,7 @@ func (h *BalanceStorageHelper) AccountBalance(
ctx context.Context,
account *types.AccountIdentifier,
currency *types.Currency,
block *types.BlockIdentifier,
lookupBlock *types.BlockIdentifier,
) (*types.Amount, error) {
if !h.lookupBalanceByBlock || h.initialFetchDisabled {
return &types.Amount{
Expand All @@ -96,18 +96,24 @@ func (h *BalanceStorageHelper) AccountBalance(
// In the case that we are syncing from arbitrary height,
// we may need to recover the balance of an account to
// perform validations.
amount, _, _, err := utils.CurrencyBalance(
amount, block, _, err := utils.CurrencyBalance(
ctx,
h.network,
h.fetcher,
account,
currency,
block,
lookupBlock.Index,
)
if err != nil {
return nil, fmt.Errorf("%w: unable to get currency balance", err)
}

// If the returned balance block does not match the intended
// block a re-org could've occurred.
if types.Hash(lookupBlock) != types.Hash(block) {
return nil, syncer.ErrOrphanHead
}

return &types.Amount{
Value: amount.Value,
Currency: currency,
Expand All @@ -134,7 +140,7 @@ func (h *BalanceStorageHelper) ExemptFunc() parser.ExemptOperation {
}
}

thisAcct := types.Hash(&reconciler.AccountCurrency{
thisAcct := types.Hash(&types.AccountCurrency{
Account: op.Account,
Currency: op.Amount.Currency,
})
Expand Down
9 changes: 4 additions & 5 deletions pkg/processor/balance_storage_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ package processor
import (
"testing"

"github.com/coinbase/rosetta-sdk-go/reconciler"
"github.com/coinbase/rosetta-sdk-go/types"
"github.com/stretchr/testify/assert"
)

var (
opAmountCurrency = &reconciler.AccountCurrency{
opAmountCurrency = &types.AccountCurrency{
Account: &types.AccountIdentifier{
Address: "hello",
},
Expand All @@ -36,12 +35,12 @@ var (

func TestExemptFuncExemptAccounts(t *testing.T) {
var tests = map[string]struct {
exemptAccounts []*reconciler.AccountCurrency
exemptAccounts []*types.AccountCurrency
exempt bool
}{
"no exempt accounts": {},
"account not exempt": {
exemptAccounts: []*reconciler.AccountCurrency{
exemptAccounts: []*types.AccountCurrency{
{
Account: &types.AccountIdentifier{
Address: "addr1",
Expand All @@ -57,7 +56,7 @@ func TestExemptFuncExemptAccounts(t *testing.T) {
},
},
"account is exempt": {
exemptAccounts: []*reconciler.AccountCurrency{
exemptAccounts: []*types.AccountCurrency{
{
Account: &types.AccountIdentifier{
Address: "addr1",
Expand Down
4 changes: 2 additions & 2 deletions pkg/processor/reconciler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type ReconcilerHandler struct {
balanceStorage *storage.BalanceStorage
haltOnReconciliationError bool

InactiveFailure *reconciler.AccountCurrency
InactiveFailure *types.AccountCurrency
InactiveFailureBlock *types.BlockIdentifier

ActiveFailureBlock *types.BlockIdentifier
Expand Down Expand Up @@ -88,7 +88,7 @@ func (h *ReconcilerHandler) ReconciliationFailed(
if reconciliationType == reconciler.InactiveReconciliation {
// Populate inactive failure information so we can try to find block with
// missing ops.
h.InactiveFailure = &reconciler.AccountCurrency{
h.InactiveFailure = &types.AccountCurrency{
Account: account,
Currency: currency,
}
Expand Down
25 changes: 19 additions & 6 deletions pkg/processor/reconciler_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ReconcilerHelper struct {
network *types.NetworkIdentifier
fetcher *fetcher.Fetcher

database storage.Database
blockStorage *storage.BlockStorage
balanceStorage *storage.BalanceStorage
}
Expand All @@ -40,63 +41,75 @@ type ReconcilerHelper struct {
func NewReconcilerHelper(
network *types.NetworkIdentifier,
fetcher *fetcher.Fetcher,
database storage.Database,
blockStorage *storage.BlockStorage,
balanceStorage *storage.BalanceStorage,
) *ReconcilerHelper {
return &ReconcilerHelper{
network: network,
fetcher: fetcher,
database: database,
blockStorage: blockStorage,
balanceStorage: balanceStorage,
}
}

// DatabaseTransaction returns a new read-only storage.DatabaseTransaction.
func (h *ReconcilerHelper) DatabaseTransaction(
ctx context.Context,
) storage.DatabaseTransaction {
return h.database.NewDatabaseTransaction(ctx, false)
}

// CanonicalBlock returns a boolean indicating if a block
// is in the canonical chain. This is necessary to reconcile across
// reorgs. If the block returned on an account balance fetch
// does not exist, reconciliation will be skipped.
func (h *ReconcilerHelper) CanonicalBlock(
ctx context.Context,
dbTx storage.DatabaseTransaction,
block *types.BlockIdentifier,
) (bool, error) {
return h.blockStorage.CanonicalBlock(ctx, block)
return h.blockStorage.CanonicalBlockTransactional(ctx, block, dbTx)
}

// CurrentBlock returns the last processed block and is used
// to determine which block to check account balances at during
// inactive reconciliation.
func (h *ReconcilerHelper) CurrentBlock(
ctx context.Context,
dbTx storage.DatabaseTransaction,
) (*types.BlockIdentifier, error) {
return h.blockStorage.GetHeadBlockIdentifier(ctx)
return h.blockStorage.GetHeadBlockIdentifierTransactional(ctx, dbTx)
}

// ComputedBalance returns the balance of an account in block storage.
// It is necessary to perform this check outside of the Reconciler
// package to allow for separation from a default storage backend.
func (h *ReconcilerHelper) ComputedBalance(
ctx context.Context,
dbTx storage.DatabaseTransaction,
account *types.AccountIdentifier,
currency *types.Currency,
headBlock *types.BlockIdentifier,
index int64,
) (*types.Amount, error) {
return h.balanceStorage.GetOrSetBalance(ctx, account, currency, headBlock)
return h.balanceStorage.GetBalanceTransactional(ctx, dbTx, account, currency, index)
}

// LiveBalance returns the live balance of an account.
func (h *ReconcilerHelper) LiveBalance(
ctx context.Context,
account *types.AccountIdentifier,
currency *types.Currency,
headBlock *types.BlockIdentifier,
index int64,
) (*types.Amount, *types.BlockIdentifier, error) {
amt, block, _, err := utils.CurrencyBalance(
ctx,
h.network,
h.fetcher,
account,
currency,
headBlock,
index,
)
if err != nil {
return nil, nil, err
Expand Down
22 changes: 19 additions & 3 deletions pkg/results/data_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func ComputeCheckDataProgress(
fetcher *fetcher.Fetcher,
network *types.NetworkIdentifier,
counters *storage.CounterStorage,
blockStorage *storage.BlockStorage,
reconciler *reconciler.Reconciler,
) *CheckDataProgress {
networkStatus, fetchErr := fetcher.NetworkStatusRetry(ctx, network, nil)
Expand All @@ -289,6 +290,17 @@ func ComputeCheckDataProgress(
}
tipIndex := networkStatus.CurrentBlockIdentifier.Index

// Get current tip in the case that re-orgs occurred
// or a custom start index was provied.
headBlock, err := blockStorage.GetHeadBlockIdentifier(ctx)
if errors.Is(err, storage.ErrHeadBlockNotFound) {
return nil
}
if err != nil {
fmt.Printf("%s: cannot get head block", err.Error())
return nil
}

blocks, err := counters.Get(ctx, storage.BlockCounter)
if err != nil {
fmt.Printf("%s: cannot get block counter", err.Error())
Expand All @@ -305,6 +317,8 @@ func ComputeCheckDataProgress(
return nil
}

// adjustedBlocks is used to calculate the sync rate (regardless
// of which block we started syncing at)
adjustedBlocks := blocks.Int64() - orphans.Int64()
if tipIndex-adjustedBlocks <= 0 { // return if no blocks to sync
return nil
Expand All @@ -322,15 +336,15 @@ func ComputeCheckDataProgress(

blocksPerSecond := new(big.Float).Quo(new(big.Float).SetInt64(adjustedBlocks), new(big.Float).SetInt(elapsedTime))
blocksPerSecondFloat, _ := blocksPerSecond.Float64()
blocksSynced := new(big.Float).Quo(new(big.Float).SetInt64(adjustedBlocks), new(big.Float).SetInt64(tipIndex))
blocksSynced := new(big.Float).Quo(new(big.Float).SetInt64(headBlock.Index), new(big.Float).SetInt64(tipIndex))
blocksSyncedFloat, _ := blocksSynced.Float64()

return &CheckDataProgress{
Blocks: adjustedBlocks,
Blocks: headBlock.Index,
Tip: tipIndex,
Completed: blocksSyncedFloat * utils.OneHundred,
Rate: blocksPerSecondFloat,
TimeRemaining: utils.TimeToTip(blocksPerSecondFloat, adjustedBlocks, tipIndex).String(),
TimeRemaining: utils.TimeToTip(blocksPerSecondFloat, headBlock.Index, tipIndex).String(),
ReconcilerQueueSize: reconciler.QueueSize(),
ReconcilerLastIndex: reconciler.LastIndexReconciled(),
}
Expand All @@ -347,6 +361,7 @@ type CheckDataStatus struct {
// *CheckDataStatus.
func ComputeCheckDataStatus(
ctx context.Context,
blocks *storage.BlockStorage,
counters *storage.CounterStorage,
balances *storage.BalanceStorage,
fetcher *fetcher.Fetcher,
Expand All @@ -364,6 +379,7 @@ func ComputeCheckDataStatus(
fetcher,
network,
counters,
blocks,
reconciler,
),
}
Expand Down
Loading