Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate snow package from snow vm refactor #1846

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 203 additions & 0 deletions chainindex/chain_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package chainindex

import (
"context"
"encoding/binary"
"errors"
"math/rand"
"time"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/hypersdk/consts"
)

const (
blockPrefix byte = 0x0 // TODO: move to flat files (https://github.com/ava-labs/hypersdk/issues/553)
blockIDHeightPrefix byte = 0x1 // ID -> Height
blockHeightIDPrefix byte = 0x2 // Height -> ID (don't always need full block from disk)
lastAcceptedByte byte = 0x3 // lastAcceptedByte -> lastAcceptedHeight
)

var lastAcceptedKey = []byte{lastAcceptedByte}

var errBlockCompactionFrequencyZero = errors.New("block compaction frequency must be non-zero")

type Config struct {
AcceptedBlockWindow uint64 `json:"acceptedBlockWindow"`
BlockCompactionFrequency uint64 `json:"blockCompactionFrequency"`
}

func NewDefaultConfig() Config {
return Config{
AcceptedBlockWindow: 50_000, // ~3.5hr with 250ms block time (100GB at 2MB)
BlockCompactionFrequency: 32, // 64 MB of deletion if 2 MB blocks
}
}

type ChainIndex[T Block] struct {
config Config
compactionOffset uint64
metrics *metrics
log logging.Logger
db database.Database
parser Parser[T]
}

type Block interface {
ID() ids.ID
Height() uint64
Bytes() []byte
}

type Parser[T Block] interface {
ParseBlock(context.Context, []byte) (T, error)
}

func New[T Block](
log logging.Logger,
registry prometheus.Registerer,
config Config,
parser Parser[T],
db database.Database,
) (*ChainIndex[T], error) {
metrics, err := newMetrics(registry)
if err != nil {
return nil, err
}
if config.BlockCompactionFrequency == 0 {
return nil, errBlockCompactionFrequencyZero
}

return &ChainIndex[T]{
config: config,
compactionOffset: rand.Uint64() % config.BlockCompactionFrequency, //nolint:gosec
metrics: metrics,
log: log,
db: db,
parser: parser,
}, nil
}

func (c *ChainIndex[T]) GetLastAcceptedHeight(_ context.Context) (uint64, error) {
lastAcceptedHeightBytes, err := c.db.Get(lastAcceptedKey)
if err != nil {
return 0, err
}
return database.ParseUInt64(lastAcceptedHeightBytes)
}

func (c *ChainIndex[T]) UpdateLastAccepted(ctx context.Context, blk T) error {
batch := c.db.NewBatch()

var (
blkID = blk.ID()
height = blk.Height()
blkBytes = blk.Bytes()
)
heightBytes := binary.BigEndian.AppendUint64(nil, height)
err := errors.Join(
batch.Put(lastAcceptedKey, heightBytes),
batch.Put(prefixBlockIDHeightKey(blkID), heightBytes),
batch.Put(prefixBlockHeightIDKey(blk.Height()), blkID[:]),
batch.Put(prefixBlockKey(height), blkBytes),
)
if err != nil {
return err
}

expiryHeight := height - c.config.AcceptedBlockWindow
if c.config.AcceptedBlockWindow == 0 || expiryHeight == 0 || expiryHeight >= height { // ensure we don't free genesis
return batch.Write()
}

if err := batch.Delete(prefixBlockKey(expiryHeight)); err != nil {
return err
}
deleteBlkID, err := c.GetBlockIDAtHeight(ctx, expiryHeight)
if err != nil {
return err
}
if err := batch.Delete(prefixBlockIDHeightKey(deleteBlkID)); err != nil {
return err
}
if err := batch.Delete(prefixBlockHeightIDKey(expiryHeight)); err != nil {
return err
}
c.metrics.deletedBlocks.Inc()

if expiryHeight%c.compactionOffset == 0 {
go func() {
start := time.Now()
if err := c.db.Compact([]byte{blockPrefix}, prefixBlockKey(expiryHeight)); err != nil {
c.log.Error("failed to compact block store", zap.Error(err))
return
}
c.log.Info("compacted disk blocks", zap.Uint64("end", expiryHeight), zap.Duration("t", time.Since(start)))
}()
}

return batch.Write()
}

