Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eds/store): cache proofs after first eds recompute on sampling #2429

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ package core
import (
"context"
"errors"
"fmt"

"github.com/filecoin-project/dagstore"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-app/app"
"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/shares"
"github.com/celestiaorg/celestia-app/pkg/square"
"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
Expand All @@ -17,12 +23,31 @@ import (
// extendBlock extends the given block data, returning the resulting
// ExtendedDataSquare (EDS). If there are no transactions in the block,
// nil is returned in place of the eds.
func extendBlock(data types.Data, appVersion uint64) (*rsmt2d.ExtendedDataSquare, error) {
func extendBlock(data types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) {
if app.IsEmptyBlock(data, appVersion) {
return nil, nil
}

return app.ExtendBlock(data, appVersion)
// Construct the data square from the block's transactions
dataSquare, err := square.Construct(data.Txs.ToSliceOfBytes(), appVersion, appconsts.SquareSizeUpperBound(appVersion))
if err != nil {
return nil, err
}
return extendShares(shares.ToBytes(dataSquare), options...)
}

func extendShares(s [][]byte, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) {
// Check that the length of the square is a power of 2.
if !shares.IsPowerOfTwo(len(s)) {
return nil, fmt.Errorf("number of shares is not a power of 2: got %d", len(s))
}
// here we construct a tree
// Note: uses the nmt wrapper to construct the tree.
squareSize := square.Size(len(s))
return rsmt2d.ComputeExtendedDataSquare(s,
appconsts.DefaultCodec(),
wrapper.NewConstructor(uint64(squareSize),
options...))
}

// storeEDS will only store extended block if it is not empty and doesn't already exist.
Expand Down
12 changes: 10 additions & 2 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"golang.org/x/sync/errgroup"

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
)

const concurrencyLimit = 4
Expand Down Expand Up @@ -105,7 +107,8 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
}

// extend block data
eds, err := extendBlock(block.Data, block.Header.Version.App)
adder := ipld.NewProofsAdder(int(block.Data.SquareSize))
eds, err := extendBlock(block.Data, block.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
}
Expand All @@ -119,6 +122,8 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
return nil, fmt.Errorf("incorrect hash in header at height %d: expected %x, got %x",
&block.Height, hash, eh.Hash())
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for height %d: %w", &block.Height, err)
Expand All @@ -142,7 +147,8 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
log.Debugw("fetched signed block from core", "height", b.Header.Height)

// extend block data
eds, err := extendBlock(b.Data, b.Header.Version.App)
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}
Expand All @@ -151,6 +157,8 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
if err != nil {
return nil, fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err)
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for block height %d: %w", b.Header.Height, err)
Expand Down
6 changes: 5 additions & 1 deletion core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"go.opentelemetry.io/otel/attribute"

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

Expand Down Expand Up @@ -150,7 +152,8 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
attribute.Int64("height", b.Header.Height),
)
// extend block data
eds, err := extendBlock(b.Data, b.Header.Version.App)
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return fmt.Errorf("extending block data: %w", err)
}
Expand All @@ -161,6 +164,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
}

// attempt to store block data if not empty
ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, b.Header.DataHash.Bytes(), eds, cl.store)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
Expand Down
83 changes: 83 additions & 0 deletions nodebuilder/store_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
package nodebuilder

import (
"context"
"strconv"
"testing"

"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-app/pkg/wrapper"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestRepo(t *testing.T) {
Expand Down Expand Up @@ -50,3 +63,73 @@ func TestRepo(t *testing.T) {
})
}
}

func BenchmarkStore(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

tmpDir := b.TempDir()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(tmpDir, ds)
require.NoError(b, err)
err = edsStore.Start(ctx)
require.NoError(b, err)

// BenchmarkStore/bench_read_128-10 14 78970661 ns/op (~70ms)
b.Run("bench put 128", func(b *testing.B) {
b.ResetTimer()
dir := b.TempDir()

err := Init(*DefaultConfig(node.Full), dir, node.Full)
require.NoError(b, err)

store, err := OpenStore(dir, nil)
require.NoError(b, err)
ds, err := store.Datastore()
require.NoError(b, err)
edsStore, err := eds.NewStore(dir, ds)
require.NoError(b, err)
err = edsStore.Start(ctx)
require.NoError(b, err)

size := 128
b.Run("enabled eds proof caching", func(b *testing.B) {
b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
adder := ipld.NewProofsAdder(size * 2)
shares := sharetest.RandShares(b, size*size)
eds, err := rsmt2d.ComputeExtendedDataSquare(
shares,
share.DefaultRSMT2DCodec(),
wrapper.NewConstructor(uint64(size),
nmt.NodeVisitor(adder.VisitFn())),
)
require.NoError(b, err)
dah, err := da.NewDataAvailabilityHeader(eds)
require.NoError(b, err)
ctx := ipld.CtxWithProofsAdder(ctx, adder)

b.StartTimer()
err = edsStore.Put(ctx, dah.Hash(), eds)
b.StopTimer()
require.NoError(b, err)
}
})

b.Run("disabled eds proof caching", func(b *testing.B) {
b.ResetTimer()
b.StopTimer()
for i := 0; i < b.N; i++ {
eds := edstest.RandEDS(b, size)
dah, err := da.NewDataAvailabilityHeader(eds)
require.NoError(b, err)

b.StartTimer()
err = edsStore.Put(ctx, dah.Hash(), eds)
b.StopTimer()
require.NoError(b, err)
}
})
})
}
Loading
Loading