Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize MTrie checkpoint: 47x speedup (11.7 hours -> 15 mins), -431 GB alloc/op, -7.6 billion allocs/op, -6.9 GB file size #1944

Merged
merged 41 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
04e5c08
Add NewUniqueNodeIterator() to skip shared nodes
fxamacker Feb 1, 2022
dfafbd0
Optimize FlattenForest() with NewUniqueNodeIterator
fxamacker Feb 1, 2022
f185fd9
Skip StorableNode/StorableTrie for new checkpoint
fxamacker Feb 2, 2022
8b03a22
Skip StorableNode/Trie when reading checkpoint
fxamacker Feb 2, 2022
800cca1
Fix lint errors
fxamacker Feb 2, 2022
422b75b
Reduce checkpoint file size by 5.8+GB
fxamacker Feb 3, 2022
0b73f52
Merge branch 'master' into fxamacker/optimize-checkpoint
fxamacker Feb 3, 2022
dd636e5
Encode and decode empty payload in checkpoint
fxamacker Feb 4, 2022
6c5ad37
Optimize reading checkpoint file by reusing buffer
fxamacker Feb 4, 2022
3aacebb
Increase bufio read size to 8192 for checkpoint
fxamacker Feb 4, 2022
c9a8f14
Optimize creating checkpoint by reusing buffer
fxamacker Feb 6, 2022
681d9fe
Increase bufio write size to 8192 for checkpoint
fxamacker Feb 7, 2022
c7c62cd
Preallocate slice when decoding trie update
fxamacker Feb 7, 2022
1b6878e
Avoid allocs when loading checkpoint
fxamacker Feb 8, 2022
81ddd43
Increase checkpoint bufio buffer to 32KiB
fxamacker Feb 14, 2022
529b923
Merge branch 'master' into fxamacker/optimize-checkpoint
fxamacker Feb 14, 2022
889c2f2
Rename payload encoding functions
fxamacker Feb 15, 2022
51c3249
Refactor payload encoding function APIs
fxamacker Feb 15, 2022
2d0936c
Refactor payload encoding functions
fxamacker Feb 15, 2022
0cc1ab2
Fix EncodeAndAppendPayload[...] with nil payload
fxamacker Feb 16, 2022
f09677d
Improve test
fxamacker Feb 16, 2022
2ca95f4
Cleanup checkpoint encoding and decoding
fxamacker Feb 17, 2022
d12c3b3
Optimize encodedKeyLength()
fxamacker Feb 17, 2022
802347f
Add flattening encoding tests
fxamacker Feb 18, 2022
aa1549e
Add node type check when decoding checkpoint
fxamacker Feb 18, 2022
f91a1be
Cleanup checkpoint v3 decoding
fxamacker Feb 18, 2022
2b9d1ae
Add checkpoint v3 decoding tests
fxamacker Feb 18, 2022
dcca74e
Refactor mtrie checkpoint encoding to reduce size
fxamacker Feb 23, 2022
9d9d2bb
Fix lint error
fxamacker Feb 23, 2022
bfa2c67
Refactor tests
fxamacker Feb 23, 2022
6618948
Refactor checkpointer
fxamacker Feb 23, 2022
0f679fd
Refactor checkpointer test
fxamacker Feb 23, 2022
65514fa
Refactor checkpoint v3 and earlier tests
fxamacker Feb 24, 2022
0ee9675
Remove TODO comment to log mtrie eviction cb error
fxamacker Feb 25, 2022
0f1ba35
Add checkpoint benchmarks
fxamacker Mar 2, 2022
c919100
Fix linter error
fxamacker Mar 2, 2022
1ebde08
Remove extra logs
fxamacker Mar 2, 2022
c99bc8c
Remove returned error from onTreeEvicted callback
fxamacker Mar 7, 2022
d3a32c6
Add extra comment for NewNodeIterator
fxamacker Mar 7, 2022
6eb8ca9
Merge branch 'master' into fxamacker/optimize-checkpoint
fxamacker Mar 7, 2022
2c587e6
Merge branch 'master' into fxamacker/optimize-checkpoint
fxamacker Mar 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/util/cmd/checkpoint-list-tries/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func init() {

func run(*cobra.Command, []string) {

flattenedForest, err := wal.LoadCheckpoint(flagCheckpoint)
tries, err := wal.LoadCheckpoint(flagCheckpoint)
if err != nil {
log.Fatal().Err(err).Msg("error while loading checkpoint")
}

for _, trie := range flattenedForest.Tries {
fmt.Printf("%x\n", trie.RootHash)
for _, trie := range tries {
fmt.Printf("%x\n", trie.RootHash())
}
}
4 changes: 2 additions & 2 deletions cmd/util/cmd/read-execution-state/list-wals/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/module/metrics"
)
Expand Down Expand Up @@ -52,7 +52,7 @@ func run(*cobra.Command, []string) {
}()

err = w.ReplayLogsOnly(
func(forestSequencing *flattener.FlattenedForest) error {
func(tries []*trie.MTrie) error {
fmt.Printf("forest sequencing \n")
return nil
},
Expand Down
176 changes: 135 additions & 41 deletions ledger/common/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,25 @@ func EncodeKeyPart(kp *ledger.KeyPart) []byte {
}

func encodeKeyPart(kp *ledger.KeyPart) []byte {
buffer := make([]byte, 0)
buffer := make([]byte, 0, encodedKeyPartLength(kp))
return encodeAndAppendKeyPart(buffer, kp)
}

func encodeAndAppendKeyPart(buffer []byte, kp *ledger.KeyPart) []byte {
// encode "Type" field of the key part
buffer = utils.AppendUint16(buffer, kp.Type)

// encode "Value" field of the key part
buffer = append(buffer, kp.Value...)

return buffer
}

func encodedKeyPartLength(kp *ledger.KeyPart) int {
// Key part is encoded as: type (2 bytes) + value
return 2 + len(kp.Value)
}

// DecodeKeyPart constructs a key part from an encoded key part
func DecodeKeyPart(encodedKeyPart []byte) (*ledger.KeyPart, error) {
// currently we ignore the version but in the future we
Expand All @@ -133,22 +142,29 @@ func DecodeKeyPart(encodedKeyPart []byte) (*ledger.KeyPart, error) {
return nil, fmt.Errorf("error decoding key part: %w", err)
}

// decode the key part content
key, err := decodeKeyPart(rest)
// decode the key part content (zerocopy)
key, err := decodeKeyPart(rest, true)
if err != nil {
return nil, fmt.Errorf("error decoding key part: %w", err)
}

return key, nil
}

func decodeKeyPart(inp []byte) (*ledger.KeyPart, error) {
// decodeKeyPart decodes inp into KeyPart. If zeroCopy is true, KeyPart
// references data in inp. Otherwise, it is copied.
func decodeKeyPart(inp []byte, zeroCopy bool) (*ledger.KeyPart, error) {
// read key part type and the rest is the key item part
kpt, kpv, err := utils.ReadUint16(inp)
if err != nil {
return nil, fmt.Errorf("error decoding key part (content): %w", err)
}
return &ledger.KeyPart{Type: kpt, Value: kpv}, nil
if zeroCopy {
return &ledger.KeyPart{Type: kpt, Value: kpv}, nil
}
v := make([]byte, len(kpv))
copy(v, kpv)
return &ledger.KeyPart{Type: kpt, Value: v}, nil
}

// EncodeKey encodes a key into a byte slice
Expand All @@ -168,21 +184,36 @@ func EncodeKey(k *ledger.Key) []byte {

// encodeKey encodes a key into a byte slice
func encodeKey(k *ledger.Key) []byte {
buffer := make([]byte, 0)
buffer := make([]byte, 0, encodedKeyLength(k))
return encodeAndAppendKey(buffer, k)
}

func encodeAndAppendKey(buffer []byte, k *ledger.Key) []byte {
// encode number of key parts
buffer = utils.AppendUint16(buffer, uint16(len(k.KeyParts)))

// iterate over key parts
for _, kp := range k.KeyParts {
// encode the key part
encKP := encodeKeyPart(&kp)
// encode the len of the encoded key part
buffer = utils.AppendUint32(buffer, uint32(len(encKP)))
// append the encoded key part
buffer = append(buffer, encKP...)
buffer = utils.AppendUint32(buffer, uint32(encodedKeyPartLength(&kp)))

// encode the key part
buffer = encodeAndAppendKeyPart(buffer, &kp)
}

return buffer
}

func encodedKeyLength(k *ledger.Key) int {
// Key is encoded as: number of key parts (2 bytes) and for each key part,
// the key part size (4 bytes) + encoded key part (n bytes).
size := 2 + 4*len(k.KeyParts)
for _, kp := range k.KeyParts {
size += encodedKeyPartLength(&kp)
}
return size
}

// DecodeKey constructs a key from an encoded key part
func DecodeKey(encodedKey []byte) (*ledger.Key, error) {
fxamacker marked this conversation as resolved.
Show resolved Hide resolved
// check the enc dec version
Expand All @@ -196,21 +227,30 @@ func DecodeKey(encodedKey []byte) (*ledger.Key, error) {
return nil, fmt.Errorf("error decoding key: %w", err)
}

// decode the key content
key, err := decodeKey(rest)
// decode the key content (zerocopy)
key, err := decodeKey(rest, true)
if err != nil {
return nil, fmt.Errorf("error decoding key: %w", err)
}
return key, nil
}

func decodeKey(inp []byte) (*ledger.Key, error) {
// decodeKey decodes inp into Key. If zeroCopy is true, returned key
// references data in inp. Otherwise, it is copied.
func decodeKey(inp []byte, zeroCopy bool) (*ledger.Key, error) {
key := &ledger.Key{}

numOfParts, rest, err := utils.ReadUint16(inp)
if err != nil {
return nil, fmt.Errorf("error decoding key (content): %w", err)
}

if numOfParts == 0 {
return key, nil
}

key.KeyParts = make([]ledger.KeyPart, numOfParts)

for i := 0; i < int(numOfParts); i++ {
var kpEncSize uint32
var kpEnc []byte
Expand All @@ -227,11 +267,12 @@ func decodeKey(inp []byte) (*ledger.Key, error) {
}

// decode encoded key part
kp, err := decodeKeyPart(kpEnc)
kp, err := decodeKeyPart(kpEnc, zeroCopy)
if err != nil {
return nil, fmt.Errorf("error decoding key (content): %w", err)
}
key.KeyParts = append(key.KeyParts, *kp)

key.KeyParts[i] = *kp
}
return key, nil
}
Expand All @@ -254,6 +295,14 @@ func encodeValue(v ledger.Value) []byte {
return v
}

func encodeAndAppendValue(buffer []byte, v ledger.Value) []byte {
return append(buffer, v...)
}

func encodedValueLength(v ledger.Value) int {
return len(v)
}

// DecodeValue constructs a ledger value using an encoded byte slice
func DecodeValue(encodedValue []byte) (ledger.Value, error) {
// check enc dec version
Expand Down Expand Up @@ -327,30 +376,52 @@ func EncodePayload(p *ledger.Payload) []byte {
return buffer
}

// EncodeAndAppendPayloadWithoutPrefix encodes a ledger payload
// without prefix (version and type) and appends to buffer.
// If payload is nil, unmodified buffer is returned.
func EncodeAndAppendPayloadWithoutPrefix(buffer []byte, p *ledger.Payload) []byte {
if p == nil {
return buffer
}
return encodeAndAppendPayload(buffer, p)
}

func EncodedPayloadLengthWithoutPrefix(p *ledger.Payload) int {
return encodedPayloadLength(p)
}

func encodePayload(p *ledger.Payload) []byte {
buffer := make([]byte, 0)
buffer := make([]byte, 0, encodedPayloadLength(p))
return encodeAndAppendPayload(buffer, p)
}

// encode key
encK := encodeKey(&p.Key)
func encodeAndAppendPayload(buffer []byte, p *ledger.Payload) []byte {

// encode encoded key size
buffer = utils.AppendUint32(buffer, uint32(len(encK)))
buffer = utils.AppendUint32(buffer, uint32(encodedKeyLength(&p.Key)))

// append encoded key content
buffer = append(buffer, encK...)

// encode value
encV := encodeValue(p.Value)
// encode key
buffer = encodeAndAppendKey(buffer, &p.Key)

// encode encoded value size
buffer = utils.AppendUint64(buffer, uint64(len(encV)))
buffer = utils.AppendUint64(buffer, uint64(encodedValueLength(p.Value)))
fxamacker marked this conversation as resolved.
Show resolved Hide resolved

// append encoded key content
buffer = append(buffer, encV...)
// encode value
buffer = encodeAndAppendValue(buffer, p.Value)

return buffer
}

func encodedPayloadLength(p *ledger.Payload) int {
if p == nil {
return 0
}
// Payload is encoded as:
// encode key length (4 bytes) + encoded key +
// encoded value length (8 bytes) + encode value
return 4 + encodedKeyLength(&p.Key) + 8 + encodedValueLength(p.Value)
}

// DecodePayload construct a payload from an encoded byte slice
func DecodePayload(encodedPayload []byte) (*ledger.Payload, error) {
// if empty don't decode
Expand All @@ -367,10 +438,24 @@ func DecodePayload(encodedPayload []byte) (*ledger.Payload, error) {
if err != nil {
return nil, fmt.Errorf("error decoding payload: %w", err)
}
return decodePayload(rest)
// decode payload (zerocopy)
return decodePayload(rest, true)
}

// DecodePayloadWithoutPrefix constructs a payload from encoded byte slice
// without prefix (version and type). If zeroCopy is true, returned payload
// references data in encodedPayload. Otherwise, it is copied.
func DecodePayloadWithoutPrefix(encodedPayload []byte, zeroCopy bool) (*ledger.Payload, error) {
// if empty don't decode
if len(encodedPayload) == 0 {
return nil, nil
}
return decodePayload(encodedPayload, zeroCopy)
}

func decodePayload(inp []byte) (*ledger.Payload, error) {
// decodePayload decodes inp into payload. If zeroCopy is true,
// returned payload references data in inp. Otherwise, it is copied.
func decodePayload(inp []byte, zeroCopy bool) (*ledger.Payload, error) {

// read encoded key size
encKeySize, rest, err := utils.ReadUint32(inp)
Expand All @@ -385,7 +470,7 @@ func decodePayload(inp []byte) (*ledger.Payload, error) {
}

// decode the key
key, err := decodeKey(encKey)
key, err := decodeKey(encKey, zeroCopy)
if err != nil {
return nil, fmt.Errorf("error decoding payload: %w", err)
}
Expand All @@ -402,7 +487,13 @@ func decodePayload(inp []byte) (*ledger.Payload, error) {
return nil, fmt.Errorf("error decoding payload: %w", err)
}

return &ledger.Payload{Key: *key, Value: encValue}, nil
if zeroCopy {
return &ledger.Payload{Key: *key, Value: encValue}, nil
}

v := make([]byte, len(encValue))
copy(v, encValue)
return &ledger.Payload{Key: *key, Value: v}, nil
}

// EncodeTrieUpdate encodes a trie update struct
Expand Down Expand Up @@ -475,9 +566,6 @@ func DecodeTrieUpdate(encodedTrieUpdate []byte) (*ledger.TrieUpdate, error) {

func decodeTrieUpdate(inp []byte) (*ledger.TrieUpdate, error) {

paths := make([]ledger.Path, 0)
payloads := make([]*ledger.Payload, 0)

// decode root hash
rhSize, rest, err := utils.ReadUint16(inp)
if err != nil {
Expand Down Expand Up @@ -505,6 +593,9 @@ func decodeTrieUpdate(inp []byte) (*ledger.TrieUpdate, error) {
return nil, fmt.Errorf("error decoding trie update: %w", err)
}

paths := make([]ledger.Path, numOfPaths)
payloads := make([]*ledger.Payload, numOfPaths)
Comment on lines +596 to +597
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this avoid the initialization with nil ?

Suggested change
paths := make([]ledger.Path, numOfPaths)
payloads := make([]*ledger.Payload, numOfPaths)
paths := make([]ledger.Path, 0, numOfPaths)
payloads := make([]*ledger.Payload, 0, numOfPaths)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this avoid the initialization with nil ?

I'm not sure I understand your question.

When slice is created, all elements are initialized with default value even when len=0 and cap>0.
https://go.dev/play/p/vSqQzlyE2Ij

	a := make([]*int, 5)
	fmt.Println("a", a)  // a [<nil> <nil> <nil> <nil> <nil>]

	b := make([]*int, 0, 5)
	fmt.Println("b", b[:cap(b)])  // b [<nil> <nil> <nil> <nil> <nil>]	

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a miss-conception involved here. Up to this moment, I also had this (apparently incorrect) assumption:

  • When allocating the underlying array for a slice, the compiler needs to write the default values to each slice element.
  • We were under the impression that if you initialize a slice with zero-length but non-zero capacity, the compiler would skip this initialization step.

But your example shows that this assumption is false. Thanks, I learned something 🎉 :

  • https://go.dev/blog/slices-intro explicitly states

    Arrays do not need to be initialized explicitly; the zero value of an array is a ready-to-use array whose elements are themselves zeroed:

Copy link
Contributor

@tarakby tarakby Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fxamacker for clarifying this mistake! I mistakenly thought that the pattern :

t := make([]int, 0, 10)
for i:=0; i<10; i++ {
  t = append(t, val)
}

is faster than :

t := make([]int, 10)
for i:=0; i<10; i++ {
  t[i] = val
}

because it avoids the initialization of t. It turns out this is not true!
I've learned something today 🙏🏼


var path ledger.Path
var encPath []byte
for i := 0; i < int(numOfPaths); i++ {
Expand All @@ -516,7 +607,7 @@ func decodeTrieUpdate(inp []byte) (*ledger.TrieUpdate, error) {
if err != nil {
return nil, fmt.Errorf("error decoding trie update: %w", err)
}
paths = append(paths, path)
paths[i] = path
}

var payloadSize uint32
Expand All @@ -532,11 +623,12 @@ func decodeTrieUpdate(inp []byte) (*ledger.TrieUpdate, error) {
if err != nil {
return nil, fmt.Errorf("error decoding trie update: %w", err)
}
payload, err = decodePayload(encPayload)
// Decode payload (zerocopy)
payload, err = decodePayload(encPayload, true)
if err != nil {
return nil, fmt.Errorf("error decoding trie update: %w", err)
}
payloads = append(payloads, payload)
payloads[i] = payload
}
return &ledger.TrieUpdate{RootHash: rh, Paths: paths, Payloads: payloads}, nil
}
Expand Down Expand Up @@ -660,7 +752,8 @@ func decodeTrieProof(inp []byte) (*ledger.TrieProof, error) {
if err != nil {
return nil, fmt.Errorf("error decoding proof: %w", err)
}
payload, err := decodePayload(encPayload)
// Decode payload (zerocopy)
payload, err := decodePayload(encPayload, true)
if err != nil {
return nil, fmt.Errorf("error decoding proof: %w", err)
}
Expand All @@ -671,7 +764,8 @@ func decodeTrieProof(inp []byte) (*ledger.TrieProof, error) {
if err != nil {
return nil, fmt.Errorf("error decoding proof: %w", err)
}
interims := make([]hash.Hash, 0)

interims := make([]hash.Hash, interimsLen)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the same here, since the initialization of arrays can be costly?

Suggested change
interims := make([]hash.Hash, interimsLen)
interims := make([]hash.Hash, 0, interimsLen)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above reply with code snippet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! I made the same error here!


var interimSize uint16
var interim hash.Hash
Expand All @@ -692,7 +786,7 @@ func decodeTrieProof(inp []byte) (*ledger.TrieProof, error) {
return nil, fmt.Errorf("error decoding proof: %w", err)
}

interims = append(interims, interim)
interims[i] = interim
}
pInst.Interims = interims

Expand Down
Loading