func (c *ChainIndex[T]) GetBlock(ctx context.Context, blkID ids.ID) (T, error) {
var emptyT T
height, err := c.GetBlockIDHeight(ctx, blkID)
if err != nil {
return emptyT, err
}
return c.GetBlockByHeight(ctx, height)
}

func (c *ChainIndex[T]) GetBlockIDAtHeight(_ context.Context, blkHeight uint64) (ids.ID, error) {
blkIDBytes, err := c.db.Get(prefixBlockHeightIDKey(blkHeight))
if err != nil {
return ids.Empty, err
}
return ids.ID(blkIDBytes), nil
}

func (c *ChainIndex[T]) GetBlockIDHeight(_ context.Context, blkID ids.ID) (uint64, error) {
blkHeightBytes, err := c.db.Get(prefixBlockIDHeightKey(blkID))
if err != nil {
return 0, err
}
return database.ParseUInt64(blkHeightBytes)
}

func (c *ChainIndex[T]) GetBlockByHeight(ctx context.Context, blkHeight uint64) (T, error) {
var emptyT T
blkBytes, err := c.db.Get(prefixBlockKey(blkHeight))
if err != nil {
return emptyT, err
}
return c.parser.ParseBlock(ctx, blkBytes)
}

func prefixBlockKey(height uint64) []byte {
k := make([]byte, 1+consts.Uint64Len)
k[0] = blockPrefix
binary.BigEndian.PutUint64(k[1:], height)
return k
}

func prefixBlockIDHeightKey(id ids.ID) []byte {
k := make([]byte, 1+ids.IDLen)
k[0] = blockIDHeightPrefix
copy(k[1:], id[:])
return k
}

func prefixBlockHeightIDKey(height uint64) []byte {
k := make([]byte, 1+consts.Uint64Len)
k[0] = blockHeightIDPrefix
binary.BigEndian.PutUint64(k[1:], height)
return k
}
136 changes: 136 additions & 0 deletions chainindex/chain_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package chainindex

import (
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/hashing"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/ava-labs/hypersdk/consts"
)

type testBlock struct {
height uint64
}

func (t *testBlock) ID() ids.ID { return hashing.ComputeHash256Array(t.Bytes()) }
func (t *testBlock) Height() uint64 { return t.height }
func (t *testBlock) Bytes() []byte { return binary.BigEndian.AppendUint64(nil, t.height) }

type parser struct{}

func (*parser) ParseBlock(_ context.Context, b []byte) (*testBlock, error) {
if len(b) != consts.Uint64Len {
return nil, fmt.Errorf("unexpected block length: %d", len(b))
}
height := binary.BigEndian.Uint64(b)
return &testBlock{height}, nil
}

func newTestChainIndex(config Config, db database.Database) (*ChainIndex[*testBlock], error) {
return New(logging.NoLog{}, prometheus.NewRegistry(), config, &parser{}, db)
}

func confirmBlockIndexed(r *require.Assertions, ctx context.Context, chainIndex *ChainIndex[*testBlock], expectedBlk *testBlock) {
blk, err := chainIndex.GetBlockByHeight(ctx, expectedBlk.height)
r.NoError(err)
r.Equal(expectedBlk.ID(), blk.ID())

blkID, err := chainIndex.GetBlockIDAtHeight(ctx, expectedBlk.height)
r.NoError(err)
r.Equal(expectedBlk.ID(), blkID)

blkHeight, err := chainIndex.GetBlockIDHeight(ctx, expectedBlk.ID())
r.NoError(err)
r.Equal(expectedBlk.Height(), blkHeight)

blk, err = chainIndex.GetBlock(ctx, expectedBlk.ID())
r.NoError(err)
r.Equal(expectedBlk.ID(), blk.ID())
}

func confirmLastAcceptedHeight(r *require.Assertions, ctx context.Context, chainIndex *ChainIndex[*testBlock], expectedHeight uint64) {
lastAcceptedHeight, err := chainIndex.GetLastAcceptedHeight(ctx)
r.NoError(err)
r.Equal(expectedHeight, lastAcceptedHeight)
}

