Skip to content

Commit

Permalink
Merge pull request #1944 from onflow/fxamacker/optimize-checkpoint
Browse files Browse the repository at this point in the history
Optimize MTrie checkpoint for speed, memory, and file size
  • Loading branch information
fxamacker authored Mar 7, 2022
2 parents 4d5c22c + 2c587e6 commit 9a1c704
Show file tree
Hide file tree
Showing 28 changed files with 2,443 additions and 873 deletions.
6 changes: 3 additions & 3 deletions cmd/util/cmd/checkpoint-list-tries/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func init() {

func run(*cobra.Command, []string) {

flattenedForest, err := wal.LoadCheckpoint(flagCheckpoint)
tries, err := wal.LoadCheckpoint(flagCheckpoint)
if err != nil {
log.Fatal().Err(err).Msg("error while loading checkpoint")
}

for _, trie := range flattenedForest.Tries {
fmt.Printf("%x\n", trie.RootHash)
for _, trie := range tries {
fmt.Printf("%x\n", trie.RootHash())
}
}
4 changes: 2 additions & 2 deletions cmd/util/cmd/read-execution-state/list-wals/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/module/metrics"
)
Expand Down Expand Up @@ -52,7 +52,7 @@ func run(*cobra.Command, []string) {
}()

err = w.ReplayLogsOnly(
func(forestSequencing *flattener.FlattenedForest) error {
func(tries []*trie.MTrie) error {
fmt.Printf("forest sequencing \n")
return nil
},
Expand Down
176 changes: 135 additions & 41 deletions ledger/common/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,25 @@ func EncodeKeyPart(kp *ledger.KeyPart) []byte {
}

func encodeKeyPart(kp *ledger.KeyPart) []byte {
buffer := make([]byte, 0)
buffer := make([]byte, 0, encodedKeyPartLength(kp))
return encodeAndAppendKeyPart(buffer, kp)
}

func encodeAndAppendKeyPart(buffer []byte, kp *ledger.KeyPart) []byte {
// encode "Type" field of the key part
buffer = utils.AppendUint16(buffer, kp.Type)

// encode "Value" field of the key part
buffer = append(buffer, kp.Value...)

return buffer
}

func encodedKeyPartLength(kp *ledger.KeyPart) int {
// Key part is encoded as: type (2 bytes) + value
return 2 + len(kp.Value)
}

// DecodeKeyPart constructs a key part from an encoded key part
func DecodeKeyPart(encodedKeyPart []byte) (*ledger.KeyPart, error) {
// currently we ignore the version but in the future we
Expand All @@ -133,22 +142,29 @@ func DecodeKeyPart(encodedKeyPart []byte) (*ledger.KeyPart, error) {
return nil, fmt.Errorf("error decoding key part: %w", err)
}

// decode the key part content
key, err := decodeKeyPart(rest)
// decode the key part content (zerocopy)
key, err := decodeKeyPart(rest, true)
if err != nil {
return nil, fmt.Errorf("error decoding key part: %w", err)
}

return key, nil
}

func decodeKeyPart(inp []byte) (*ledger.KeyPart, error) {
// decodeKeyPart decodes inp into KeyPart. If zeroCopy is true, KeyPart
// references data in inp. Otherwise, it is copied.
func decodeKeyPart(inp []byte, zeroCopy bool) (*ledger.KeyPart, error) {
// read key part type and the rest is the key item part
kpt, kpv, err := utils.ReadUint16(inp)
if err != nil {
return nil, fmt.Errorf("error decoding key part (content): %w", err)
}
return &ledger.KeyPart{Type: kpt, Value: kpv}, nil
if zeroCopy {
return &ledger.KeyPart{Type: kpt, Value: kpv}, nil
}
v := make([]byte, len(kpv))
copy(v, kpv)
return &ledger.KeyPart{Type: kpt, Value: v}, nil
}

// EncodeKey encodes a key into a byte slice
Expand All @@ -168,21 +184,36 @@ func EncodeKey(k *ledger.Key) []byte {

// encodeKey encodes a key into a byte slice
func encodeKey(k *ledger.Key) []byte {
buffer := make([]byte, 0)
buffer := make([]byte, 0, encodedKeyLength(k))
return encodeAndAppendKey(buffer, k)
}

func encodeAndAppendKey(buffer []byte, k *ledger.Key) []byte {
// encode number of key parts
buffer = utils.AppendUint16(buffer, uint16(len(k.KeyParts)))

// iterate over key parts
for _, kp := range k.KeyParts {
// encode the key part
encKP := encodeKeyPart(&kp)
// encode the len of the encoded key part
buffer = utils.AppendUint32(buffer, uint32(len(encKP)))
// append the encoded key part
buffer = append(buffer, encKP...)
buffer = utils.AppendUint32(buffer, uint32(encodedKeyPartLength(&kp)))

// encode the key part
buffer = encodeAndAppendKeyPart(buffer, &kp)
}

return buffer
}

func encodedKeyLength(k *ledger.Key) int {
// Key is encoded as: number of key parts (2 bytes) and for each key part,
// the key part size (4 bytes) + encoded key part (n bytes).
size := 2 + 4*len(k.KeyParts)
for _, kp := range k.KeyParts {
size += encodedKeyPartLength(&kp)
}
return size
}

// DecodeKey constructs a key from an encoded key part
func DecodeKey(encodedKey []byte) (*ledger.Key, error) {
// check the enc dec version
Expand All @@ -196,21 +227,30 @@ func DecodeKey(encodedKey []byte) (*ledger.Key, error) {
return nil, fmt.Errorf("error decoding key: %w", err)
}

// decode the key content
key, err := decodeKey(rest)
// decode the key content (zerocopy)
key, err := decodeKey(rest, true)
if err != nil {
return nil, fmt.Errorf("error decoding key: %w", err)
}
return key, nil
}

func decodeKey(inp []byte) (*ledger.Key, error) {
// decodeKey decodes inp into Key. If zeroCopy is true, returned key
// references data in inp. Otherwise, it is copied.
func decodeKey(inp []byte, zeroCopy bool) (*ledger.Key, error) {
key := &ledger.Key{}

numOfParts, rest, err := utils.ReadUint16(inp)
if err != nil {
return nil, fmt.Errorf("error decoding key (content): %w", err)
}

if numOfParts == 0 {
return key, nil
}

key.KeyParts = make([]ledger.KeyPart, numOfParts)

for i := 0; i < int(numOfParts); i++ {
var kpEncSize uint32
var kpEnc []byte
Expand All @@ -227,11 +267,12 @@ func decodeKey(inp []byte) (*ledger.Key, error) {
}

// decode encoded key part
kp, err := decodeKeyPart(kpEnc)
kp, err := decodeKeyPart(kpEnc, zeroCopy)
if err != nil {
return nil, fmt.Errorf("error decoding key (content): %w", err)
}
key.KeyParts = append(key.KeyParts, *kp)

key.KeyParts[i] = *kp
}
return key, nil
}
Expand All @@ -254,6 +295,14 @@ func encodeValue(v ledger.Value) []byte {
return v
}

func encodeAndAppendValue(buffer []byte, v ledger.Value) []byte {
return append(buffer, v...)
}

func encodedValueLength(v ledger.Value) int {
return len(v)
}

// DecodeValue constructs a ledger value using an encoded byte slice
func DecodeValue(encodedValue []byte) (ledger.Value, error) {
// check enc dec version
Expand Down Expand Up @@ -327,30 +376,52 @@ func EncodePayload(p *ledger.Payload) []byte {
return buffer
}

// EncodeAndAppendPayloadWithoutPrefix encodes a ledger payload
// without prefix (version and type) and appends to buffer.
// If payload is nil, unmodified buffer is returned.
func EncodeAndAppendPayloadWithoutPrefix(buffer []byte, p *ledger.Payload) []byte {
if p == nil {
return buffer
}
return encodeAndAppendPayload(buffer, p)
}

func EncodedPayloadLengthWithoutPrefix(p *ledger.Payload) int {
return encodedPayloadLength(p)
}

func encodePayload(p *ledger.Payload) []byte {
buffer := make([]byte, 0)
buffer := make([]byte, 0, encodedPayloadLength(p))
return encodeAndAppendPayload(buffer, p)
}

// encode key
encK := encodeKey(&p.Key)
func encodeAndAppendPayload(buffer []byte, p *ledger.Payload) []byte {

// encode encoded key size
buffer = utils.AppendUint32(buffer, uint32(len(encK)))
buffer = utils.AppendUint32(buffer, uint32(encodedKeyLength(&p.Key)))

// append encoded key content
buffer = append(buffer, encK...)

// encode value
encV := encodeValue(p.Value)
// encode key
buffer = encodeAndAppendKey(buffer, &p.Key)

// encode encoded value size
buffer = utils.AppendUint64(buffer, uint64(len(encV)))
buffer = utils.AppendUint64(buffer, uint64(encodedValueLength(p.Value)))

// append encoded key content
buffer = append(buffer, encV...)
// encode value
buffer = encodeAndAppendValue(buffer, p.Value)

return buffer
}

func encodedPayloadLength(p *ledger.Payload) int {
if p == nil {
return 0
}
// Payload is encoded as:
// encode key length (4 bytes) + encoded key +
// encoded value length (8 bytes) + encode value
return 4 + encodedKeyLength(&p.Key) + 8 + encodedValueLength(p.Value)
}

// DecodePayload construct a payload from an encoded byte slice
func DecodePayload(encodedPayload []byte) (*ledger.Payload, error) {
// if empty don't decode
Expand All @@ -367,10 +438,24 @@ func DecodePayload(encodedPayload []byte) (*ledger.Payload, error) {
if err != nil {
return nil, fmt.Errorf("error decoding payload: %w", err)
}
return decodePayload(rest)
// decode payload (zerocopy)
return decodePayload(rest, true)
}

// DecodePayloadWithoutPrefix constructs a payload from encoded byte slice
// without prefix (version and type). If zeroCopy is true, returned payload
// references data in encodedPayload. Otherwise, it is copied.
func DecodePayloadWithoutPrefix(encodedPayload []byte, zeroCopy bool) (*ledger.Payload, error) {
// if empty don't decode
if len(encodedPayload) == 0 {
return nil, nil
}
return decodePayload(encodedPayload, zeroCopy)
}

func decodePayload(inp []byte) (*ledger.Payload, error) {
// decodePayload decodes inp into payload. If zeroCopy is true,
// returned payload references data in inp. Otherwise, it is copied.
func decodePayload(inp []byte, zeroCopy bool) (*ledger.Payload, error) {

// read encoded key size
encKeySize, rest, err := utils.ReadUint32(inp)
Expand All @@ -385,7 +470,7 @@ func decodePayload(inp []byte) (*ledger.Payload, error) {
}

// decode the key
key, err := decodeKey(encKey)
key, err := decodeKey(encKey, zeroCopy)
if err != nil {
return nil, fmt.Errorf("error decoding payload: %w", err)
}
Expand All @@ -402,7 +487,13 @@ func decodePayload(inp []byte) (*ledger.Payload, error) {
return nil, fmt.Errorf("error decoding payload: %w", err)
}

return &ledger.Payload{Key: *key, Value: encValue}, nil
if zeroCopy {
return &ledger.Payload{Key: *key, Value: encValue}, nil
}

v := make([]byte, len(encValue))
copy(v, encValue)
return &ledger.Payload{Key: *key, Value: v}, nil
}

// EncodeTrieUpdate encodes a trie update struct
Expand Down Expand Up @@ -475,9 +566,6 @@ func DecodeTrieUpdate(encodedTrieUpdate []byte) (*ledger.TrieUpdate, error) {

func decodeTrieUpdate(inp []byte) (*ledger.TrieUpdate, error) {

paths := make([]ledger.Path, 0)
payloads := make([]*ledger.Payload, 0)

// decode root hash
rhSize, rest, err := utils.ReadUint16(inp)
if err != nil {
Expand Down Expand Up @@ -505,6 +593,9 @@ func decodeTrieUpdate(inp []byte) (*ledger.TrieUpdate, error) {
return nil, fmt.Errorf("error decoding trie update: %w", err)
}

paths := make([]ledger.Path, numOfPaths)
payloads := make([]*ledger.Payload, numOfPaths)

var path ledger.Path
var encPath []byte
for i := 0; i < int(numOfPaths); i++ {
Expand All @@ -516,7 +607,7 @@ func decodeTrieUpdate(inp []byte) (*ledger.TrieUpdate, error) {
if err != nil {
return nil, fmt.Errorf("error decoding trie update: %w", err)
}
paths = append(paths, path)
paths[i] = path
}

var payloadSize uint32
Expand All @@ -532,11 +623,12 @@ func decodeTrieUpdate(inp []byte) (*ledger.TrieUpdate, error) {
if err != nil {
return nil, fmt.Errorf("error decoding trie update: %w", err)
}
payload, err = decodePayload(encPayload)
// Decode payload (zerocopy)
payload, err = decodePayload(encPayload, true)
if err != nil {
return nil, fmt.Errorf("error decoding trie update: %w", err)
}
payloads = append(payloads, payload)
payloads[i] = payload
}
return &ledger.TrieUpdate{RootHash: rh, Paths: paths, Payloads: payloads}, nil
}
Expand Down Expand Up @@ -660,7 +752,8 @@ func decodeTrieProof(inp []byte) (*ledger.TrieProof, error) {
if err != nil {
return nil, fmt.Errorf("error decoding proof: %w", err)
}
payload, err := decodePayload(encPayload)
// Decode payload (zerocopy)
payload, err := decodePayload(encPayload, true)
if err != nil {
return nil, fmt.Errorf("error decoding proof: %w", err)
}
Expand All @@ -671,7 +764,8 @@ func decodeTrieProof(inp []byte) (*ledger.TrieProof, error) {
if err != nil {
return nil, fmt.Errorf("error decoding proof: %w", err)
}
interims := make([]hash.Hash, 0)

interims := make([]hash.Hash, interimsLen)

var interimSize uint16
var interim hash.Hash
Expand All @@ -692,7 +786,7 @@ func decodeTrieProof(inp []byte) (*ledger.TrieProof, error) {
return nil, fmt.Errorf("error decoding proof: %w", err)
}

interims = append(interims, interim)
interims[i] = interim
}
pInst.Interims = interims

Expand Down
Loading

0 comments on commit 9a1c704

Please sign in to comment.