Skip to content

Commit

Permalink
feat(eds/store): cache proofs after first eds recompute on sampling (#…
Browse files Browse the repository at this point in the history
…2429)

## Overview

Before this change eds was recomputed twice on sampling:
1. inside sampling client to verify data integrity
 2. On storing EDS to storage to calculate merkle tree proof nodes.

This PR adds `proofsAdder` cache, that will store proof nodes for later
use with key features:
- cache could be optionally added to context and will be respected by
ipld/shrex getter to populate on eds recompute.
- cache is safe from second recompute, since it will only allow single
recompute write and all consequent recomputes over same cache will be
noop.
 
Also this PR reworks writeEDS to use use `proofsAdder` instead of
creating new blockstore and iterating over all keys in it. This approach
speeds up writeEDS even without pre collected cache.
 
Since there are no more need for blockstore being created, I have
simplified writeEDS and removed a lot of redundant code including whole
struct of `writeSession`.
 
The PR also adds benchmark that uses disk for badger store (existing
benchmark uses in-memory datastore instead of badger).
 
This PR brings significant performance increase. On disk store badger
results are:
  - before PR store.Put operation: **~1.5s**
  - after PR store.Put with cached proofs: **800ms** 
  - after PR store.Put without cached proofs: **900ms**
  • Loading branch information
walldiss authored Aug 7, 2023
1 parent cf1b117 commit 6546252
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 162 deletions.
29 changes: 27 additions & 2 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ package core
import (
"context"
"errors"
"fmt"

"github.com/filecoin-project/dagstore"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-app/app"
"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/shares"
"github.com/celestiaorg/celestia-app/pkg/square"
"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
Expand All @@ -17,12 +23,31 @@ import (
// extendBlock extends the given block data, returning the resulting
// ExtendedDataSquare (EDS). If there are no transactions in the block,
// nil is returned in place of the eds.
func extendBlock(data types.Data, appVersion uint64) (*rsmt2d.ExtendedDataSquare, error) {
func extendBlock(data types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) {
if app.IsEmptyBlock(data, appVersion) {
return nil, nil
}

return app.ExtendBlock(data, appVersion)
// Construct the data square from the block's transactions
dataSquare, err := square.Construct(data.Txs.ToSliceOfBytes(), appVersion, appconsts.SquareSizeUpperBound(appVersion))
if err != nil {
return nil, err
}
return extendShares(shares.ToBytes(dataSquare), options...)
}

func extendShares(s [][]byte, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) {
// Check that the length of the square is a power of 2.
if !shares.IsPowerOfTwo(len(s)) {
return nil, fmt.Errorf("number of shares is not a power of 2: got %d", len(s))
}
// here we construct a tree
// Note: uses the nmt wrapper to construct the tree.
squareSize := square.Size(len(s))
return rsmt2d.ComputeExtendedDataSquare(s,
appconsts.DefaultCodec(),
wrapper.NewConstructor(uint64(squareSize),
options...))
}

// storeEDS will only store extended block if it is not empty and doesn't already exist.
Expand Down
12 changes: 10 additions & 2 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"golang.org/x/sync/errgroup"

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
)

const concurrencyLimit = 4
Expand Down Expand Up @@ -105,7 +107,8 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
}

// extend block data
eds, err := extendBlock(block.Data, block.Header.Version.App)
adder := ipld.NewProofsAdder(int(block.Data.SquareSize))
eds, err := extendBlock(block.Data, block.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
}
Expand All @@ -119,6 +122,8 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
return nil, fmt.Errorf("incorrect hash in header at height %d: expected %x, got %x",
&block.Height, hash, eh.Hash())
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for height %d: %w", &block.Height, err)
Expand All @@ -142,7 +147,8 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
log.Debugw("fetched signed block from core", "height", b.Header.Height)

// extend block data
eds, err := extendBlock(b.Data, b.Header.Version.App)
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}
Expand All @@ -151,6 +157,8 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
if err != nil {
return nil, fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err)
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for block height %d: %w", b.Header.Height, err)
Expand Down
6 changes: 5 additions & 1 deletion core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"go.opentelemetry.io/otel/attribute"

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

Expand Down Expand Up @@ -150,7 +152,8 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
attribute.Int64("height", b.Header.Height),
)
// extend block data
eds, err := extendBlock(b.Data, b.Header.Version.App)
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return fmt.Errorf("extending block data: %w", err)
}
Expand All @@ -161,6 +164,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
}

// attempt to store block data if not empty
ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, b.Header.DataHash.Bytes(), eds, cl.store)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
Expand Down
83 changes: 83 additions & 0 deletions nodebuilder/store_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
package nodebuilder

import (
"context"
"strconv"
"testing"

"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestRepo(t *testing.T) {
Expand Down Expand Up @@ -50,3 +63,73 @@ func TestRepo(t *testing.T) {
})
}
}

func BenchmarkStore(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

tmpDir := b.TempDir()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(tmpDir, ds)
require.NoError(b, err)
err = edsStore.Start(ctx)
require.NoError(b, err)

// BenchmarkStore/bench_read_128-10 14 78970661 ns/op (~70ms)
b.Run("bench put 128", func(b *testing.B) {
b.ResetTimer()
dir := b.TempDir()

err := Init(*DefaultConfig(node.Full), dir, node.Full)
require.NoError(b, err)

store, err := OpenStore(dir, nil)
require.NoError(b, err)
ds, err := store.Datastore()
require.NoError(b, err)
edsStore, err := eds.NewStore(dir, ds)
require.NoError(b, err)
err = edsStore.Start(ctx)
require.NoError(b, err)

size := 128
b.Run("enabled eds proof caching", func(b *testing.B) {
b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
adder := ipld.NewProofsAdder(size * 2)
shares := sharetest.RandShares(b, size*size)
eds, err := rsmt2d.ComputeExtendedDataSquare(
shares,
share.DefaultRSMT2DCodec(),
wrapper.NewConstructor(uint64(size),
nmt.NodeVisitor(adder.VisitFn())),
)
require.NoError(b, err)
dah, err := da.NewDataAvailabilityHeader(eds)
require.NoError(b, err)
ctx := ipld.CtxWithProofsAdder(ctx, adder)

b.StartTimer()
err = edsStore.Put(ctx, dah.Hash(), eds)
b.StopTimer()
require.NoError(b, err)
}
})

b.Run("disabled eds proof caching", func(b *testing.B) {
b.ResetTimer()
b.StopTimer()
for i := 0; i < b.N; i++ {
eds := edstest.RandEDS(b, size)
dah, err := da.NewDataAvailabilityHeader(eds)
require.NoError(b, err)

b.StartTimer()
err = edsStore.Put(ctx, dah.Hash(), eds)
b.StopTimer()
require.NoError(b, err)
}
})
})
}
Loading

0 comments on commit 6546252

Please sign in to comment.