Skip to content

Commit

Permalink
Merge pull request #2792 from onflow/fxamacker/reuse-mtrie-state-for-…
Browse files Browse the repository at this point in the history
…checkpointing-2

[Execution Node] Reuse ledger state in checkpoints for -152GB RAM and -24 minutes
  • Loading branch information
fxamacker authored Aug 2, 2022
2 parents 9ed7793 + c700389 commit a908e36
Show file tree
Hide file tree
Showing 34 changed files with 2,520 additions and 1,017 deletions.
27 changes: 22 additions & 5 deletions cmd/bootstrap/run/execution_state.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package run

import (
"math"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/crypto/hash"
"github.com/onflow/flow-go/engine/execution/state/bootstrap"
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
ledger "github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -35,20 +38,34 @@ func GenerateExecutionState(
chain flow.Chain,
bootstrapOptions ...fvm.BootstrapProcedureOption,
) (flow.StateCommitment, error) {
const (
capacity = 100
checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation.
checkpointsToKeep = 1
)

metricsCollector := &metrics.NoopCollector{}

diskWal, err := wal.NewDiskWAL(zerolog.Nop(), nil, metricsCollector, dbDir, 100, pathfinder.PathByteSize, wal.SegmentSize)
diskWal, err := wal.NewDiskWAL(zerolog.Nop(), nil, metricsCollector, dbDir, capacity, pathfinder.PathByteSize, wal.SegmentSize)
if err != nil {
return flow.DummyStateCommitment, err
}

ledgerStorage, err := ledger.NewLedger(diskWal, capacity, metricsCollector, zerolog.Nop(), ledger.DefaultPathFinderVersion)
if err != nil {
return flow.DummyStateCommitment, err
}
defer func() {
<-diskWal.Done()
}()

ledgerStorage, err := ledger.NewLedger(diskWal, 100, metricsCollector, zerolog.Nop(), ledger.DefaultPathFinderVersion)
compactor, err := complete.NewCompactor(ledgerStorage, diskWal, zerolog.Nop(), capacity, checkpointDistance, checkpointsToKeep)
if err != nil {
return flow.DummyStateCommitment, err
}
<-compactor.Ready()

defer func() {
<-ledgerStorage.Done()
<-compactor.Done()
}()

return bootstrap.NewBootstrapper(
zerolog.Nop()).BootstrapLedger(
Expand Down
29 changes: 14 additions & 15 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,6 @@ func (e *ExecutionNodeBuilder) LoadComponentsAndModules() {
}
return nil
}).
Component("Write-Ahead Log", func(node *NodeConfig) (module.ReadyDoneAware, error) {
var err error
diskWAL, err = wal.NewDiskWAL(node.Logger.With().Str("subcomponent", "wal").Logger(),
node.MetricsRegisterer, collector, e.exeConf.triedir, int(e.exeConf.mTrieCacheSize), pathfinder.PathByteSize, wal.SegmentSize)
return diskWAL, err
}).
Component("execution state ledger", func(node *NodeConfig) (module.ReadyDoneAware, error) {

// check if the execution database already exists
Expand Down Expand Up @@ -382,23 +376,28 @@ func (e *ExecutionNodeBuilder) LoadComponentsAndModules() {
}
}

// DiskWal is a dependent component because we need to ensure
// that all WAL updates are completed before closing opened WAL segment.
diskWAL, err = wal.NewDiskWAL(node.Logger.With().Str("subcomponent", "wal").Logger(),
node.MetricsRegisterer, collector, e.exeConf.triedir, int(e.exeConf.mTrieCacheSize), pathfinder.PathByteSize, wal.SegmentSize)
if err != nil {
return nil, fmt.Errorf("failed to initialize wal: %w", err)
}

ledgerStorage, err = ledger.NewLedger(diskWAL, int(e.exeConf.mTrieCacheSize), collector, node.Logger.With().Str("subcomponent",
"ledger").Logger(), ledger.DefaultPathFinderVersion)
return ledgerStorage, err
}).
Component("execution state ledger WAL compactor", func(node *NodeConfig) (module.ReadyDoneAware, error) {

checkpointer, err := ledgerStorage.Checkpointer()
if err != nil {
return nil, fmt.Errorf("cannot create checkpointer: %w", err)
}
compactor := wal.NewCompactor(checkpointer,
10*time.Second,
return ledger.NewCompactor(
ledgerStorage,
diskWAL,
node.Logger.With().Str("subcomponent", "checkpointer").Logger(),
uint(e.exeConf.mTrieCacheSize),
e.exeConf.checkpointDistance,
e.exeConf.checkpointsToKeep,
node.Logger.With().Str("subcomponent", "checkpointer").Logger())

return compactor, nil
)
}).
Component("execution data service", func(node *NodeConfig) (module.ReadyDoneAware, error) {
err := os.MkdirAll(e.exeConf.executionDataDir, 0700)
Expand Down
17 changes: 14 additions & 3 deletions cmd/util/cmd/exec-data-json-export/ledger_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"encoding/hex"
"fmt"
"math"
"os"
"path/filepath"

Expand All @@ -19,18 +20,28 @@ import (

// ExportLedger exports ledger key value pairs at the given blockID
func ExportLedger(ledgerPath string, targetstate string, outputPath string) error {
const (
checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation.
checkpointsToKeep = 1
)

diskWal, err := wal.NewDiskWAL(zerolog.Nop(), nil, &metrics.NoopCollector{}, ledgerPath, complete.DefaultCacheSize, pathfinder.PathByteSize, wal.SegmentSize)
if err != nil {
return fmt.Errorf("cannot create WAL: %w", err)
}
defer func() {
<-diskWal.Done()
}()
led, err := complete.NewLedger(diskWal, complete.DefaultCacheSize, &metrics.NoopCollector{}, log.Logger, 0)
if err != nil {
return fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err)
}
compactor, err := complete.NewCompactor(led, diskWal, zerolog.Nop(), complete.DefaultCacheSize, checkpointDistance, checkpointsToKeep)
if err != nil {
return fmt.Errorf("cannot create compactor: %w", err)
}
<-compactor.Ready()
defer func() {
<-led.Done()
<-compactor.Done()
}()
stateBytes, err := hex.DecodeString(targetstate)
if err != nil {
return fmt.Errorf("failed to decode hex code of state: %w", err)
Expand Down
20 changes: 17 additions & 3 deletions cmd/util/cmd/execution-state-extract/execution_state_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package extract

import (
"fmt"
"math"

"github.com/rs/zerolog"

Expand Down Expand Up @@ -43,9 +44,6 @@ func extractExecutionState(
if err != nil {
return fmt.Errorf("cannot create disk WAL: %w", err)
}
defer func() {
<-diskWal.Done()
}()

led, err := complete.NewLedger(
diskWal,
Expand All @@ -57,6 +55,22 @@ func extractExecutionState(
return fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err)
}

const (
checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation.
checkpointsToKeep = 1
)
compactor, err := complete.NewCompactor(led, diskWal, zerolog.Nop(), complete.DefaultCacheSize, checkpointDistance, checkpointsToKeep)
if err != nil {
return fmt.Errorf("cannot create compactor: %w", err)
}

<-compactor.Ready()

defer func() {
<-led.Done()
<-compactor.Done()
}()

var migrations []ledger.Migration
var preCheckpointReporters, postCheckpointReporters []ledger.Reporter
newState := ledger.State(targetHash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package extract

import (
"crypto/rand"
"math"
"testing"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -74,6 +75,11 @@ func TestExtractExecutionState(t *testing.T) {

withDirs(t, func(datadir, execdir, _ string) {

const (
checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation.
checkpointsToKeep = 1
)

db := common.InitStorage(datadir)
commits := badger.NewCommits(metr, db)

Expand All @@ -84,6 +90,9 @@ func TestExtractExecutionState(t *testing.T) {
require.NoError(t, err)
f, err := complete.NewLedger(diskWal, size*10, metr, zerolog.Nop(), complete.DefaultPathFinderVersion)
require.NoError(t, err)
compactor, err := complete.NewCompactor(f, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep)
require.NoError(t, err)
<-compactor.Ready()

var stateCommitment = f.InitialState()

Expand Down Expand Up @@ -120,8 +129,9 @@ func TestExtractExecutionState(t *testing.T) {
blocksInOrder[i] = blockID
}

<-diskWal.Done()
<-f.Done()
<-compactor.Done()

err = db.Close()
require.NoError(t, err)

Expand Down Expand Up @@ -152,6 +162,15 @@ func TestExtractExecutionState(t *testing.T) {
storage, err := complete.NewLedger(diskWal, 1000, metr, zerolog.Nop(), complete.DefaultPathFinderVersion)
require.NoError(t, err)

const (
checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation.
checkpointsToKeep = 1
)
compactor, err := complete.NewCompactor(storage, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep)
require.NoError(t, err)

<-compactor.Ready()

data := keysValuesByCommit[string(stateCommitment[:])]

keys := make([]ledger.Key, 0, len(data))
Expand Down Expand Up @@ -181,8 +200,8 @@ func TestExtractExecutionState(t *testing.T) {
require.Error(t, err)
}

<-diskWal.Done()
<-storage.Done()
<-compactor.Done()
})
}
})
Expand Down
1 change: 0 additions & 1 deletion cmd/util/cmd/export-json-execution-state/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func ExportLedger(ledgerPath string, targetstate string, outputPath string, full
return err
},
func(rootHash ledger.RootHash) error {
forest.RemoveTrie(rootHash)
return nil
},
)
Expand Down
7 changes: 7 additions & 0 deletions engine/execution/computation/execution_verification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,13 @@ func executeBlockAndVerifyWithParameters(t *testing.T,
ledger, err := completeLedger.NewLedger(wal, 100, collector, logger, completeLedger.DefaultPathFinderVersion)
require.NoError(t, err)

compactor := fixtures.NewNoopCompactor(ledger)
<-compactor.Ready()
defer func() {
<-ledger.Done()
<-compactor.Done()
}()

bootstrapper := bootstrapexec.NewBootstrapper(logger)

initialCommit, err := bootstrapper.BootstrapLedger(
Expand Down
7 changes: 7 additions & 0 deletions engine/execution/computation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ func TestComputeBlock_Uploader(t *testing.T) {
ledger, err := complete.NewLedger(&fixtures.NoopWAL{}, 10, noopCollector, zerolog.Nop(), complete.DefaultPathFinderVersion)
require.NoError(t, err)

compactor := fixtures.NewNoopCompactor(ledger)
<-compactor.Ready()
defer func() {
<-ledger.Done()
<-compactor.Done()
}()

me := new(module.Local)
me.On("NodeID").Return(flow.ZeroID)

Expand Down
12 changes: 12 additions & 0 deletions engine/execution/state/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ func TestBootstrapLedger(t *testing.T) {
wal := &fixtures.NoopWAL{}
ls, err := completeLedger.NewLedger(wal, 100, metricsCollector, zerolog.Nop(), completeLedger.DefaultPathFinderVersion)
require.NoError(t, err)
compactor := fixtures.NewNoopCompactor(ls)
<-compactor.Ready()
defer func() {
<-ls.Done()
<-compactor.Done()
}()

stateCommitment, err := NewBootstrapper(zerolog.Nop()).BootstrapLedger(
ls,
Expand Down Expand Up @@ -59,6 +65,12 @@ func TestBootstrapLedger_ZeroTokenSupply(t *testing.T) {
wal := &fixtures.NoopWAL{}
ls, err := completeLedger.NewLedger(wal, 100, metricsCollector, zerolog.Nop(), completeLedger.DefaultPathFinderVersion)
require.NoError(t, err)
compactor := fixtures.NewNoopCompactor(ls)
<-compactor.Ready()
defer func() {
<-ls.Done()
<-compactor.Done()
}()

stateCommitment, err := NewBootstrapper(zerolog.Nop()).BootstrapLedger(
ls,
Expand Down
6 changes: 6 additions & 0 deletions engine/execution/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ func prepareTest(f func(t *testing.T, es state.ExecutionState, l *ledger.Ledger)
diskWal := &fixtures.NoopWAL{}
ls, err := ledger.NewLedger(diskWal, 100, metricsCollector, zerolog.Nop(), ledger.DefaultPathFinderVersion)
require.NoError(t, err)
compactor := fixtures.NewNoopCompactor(ls)
<-compactor.Ready()
defer func() {
<-ls.Done()
<-compactor.Done()
}()

ctrl := gomock.NewController(t)

Expand Down
7 changes: 3 additions & 4 deletions engine/testutil/mock/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/onflow/flow-go/fvm"
fvmState "github.com/onflow/flow-go/fvm/state"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/finalizer/consensus"
Expand Down Expand Up @@ -204,7 +204,7 @@ type ExecutionNode struct {
ReceiptsEngine *executionprovider.Engine
FollowerEngine *followereng.Engine
SyncEngine *synchronization.Engine
DiskWAL *wal.DiskWAL
Compactor *complete.Compactor
BadgerDB *badger.DB
VM *fvm.VirtualMachine
ExecutionState state.ExecutionState
Expand All @@ -223,7 +223,6 @@ func (en ExecutionNode) Ready() {
en.FollowerEngine,
en.RequestEngine,
en.SyncEngine,
en.DiskWAL,
)
}

Expand All @@ -236,7 +235,7 @@ func (en ExecutionNode) Done() {
en.FollowerEngine,
en.RequestEngine,
en.SyncEngine,
en.DiskWAL,
en.Compactor,
)
os.RemoveAll(en.LevelDbDir)
en.GenericNode.Done()
Expand Down
Loading

0 comments on commit a908e36

Please sign in to comment.