Skip to content

Commit

Permalink
Skip StorableNode/Trie when reading checkpoint
Browse files Browse the repository at this point in the history
- Merge RebuildTries() with LoadCheckpoint() to deserialize data
to nodes without creating intermediate StorableNode/StorableTrie
objects.

- Avoid creating 400+ million element slice holding all StorableNodes
read from checkpoint file

- DiskWal.Replay*() APIs are changed.  checkpointFn receives
[]*trie.MTrie instead of FlattenedForest.

- Remove files contaning StorableNode/StorableTrie/FlattenedForest etc.
  * mtrie/flattener/forest.go
  * mtrie/flattener/forest_test.go
  * mtrie/flattener/storables.go
  * mtrie/flattener/trie.go
  * mtrie/flattener/trie_test.go
  • Loading branch information
fxamacker committed Feb 2, 2022
1 parent f185fd9 commit 8b03a22
Show file tree
Hide file tree
Showing 16 changed files with 272 additions and 633 deletions.
6 changes: 3 additions & 3 deletions cmd/util/cmd/checkpoint-list-tries/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func init() {

func run(*cobra.Command, []string) {

flattenedForest, err := wal.LoadCheckpoint(flagCheckpoint)
tries, err := wal.LoadCheckpoint(flagCheckpoint)
if err != nil {
log.Fatal().Err(err).Msg("error while loading checkpoint")
}

for _, trie := range flattenedForest.Tries {
fmt.Printf("%x\n", trie.RootHash)
for _, trie := range tries {
fmt.Printf("%x\n", trie.RootHash())
}
}
4 changes: 2 additions & 2 deletions cmd/util/cmd/read-execution-state/list-wals/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/module/metrics"
)
Expand Down Expand Up @@ -52,7 +52,7 @@ func run(*cobra.Command, []string) {
}()

err = w.ReplayLogsOnly(
func(forestSequencing *flattener.FlattenedForest) error {
func(tries []*trie.MTrie) error {
fmt.Printf("forest sequencing \n")
return nil
},
Expand Down
119 changes: 89 additions & 30 deletions ledger/complete/mtrie/flattener/encoding.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package flattener

import (
"bytes"
"fmt"
"io"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/encoding"
"github.com/onflow/flow-go/ledger/common/hash"
"github.com/onflow/flow-go/ledger/common/utils"
"github.com/onflow/flow-go/ledger/complete/mtrie/node"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
Expand Down Expand Up @@ -59,82 +61,127 @@ func EncodeNode(n *node.Node, lchildIndex uint64, rchildIndex uint64) []byte {
return buf
}

// ReadStorableNode reads a storable node from io
func ReadStorableNode(reader io.Reader) (*StorableNode, error) {
// ReadNode reconstructs a node from data read from reader.
// TODO: reuse read buffer
func ReadNode(reader io.Reader, getNode func(nodeIndex uint64) (*node.Node, error)) (*node.Node, error) {

// reading version
buf := make([]byte, 2)
read, err := io.ReadFull(reader, buf)
if err != nil {
return nil, fmt.Errorf("error reading storable node, cannot read version part: %w", err)
return nil, fmt.Errorf("failed to read serialized node, cannot read version part: %w", err)
}
if read != len(buf) {
return nil, fmt.Errorf("not enough bytes read %d expected %d", read, len(buf))
return nil, fmt.Errorf("failed to read serialized node: not enough bytes read %d expected %d", read, len(buf))
}

version, _, err := utils.ReadUint16(buf)
if err != nil {
return nil, fmt.Errorf("error reading storable node: %w", err)
return nil, fmt.Errorf("failed to read serialized node: %w", err)
}

if version > encodingDecodingVersion {
return nil, fmt.Errorf("error reading storable node: unsuported version %d > %d", version, encodingDecodingVersion)
return nil, fmt.Errorf("failed to read serialized node: unsuported version %d > %d", version, encodingDecodingVersion)
}

// reading fixed-length part
buf = make([]byte, 2+8+8+2+8)

read, err = io.ReadFull(reader, buf)
if err != nil {
return nil, fmt.Errorf("error reading storable node, cannot read fixed-length part: %w", err)
return nil, fmt.Errorf("failed to read serialized node, cannot read fixed-length part: %w", err)
}
if read != len(buf) {
return nil, fmt.Errorf("not enough bytes read %d expected %d", read, len(buf))
return nil, fmt.Errorf("failed to read serialized node: not enough bytes read %d expected %d", read, len(buf))
}

storableNode := &StorableNode{}
var height, maxDepth uint16
var lchildIndex, rchildIndex, regCount uint64
var path, hashValue, encPayload []byte

storableNode.Height, buf, err = utils.ReadUint16(buf)
height, buf, err = utils.ReadUint16(buf)
if err != nil {
return nil, fmt.Errorf("error reading storable node: %w", err)
return nil, fmt.Errorf("failed to read serialized node: %w", err)
}

storableNode.LIndex, buf, err = utils.ReadUint64(buf)
lchildIndex, buf, err = utils.ReadUint64(buf)
if err != nil {
return nil, fmt.Errorf("error reading storable node: %w", err)
return nil, fmt.Errorf("failed to read serialized node: %w", err)
}

storableNode.RIndex, buf, err = utils.ReadUint64(buf)
rchildIndex, buf, err = utils.ReadUint64(buf)
if err != nil {
return nil, fmt.Errorf("error reading storable node: %w", err)
return nil, fmt.Errorf("failed to read serialized node: %w", err)
}

storableNode.MaxDepth, buf, err = utils.ReadUint16(buf)
maxDepth, buf, err = utils.ReadUint16(buf)
if err != nil {
return nil, fmt.Errorf("error reading storable node: %w", err)
return nil, fmt.Errorf("failed to read serialized node: %w", err)
}

storableNode.RegCount, _, err = utils.ReadUint64(buf)
regCount, _, err = utils.ReadUint64(buf)
if err != nil {
return nil, fmt.Errorf("error reading storable node: %w", err)
return nil, fmt.Errorf("failed to read serialized node: %w", err)
}

storableNode.Path, err = utils.ReadShortDataFromReader(reader)
path, err = utils.ReadShortDataFromReader(reader)
if err != nil {
return nil, fmt.Errorf("cannot read key data: %w", err)
}

storableNode.EncPayload, err = utils.ReadLongDataFromReader(reader)
encPayload, err = utils.ReadLongDataFromReader(reader)
if err != nil {
return nil, fmt.Errorf("cannot read value data: %w", err)
}

storableNode.HashValue, err = utils.ReadShortDataFromReader(reader)
hashValue, err = utils.ReadShortDataFromReader(reader)
if err != nil {
return nil, fmt.Errorf("cannot read hashValue data: %w", err)
}

return storableNode, nil
// Create (and copy) hash from raw data.
nodeHash, err := hash.ToHash(hashValue)
if err != nil {
return nil, fmt.Errorf("failed to decode hash from checkpoint: %w", err)
}

if len(path) > 0 {
// Create (and copy) path from raw data.
path, err := ledger.ToPath(path)
if err != nil {
return nil, fmt.Errorf("failed to decode path from checkpoint: %w", err)
}

// Decode payload (payload data isn't copied).
payload, err := encoding.DecodePayload(encPayload)
if err != nil {
return nil, fmt.Errorf("failed to decode payload from checkpoint: %w", err)
}

// make a copy of payload
var pl *ledger.Payload
if payload != nil {
pl = payload.DeepCopy()
}

n := node.NewNode(int(height), nil, nil, path, pl, nodeHash, maxDepth, regCount)
return n, nil
}

// Get left child node by node index
lchild, err := getNode(lchildIndex)
if err != nil {
return nil, fmt.Errorf("failed to find left child node: %w", err)
}

// Get right child node by node index
rchild, err := getNode(rchildIndex)
if err != nil {
return nil, fmt.Errorf("failed to find right child node: %w", err)
}

n := node.NewNode(int(height), lchild, rchild, ledger.DummyPath, nil, nodeHash, maxDepth, regCount)
return n, nil
}

// EncodeTrie encodes trie root node
Expand Down Expand Up @@ -162,9 +209,8 @@ func EncodeTrie(rootNode *node.Node, rootIndex uint64) []byte {
return buf
}

// ReadStorableTrie reads a storable trie from io
func ReadStorableTrie(reader io.Reader) (*StorableTrie, error) {
storableTrie := &StorableTrie{}
// ReadTrie reconstructs a trie from data read from reader.
func ReadTrie(reader io.Reader, getNode func(nodeIndex uint64) (*node.Node, error)) (*trie.MTrie, error) {

// reading version
buf := make([]byte, 2)
Expand Down Expand Up @@ -199,13 +245,26 @@ func ReadStorableTrie(reader io.Reader) (*StorableTrie, error) {
if err != nil {
return nil, fmt.Errorf("cannot read root index data: %w", err)
}
storableTrie.RootIndex = rootIndex

roothash, err := utils.ReadShortDataFromReader(reader)
readRootHash, err := utils.ReadShortDataFromReader(reader)
if err != nil {
return nil, fmt.Errorf("cannot read roothash data: %w", err)
}
storableTrie.RootHash = roothash

return storableTrie, nil
rootNode, err := getNode(rootIndex)
if err != nil {
return nil, fmt.Errorf("cannot find root node: %w", err)
}

mtrie, err := trie.NewMTrie(rootNode)
if err != nil {
return nil, fmt.Errorf("restoring trie failed: %w", err)
}

rootHash := mtrie.RootHash()
if !bytes.Equal(readRootHash, rootHash[:]) {
return nil, fmt.Errorf("restoring trie failed: roothash doesn't match")
}

return mtrie, nil
}
Loading

0 comments on commit 8b03a22

Please sign in to comment.