Skip to content

Commit

Permalink
add light pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Oct 31, 2024
1 parent 0c2113b commit 5706d30
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 207 deletions.
1 change: 0 additions & 1 deletion nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
return fx.Module("prune",
baseComponents,
prunerService,
fx.Provide(light.NewPruner),
)
}
// We do not trigger DetectPreviousRun for Light nodes, to allow them to disable pruning at wish.
Expand Down
10 changes: 6 additions & 4 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/celestiaorg/celestia-node/nodebuilder/node"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
lightprune "github.com/celestiaorg/celestia-node/pruner/light"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/full"
Expand Down Expand Up @@ -222,20 +223,21 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
case node.Light:
return fx.Options(
fx.Provide(fx.Annotate(
func(getter shwap.Getter, ds datastore.Batching) *light.ShareAvailability {
func(getter shwap.Getter, ds datastore.Batching, bs blockstore.Blockstore) *light.ShareAvailability {
return light.NewShareAvailability(
getter,
ds,
bs,
light.WithSampleAmount(cfg.LightAvailability.SampleAmount),
)
},
fx.As(fx.Self()),
fx.As(new(share.Availability)),
fx.As(new(pruner.Pruner)), // TODO(@walldiss): remove conversion after Availability and Pruner interfaces are merged
fx.OnStop(func(ctx context.Context, la *light.ShareAvailability) error {
return la.Close(ctx)
}),
)),
fx.Provide(func(avail *light.ShareAvailability) share.Availability {
return avail
}),
)
case node.Bridge, node.Full:
return fx.Options(
Expand Down
46 changes: 0 additions & 46 deletions pruner/light/pruner.go

This file was deleted.

59 changes: 59 additions & 0 deletions share/availability/light/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
"github.com/ipfs/go-datastore/namespace"
Expand All @@ -16,7 +17,9 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
)

var (
Expand All @@ -31,6 +34,7 @@ var (
// on the network doing sampling over the same Root to collectively verify its availability.
type ShareAvailability struct {
getter shwap.Getter
bs blockstore.Blockstore
params Parameters

activeHeights *utils.Sessions
Expand All @@ -42,6 +46,7 @@ type ShareAvailability struct {
func NewShareAvailability(
getter shwap.Getter,
ds datastore.Batching,
bs blockstore.Blockstore,
opts ...Option,
) *ShareAvailability {
params := *DefaultParameters()
Expand All @@ -54,6 +59,7 @@ func NewShareAvailability(

return &ShareAvailability{
getter: getter,
bs: bs,
params: params,
activeHeights: utils.NewSessions(),
ds: autoDS,
Expand Down Expand Up @@ -169,6 +175,59 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header
return nil
}

// Prune deletes samples and all sampling data corresponding to provided header from store.
// The operation will remove all data that ShareAvailable might have created
func (la *ShareAvailability) Prune(ctx context.Context, h *header.ExtendedHeader) error {
dah := h.DAH
if share.DataHash(dah.Hash()).IsEmptyEDS() {
return nil
}

// Prevent multiple sampling sessions for the same header height
release, err := la.activeHeights.StartSession(ctx, h.Height())
if err != nil {
return err
}
defer release()

key := datastoreKeyForRoot(dah)
la.dsLk.RLock()
data, err := la.ds.Get(ctx, key)
la.dsLk.RUnlock()
if errors.Is(err, datastore.ErrNotFound) {
// nothing to prune
return nil
}
if err != nil {
return fmt.Errorf("failed to get sampling result: %w", err)
}

var result SamplingResult
err = json.Unmarshal(data, &result)
if err != nil {
return fmt.Errorf("failed to unmarshal sampling result: %w", err)
}

// delete stored samples
for _, sample := range result.Available {
blk, err := bitswap.NewEmptySampleBlock(h.Height(), sample.Row, sample.Col, len(h.DAH.RowRoots))
if err != nil {
return fmt.Errorf("failed to marshal sample ID: %w", err)
}
err = la.bs.DeleteBlock(ctx, blk.CID())
if err != nil && !errors.Is(err, ipld.ErrNodeNotFound) {
return fmt.Errorf("failed to delete sample: %w", err)
}
}

// delete the sampling result
err = la.ds.Delete(ctx, datastoreKeyForRoot(dah))
if err != nil {
return fmt.Errorf("failed to delete sampling result: %w", err)
}
return nil
}

func datastoreKeyForRoot(root *share.AxisRoots) datastore.Key {
return datastore.NewKey(root.String())
}
Expand Down
110 changes: 105 additions & 5 deletions share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import (
"encoding/json"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"

libshare "github.com/celestiaorg/go-square/v2/share"
Expand All @@ -20,7 +25,9 @@ import (
"github.com/celestiaorg/celestia-node/share/eds/edstest"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/celestia-node/share/shwap/getters/mock"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex"
"github.com/celestiaorg/celestia-node/store"
)

func TestSharesAvailableSuccess(t *testing.T) {
Expand All @@ -47,7 +54,7 @@ func TestSharesAvailableSuccess(t *testing.T) {
AnyTimes()

ds := datastore.NewMapDatastore()
avail := NewShareAvailability(getter, ds)
avail := NewShareAvailability(getter, ds, nil)

// Ensure the datastore doesn't have the sampling result yet
has, err := avail.ds.Has(ctx, datastoreKeyForRoot(roots))
Expand Down Expand Up @@ -81,7 +88,7 @@ func TestSharesAvailableSkipSampled(t *testing.T) {
AnyTimes()

ds := datastore.NewMapDatastore()
avail := NewShareAvailability(getter, ds)
avail := NewShareAvailability(getter, ds, nil)

// generate random header
roots := edstest.RandomAxisRoots(t, 16)
Expand Down Expand Up @@ -112,7 +119,7 @@ func TestSharesAvailableEmptyEDS(t *testing.T) {

getter := mock.NewMockGetter(gomock.NewController(t))
ds := datastore.NewMapDatastore()
avail := NewShareAvailability(getter, ds)
avail := NewShareAvailability(getter, ds, nil)

// request for empty eds
eh := headertest.RandExtendedHeaderWithRoot(t, share.EmptyEDSRoots())
Expand All @@ -126,7 +133,7 @@ func TestSharesAvailableFailed(t *testing.T) {

getter := mock.NewMockGetter(gomock.NewController(t))
ds := datastore.NewMapDatastore()
avail := NewShareAvailability(getter, ds)
avail := NewShareAvailability(getter, ds, nil)

// Create new eds, that is not available by getter
eds := edstest.RandEDS(t, 16)
Expand Down Expand Up @@ -184,7 +191,7 @@ func TestParallelAvailability(t *testing.T) {
ds := datastore.NewMapDatastore()
// Simulate a getter that returns shares successfully
successfulGetter := newOnceGetter()
avail := NewShareAvailability(successfulGetter, ds)
avail := NewShareAvailability(successfulGetter, ds, nil)

// create new eds, that is not available by getter
eds := edstest.RandEDS(t, 16)
Expand Down Expand Up @@ -267,3 +274,96 @@ func (g onceGetter) GetSharesByNamespace(
) (shwap.NamespaceData, error) {
panic("not implemented")
}

func TestPrune(t *testing.T) {
const size = 8
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
t.Cleanup(cancel)

dir := t.TempDir()
store, err := store.NewStore(store.DefaultParameters(), dir)
require.NoError(t, err)
defer require.NoError(t, store.Stop(ctx))
eds, h := randEdsAndHeader(t, size)
err = store.PutODSQ4(ctx, h.DAH, h.Height(), eds)
require.NoError(t, err)

// Create a new bitswap getter
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewBlockstore(ds)
ex := NewFakeExchange(store)
getter := bitswap.NewGetter(ex, bs, 0)
getter.Start()
defer getter.Stop()

// Create a new ShareAvailability instance and sample the shares
sampleAmount := uint(20)
avail := NewShareAvailability(getter, ds, bs, WithSampleAmount(sampleAmount))
err = avail.SharesAvailable(ctx, h)
require.NoError(t, err)
// close ShareAvailability to force flush of batched writes
avail.Close(ctx)

preDeleteCount := countKeys(ctx, t, bs)
require.EqualValues(t, sampleAmount, preDeleteCount)

// prune the samples
err = avail.Prune(ctx, h)
require.NoError(t, err)

// Check if samples are deleted
postDeleteCount := countKeys(ctx, t, bs)
require.Zero(t, postDeleteCount)

// Check if sampling result is deleted
exist, err := avail.ds.Has(ctx, datastoreKeyForRoot(h.DAH))
require.NoError(t, err)
require.False(t, exist)
}

var _ exchange.SessionExchange = (*fakeSessionExchange)(nil)

func NewFakeExchange(store *store.Store) *fakeSessionExchange {
bs := &bitswap.Blockstore{Getter: store}
return &fakeSessionExchange{
Interface: offline.Exchange(bs),
session: offline.Exchange(bs),
}
}

type fakeSessionExchange struct {
exchange.Interface
session exchange.Fetcher
}

func (fe *fakeSessionExchange) NewSession(ctx context.Context) exchange.Fetcher {
if ctx == nil {
panic("nil context")
}
return fe.session
}

func randEdsAndHeader(t *testing.T, size int) (*rsmt2d.ExtendedDataSquare, *header.ExtendedHeader) {
height := uint64(42)
eds := edstest.RandEDS(t, size)
roots, err := share.NewAxisRoots(eds)
require.NoError(t, err)

h := &header.ExtendedHeader{
RawHeader: header.RawHeader{
Height: int64(height),
},
DAH: roots,
}
return eds, h
}

func countKeys(ctx context.Context, t *testing.T, bs blockstore.Blockstore) int {
keys, err := bs.AllKeysChan(ctx)
require.NoError(t, err)
var count int
for range keys {
count++
}
return count
}
29 changes: 0 additions & 29 deletions share/ipld/delete.go

This file was deleted.

Loading

0 comments on commit 5706d30

Please sign in to comment.