Skip to content

Commit

Permalink
Add ability to read or extract payloads from state
Browse files Browse the repository at this point in the history
Added two flags to execution state extraction program:

--extract-payloads-by-address produces a file of payloads for
specified accounts or for all accounts instead of checkpoint files

--use-payload-as-input uses payload file as input instead of
checkpoint files

The two new flags don't affect migration and other existing
functionaly of state extraction program.  These two options
only affect input and output of state extraction program.

In other words, this can be used to extract migrated payloads
or extract as-is payloads for specified accounts.
  • Loading branch information
fxamacker committed Feb 13, 2024
1 parent 51202b7 commit 85527b8
Show file tree
Hide file tree
Showing 4 changed files with 797 additions and 50 deletions.
116 changes: 97 additions & 19 deletions cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package extract
import (
"encoding/hex"
"path"
"strings"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

runtimeCommon "github.com/onflow/cadence/runtime/common"

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -26,6 +29,8 @@ var (
flagNoReport bool
flagValidateMigration bool
flagLogVerboseValidationError bool
flagInputPayload bool
flagOutputPayloadByAddresses string
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -68,6 +73,19 @@ func init() {
Cmd.Flags().BoolVar(&flagLogVerboseValidationError, "log-verbose-validation-error", false,
"log entire Cadence values on validation error (atree migration)")

Cmd.Flags().StringVar(
&flagOutputPayloadByAddresses,
"extract-payloads-by-address",
"",
"extract payloads of specified addresses (comma separated list of hex-encoded addresses or \"all\"", // empty string ignores this flag
)

Cmd.Flags().BoolVar(
&flagInputPayload,
"use-payload-as-input",
false,
"use payload file instead of checkpoint file as input",
)
}

func run(*cobra.Command, []string) {
Expand Down Expand Up @@ -112,20 +130,65 @@ func run(*cobra.Command, []string) {
log.Info().Msgf("extracting state by state commitment: %x", stateCommitment)
}

if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 {
log.Fatal().Msg("no --block-hash or --state-commitment was specified")
if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 && !flagInputPayload {
log.Fatal().Msg("no --block-hash or --state-commitment or --use-payload-as-input was specified")
}

log.Info().Msgf("Extracting state from %s, exporting root checkpoint to %s, version: %v",
flagExecutionStateDir,
path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint),
6,
)
exportPayloads := len(flagOutputPayloadByAddresses) > 0

var exportedAddresses []runtimeCommon.Address

if exportPayloads {

addresses := strings.Split(flagOutputPayloadByAddresses, ",")

if len(addresses) == 1 && strings.TrimSpace(addresses[0]) == "all" {
// Extract payloads of the entire state.
log.Info().Msgf("Extracting state from %s, exporting all payloads to %s",
flagExecutionStateDir,
path.Join(flagOutputDir, FilenamePayloads),
)
} else {
// Extract payloads of specified accounts
for _, hexAddr := range addresses {
b, err := hex.DecodeString(strings.TrimSpace(hexAddr))
if err != nil {
log.Fatal().Err(err).Msgf("cannot hex decode address %s for payload export", strings.TrimSpace(hexAddr))
}

addr, err := runtimeCommon.BytesToAddress(b)
if err != nil {
log.Fatal().Err(err).Msgf("cannot decode address %x for payload export", b)
}

exportedAddresses = append(exportedAddresses, addr)
}

log.Info().Msgf("Extracting state from %s, exporting payloads by addresses %v to %s",
flagExecutionStateDir,
flagOutputPayloadByAddresses,
path.Join(flagOutputDir, FilenamePayloads),
)
}

log.Info().Msgf("Block state commitment: %s from %v, output dir: %s",
hex.EncodeToString(stateCommitment[:]),
flagExecutionStateDir,
flagOutputDir)
} else {
log.Info().Msgf("Extracting state from %s, exporting root checkpoint to %s, version: %v",
flagExecutionStateDir,
path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint),
6,
)
}

if flagInputPayload {
log.Info().Msgf("Payload input from %v, output dir: %s",
flagExecutionStateDir,
flagOutputDir)
} else {
log.Info().Msgf("Block state commitment: %s from %v, output dir: %s",
hex.EncodeToString(stateCommitment[:]),
flagExecutionStateDir,
flagOutputDir)
}

// err := ensureCheckpointFileExist(flagExecutionStateDir)
// if err != nil {
Expand All @@ -148,14 +211,29 @@ func run(*cobra.Command, []string) {
log.Warn().Msgf("atree migration has verbose validation error logging enabled which may increase size of log")
}

err := extractExecutionState(
log.Logger,
flagExecutionStateDir,
stateCommitment,
flagOutputDir,
flagNWorker,
!flagNoMigration,
)
var err error
if flagInputPayload {
err = extractExecutionStateFromPayloads(
log.Logger,
flagExecutionStateDir,
flagOutputDir,
flagNWorker,
!flagNoMigration,
exportPayloads,
exportedAddresses,
)
} else {
err = extractExecutionState(
log.Logger,
flagExecutionStateDir,
stateCommitment,
flagOutputDir,
flagNWorker,
!flagNoMigration,
exportPayloads,
exportedAddresses,
)
}

if err != nil {
log.Fatal().Err(err).Msgf("error extracting the execution state: %s", err.Error())
Expand Down
199 changes: 175 additions & 24 deletions cmd/util/cmd/execution-state-extract/execution_state_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"math"
"os"
"time"

"github.com/onflow/cadence/runtime/common"
"github.com/rs/zerolog"
"go.uber.org/atomic"

Expand Down Expand Up @@ -34,6 +36,8 @@ func extractExecutionState(
outputDir string,
nWorker int, // number of concurrent worker to migation payloads
runMigrations bool,
exportPayloads bool,
exportPayloadsByAddresses []common.Address,
) error {

log.Info().Msg("init WAL")
Expand Down Expand Up @@ -84,30 +88,7 @@ func extractExecutionState(
<-compactor.Done()
}()

var migrations []ledger.Migration

if runMigrations {
rwf := reporters.NewReportFileWriterFactory(dir, log)

migrations = []ledger.Migration{
migrators.CreateAccountBasedMigration(
log,
nWorker,
[]migrators.AccountBasedMigration{
migrators.NewAtreeRegisterMigrator(
rwf,
flagValidateMigration,
flagLogVerboseValidationError,
),

&migrators.DeduplicateContractNamesMigration{},

// This will fix storage used discrepancies caused by the
// DeduplicateContractNamesMigration.
&migrators.AccountUsageMigrator{},
}),
}
}
migrations := newMigrations(log, dir, nWorker, runMigrations)

newState := ledger.State(targetHash)

Expand All @@ -134,6 +115,19 @@ func extractExecutionState(
log.Error().Err(err).Msgf("can not generate report for migrated state: %v", newMigratedState)
}

if exportPayloads {
payloads := newTrie.AllPayloads()

exportedPayloadCount, err := createPayloadFile(log, outputDir, payloads, exportPayloadsByAddresses)
if err != nil {
return fmt.Errorf("cannot generate payloads file: %w", err)
}

log.Info().Msgf("Exported %d payloads out of %d payloads", exportedPayloadCount, len(payloads))

return nil
}

migratedState, err := createCheckpoint(
newTrie,
log,
Expand Down Expand Up @@ -191,3 +185,160 @@ func writeStatusFile(fileName string, e error) error {
err := os.WriteFile(fileName, checkpointStatusJson, 0644)
return err
}

func extractExecutionStateFromPayloads(
log zerolog.Logger,
dir string,
outputDir string,
nWorker int, // number of concurrent worker to migation payloads
runMigrations bool,
exportPayloads bool,
exportPayloadsByAddresses []common.Address,
) error {

payloads, err := readPayloadFile(log, dir)
if err != nil {
return err
}

log.Info().Msgf("read %d payloads\n", len(payloads))

migrations := newMigrations(log, dir, nWorker, runMigrations)

payloads, err = migratePayloads(log, payloads, migrations)
if err != nil {
return err
}

if exportPayloads {
exportedPayloadCount, err := createPayloadFile(log, outputDir, payloads, exportPayloadsByAddresses)
if err != nil {
return fmt.Errorf("cannot generate payloads file: %w", err)
}

log.Info().Msgf("Exported %d payloads out of %d payloads", exportedPayloadCount, len(payloads))

return nil
}

newTrie, err := createTrieFromPayloads(log, payloads)
if err != nil {
return err
}

migratedState, err := createCheckpoint(
newTrie,
log,
outputDir,
bootstrap.FilenameWALRootCheckpoint,
)
if err != nil {
return fmt.Errorf("cannot generate the output checkpoint: %w", err)
}

log.Info().Msgf(
"New state commitment for the exported state is: %s (base64: %s)",
migratedState.String(),
migratedState.Base64(),
)

return nil
}

func migratePayloads(logger zerolog.Logger, payloads []*ledger.Payload, migrations []ledger.Migration) ([]*ledger.Payload, error) {

if len(migrations) == 0 {
return payloads, nil
}

var err error
payloadCount := len(payloads)

// migrate payloads
for i, migrate := range migrations {
logger.Info().Msgf("migration %d/%d is underway", i, len(migrations))

start := time.Now()
payloads, err = migrate(payloads)
elapsed := time.Since(start)

if err != nil {
return nil, fmt.Errorf("error applying migration (%d): %w", i, err)
}

newPayloadCount := len(payloads)

if payloadCount != newPayloadCount {
logger.Warn().
Int("migration_step", i).
Int("expected_size", payloadCount).
Int("outcome_size", newPayloadCount).
Msg("payload counts has changed during migration, make sure this is expected.")
}
logger.Info().Str("timeTaken", elapsed.String()).Msgf("migration %d is done", i)

payloadCount = newPayloadCount
}

return payloads, nil
}

func createTrieFromPayloads(logger zerolog.Logger, payloads []*ledger.Payload) (*trie.MTrie, error) {
// get paths
paths, err := pathfinder.PathsFromPayloads(payloads, complete.DefaultPathFinderVersion)
if err != nil {
return nil, fmt.Errorf("cannot export checkpoint, can't construct paths: %w", err)
}

logger.Info().Msgf("constructing a new trie with migrated payloads (count: %d)...", len(payloads))

emptyTrie := trie.NewEmptyMTrie()

derefPayloads := make([]ledger.Payload, len(payloads))
for i, p := range payloads {
derefPayloads[i] = *p
}

// no need to prune the data since it has already been prunned through migrations
applyPruning := false
newTrie, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, derefPayloads, applyPruning)
if err != nil {
return nil, fmt.Errorf("constructing updated trie failed: %w", err)
}

return newTrie, nil
}

func newMigrations(
log zerolog.Logger,
dir string,
nWorker int, // number of concurrent worker to migation payloads
runMigrations bool,
) []ledger.Migration {
if runMigrations {
rwf := reporters.NewReportFileWriterFactory(dir, log)

migrations := []ledger.Migration{
migrators.CreateAccountBasedMigration(
log,
nWorker,
[]migrators.AccountBasedMigration{
migrators.NewAtreeRegisterMigrator(
rwf,
flagValidateMigration,
flagLogVerboseValidationError,
),

&migrators.DeduplicateContractNamesMigration{},

// This will fix storage used discrepancies caused by the
// DeduplicateContractNamesMigration.
&migrators.AccountUsageMigrator{},
}),
}

return migrations
}

return nil
}
Loading

0 comments on commit 85527b8

Please sign in to comment.