From 456d169582dc02aafb7d5df891da5973d54a20b7 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 5 Sep 2023 11:43:52 +0200 Subject: [PATCH] feat(eds/store): remove corrupted blocks from store (#2625) Related to #2335 If an OpShardFail is found or corruption is detected from GetSharesByNamespace, the shard is removed --- nodebuilder/share/constructors.go | 18 +++++++ nodebuilder/share/module.go | 79 ++++++++++++++++++++----------- share/eds/accessor_cache.go | 8 ++++ share/eds/metrics.go | 20 ++++++++ share/eds/store.go | 50 ++++++++++++++----- share/eds/store_test.go | 38 +++++++++++++++ share/getters/getter_test.go | 29 ++++++++++++ share/getters/shrex_test.go | 3 +- share/getters/store.go | 12 +++++ share/p2p/peers/manager.go | 28 ++++++++--- share/p2p/peers/manager_test.go | 5 +- share/p2p/peers/options.go | 17 +++++++ share/share.go | 14 ++++++ 13 files changed, 269 insertions(+), 52 deletions(-) diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index a1b7e39713..b962038d17 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -87,6 +87,24 @@ func lightGetter( return getters.NewCascadeGetter(cascade) } +// ShrexGetter is added to bridge nodes for the case that a shard is removed +// after detected shard corruption. This ensures the block is fetched and stored +// by shrex the next time the data is retrieved (meaning shard recovery is +// manual after corruption is detected). +func bridgeGetter( + store *eds.Store, + storeGetter *getters.StoreGetter, + shrexGetter *getters.ShrexGetter, + cfg Config, +) share.Getter { + var cascade []share.Getter + cascade = append(cascade, storeGetter) + if cfg.UseShareExchange { + cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store)) + } + return getters.NewCascadeGetter(cascade) +} + func fullGetter( store *eds.Store, storeGetter *getters.StoreGetter, diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index b924cf8167..f7a84bf526 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -5,9 +5,12 @@ import ( "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.uber.org/fx" - "github.com/celestiaorg/celestia-node/libs/fxutil" + libhead "github.com/celestiaorg/go-header" + + "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/nodebuilder/node" modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/share" @@ -48,6 +51,33 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option ), ) + shrexGetterComponents := fx.Options( + fx.Provide(func() peers.Parameters { + return cfg.PeerManagerParams + }), + fx.Provide( + func(host host.Host, network modp2p.Network) (*shrexnd.Client, error) { + cfg.ShrExNDParams.WithNetworkID(network.String()) + return shrexnd.NewClient(cfg.ShrExNDParams, host) + }, + ), + fx.Provide( + func(host host.Host, network modp2p.Network) (*shrexeds.Client, error) { + cfg.ShrExEDSParams.WithNetworkID(network.String()) + return shrexeds.NewClient(cfg.ShrExEDSParams, host) + }, + ), + fx.Provide(fx.Annotate( + getters.NewShrexGetter, + fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error { + return getter.Start(ctx) + }), + fx.OnStop(func(ctx context.Context, getter *getters.ShrexGetter) error { + return getter.Stop(ctx) + }), + )), + ) + bridgeAndFullComponents := fx.Options( fx.Provide(getters.NewStoreGetter), fx.Invoke(func(edsSrv *shrexeds.Server, ndSrc *shrexnd.Server) {}), @@ -112,32 +142,25 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option }), ) - shrexGetterComponents := fx.Options( - fx.Provide(func() peers.Parameters { - return cfg.PeerManagerParams - }), - fx.Provide(peers.NewManager), - fx.Provide( - func(host host.Host, network modp2p.Network) (*shrexnd.Client, error) { - cfg.ShrExNDParams.WithNetworkID(network.String()) - return shrexnd.NewClient(cfg.ShrExNDParams, host) - }, - ), + peerManagerWithShrexPools := fx.Options( fx.Provide( - func(host host.Host, network modp2p.Network) (*shrexeds.Client, error) { - cfg.ShrExEDSParams.WithNetworkID(network.String()) - return shrexeds.NewClient(cfg.ShrExEDSParams, host) + func( + params peers.Parameters, + discovery *disc.Discovery, + host host.Host, + connGater *conngater.BasicConnectionGater, + shrexSub *shrexsub.PubSub, + headerSub libhead.Subscriber[*header.ExtendedHeader], + ) (*peers.Manager, error) { + return peers.NewManager( + params, + discovery, + host, + connGater, + peers.WithShrexSubPools(shrexSub, headerSub), + ) }, ), - fx.Provide(fx.Annotate( - getters.NewShrexGetter, - fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error { - return getter.Start(ctx) - }), - fx.OnStop(func(ctx context.Context, getter *getters.ShrexGetter) error { - return getter.Stop(ctx) - }), - )), ) switch tp { @@ -145,10 +168,10 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option return fx.Module( "share", baseComponents, + fx.Provide(peers.NewManager), bridgeAndFullComponents, - fxutil.ProvideAs(func(getter *getters.StoreGetter) share.Getter { - return getter - }), + shrexGetterComponents, + fx.Provide(bridgeGetter), fx.Invoke(func(lc fx.Lifecycle, sub *shrexsub.PubSub) error { lc.Append(fx.Hook{ OnStart: sub.Start, @@ -160,6 +183,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option case node.Full: return fx.Module( "share", + peerManagerWithShrexPools, baseComponents, bridgeAndFullComponents, shrexGetterComponents, @@ -175,6 +199,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option light.WithSampleAmount(cfg.LightAvailability.SampleAmount), } }), + peerManagerWithShrexPools, shrexGetterComponents, fx.Invoke(ensureEmptyEDSInBS), fx.Provide(getters.NewIPLDGetter), diff --git a/share/eds/accessor_cache.go b/share/eds/accessor_cache.go index cd0f0537fa..9f70178be6 100644 --- a/share/eds/accessor_cache.go +++ b/share/eds/accessor_cache.go @@ -69,6 +69,14 @@ func (bc *blockstoreCache) evictFn() func(_ interface{}, val interface{}) { } } +func (bc *blockstoreCache) Remove(key shard.Key) bool { + lk := &bc.stripedLocks[shardKeyToStriped(key)] + lk.Lock() + defer lk.Unlock() + + return bc.cache.Remove(key) +} + // Get retrieves the blockstore for a given shard key from the cache. If the blockstore is not in // the cache, it returns an errCacheMiss func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlockstore, error) { diff --git a/share/eds/metrics.go b/share/eds/metrics.go index 1f430bf688..9d4b2a53ef 100644 --- a/share/eds/metrics.go +++ b/share/eds/metrics.go @@ -47,6 +47,8 @@ type metrics struct { listTime metric.Float64Histogram getAccessorTime metric.Float64Histogram + shardFailureCount metric.Int64Counter + longOpTime metric.Float64Histogram gcTime metric.Float64Histogram } @@ -106,6 +108,12 @@ func (s *Store) WithMetrics() error { return err } + shardFailureCount, err := meter.Int64Counter("eds_store_shard_failure_counter", + metric.WithDescription("eds store OpShardFail counter")) + if err != nil { + return err + } + longOpTime, err := meter.Float64Histogram("eds_store_long_operation_time_histogram", metric.WithDescription("eds store long operation time histogram(s)")) if err != nil { @@ -153,6 +161,7 @@ func (s *Store) WithMetrics() error { hasTime: hasTime, listTime: listTime, getAccessorTime: getAccessorTime, + shardFailureCount: shardFailureCount, longOpTime: longOpTime, gcTime: gcTime, } @@ -170,6 +179,17 @@ func (m *metrics) observeGCtime(ctx context.Context, dur time.Duration, failed b attribute.Bool(failedKey, failed))) } +func (m *metrics) observeShardFailure(ctx context.Context, shardKey string) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.shardFailureCount.Add(ctx, 1, metric.WithAttributes(attribute.String("shard_key", shardKey))) +} + func (m *metrics) observePut(ctx context.Context, dur time.Duration, result putResult, size uint) { if m == nil { return diff --git a/share/eds/store.go b/share/eds/store.go index 24a96c9fe4..fa9a7f7c7e 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "context" - "encoding/hex" "errors" "fmt" "io" @@ -64,6 +63,8 @@ type Store struct { // lastGCResult is only stored on the store for testing purposes. lastGCResult atomic.Pointer[dagstore.GCResult] + shardFailures chan dagstore.ShardResult + metrics *metrics } @@ -92,6 +93,8 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { if err != nil { return nil, fmt.Errorf("failed to create index: %w", err) } + + failureChan := make(chan dagstore.ShardResult) dagStore, err := dagstore.NewDAGStore( dagstore.Config{ TransientsDir: basepath + transientsPath, @@ -99,6 +102,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { Datastore: ds, MountRegistry: r, TopLevelIndex: invertedIdx, + FailureCh: failureChan, }, ) if err != nil { @@ -111,13 +115,14 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { } store := &Store{ - basepath: basepath, - dgstr: dagStore, - carIdx: fsRepo, - invertedIdx: invertedIdx, - gcInterval: defaultGCInterval, - mounts: r, - cache: cache, + basepath: basepath, + dgstr: dagStore, + carIdx: fsRepo, + invertedIdx: invertedIdx, + gcInterval: defaultGCInterval, + mounts: r, + shardFailures: failureChan, + cache: cache, } store.bs = newBlockstore(store, cache, ds) return store, nil @@ -139,6 +144,8 @@ func (s *Store) Start(ctx context.Context) error { if s.gcInterval != 0 { go s.gc(runCtx) } + + go s.watchForFailures(runCtx) return nil } @@ -172,6 +179,23 @@ func (s *Store) gc(ctx context.Context) { } } +func (s *Store) watchForFailures(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case res := <-s.shardFailures: + log.Errorw("removing shard after failure", "key", res.Key, "err", res.Error) + s.metrics.observeShardFailure(ctx, res.Key.String()) + k := share.MustDataHashFromString(res.Key.String()) + err := s.Remove(ctx, k) + if err != nil { + log.Errorw("failed to remove shard after failure", "key", res.Key, "err", err) + } + } + } +} + // Put stores the given data square with DataRoot's hash as a key. // // The square is verified on the Exchange level, and Put only stores the square, trusting it. @@ -437,6 +461,11 @@ func (s *Store) Remove(ctx context.Context, root share.DataHash) error { func (s *Store) remove(ctx context.Context, root share.DataHash) (err error) { key := root.String() + + // Remove from accessor cache, so that existing readers are closed and + // DestroyShard can be executed. + s.cache.Remove(shard.KeyFromString(key)) + ch := make(chan dagstore.ShardResult, 1) err = s.dgstr.DestroyShard(ctx, shard.KeyFromString(key), ch, dagstore.DestroyOpts{}) if err != nil { @@ -535,10 +564,7 @@ func (s *Store) list() ([]share.DataHash, error) { shards := s.dgstr.AllShardsInfo() hashes := make([]share.DataHash, 0, len(shards)) for shrd := range shards { - hash, err := hex.DecodeString(shrd.String()) - if err != nil { - return nil, err - } + hash := share.MustDataHashFromString(shrd.String()) hashes = append(hashes, hash) } return hashes, nil diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 4b263e7062..4f1d7f4c8b 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -122,6 +122,44 @@ func TestEDSStore(t *testing.T) { assert.ErrorContains(t, err, "no such file or directory") }) + t.Run("Remove after OpShardFail", func(t *testing.T) { + eds, dah := randomEDS(t) + + err = edsStore.Put(ctx, dah.Hash(), eds) + require.NoError(t, err) + + // assert that shard now exists + ok, err := edsStore.Has(ctx, dah.Hash()) + assert.NoError(t, err) + assert.True(t, ok) + + // assert that file now exists + path := edsStore.basepath + blocksPath + dah.String() + _, err = os.Stat(path) + assert.NoError(t, err) + + err = os.Remove(path) + assert.NoError(t, err) + + _, err = edsStore.GetCAR(ctx, dah.Hash()) + assert.Error(t, err) + + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + select { + case <-ticker.C: + has, err := edsStore.Has(ctx, dah.Hash()) + if err == nil && !has { + // shard no longer exists after OpShardFail was detected from GetCAR call + return + } + case <-ctx.Done(): + t.Fatal("timeout waiting for shard to be removed") + } + } + }) + t.Run("Has", func(t *testing.T) { eds, dah := randomEDS(t) diff --git a/share/getters/getter_test.go b/share/getters/getter_test.go index 02e075459b..d19d0cf174 100644 --- a/share/getters/getter_test.go +++ b/share/getters/getter_test.go @@ -2,6 +2,7 @@ package getters import ( "context" + "os" "testing" "time" @@ -153,6 +154,34 @@ func TestStoreGetter(t *testing.T) { _, err = sg.GetSharesByNamespace(ctx, &root, namespace) require.ErrorIs(t, err, share.ErrNotFound) }) + + t.Run("GetSharesFromNamespace removes corrupted shard", func(t *testing.T) { + randEds, namespace, dah := randomEDSWithDoubledNamespace(t, 4) + err = edsStore.Put(ctx, dah.Hash(), randEds) + require.NoError(t, err) + + // available + shares, err := sg.GetSharesByNamespace(ctx, &dah, namespace) + require.NoError(t, err) + require.NoError(t, shares.Verify(&dah, namespace)) + assert.Len(t, shares.Flatten(), 2) + + // 'corrupt' existing CAR by overwriting with a random EDS + f, err := os.OpenFile(tmpDir+"/blocks/"+dah.String(), os.O_WRONLY, 0644) + require.NoError(t, err) + edsToOverwriteWith, dah := randomEDS(t) + err = eds.WriteEDS(ctx, edsToOverwriteWith, f) + require.NoError(t, err) + + shares, err = sg.GetSharesByNamespace(ctx, &dah, namespace) + require.ErrorIs(t, err, share.ErrNotFound) + require.Nil(t, shares) + + // corruption detected, shard is removed + has, err := edsStore.Has(ctx, dah.Hash()) + require.False(t, has) + require.NoError(t, err) + }) } func TestIPLDGetter(t *testing.T) { diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index 0ca807d0d4..236fae36e1 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -227,11 +227,10 @@ func testManager( } manager, err := peers.NewManager( peers.DefaultParameters(), - headerSub, - shrexSub, disc, host, connGater, + peers.WithShrexSubPools(shrexSub, headerSub), ) return manager, err } diff --git a/share/getters/store.go b/share/getters/store.go index 989649f795..5eca956faa 100644 --- a/share/getters/store.go +++ b/share/getters/store.go @@ -121,6 +121,18 @@ func (sg *StoreGetter) GetSharesByNamespace( // wrap the read-only CAR blockstore in a getter blockGetter := eds.NewBlockGetter(bs) shares, err = collectSharesByNamespace(ctx, blockGetter, root, namespace) + if errors.Is(err, ipld.ErrNodeNotFound) { + // IPLD node not found after the index pointed to this shard and the CAR blockstore has been + // opened successfully is a strong indicator of corruption. We remove the block on bridges + // and fulls and return share.ErrNotFound to ensure the data is retrieved by the next + // getter. Note that this recovery is manual and will only be restored by an RPC call to + // fetch the same datahash that was removed. + err = sg.store.Remove(ctx, root.Hash()) + if err != nil { + log.Errorf("getter/store: failed to remove CAR after detected corruption: %w", err) + } + err = share.ErrNotFound + } if err != nil { return nil, fmt.Errorf("getter/store: failed to retrieve shares by namespace: %w", err) } diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 87f9361ee2..caef242eec 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -99,11 +99,10 @@ type syncPool struct { func NewManager( params Parameters, - headerSub libhead.Subscriber[*header.ExtendedHeader], - shrexSub *shrexsub.PubSub, discovery *discovery.Discovery, host host.Host, connGater *conngater.BasicConnectionGater, + options ...Option, ) (*Manager, error) { if err := params.Validate(); err != nil { return nil, err @@ -111,8 +110,6 @@ func NewManager( s := &Manager{ params: params, - headerSub: headerSub, - shrexSub: shrexSub, connGater: connGater, disc: discovery, host: host, @@ -122,6 +119,13 @@ func NewManager( disconnectedPeersDone: make(chan struct{}), } + for _, opt := range options { + err := opt(s) + if err != nil { + return nil, err + } + } + s.fullNodes = newPool(s.params.PeerCooldown) discovery.WithOnPeersUpdate( @@ -147,12 +151,17 @@ func (m *Manager) Start(startCtx context.Context) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel + // pools will only be populated with senders of shrexsub notifications if the WithShrexSubPools + // option is used. + if m.shrexSub == nil && m.headerSub == nil { + return nil + } + validatorFn := m.metrics.validationObserver(m.Validate) err := m.shrexSub.AddValidator(validatorFn) if err != nil { return fmt.Errorf("registering validator: %w", err) } - err = m.shrexSub.Start(startCtx) if err != nil { return fmt.Errorf("starting shrexsub: %w", err) @@ -168,16 +177,21 @@ func (m *Manager) Start(startCtx context.Context) error { return fmt.Errorf("subscribing to libp2p events: %w", err) } - go m.subscribeDisconnectedPeers(ctx, sub) go m.subscribeHeader(ctx, headerSub) + go m.subscribeDisconnectedPeers(ctx, sub) go m.GC(ctx) - return nil } func (m *Manager) Stop(ctx context.Context) error { m.cancel() + // we do not need to wait for headersub and disconnected peers to finish + // here, since they were never started + if m.headerSub == nil && m.shrexSub == nil { + return nil + } + select { case <-m.headerSubDone: case <-ctx.Done(): diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index ad04d2c7bd..c60a737baa 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -419,8 +419,6 @@ func TestIntegration(t *testing.T) { require.NoError(t, err) fnPeerManager, err := NewManager( DefaultParameters(), - nil, - nil, fnDisc, nil, connGater, @@ -469,11 +467,10 @@ func testManager(ctx context.Context, headerSub libhead.Subscriber[*header.Exten } manager, err := NewManager( DefaultParameters(), - headerSub, - shrexSub, disc, host, connGater, + WithShrexSubPools(shrexSub, headerSub), ) if err != nil { return nil, err diff --git a/share/p2p/peers/options.go b/share/p2p/peers/options.go index cfda906071..97ec30df4a 100644 --- a/share/p2p/peers/options.go +++ b/share/p2p/peers/options.go @@ -3,6 +3,11 @@ package peers import ( "fmt" "time" + + libhead "github.com/celestiaorg/go-header" + + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" ) type Parameters struct { @@ -21,6 +26,8 @@ type Parameters struct { EnableBlackListing bool } +type Option func(*Manager) error + // Validate validates the values in Parameters func (p *Parameters) Validate() error { if p.PoolValidationTimeout <= 0 { @@ -56,6 +63,16 @@ func DefaultParameters() Parameters { } } +// WithShrexSubPools passes a shrexsub and headersub instance to be used to populate and validate +// pools from shrexsub notifications. +func WithShrexSubPools(shrexSub *shrexsub.PubSub, headerSub libhead.Subscriber[*header.ExtendedHeader]) Option { + return func(m *Manager) error { + m.shrexSub = shrexSub + m.headerSub = headerSub + return nil + } +} + // WithMetrics turns on metric collection in peer manager. func (m *Manager) WithMetrics() error { metrics, err := initMetrics(m) diff --git a/share/share.go b/share/share.go index 02ccd73909..4079028d82 100644 --- a/share/share.go +++ b/share/share.go @@ -2,6 +2,7 @@ package share import ( "bytes" + "encoding/hex" "fmt" "github.com/celestiaorg/celestia-app/pkg/appconsts" @@ -57,3 +58,16 @@ func (dh DataHash) String() string { func (dh DataHash) IsEmptyRoot() bool { return bytes.Equal(EmptyRoot().Hash(), dh) } + +// MustDataHashFromString converts a hex string to a valid datahash. +func MustDataHashFromString(datahash string) DataHash { + dh, err := hex.DecodeString(datahash) + if err != nil { + panic(fmt.Sprintf("datahash conversion: passed string was not valid hex: %s", datahash)) + } + err = DataHash(dh).Validate() + if err != nil { + panic(fmt.Sprintf("datahash validation: passed hex string failed: %s", err)) + } + return dh +}