Skip to content

Commit

Permalink
CV Optimistic Export and Import
Browse files Browse the repository at this point in the history
  • Loading branch information
chillyvee committed Feb 4, 2024
1 parent 11ba496 commit ec6c43d
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 30 deletions.
65 changes: 58 additions & 7 deletions export.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package iavl

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -31,13 +32,24 @@ type ExportNode struct {
// depth-first post-order (LRN), this order must be preserved when importing in order to recreate
// the same tree structure.
type Exporter struct {
tree *ImmutableTree
ch chan *ExportNode
cancel context.CancelFunc
tree *ImmutableTree
ch chan *ExportNode
cancel context.CancelFunc
optimistic bool // export raw key value pairs for optimistic import
}

// NewExporter creates a new Exporter. Callers must call Close() when done.
func newExporter(tree *ImmutableTree) (*Exporter, error) {
return newExporterWithOptions(tree, false)
}

// NewOptimisticExporter creates a new Exporter with raw Key Values. Callers must call Close() when done.
func newOptimisticExporter(tree *ImmutableTree) (*Exporter, error) {
return newExporterWithOptions(tree, true)
}

// NewExporterWithOptions creates a new Exporter and configures optimistic mode
func newExporterWithOptions(tree *ImmutableTree, optimistic bool) (*Exporter, error) {
if tree == nil {
return nil, fmt.Errorf("tree is nil: %w", ErrNotInitalizedTree)
}
Expand All @@ -48,13 +60,18 @@ func newExporter(tree *ImmutableTree) (*Exporter, error) {

ctx, cancel := context.WithCancel(context.Background())
exporter := &Exporter{
tree: tree,
ch: make(chan *ExportNode, exportBufferSize),
cancel: cancel,
tree: tree,
ch: make(chan *ExportNode, exportBufferSize),
cancel: cancel,
optimistic: optimistic,
}

tree.ndb.incrVersionReaders(tree.version)
go exporter.export(ctx)
if exporter.optimistic {
go exporter.optimisticExport(ctx)
} else {
go exporter.export(ctx)
}

return exporter, nil
}
Expand All @@ -79,6 +96,40 @@ func (e *Exporter) export(ctx context.Context) {
close(e.ch)
}

// optimisticExport exports raw key, value nodes
// Cosmos-SDK should set different snapshot format so nodes can select between either "untrusted statesync" or "trusted-peer optimistic" import
func (e *Exporter) optimisticExport(ctx context.Context) {
e.tree.root.traverse(e.tree, true, func(node *Node) bool {
// TODO: How to get the original db value bytes directly without writeBytes()?
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)

if err := node.writeBytes(buf); err != nil {
fmt.Printf("WARN: failed writeBytes")
}

bytesCopy := make([]byte, buf.Len())
copy(bytesCopy, buf.Bytes())

// Use Export Node Format.
exportNode := &ExportNode{
Key: node.GetKey(), // TODO: How to get prefixed key so that import does not need to prefix?
Value: bytesCopy,
Version: 0, // Version not used
Height: 0, // Height not used
}

select {
case e.ch <- exportNode:
return false
case <-ctx.Done():
return true
}
})
close(e.ch)
}

// Next fetches the next exported node, or returns ExportDone when done.
func (e *Exporter) Next() (*ExportNode, error) {
if exportNode, ok := <-e.ch; ok {
Expand Down
62 changes: 59 additions & 3 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ func setupExportTreeBasic(t require.TestingT) *ImmutableTree {
require.NoError(t, err)
_, err = tree.Set([]byte("z"), []byte{255})
require.NoError(t, err)
_, err = tree.Set([]byte("a"), []byte{1})
_, err = tree.Set([]byte("c"), []byte{0x13})
require.NoError(t, err)
_, err = tree.Set([]byte("b"), []byte{2})
_, err = tree.Set([]byte("a"), []byte{0x11})
require.NoError(t, err)
_, err = tree.Set([]byte("c"), []byte{3})
_, err = tree.Set([]byte("b"), []byte{0x12})
require.NoError(t, err)
_, _, err = tree.SaveVersion()
require.NoError(t, err)
Expand Down Expand Up @@ -298,6 +298,62 @@ func TestExporter_Import(t *testing.T) {
}
}

func TestOptimisticExporter_Import(t *testing.T) {
testcases := map[string]*ImmutableTree{
"empty tree": NewImmutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger()),
"basic tree": setupExportTreeBasic(t),
}
if !testing.Short() {
testcases["sized tree"] = setupExportTreeSized(t, 4096)
testcases["random tree"] = setupExportTreeRandom(t)
}

for desc, tree := range testcases {
tree := tree
t.Run(desc, func(t *testing.T) {
t.Parallel()

exporter, err := tree.OptimisticExport()
require.NoError(t, err)
defer exporter.Close()

newTree := NewMutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger())
importer, err := newTree.OptimisticImport(tree.Version())
require.NoError(t, err)
defer importer.Close()

for {
item, err := exporter.Next()
if err == ErrorExportDone {
err = importer.Commit()
require.NoError(t, err)
break
}
require.NoError(t, err)
err = importer.Add(item)
require.NoError(t, err)
}

treeHash := tree.Hash()
newTreeHash := newTree.Hash()

require.Equal(t, treeHash, newTreeHash, "Tree hash mismatch")
require.Equal(t, tree.Size(), newTree.Size(), "Tree size mismatch")
require.Equal(t, tree.Version(), newTree.Version(), "Tree version mismatch")

tree.Iterate(func(key, value []byte) bool { //nolint:errcheck
index, _, err := tree.GetWithIndex(key)
require.NoError(t, err)
newIndex, newValue, err := newTree.GetWithIndex(key)
require.NoError(t, err)
require.Equal(t, index, newIndex, "Index mismatch for key %v", key)
require.Equal(t, value, newValue, "Value mismatch for key %v", key)
return false
})
})
}
}

func TestExporter_Close(t *testing.T) {
tree := setupExportTreeSized(t, 4096)
exporter, err := tree.Export()
Expand Down
6 changes: 6 additions & 0 deletions immutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ func (t *ImmutableTree) Export() (*Exporter, error) {
return newExporter(t)
}

// OptimisiticExport returns an iterator that exports tree nodes as ExportNodes. These nodes can be
// imported with MutableTree.Import() to recreate an identical tree.
func (t *ImmutableTree) OptimisticExport() (*Exporter, error) {
return newOptimisticExporter(t)
}

// GetWithIndex returns the index and value of the specified key if it exists, or nil and the next index
// otherwise. The returned value must not be modified, since it may point to data stored within
// IAVL.
Expand Down
114 changes: 94 additions & 20 deletions import.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,27 @@ type Importer struct {

// inflightCommit tracks a batch commit, if any.
inflightCommit <-chan error

// Optimistic raw key value import
optimistic bool
}

// newImporter creates a new Importer for an empty MutableTree.
//
// version should correspond to the version that was initially exported. It must be greater than
// or equal to the highest ExportNode version number given.
func newImporter(tree *MutableTree, version int64) (*Importer, error) {
return newImporterWithOptions(tree, version, false)
}

// newOptimisticImporter creates a new Importer for an empty MutableTree.
//
// expects optimistic raw key values for import
func newOptimisticImporter(tree *MutableTree, version int64) (*Importer, error) {
return newImporterWithOptions(tree, version, true)
}

func newImporterWithOptions(tree *MutableTree, version int64, optimistic bool) (*Importer, error) {
if version < 0 {
return nil, errors.New("imported version cannot be negative")
}
Expand All @@ -49,11 +63,12 @@ func newImporter(tree *MutableTree, version int64) (*Importer, error) {
}

return &Importer{
tree: tree,
version: version,
batch: tree.ndb.db.NewBatch(),
stack: make([]*Node, 0, 8),
nonces: make([]uint32, version+1),
tree: tree,
version: version,
batch: tree.ndb.db.NewBatch(),
stack: make([]*Node, 0, 8),
nonces: make([]uint32, version+1),
optimistic: optimistic,
}, nil
}

Expand Down Expand Up @@ -117,10 +132,65 @@ func (i *Importer) Close() {
i.tree = nil
}

// sendBatchIfFull can be called during imports after each key add
// automatically batch.Write() when pending writes > maxBatchSize
func (i *Importer) sendBatchIfFull() error {
if i.batchSize >= maxBatchSize {
// Wait for previous batch.
var err error
if i.inflightCommit != nil {
err = <-i.inflightCommit
i.inflightCommit = nil
}
if err != nil {
return err
}
result := make(chan error)
i.inflightCommit = result
go func(batch db.Batch) {
defer batch.Close()
result <- batch.Write()
}(i.batch)
i.batch = i.tree.ndb.db.NewBatch()
i.batchSize = 0
}

return nil
}

// OptimisticAdd adds a TRUSTED leveldb key value pair WITHOUT verification
func (i *Importer) OptimisticAdd(exportNode *ExportNode) error {
if i.tree == nil {
return ErrNoImport
}
if exportNode == nil {
return errors.New("node cannot be nil")
}
if exportNode.Key == nil {
return errors.New("node.Key cannot be nil")
}
if exportNode.Value == nil {
return errors.New("node.Value cannot be nil")
}

if err := i.batch.Set(i.tree.ndb.nodeKey(exportNode.Key), exportNode.Value); err != nil {
return err
}
i.batchSize++

i.sendBatchIfFull()

Check failure on line 181 in import.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value of `i.sendBatchIfFull` is not checked (errcheck)

return nil
}

// Add adds an ExportNode to the import. ExportNodes must be added in the order returned by
// Exporter, i.e. depth-first post-order (LRN). Nodes are periodically flushed to the database,
// but the imported version is not visible until Commit() is called.
func (i *Importer) Add(exportNode *ExportNode) error {
// Keep the same Add(node) API but run faster optimistic import when configured
if i.optimistic {
return i.OptimisticAdd(exportNode)
}
if i.tree == nil {
return ErrNoImport
}
Expand Down Expand Up @@ -193,24 +263,28 @@ func (i *Importer) Commit() error {
return ErrNoImport
}

switch len(i.stack) {
case 0:
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil {
return err
}
case 1:
i.stack[0].nodeKey.nonce = 1
if err := i.writeNode(i.stack[0]); err != nil {
return err
}
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil {
if i.optimistic {

Check warning on line 266 in import.go

View workflow job for this annotation

GitHub Actions / golangci-lint

empty-block: this block is empty, you can remove it (revive)
// All keys should be already imported
} else {
switch len(i.stack) {
case 0:
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil {
return err
}
case 1:
i.stack[0].nodeKey.nonce = 1
if err := i.writeNode(i.stack[0]); err != nil {
return err
}
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil {
return err
}
}
default:
return fmt.Errorf("invalid node structure, found stack size %v when committing",
len(i.stack))
}
default:
return fmt.Errorf("invalid node structure, found stack size %v when committing",
len(i.stack))
}

err := i.batch.WriteSync()
Expand Down
37 changes: 37 additions & 0 deletions import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,43 @@ func TestImporter_Commit_Empty(t *testing.T) {
assert.EqualValues(t, 3, tree.Version())
}

func TestImporter_OptimisticAdd(t *testing.T) {
k := []byte("rawStoreKey")
v := []byte("rawStoreValue")

testcases := map[string]struct {
node *ExportNode
valid bool
}{
"nil node": {nil, false},
"trusted_valid": {&ExportNode{Key: k, Value: v, Version: 1, Height: 0}, true},
"no key": {&ExportNode{Key: nil, Value: v, Version: 1, Height: 0}, false},
"no value": {&ExportNode{Key: k, Value: nil, Version: 1, Height: 0}, false},
// Only Key and Value used for Optimistic Add
// Version and Height is ignored
// further cases will be handled by Node.validate()
}
for desc, tc := range testcases {
tc := tc // appease scopelint
t.Run(desc, func(t *testing.T) {
tree := NewMutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger())
importer, err := tree.Import(1)
require.NoError(t, err)
defer importer.Close()

err = importer.OptimisticAdd(tc.node)
if tc.valid {
require.NoError(t, err)
} else {
if err == nil {
err = importer.Commit()
}
require.Error(t, err)
}
})
}
}

func BenchmarkImport(b *testing.B) {
benchmarkImport(b, 4096)
}
Expand Down
Loading

0 comments on commit ec6c43d

Please sign in to comment.