func confirmBlockUnindexed(r *require.Assertions, ctx context.Context, chainIndex *ChainIndex[*testBlock], blk *testBlock) {
_, err := chainIndex.GetBlockByHeight(ctx, blk.height)
r.ErrorIs(err, database.ErrNotFound)
_, err = chainIndex.GetBlockIDAtHeight(ctx, blk.height)
r.ErrorIs(err, database.ErrNotFound)
_, err = chainIndex.GetBlockIDHeight(ctx, blk.ID())
r.ErrorIs(err, database.ErrNotFound)
_, err = chainIndex.GetBlock(ctx, blk.ID())
r.ErrorIs(err, database.ErrNotFound)
}

func TestChainIndex(t *testing.T) {
r := require.New(t)
ctx := context.Background()
chainIndex, err := newTestChainIndex(NewDefaultConfig(), memdb.New())
r.NoError(err)

genesisBlk := &testBlock{0}
confirmBlockUnindexed(r, ctx, chainIndex, genesisBlk)
_, err = chainIndex.GetLastAcceptedHeight(ctx)
r.ErrorIs(err, database.ErrNotFound)

r.NoError(chainIndex.UpdateLastAccepted(ctx, genesisBlk))
confirmBlockIndexed(r, ctx, chainIndex, genesisBlk)
confirmLastAcceptedHeight(r, ctx, chainIndex, genesisBlk.Height())

blk1 := &testBlock{1}
r.NoError(chainIndex.UpdateLastAccepted(ctx, blk1))
confirmBlockIndexed(r, ctx, chainIndex, blk1)
confirmLastAcceptedHeight(r, ctx, chainIndex, blk1.Height())
}

func TestChainIndexInvalidCompactionFrequency(t *testing.T) {
_, err := newTestChainIndex(Config{BlockCompactionFrequency: 0}, memdb.New())
require.ErrorIs(t, err, errBlockCompactionFrequencyZero)
}

func TestChainIndexExpiry(t *testing.T) {
r := require.New(t)
ctx := context.Background()
chainIndex, err := newTestChainIndex(Config{AcceptedBlockWindow: 1, BlockCompactionFrequency: 64}, memdb.New())
r.NoError(err)

genesisBlk := &testBlock{0}
r.NoError(chainIndex.UpdateLastAccepted(ctx, genesisBlk))
confirmBlockIndexed(r, ctx, chainIndex, genesisBlk)
confirmLastAcceptedHeight(r, ctx, chainIndex, genesisBlk.Height())

blk1 := &testBlock{1}
r.NoError(chainIndex.UpdateLastAccepted(ctx, blk1))
confirmBlockIndexed(r, ctx, chainIndex, genesisBlk) // Confirm genesis is not un-indexed
confirmBlockIndexed(r, ctx, chainIndex, blk1)
confirmLastAcceptedHeight(r, ctx, chainIndex, blk1.Height())

blk2 := &testBlock{2}
r.NoError(chainIndex.UpdateLastAccepted(ctx, blk2))
confirmBlockIndexed(r, ctx, chainIndex, genesisBlk) // Confirm genesis is not un-indexed
confirmBlockIndexed(r, ctx, chainIndex, blk2)
confirmBlockUnindexed(r, ctx, chainIndex, blk1)
confirmLastAcceptedHeight(r, ctx, chainIndex, blk2.Height())

blk3 := &testBlock{3}
r.NoError(chainIndex.UpdateLastAccepted(ctx, blk3))
confirmBlockIndexed(r, ctx, chainIndex, genesisBlk) // Confirm genesis is not un-indexed
confirmBlockIndexed(r, ctx, chainIndex, blk3)
confirmBlockUnindexed(r, ctx, chainIndex, blk2)
confirmLastAcceptedHeight(r, ctx, chainIndex, blk3.Height())
}
26 changes: 26 additions & 0 deletions chainindex/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package chainindex

import "github.com/prometheus/client_golang/prometheus"

type metrics struct {
deletedBlocks prometheus.Counter
}

func newMetrics(registry prometheus.Registerer) (*metrics, error) {
m := &metrics{
deletedBlocks: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "chainindex",
Name: "deleted_blocks",
Help: "Number of blocks deleted from the chain",
}),
}

if err := registry.Register(m.deletedBlocks); err != nil {
return nil, err
}

return m, nil
}
Loading
Loading