Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CV Optimistic Export and Import #878

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions export.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package iavl

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -31,13 +32,24 @@ type ExportNode struct {
// depth-first post-order (LRN), this order must be preserved when importing in order to recreate
// the same tree structure.
type Exporter struct {
tree *ImmutableTree
ch chan *ExportNode
cancel context.CancelFunc
tree *ImmutableTree
ch chan *ExportNode
cancel context.CancelFunc
optimistic bool // export raw key value pairs for optimistic import
}

// NewExporter creates a new Exporter. Callers must call Close() when done.
func newExporter(tree *ImmutableTree) (*Exporter, error) {
return newExporterWithOptions(tree, false)
}

// NewOptimisticExporter creates a new Exporter with raw Key Values. Callers must call Close() when done.
func newOptimisticExporter(tree *ImmutableTree) (*Exporter, error) {
return newExporterWithOptions(tree, true)
}

// NewExporterWithOptions creates a new Exporter and configures optimistic mode
func newExporterWithOptions(tree *ImmutableTree, optimistic bool) (*Exporter, error) {
if tree == nil {
return nil, fmt.Errorf("tree is nil: %w", ErrNotInitalizedTree)
}
Expand All @@ -48,20 +60,25 @@ func newExporter(tree *ImmutableTree) (*Exporter, error) {

ctx, cancel := context.WithCancel(context.Background())
exporter := &Exporter{
tree: tree,
ch: make(chan *ExportNode, exportBufferSize),
cancel: cancel,
tree: tree,
ch: make(chan *ExportNode, exportBufferSize),
cancel: cancel,
optimistic: optimistic,
}

tree.ndb.incrVersionReaders(tree.version)
go exporter.export(ctx)
if exporter.optimistic {
go exporter.optimisticExport(ctx)
} else {
go exporter.export(ctx)
}

return exporter, nil
}

// export exports nodes
func (e *Exporter) export(ctx context.Context) {
e.tree.root.traversePost(e.tree, true, func(node *Node) bool {
_ = e.tree.root.traversePost(e.tree, true, func(node *Node) bool {
exportNode := &ExportNode{
Key: node.key,
Value: node.value,
Expand All @@ -79,6 +96,40 @@ func (e *Exporter) export(ctx context.Context) {
close(e.ch)
}

// optimisticExport exports raw key, value nodes
// Cosmos-SDK should set different snapshot format so nodes can select between either "untrusted statesync" or "trusted-peer optimistic" import
func (e *Exporter) optimisticExport(ctx context.Context) {
e.tree.root.traverse(e.tree, true, func(node *Node) bool {
// TODO: How to get the original db value bytes directly without writeBytes()?
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)

if err := node.writeBytes(buf); err != nil {
fmt.Printf("WARN: failed writeBytes")
}

bytesCopy := make([]byte, buf.Len())
copy(bytesCopy, buf.Bytes())

// Use Export Node Format.
exportNode := &ExportNode{
Key: node.GetKey(), // TODO: How to get prefixed key so that import does not need to prefix?
Value: bytesCopy,
Version: 0, // Version not used
Height: 0, // Height not used
}

select {
case e.ch <- exportNode:
return false
case <-ctx.Done():
return true
}
})
close(e.ch)
}

// Next fetches the next exported node, or returns ExportDone when done.
func (e *Exporter) Next() (*ExportNode, error) {
if exportNode, ok := <-e.ch; ok {
Expand Down
56 changes: 56 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,62 @@ func TestExporter_Import(t *testing.T) {
}
}

func TestOptimisticExporter_Import(t *testing.T) {
testcases := map[string]*ImmutableTree{
"empty tree": NewImmutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger()),
"basic tree": setupExportTreeBasic(t),
}
if !testing.Short() {
testcases["sized tree"] = setupExportTreeSized(t, 4096)
testcases["random tree"] = setupExportTreeRandom(t)
}

for desc, tree := range testcases {
tree := tree
t.Run(desc, func(t *testing.T) {
t.Parallel()

exporter, err := tree.OptimisticExport()
require.NoError(t, err)
defer exporter.Close()

newTree := NewMutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger())
importer, err := newTree.OptimisticImport(tree.Version())
require.NoError(t, err)
defer importer.Close()

for {
item, err := exporter.Next()
if err == ErrorExportDone {
err = importer.Commit()
require.NoError(t, err)
break
}
require.NoError(t, err)
err = importer.Add(item)
require.NoError(t, err)
}

treeHash := tree.Hash()
newTreeHash := newTree.Hash()

require.Equal(t, treeHash, newTreeHash, "Tree hash mismatch")
require.Equal(t, tree.Size(), newTree.Size(), "Tree size mismatch")
require.Equal(t, tree.Version(), newTree.Version(), "Tree version mismatch")

tree.Iterate(func(key, value []byte) bool { //nolint:errcheck
index, _, err := tree.GetWithIndex(key)
require.NoError(t, err)
newIndex, newValue, err := newTree.GetWithIndex(key)
require.NoError(t, err)
require.Equal(t, index, newIndex, "Index mismatch for key %v", key)
require.Equal(t, value, newValue, "Value mismatch for key %v", key)
return false
})
})
}
}

func TestExporter_Close(t *testing.T) {
tree := setupExportTreeSized(t, 4096)
exporter, err := tree.Export()
Expand Down
6 changes: 6 additions & 0 deletions immutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ func (t *ImmutableTree) Export() (*Exporter, error) {
return newExporter(t)
}

// OptimisiticExport returns an iterator that exports tree nodes as ExportNodes. These nodes can be
// imported with MutableTree.Import() to recreate an identical tree.
func (t *ImmutableTree) OptimisticExport() (*Exporter, error) {
return newOptimisticExporter(t)
}

// GetWithIndex returns the index and value of the specified key if it exists, or nil and the next index
// otherwise. The returned value must not be modified, since it may point to data stored within
// IAVL.
Expand Down
115 changes: 95 additions & 20 deletions import.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,27 @@ type Importer struct {

// inflightCommit tracks a batch commit, if any.
inflightCommit <-chan error

// Optimistic raw key value import
optimistic bool
}

// newImporter creates a new Importer for an empty MutableTree.
//
// version should correspond to the version that was initially exported. It must be greater than
// or equal to the highest ExportNode version number given.
func newImporter(tree *MutableTree, version int64) (*Importer, error) {
return newImporterWithOptions(tree, version, false)
}

// newOptimisticImporter creates a new Importer for an empty MutableTree.
//
// expects optimistic raw key values for import
func newOptimisticImporter(tree *MutableTree, version int64) (*Importer, error) {
return newImporterWithOptions(tree, version, true)
}

func newImporterWithOptions(tree *MutableTree, version int64, optimistic bool) (*Importer, error) {
if version < 0 {
return nil, errors.New("imported version cannot be negative")
}
Expand All @@ -49,11 +63,12 @@ func newImporter(tree *MutableTree, version int64) (*Importer, error) {
}

return &Importer{
tree: tree,
version: version,
batch: tree.ndb.db.NewBatch(),
stack: make([]*Node, 0, 8),
nonces: make([]uint32, version+1),
tree: tree,
version: version,
batch: tree.ndb.db.NewBatch(),
stack: make([]*Node, 0, 8),
nonces: make([]uint32, version+1),
optimistic: optimistic,
}, nil
}

Expand Down Expand Up @@ -117,10 +132,67 @@ func (i *Importer) Close() {
i.tree = nil
}

// sendBatchIfFull can be called during imports after each key add
// automatically batch.Write() when pending writes > maxBatchSize
func (i *Importer) sendBatchIfFull() error {
if i.batchSize >= maxBatchSize {
// Wait for previous batch.
var err error
if i.inflightCommit != nil {
err = <-i.inflightCommit
i.inflightCommit = nil
}
if err != nil {
return err
}
result := make(chan error)
i.inflightCommit = result
go func(batch db.Batch) {
defer batch.Close()
result <- batch.Write()
}(i.batch)
i.batch = i.tree.ndb.db.NewBatch()
i.batchSize = 0
}

return nil
}

// OptimisticAdd adds a TRUSTED leveldb key value pair WITHOUT verification
func (i *Importer) OptimisticAdd(exportNode *ExportNode) error {
if i.tree == nil {
return ErrNoImport
}
if exportNode == nil {
return errors.New("node cannot be nil")
}
if exportNode.Key == nil {
return errors.New("node.Key cannot be nil")
}
if exportNode.Value == nil {
return errors.New("node.Value cannot be nil")
}

if err := i.batch.Set(i.tree.ndb.nodeKey(exportNode.Key), exportNode.Value); err != nil {
return err
}
i.batchSize++

if err := i.sendBatchIfFull(); err != nil {
return errors.New("failed sending db write batch")
}

return nil
}

// Add adds an ExportNode to the import. ExportNodes must be added in the order returned by
// Exporter, i.e. depth-first post-order (LRN). Nodes are periodically flushed to the database,
// but the imported version is not visible until Commit() is called.
func (i *Importer) Add(exportNode *ExportNode) error {
// Keep the same Add(node) API but run faster optimistic import when configured
if i.optimistic {
return i.OptimisticAdd(exportNode)
}
if i.tree == nil {
return ErrNoImport
}
Expand Down Expand Up @@ -193,24 +265,27 @@ func (i *Importer) Commit() error {
return ErrNoImport
}

switch len(i.stack) {
case 0:
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil {
return err
}
case 1:
i.stack[0].nodeKey.nonce = 1
if err := i.writeNode(i.stack[0]); err != nil {
return err
}
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil {
// Optimistic: All keys should be already imported
if !i.optimistic {
switch len(i.stack) {
case 0:
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil {
return err
}
case 1:
i.stack[0].nodeKey.nonce = 1
if err := i.writeNode(i.stack[0]); err != nil {
return err
}
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil {
return err
}
}
default:
return fmt.Errorf("invalid node structure, found stack size %v when committing",
len(i.stack))
}
default:
return fmt.Errorf("invalid node structure, found stack size %v when committing",
len(i.stack))
}

err := i.batch.WriteSync()
Expand Down
37 changes: 37 additions & 0 deletions import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,43 @@ func TestImporter_Commit_Empty(t *testing.T) {
assert.EqualValues(t, 3, tree.Version())
}

func TestImporter_OptimisticAdd(t *testing.T) {
k := []byte("rawStoreKey")
v := []byte("rawStoreValue")

testcases := map[string]struct {
node *ExportNode
valid bool
}{
"nil node": {nil, false},
"trusted_valid": {&ExportNode{Key: k, Value: v, Version: 1, Height: 0}, true},
"no key": {&ExportNode{Key: nil, Value: v, Version: 1, Height: 0}, false},
"no value": {&ExportNode{Key: k, Value: nil, Version: 1, Height: 0}, false},
// Only Key and Value used for Optimistic Add
// Version and Height is ignored
// further cases will be handled by Node.validate()
}
for desc, tc := range testcases {
tc := tc // appease scopelint
t.Run(desc, func(t *testing.T) {
tree := NewMutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger())
importer, err := tree.Import(1)
require.NoError(t, err)
defer importer.Close()

err = importer.OptimisticAdd(tc.node)
if tc.valid {
require.NoError(t, err)
} else {
if err == nil {
err = importer.Commit()
}
require.Error(t, err)
}
})
}
}

func BenchmarkImport(b *testing.B) {
benchmarkImport(b, 4096)
}
Expand Down
6 changes: 6 additions & 0 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ func (tree *MutableTree) Import(version int64) (*Importer, error) {
return newImporter(tree, version)
}

// OptimisticImport returns an importer for tree nodes previously exported by ImmutableTree.OptimisticExport(),
// producing an identical IAVL tree. The caller must call Close() on the importer when done.
func (tree *MutableTree) OptimisticImport(version int64) (*Importer, error) {
return newOptimisticImporter(tree, version)
}

// Iterate iterates over all keys of the tree. The keys and values must not be modified,
// since they may point to data stored within IAVL. Returns true if stopped by callnack, false otherwise
func (tree *MutableTree) Iterate(fn func(key []byte, value []byte) bool) (stopped bool, err error) {
Expand Down
Loading
Loading