Skip to content

Commit

Permalink
feat(share/eds/store): expose eds store params (#2724)
Browse files Browse the repository at this point in the history
Adds exported params to allow control of eds store configuration
  • Loading branch information
walldiss authored Sep 21, 2023
1 parent 953a349 commit 15b8bcc
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 46 deletions.
3 changes: 2 additions & 1 deletion core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn
func createStore(t *testing.T) *eds.Store {
t.Helper()

store, err := eds.NewStore(t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore()))
storeCfg := eds.DefaultParameters()
store, err := eds.NewStore(storeCfg, t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)
return store
}
Expand Down
3 changes: 2 additions & 1 deletion libs/edssser/edssser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type EDSsser struct {
}

func NewEDSsser(path string, datastore datastore.Batching, cfg Config) (*EDSsser, error) {
edsstore, err := eds.NewStore(path, datastore)
storeCfg := eds.DefaultParameters()
edsstore, err := eds.NewStore(storeCfg, path, datastore)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions nodebuilder/share/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
Expand All @@ -13,6 +14,9 @@ import (

// TODO: some params are pointers and other are not, Let's fix this.
type Config struct {
// EDSStoreParams sets eds store configuration parameters
EDSStoreParams *eds.Parameters

UseShareExchange bool
// ShrExEDSParams sets shrexeds client and server configuration parameters
ShrExEDSParams *shrexeds.Parameters
Expand All @@ -27,6 +31,7 @@ type Config struct {

func DefaultConfig(tp node.Type) Config {
cfg := Config{
EDSStoreParams: eds.DefaultParameters(),
Discovery: discovery.DefaultParameters(),
ShrExEDSParams: shrexeds.DefaultParameters(),
ShrExNDParams: shrexnd.DefaultParameters(),
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
)),
fx.Provide(fx.Annotate(
func(path node.StorePath, ds datastore.Batching) (*eds.Store, error) {
return eds.NewStore(string(path), ds)
return eds.NewStore(cfg.EDSStoreParams, string(path), ds)
},
fx.OnStart(func(ctx context.Context, store *eds.Store) error {
err := store.Start(ctx)
Expand Down
3 changes: 1 addition & 2 deletions nodebuilder/share/share_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ func Test_EmptyCARExists(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

tmpDir := t.TempDir()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(tmpDir, ds)
edsStore, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), ds)
require.NoError(t, err)
err = edsStore.Start(ctx)
require.NoError(t, err)
Expand Down
10 changes: 5 additions & 5 deletions nodebuilder/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func BenchmarkStore(b *testing.B) {
err := Init(*DefaultConfig(node.Full), dir, node.Full)
require.NoError(b, err)

store := newStore(ctx, b, dir)
store := newStore(ctx, b, eds.DefaultParameters(), dir)
size := 128
b.Run("enabled eds proof caching", func(b *testing.B) {
b.StopTimer()
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestStoreRestart(t *testing.T) {
err := Init(*DefaultConfig(node.Full), dir, node.Full)
require.NoError(t, err)

store := newStore(ctx, t, dir)
store := newStore(ctx, t, eds.DefaultParameters(), dir)

hashes := make([][]byte, blocks)
for i := range hashes {
Expand All @@ -145,7 +145,7 @@ func TestStoreRestart(t *testing.T) {

// restart store
store.stop(ctx, t)
store = newStore(ctx, t, dir)
store = newStore(ctx, t, eds.DefaultParameters(), dir)

for _, h := range hashes {
edsReader, err := store.edsStore.GetCAR(ctx, h)
Expand All @@ -163,12 +163,12 @@ type store struct {
edsStore *eds.Store
}

func newStore(ctx context.Context, t require.TestingT, dir string) store {
func newStore(ctx context.Context, t require.TestingT, params *eds.Parameters, dir string) store {
s, err := OpenStore(dir, nil)
require.NoError(t, err)
ds, err := s.Datastore()
require.NoError(t, err)
edsStore, err := eds.NewStore(dir, ds)
edsStore, err := eds.NewStore(params, dir, ds)
require.NoError(t, err)
err = edsStore.Start(ctx)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/tests/fraud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func TestFraudProofHandling(t *testing.T) {
set, val := sw.Validators(t)
fMaker := headerfraud.NewFraudMaker(t, 10, []types.PrivValidator{val}, set)

tmpDir := t.TempDir()
storeCfg := eds.DefaultParameters()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(tmpDir, ds)
edsStore, err := eds.NewStore(storeCfg, t.TempDir(), ds)
require.NoError(t, err)
require.NoError(t, edsStore.Start(ctx))
t.Cleanup(func() {
Expand Down
3 changes: 2 additions & 1 deletion nodebuilder/tests/swamp/swamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ func (s *Swamp) setupGenesis() {
// ensure core has surpassed genesis block
s.WaitTillHeight(ctx, 2)

store, err := eds.NewStore(s.t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore()))
ds := ds_sync.MutexWrap(ds.NewMapDatastore())
store, err := eds.NewStore(eds.DefaultParameters(), s.t.TempDir(), ds)
require.NoError(s.t, err)

ex := core.NewExchange(
Expand Down
30 changes: 13 additions & 17 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ const (
blocksPath = "/blocks/"
indexPath = "/index/"
transientsPath = "/transients/"

// GC performs DAG store garbage collection by reclaiming transient files of
// shards that are currently available but inactive, or errored.
// We don't use transient files right now, so GC is turned off by default.
defaultGCInterval = 0

defaultRecentBlocksCacheSize = 10
defaultBlockstoreCacheSize = 128
)

var ErrNotFound = errors.New("eds not found in store")
Expand Down Expand Up @@ -75,8 +67,12 @@ type Store struct {
}

// NewStore creates a new EDS Store under the given basepath and datastore.
func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
err := setupPath(basepath)
func NewStore(params *Parameters, basePath string, ds datastore.Batching) (*Store, error) {
if err := params.Validate(); err != nil {
return nil, err
}

err := setupPath(basePath)
if err != nil {
return nil, fmt.Errorf("failed to setup eds.Store directories: %w", err)
}
Expand All @@ -90,20 +86,20 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
return nil, fmt.Errorf("failed to register FS mount on the registry: %w", err)
}

fsRepo, err := index.NewFSRepo(basepath + indexPath)
fsRepo, err := index.NewFSRepo(basePath + indexPath)
if err != nil {
return nil, fmt.Errorf("failed to create index repository: %w", err)
}

invertedIdx, err := newSimpleInvertedIndex(basepath)
invertedIdx, err := newSimpleInvertedIndex(basePath)
if err != nil {
return nil, fmt.Errorf("failed to create index: %w", err)
}

failureChan := make(chan dagstore.ShardResult)
dagStore, err := dagstore.NewDAGStore(
dagstore.Config{
TransientsDir: basepath + transientsPath,
TransientsDir: basePath + transientsPath,
IndexRepo: fsRepo,
Datastore: ds,
MountRegistry: r,
Expand All @@ -115,22 +111,22 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
return nil, fmt.Errorf("failed to create DAGStore: %w", err)
}

recentBlocksCache, err := cache.NewAccessorCache("recent", defaultRecentBlocksCacheSize)
recentBlocksCache, err := cache.NewAccessorCache("recent", params.RecentBlocksCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create recent blocks cache: %w", err)
}

blockstoreCache, err := cache.NewAccessorCache("blockstore", defaultBlockstoreCacheSize)
blockstoreCache, err := cache.NewAccessorCache("blockstore", params.BlockstoreCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create blockstore cache: %w", err)
}

store := &Store{
basepath: basepath,
basepath: basePath,
dgstr: dagStore,
carIdx: fsRepo,
invertedIdx: invertedIdx,
gcInterval: defaultGCInterval,
gcInterval: params.GCInterval,
mounts: r,
shardFailures: failureChan,
cache: cache.NewDoubleCache(recentBlocksCache, blockstoreCache),
Expand Down
43 changes: 43 additions & 0 deletions share/eds/store_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package eds

import (
"fmt"
"time"
)

type Parameters struct {
// GC performs DAG store garbage collection by reclaiming transient files of
// shards that are currently available but inactive, or errored.
// We don't use transient files right now, so GC is turned off by default.
GCInterval time.Duration

// RecentBlocksCacheSize is the size of the cache for recent blocks.
RecentBlocksCacheSize int

// BlockstoreCacheSize is the size of the cache for blockstore requested accessors.
BlockstoreCacheSize int
}

// DefaultParameters returns the default configuration values for the EDS store parameters.
func DefaultParameters() *Parameters {
return &Parameters{
GCInterval: 0,
RecentBlocksCacheSize: 10,
BlockstoreCacheSize: 128,
}
}

func (p *Parameters) Validate() error {
if p.GCInterval < 0 {
return fmt.Errorf("eds: GC interval cannot be negative")
}

if p.RecentBlocksCacheSize < 1 {
return fmt.Errorf("eds: recent blocks cache size must be positive")
}

if p.BlockstoreCacheSize < 1 {
return fmt.Errorf("eds: blockstore cache size must be positive")
}
return nil
}
6 changes: 2 additions & 4 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,8 @@ func BenchmarkStore(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

tmpDir := b.TempDir()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := NewStore(tmpDir, ds)
edsStore, err := NewStore(DefaultParameters(), b.TempDir(), ds)
require.NoError(b, err)
err = edsStore.Start(ctx)
require.NoError(b, err)
Expand Down Expand Up @@ -469,9 +468,8 @@ func BenchmarkStore(b *testing.B) {
func newStore(t *testing.T) (*Store, error) {
t.Helper()

tmpDir := t.TempDir()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
return NewStore(tmpDir, ds)
return NewStore(DefaultParameters(), t.TempDir(), ds)
}

func randomEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, *share.Root) {
Expand Down
11 changes: 6 additions & 5 deletions share/getters/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func TestTeeGetter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

tmpDir := t.TempDir()
storeCfg := eds.DefaultParameters()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(tmpDir, ds)
edsStore, err := eds.NewStore(storeCfg, t.TempDir(), ds)
require.NoError(t, err)

err = edsStore.Start(ctx)
Expand Down Expand Up @@ -82,8 +82,9 @@ func TestStoreGetter(t *testing.T) {
t.Cleanup(cancel)

tmpDir := t.TempDir()
storeCfg := eds.DefaultParameters()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(tmpDir, ds)
edsStore, err := eds.NewStore(storeCfg, tmpDir, ds)
require.NoError(t, err)

err = edsStore.Start(ctx)
Expand Down Expand Up @@ -185,9 +186,9 @@ func TestIPLDGetter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

tmpDir := t.TempDir()
storeCfg := eds.DefaultParameters()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(tmpDir, ds)
edsStore, err := eds.NewStore(storeCfg, t.TempDir(), ds)
require.NoError(t, err)

err = edsStore.Start(ctx)
Expand Down
3 changes: 1 addition & 2 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,8 @@ func TestShrexGetter(t *testing.T) {
func newStore(t *testing.T) (*eds.Store, error) {
t.Helper()

tmpDir := t.TempDir()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
return eds.NewStore(tmpDir, ds)
return eds.NewStore(eds.DefaultParameters(), t.TempDir(), ds)
}

func generateTestEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, *share.Root, share.Namespace) {
Expand Down
2 changes: 1 addition & 1 deletion share/p2p/peers/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (p *Parameters) Validate() error {
return nil
}

// DefaultParameters returns the default configuration values for the daser parameters
// DefaultParameters returns the default configuration values for the peer manager parameters
func DefaultParameters() Parameters {
return Parameters{
// PoolValidationTimeout's default value is based on the default daser sampling timeout of 1 minute.
Expand Down
4 changes: 2 additions & 2 deletions share/p2p/shrexeds/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func TestExchange_RequestEDS(t *testing.T) {
func newStore(t *testing.T) *eds.Store {
t.Helper()

tmpDir := t.TempDir()
storeCfg := eds.DefaultParameters()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
store, err := eds.NewStore(tmpDir, ds)
store, err := eds.NewStore(storeCfg, t.TempDir(), ds)
require.NoError(t, err)
return store
}
Expand Down
4 changes: 2 additions & 2 deletions share/p2p/shrexnd/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func (m notFoundGetter) GetSharesByNamespace(
func newStore(t *testing.T) *eds.Store {
t.Helper()

tmpDir := t.TempDir()
storeCfg := eds.DefaultParameters()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
store, err := eds.NewStore(tmpDir, ds)
store, err := eds.NewStore(storeCfg, t.TempDir(), ds)
require.NoError(t, err)
return store
}
Expand Down

0 comments on commit 15b8bcc

Please sign in to comment.