Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Omitted Blocks #98

Merged
merged 4 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/coinbase/rosetta-cli
go 1.13

require (
github.com/coinbase/rosetta-sdk-go v0.3.4-0.20200807162047-31075a509b1f
github.com/coinbase/rosetta-sdk-go v0.3.4
github.com/dgraph-io/badger/v2 v2.0.3
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/ethereum/go-ethereum v1.9.18
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U=
github.com/coinbase/rosetta-sdk-go v0.3.4-0.20200807162047-31075a509b1f h1:U69ZwbTR10diY1MDi9LP/RxetVJzjd4bvIDUEs8XYvk=
github.com/coinbase/rosetta-sdk-go v0.3.4-0.20200807162047-31075a509b1f/go.mod h1:Q6dAY0kdG2X3jNaIYnkxnZOb8XEZQar9Q1RcnBgm/wQ=
github.com/coinbase/rosetta-sdk-go v0.3.4 h1:jWKgajozco/T0FNnZb2TqBsmsUoF6ZuCLnUJkEE+vNg=
github.com/coinbase/rosetta-sdk-go v0.3.4/go.mod h1:Q6dAY0kdG2X3jNaIYnkxnZOb8XEZQar9Q1RcnBgm/wQ=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/reconciler_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (h *ReconcilerHelper) BlockExists(
ctx context.Context,
block *types.BlockIdentifier,
) (bool, error) {
_, err := h.blockStorage.GetBlock(ctx, block)
_, err := h.blockStorage.GetBlock(ctx, types.ConstructPartialBlockIdentifier(block))
if err == nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: feels like it's more idiomatic to evaluate if err != nil first

        _, err := h.blockStorage.GetBlock(ctx, types.ConstructPartialBlockIdentifier(block))
	if err != nil {
		if errors.Is(err, storage.ErrBlockNotFound) {
			return false, nil
		}

		return false, err
	}

	return true, nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I agree it is more idiomatic to do if err != nil first, I think it is more preferable to avoid nesting conditionals.

return true, nil
}
Expand Down
161 changes: 101 additions & 60 deletions pkg/storage/block_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ const (
// blockNamespace is prepended to any stored block.
blockNamespace = "block"

// blockHashNamespace is prepended to any stored block hash.
// We cannot just use the stored block key to lookup whether
// a hash has been used before because it is concatenated
// with the index of the stored block.
blockHashNamespace = "block-hash"
// blockIndexNamespace is prepended to any stored block index.
blockIndexNamespace = "block-index"

// transactionHashNamespace is prepended to any stored
// transaction hash.
Expand All @@ -55,9 +52,9 @@ var (
// found in BlockStorage.
ErrBlockNotFound = errors.New("block not found")

// ErrDuplicateBlockHash is returned when a block hash
// ErrDuplicateKey is returned when a key
// cannot be stored because it is a duplicate.
ErrDuplicateBlockHash = errors.New("duplicate block hash")
ErrDuplicateKey = errors.New("duplicate key")

// ErrDuplicateTransactionHash is returned when a transaction
// hash cannot be stored because it is a duplicate.
Expand All @@ -68,14 +65,12 @@ func getHeadBlockKey() []byte {
return []byte(headBlockKey)
}

func getBlockKey(blockIdentifier *types.BlockIdentifier) []byte {
return []byte(
fmt.Sprintf("%s/%s/%d", blockNamespace, blockIdentifier.Hash, blockIdentifier.Index),
)
func getBlockHashKey(hash string) []byte {
return []byte(fmt.Sprintf("%s/%s", blockNamespace, hash))
}

func getBlockHashKey(blockIdentifier *types.BlockIdentifier) []byte {
return []byte(fmt.Sprintf("%s/%s", blockHashNamespace, blockIdentifier.Hash))
func getBlockIndexKey(index int64) []byte {
return []byte(fmt.Sprintf("%s/%d", blockIndexNamespace, index))
}

func getTransactionHashKey(transactionIdentifier *types.TransactionIdentifier) []byte {
Expand Down Expand Up @@ -174,18 +169,42 @@ func (b *BlockStorage) StoreHeadBlockIdentifier(
// GetBlock returns a block, if it exists.
func (b *BlockStorage) GetBlock(
ctx context.Context,
blockIdentifier *types.BlockIdentifier,
blockIdentifier *types.PartialBlockIdentifier,
) (*types.Block, error) {
transaction := b.db.NewDatabaseTransaction(ctx, false)
defer transaction.Discard(ctx)

exists, block, err := transaction.Get(ctx, getBlockKey(blockIdentifier))
var exists bool
var block []byte
var err error
switch {
case blockIdentifier == nil || (blockIdentifier.Hash == nil && blockIdentifier.Index == nil):
// Get current block when no blockIdentifier is provided
var head *types.BlockIdentifier
head, err = b.GetHeadBlockIdentifierTransactional(ctx, transaction)
if err != nil {
return nil, fmt.Errorf("%w: cannot get head block identifier", err)
}

exists, block, err = transaction.Get(ctx, getBlockHashKey(head.Hash))
case blockIdentifier.Hash != nil:
// Get block by hash if provided
exists, block, err = transaction.Get(ctx, getBlockHashKey(*blockIdentifier.Hash))
default:
// Get block by index if hash not provided
var blockKey []byte
exists, blockKey, err = transaction.Get(ctx, getBlockIndexKey(*blockIdentifier.Index))
if exists {
exists, block, err = transaction.Get(ctx, blockKey)
}
}

if err != nil {
return nil, err
return nil, fmt.Errorf("%w: unable to get block", err)
}

if !exists {
return nil, fmt.Errorf("%w %+v", ErrBlockNotFound, blockIdentifier)
return nil, fmt.Errorf("%w: %+v", ErrBlockNotFound, blockIdentifier)
}

var rosettaBlock types.Block
Expand All @@ -197,33 +216,48 @@ func (b *BlockStorage) GetBlock(
return &rosettaBlock, nil
}

// AddBlock stores a block or returns an error.
func (b *BlockStorage) AddBlock(
func (b *BlockStorage) storeBlock(
ctx context.Context,
transaction DatabaseTransaction,
block *types.Block,
) error {
transaction := b.db.NewDatabaseTransaction(ctx, true)
defer transaction.Discard(ctx)

buf, err := encode(block)
if err != nil {
return err
return fmt.Errorf("%w: unable to encode block", err)
}

// Store block
err = transaction.Set(ctx, getBlockKey(block.BlockIdentifier), buf)
if err != nil {
return err
if err := b.storeUniqueKey(ctx, transaction, getBlockHashKey(block.BlockIdentifier.Hash), buf); err != nil {
return fmt.Errorf("%w: unable to store block", err)
}

if err = b.StoreHeadBlockIdentifier(ctx, transaction, block.BlockIdentifier); err != nil {
return err
if err := b.storeUniqueKey(
ctx,
transaction,
getBlockIndexKey(block.BlockIdentifier.Index),
getBlockHashKey(block.BlockIdentifier.Hash),
); err != nil {
return fmt.Errorf("%w: unable to store block index", err)
}

if err := b.StoreHeadBlockIdentifier(ctx, transaction, block.BlockIdentifier); err != nil {
return fmt.Errorf("%w: unable to update head block identifier", err)
}

// Store block hash
err = b.storeBlockHash(ctx, transaction, block.BlockIdentifier)
return nil
}

// AddBlock stores a block or returns an error.
func (b *BlockStorage) AddBlock(
ctx context.Context,
block *types.Block,
) error {
transaction := b.db.NewDatabaseTransaction(ctx, true)
defer transaction.Discard(ctx)

// Store block
err := b.storeBlock(ctx, transaction, block)
if err != nil {
return fmt.Errorf("%w: unable to store block hash", err)
return fmt.Errorf("%w: unable to store block", err)
}

// Store all transaction hashes
Expand All @@ -242,6 +276,27 @@ func (b *BlockStorage) AddBlock(
return b.callWorkersAndCommit(ctx, block, transaction, true)
}

func (b *BlockStorage) deleteBlock(
ctx context.Context,
transaction DatabaseTransaction,
block *types.Block,
) error {
blockIdentifier := block.BlockIdentifier
if err := transaction.Delete(ctx, getBlockHashKey(blockIdentifier.Hash)); err != nil {
return fmt.Errorf("%w: unable to delete block", err)
}

if err := transaction.Delete(ctx, getBlockIndexKey(blockIdentifier.Index)); err != nil {
return fmt.Errorf("%w: unable to delete block index", err)
}

if err := b.StoreHeadBlockIdentifier(ctx, transaction, block.ParentBlockIdentifier); err != nil {
return fmt.Errorf("%w: unable to update head block identifier", err)
}

return nil
}

// RemoveBlock removes a block or returns an error.
// RemoveBlock also removes the block hash and all
// its transaction hashes to not break duplicate
Expand All @@ -250,7 +305,7 @@ func (b *BlockStorage) RemoveBlock(
ctx context.Context,
blockIdentifier *types.BlockIdentifier,
) error {
block, err := b.GetBlock(ctx, blockIdentifier)
block, err := b.GetBlock(ctx, types.ConstructPartialBlockIdentifier(blockIdentifier))
if err != nil {
return err
}
Expand All @@ -266,19 +321,9 @@ func (b *BlockStorage) RemoveBlock(
}
}

// Remove block hash
err = transaction.Delete(ctx, getBlockHashKey(blockIdentifier))
if err != nil {
return err
}

// Remove block
if err := transaction.Delete(ctx, getBlockKey(blockIdentifier)); err != nil {
return err
}

if err = b.StoreHeadBlockIdentifier(ctx, transaction, block.ParentBlockIdentifier); err != nil {
return err
// Delete block
if err := b.deleteBlock(ctx, transaction, block); err != nil {
return fmt.Errorf("%w: unable to delete block", err)
}

return b.callWorkersAndCommit(ctx, block, transaction, false)
Expand Down Expand Up @@ -348,7 +393,7 @@ func (b *BlockStorage) SetNewStartIndex(
currBlock := head
for currBlock.Index >= startIndex {
log.Printf("Removing block %+v\n", currBlock)
block, err := b.GetBlock(ctx, currBlock)
block, err := b.GetBlock(ctx, types.ConstructPartialBlockIdentifier(currBlock))
if err != nil {
return err
}
Expand All @@ -373,7 +418,7 @@ func (b *BlockStorage) CreateBlockCache(ctx context.Context) []*types.BlockIdent
}

for len(cache) < syncer.PastBlockSize {
block, err := b.GetBlock(ctx, head)
block, err := b.GetBlock(ctx, types.ConstructPartialBlockIdentifier(head))
if err != nil {
return cache
}
Expand All @@ -392,22 +437,22 @@ func (b *BlockStorage) CreateBlockCache(ctx context.Context) []*types.BlockIdent
return cache
}

func (b *BlockStorage) storeBlockHash(
func (b *BlockStorage) storeUniqueKey(
ctx context.Context,
transaction DatabaseTransaction,
block *types.BlockIdentifier,
key []byte,
value []byte,
) error {
hashKey := getBlockHashKey(block)
exists, _, err := transaction.Get(ctx, hashKey)
exists, _, err := transaction.Get(ctx, key)
if err != nil {
return err
}

if exists {
return fmt.Errorf("%w: duplicate block hash %s found", ErrDuplicateBlockHash, block.Hash)
return fmt.Errorf("%w: duplicate key %s found", ErrDuplicateKey, string(key))
}

return transaction.Set(ctx, hashKey, []byte(""))
return transaction.Set(ctx, key, value)
}

func (b *BlockStorage) storeTransactionHash(
Expand Down Expand Up @@ -518,7 +563,7 @@ func (b *BlockStorage) FindTransaction(
}
}

blockExists, block, err := txn.Get(ctx, getBlockKey(newestBlock))
blockExists, block, err := txn.Get(ctx, getBlockHashKey(newestBlock.Hash))
if err != nil {
return nil, nil, fmt.Errorf("%w: unable to query database for block", err)
}
Expand Down Expand Up @@ -554,15 +599,11 @@ func (b *BlockStorage) AtTip(
ctx context.Context,
tipDelay int64,
) (bool, error) {
head, err := b.GetHeadBlockIdentifier(ctx)
block, err := b.GetBlock(ctx, nil)
if errors.Is(err, ErrHeadBlockNotFound) {
return false, nil
}
if err != nil {
return false, fmt.Errorf("%w: unable to get head block identifir", err)
}

block, err := b.GetBlock(ctx, head)
if err != nil {
return false, fmt.Errorf("%w: unable to get head block", err)
}
Expand Down
Loading