Skip to content

Commit

Permalink
Move stateful block to vm (#1744)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronbuchwald authored Nov 12, 2024
1 parent 173f152 commit ee0c4d8
Show file tree
Hide file tree
Showing 44 changed files with 1,534 additions and 1,251 deletions.
4 changes: 1 addition & 3 deletions api/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ type VM interface {
verifySig bool,
txs []*chain.Transaction,
) (errs []error)
// LastAcceptedBlock provides the most recent block that the VM has accepted.
// The value returned is guaranteed to be non-nil.
LastAcceptedBlock() *chain.StatefulBlock
LastAcceptedBlockResult() *chain.ExecutedBlock
UnitPrices(context.Context) (fees.Dimensions, error)
CurrentValidators(
context.Context,
Expand Down
4 changes: 2 additions & 2 deletions api/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (i *Indexer) initBlocks() error {
return err
}

i.blockIDToHeight.Put(blk.BlockID, blk.Block.Hght)
i.blockIDToHeight.Put(blk.Block.ID(), blk.Block.Hght)
i.blockHeightToBlock.Put(blk.Block.Hght, blk)
lastHeight = blk.Block.Hght
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (i *Indexer) storeBlock(blk *chain.ExecutedBlock) error {
return err
}

i.blockIDToHeight.Put(blk.BlockID, blk.Block.Hght)
i.blockIDToHeight.Put(blk.Block.ID(), blk.Block.Hght)
i.blockHeightToBlock.Put(blk.Block.Hght, blk)
i.lastHeight.Store(blk.Block.Hght)

Expand Down
8 changes: 4 additions & 4 deletions api/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ func checkBlocks(
expectedLatestBlk := expectedBlocks[len(expectedBlocks)-1]
latestBlk, err := indexer.GetLatestBlock()
require.NoError(err)
require.Equal(expectedLatestBlk.BlockID, latestBlk.BlockID)
require.Equal(expectedLatestBlk.Block.ID(), latestBlk.Block.ID())

// Confirm all blocks in the window are retrievable
for i := 0; i < blockWindow; i++ {
expectedBlk := expectedBlocks[len(expectedBlocks)-1-i]
height := expectedBlk.Block.Hght
blkByHeight, err := indexer.GetBlockByHeight(height)
require.NoError(err)
require.Equal(expectedBlk.BlockID, blkByHeight.BlockID)
require.Equal(expectedBlk.Block.ID(), blkByHeight.Block.ID())

blkByID, err := indexer.GetBlock(expectedBlk.BlockID)
blkByID, err := indexer.GetBlock(expectedBlk.Block.ID())
require.NoError(err)
require.Equal(expectedBlk.BlockID, blkByID.BlockID)
require.Equal(expectedBlk.Block.ID(), blkByID.Block.ID())
}

// Confirm blocks outside the window are not retrievable
Expand Down
8 changes: 4 additions & 4 deletions api/jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ type LastAcceptedReply struct {
}

func (j *JSONRPCServer) LastAccepted(_ *http.Request, _ *struct{}, reply *LastAcceptedReply) error {
blk := j.vm.LastAcceptedBlock()
reply.Height = blk.Hght
reply.BlockID = blk.ID()
reply.Timestamp = blk.Tmstmp
blk := j.vm.LastAcceptedBlockResult()
reply.Height = blk.Block.Hght
reply.BlockID = blk.Block.ID()
reply.Timestamp = blk.Block.Tmstmp
return nil
}

Expand Down
14 changes: 0 additions & 14 deletions api/ws/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ func OptionFunc(v api.VM, config Config) (vm.Opt, error) {
)

webSocketFactory := NewWebSocketServerFactory(handler)
txRemovedSubscription := event.SubscriptionFuncFactory[vm.TxRemovedEvent]{
AcceptF: func(event vm.TxRemovedEvent) error {
return server.RemoveTx(event.TxID, event.Err)
},
}

blockSubscription := event.SubscriptionFuncFactory[*chain.ExecutedBlock]{
AcceptF: func(event *chain.ExecutedBlock) error {
Expand All @@ -80,7 +75,6 @@ func OptionFunc(v api.VM, config Config) (vm.Opt, error) {

return vm.NewOpt(
vm.WithBlockSubscriptions(blockSubscription),
vm.WithTxRemovedSubscriptions(txRemovedSubscription),
vm.WithVMAPIs(webSocketFactory),
), nil
}
Expand Down Expand Up @@ -159,14 +153,6 @@ func (w *WebSocketServer) AddTxListener(tx *chain.Transaction, c *pubsub.Connect
w.expiringTxs.Add([]*chain.Transaction{tx})
}

// If never possible for a tx to enter mempool, call this
func (w *WebSocketServer) RemoveTx(txID ids.ID, err error) error {
w.txL.Lock()
defer w.txL.Unlock()

return w.removeTx(txID, err)
}

func (w *WebSocketServer) removeTx(txID ids.ID, err error) error {
listeners, ok := w.txListeners[txID]
if !ok {
Expand Down
38 changes: 38 additions & 0 deletions chain/accepter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package chain

import (
"context"

"github.com/ava-labs/avalanchego/trace"
)

type Accepter struct {
tracer trace.Tracer
validityWindow *TimeValidityWindow
metrics *chainMetrics
}

func NewAccepter(
tracer trace.Tracer,
validityWindow *TimeValidityWindow,
metrics *chainMetrics,
) *Accepter {
return &Accepter{
tracer: tracer,
validityWindow: validityWindow,
metrics: metrics,
}
}

func (a *Accepter) AcceptBlock(ctx context.Context, blk *ExecutionBlock) error {
_, span := a.tracer.Start(ctx, "Chain.AcceptBlock")
defer span.End()

a.metrics.txsAccepted.Add(float64(len(blk.Txs)))
a.validityWindow.Accept(blk)

return nil
}
74 changes: 27 additions & 47 deletions chain/assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,70 +5,50 @@ package chain

import (
"context"
"time"

"github.com/ava-labs/hypersdk/utils"
"github.com/ava-labs/avalanchego/trace"

"github.com/ava-labs/hypersdk/state"
)

type Assembler struct {
vm VM
tracer trace.Tracer
processor *Processor
}

func NewAssembler(tracer trace.Tracer, processor *Processor) *Assembler {
return &Assembler{
tracer: tracer,
processor: processor,
}
}

func (a *Assembler) AssembleBlock(
ctx context.Context,
parent *StatefulBlock,
parentView state.View,
parent *ExecutionBlock,
timestamp int64,
blockHeight uint64,
txs []*Transaction,
) (*StatefulBlock, error) {
ctx, span := a.vm.Tracer().Start(ctx, "chain.AssembleBlock")
) (*ExecutedBlock, state.View, error) {
ctx, span := a.tracer.Start(ctx, "Chain.AssembleBlock")
defer span.End()

parentView, err := parent.View(ctx, true)
if err != nil {
return nil, err
}
parentStateRoot, err := parentView.GetMerkleRoot(ctx)
if err != nil {
return nil, err
}

blk := &StatelessBlock{
Prnt: parent.ID(),
Tmstmp: timestamp,
Hght: blockHeight,
Txs: txs,
StateRoot: parentStateRoot,
}
for _, tx := range txs {
blk.authCounts[tx.Auth.GetTypeID()]++
return nil, nil, err
}

blkBytes, err := blk.Marshal()
sb, err := NewStatelessBlock(
parent.ID(),
timestamp,
blockHeight,
txs,
parentStateRoot,
)
if err != nil {
return nil, err
return nil, nil, err
}
b := &StatefulBlock{
StatelessBlock: blk,
t: time.UnixMilli(blk.Tmstmp),
bytes: blkBytes,
accepted: false,
vm: a.vm,
id: utils.ToID(blkBytes),
}
return b, b.populateTxs(ctx) // TODO: simplify since txs are guaranteed to already be de-duplicated here
}

func (a *Assembler) ExecuteBlock(
ctx context.Context,
b *StatefulBlock,
) (*ExecutedBlock, error) {
ctx, span := a.vm.Tracer().Start(ctx, "chain.ExecuteBlock")
defer span.End()

if err := b.Verify(ctx); err != nil {
return nil, err
}

return NewExecutedBlockFromStateful(b), nil
executionBlock := NewExecutionBlock(sb)
return a.processor.Execute(ctx, parentView, executionBlock)
}
Loading

0 comments on commit ee0c4d8

Please sign in to comment.