From 4bbe418cc6f40f28c80043e8a90d78c0c35a8ad4 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 23 Aug 2024 18:21:03 +0200 Subject: [PATCH] add archival pruning support --- nodebuilder/pruner/module.go | 4 + pruner/archival/pruner.go | 23 +++- pruner/archival/window.go | 6 +- pruner/full/pruner.go | 2 +- store/file/ods.go | 17 ++- store/metrics.go | 31 +++++- store/store.go | 104 ++++++++++++------ store/store_test.go | 208 +++++++++++++++++++++++++++-------- 8 files changed, 299 insertions(+), 96 deletions(-) diff --git a/nodebuilder/pruner/module.go b/nodebuilder/pruner/module.go index 7475195307..05885b82fc 100644 --- a/nodebuilder/pruner/module.go +++ b/nodebuilder/pruner/module.go @@ -61,6 +61,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { } return fx.Module("prune", baseComponents, + prunerService, + fxutil.ProvideAs(archival.NewPruner, new(pruner.Pruner)), fx.Invoke(func(ctx context.Context, ds datastore.Batching) error { return pruner.DetectPreviousRun(ctx, ds) }), @@ -78,6 +80,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { } return fx.Module("prune", baseComponents, + prunerService, + fxutil.ProvideAs(archival.NewPruner, new(pruner.Pruner)), fx.Invoke(func(ctx context.Context, ds datastore.Batching) error { return pruner.DetectPreviousRun(ctx, ds) }), diff --git a/pruner/archival/pruner.go b/pruner/archival/pruner.go index a1a55db0da..b9ce1a92a8 100644 --- a/pruner/archival/pruner.go +++ b/pruner/archival/pruner.go @@ -3,18 +3,33 @@ package archival import ( "context" + logging "github.com/ipfs/go-log/v2" + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/store" ) +var log = logging.Logger("pruner/archival") + // Pruner is a noop implementation of the pruner.Factory interface // that allows archival nodes to sync and retain historical data // that is out of the availability window. -type Pruner struct{} +type Pruner struct { + store *store.Store +} -func NewPruner() *Pruner { - return &Pruner{} +func NewPruner(store *store.Store) *Pruner { + return &Pruner{ + store: store, + } } +func (p *Pruner) Prune(ctx context.Context, eh *header.ExtendedHeader) error { + log.Debugf("pruning header %s", eh.DAH.Hash()) -func (p *Pruner) Prune(context.Context, *header.ExtendedHeader) error { + // Archival nodes should keep ODS data indefinitely. Reduce the file to only ODS data by removing the Q4 part. + err := p.store.RemoveQ4(ctx, eh.Height(), eh.DAH.Hash()) + if err != nil { + return err + } return nil } diff --git a/pruner/archival/window.go b/pruner/archival/window.go index b89a779816..c70da19e55 100644 --- a/pruner/archival/window.go +++ b/pruner/archival/window.go @@ -1,5 +1,7 @@ package archival -import "github.com/celestiaorg/celestia-node/pruner" +import ( + "github.com/celestiaorg/celestia-node/pruner/full" +) -const Window = pruner.AvailabilityWindow(0) +const Window = full.Window diff --git a/pruner/full/pruner.go b/pruner/full/pruner.go index fac83e23fe..193dad00de 100644 --- a/pruner/full/pruner.go +++ b/pruner/full/pruner.go @@ -24,7 +24,7 @@ func NewPruner(store *store.Store) *Pruner { func (p *Pruner) Prune(ctx context.Context, eh *header.ExtendedHeader) error { log.Debugf("pruning header %s", eh.DAH.Hash()) - err := p.store.Remove(ctx, eh.Height(), eh.DAH.Hash()) + err := p.store.RemoveAll(ctx, eh.Height(), eh.DAH.Hash()) if err != nil { return err } diff --git a/store/file/ods.go b/store/file/ods.go index da7759959e..8dd72e3304 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -147,7 +147,7 @@ func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error { // If file is empty, the ErrEmptyFile is returned. // File must be closed after usage. func OpenODS(path string) (*ODS, error) { - f, err := os.Open(path) + f, err := os.OpenFile(path, os.O_RDWR, filePermissions) if err != nil { return nil, err } @@ -167,6 +167,21 @@ func (o *ODS) HasQ4() bool { return o.hdr.fileType == odsq4 } +func (o *ODS) ConvertToODS() error { + if !o.HasQ4() { + return nil + } + + // update header to change the file type + o.hdr.fileType = ods + w := io.NewOffsetWriter(o.fl, 0) + if err := writeHeader(w, o.hdr); err != nil { + return fmt.Errorf("writing header: %w", err) + } + + return nil +} + // Size returns EDS size stored in file's header. func (o *ODS) Size(context.Context) int { return o.size() diff --git a/store/metrics.go b/store/metrics.go index 637acb76ff..5a5cca99a7 100644 --- a/store/metrics.go +++ b/store/metrics.go @@ -24,7 +24,8 @@ type metrics struct { putExists metric.Int64Counter get metric.Float64Histogram has metric.Float64Histogram - remove metric.Float64Histogram + removeAll metric.Float64Histogram + removeQ4 metric.Float64Histogram unreg func() error } @@ -53,18 +54,24 @@ func (s *Store) WithMetrics() error { return err } - remove, err := meter.Float64Histogram("eds_store_remove_time_histogram", - metric.WithDescription("eds store remove time histogram(s)")) + removeAll, err := meter.Float64Histogram("eds_store_remove_all_time_histogram", + metric.WithDescription("eds store remove all data time histogram(s)")) if err != nil { return err } + removeQ4, err := meter.Float64Histogram("eds_store_remove_q4_time_histogram", + metric.WithDescription("eds store remove q4 data time histogram(s)")) + if err != nil { + return err + } s.metrics = &metrics{ put: put, putExists: putExists, get: get, has: has, - remove: remove, + removeAll: removeAll, + removeQ4: removeQ4, } return s.metrics.addCacheMetrics(s.cache) } @@ -130,7 +137,19 @@ func (m *metrics) observeHas(ctx context.Context, dur time.Duration, failed bool attribute.Bool(failedKey, failed))) } -func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed bool) { +func (m *metrics) observeRemoveAll(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.removeAll.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeRemoveQ4(ctx context.Context, dur time.Duration, failed bool) { if m == nil { return } @@ -138,7 +157,7 @@ func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed b ctx = context.Background() } - m.remove.Record(ctx, dur.Seconds(), metric.WithAttributes( + m.removeQ4.Record(ctx, dur.Seconds(), metric.WithAttributes( attribute.Bool(failedKey, failed))) } diff --git a/store/store.go b/store/store.go index 3b95322c3d..5c9d853c69 100644 --- a/store/store.go +++ b/store/store.go @@ -33,7 +33,7 @@ const ( var ErrNotFound = errors.New("eds not found in store") // Store is a storage for EDS files. It persists EDS files on disk in form of Q1Q4 files or ODS -// files. It provides methods to put, get and remove EDS files. It has two caches: recent eds cache +// files. It provides methods to put, get and removeAll EDS files. It has two caches: recent eds cache // and availability cache. Recent eds cache is used to cache recent blocks. Availability cache is // used to cache blocks that are accessed by sample requests. Store is thread-safe. type Store struct { @@ -149,11 +149,11 @@ func (s *Store) createFile( return true, nil } if err != nil { + // ensure we don't have partial writes if any operation fails + removeErr := s.removeAll(height, roots.Hash()) return false, errors.Join( fmt.Errorf("creating ODSQ4 file: %w", err), - // ensure we don't have partial writes - remove(pathODS), - remove(pathQ4), + removeErr, ) } @@ -161,11 +161,10 @@ func (s *Store) createFile( err = s.linkHeight(roots.Hash(), height) if err != nil { // ensure we don't have partial writes if any operation fails + removeErr := s.removeAll(height, roots.Hash()) return false, errors.Join( fmt.Errorf("hardlinking height: %w", err), - remove(pathODS), - remove(pathQ4), - s.removeLink(height), + removeErr, ) } return false, nil @@ -190,7 +189,7 @@ func (s *Store) populateEmptyFile() error { return fmt.Errorf("cleaning old empty EDS file: %w", err) } - err = file.CreateODSQ4(pathOds, pathQ4, share.EmptyEDSRoots(), eds.EmptyAccessor.ExtendedDataSquare) + err = file.CreateODS(pathOds, share.EmptyEDSRoots(), eds.EmptyAccessor.ExtendedDataSquare, false) if err != nil { return fmt.Errorf("creating fresh empty EDS file: %w", err) } @@ -310,51 +309,86 @@ func (s *Store) hasByHeight(height uint64) (bool, error) { return exists(pathODS) } -func (s *Store) Remove(ctx context.Context, height uint64, datahash share.DataHash) error { +func (s *Store) RemoveAll(ctx context.Context, height uint64, datahash share.DataHash) error { + lock := s.stripLock.byHashAndHeight(datahash, height) + lock.lock() + defer lock.unlock() + tNow := time.Now() - err := s.remove(height, datahash) - s.metrics.observeRemove(ctx, time.Since(tNow), err != nil) + err := s.removeAll(height, datahash) + s.metrics.observeRemoveAll(ctx, time.Since(tNow), err != nil) return err } -func (s *Store) remove(height uint64, datahash share.DataHash) error { - lock := s.stripLock.byHeight(height) - lock.Lock() - if err := s.removeLink(height); err != nil { - return fmt.Errorf("removing link: %w", err) +func (s *Store) removeAll(height uint64, datahash share.DataHash) error { + if err := s.removeODS(height, datahash); err != nil { + return fmt.Errorf("removing ODS: %w", err) } - lock.Unlock() - - dlock := s.stripLock.byHash(datahash) - dlock.Lock() - defer dlock.Unlock() - if err := s.removeFile(datahash); err != nil { - return fmt.Errorf("removing file: %w", err) + if err := s.removeQ4(height, datahash); err != nil { + return fmt.Errorf("removing Q4: %w", err) } return nil } -func (s *Store) removeLink(height uint64) error { +func (s *Store) removeODS(height uint64, datahash share.DataHash) error { if err := s.cache.Remove(height); err != nil { return fmt.Errorf("removing from cache: %w", err) } - pathODS := s.heightToPath(height, odsFileExt) - return remove(pathODS) + pathLink := s.heightToPath(height, odsFileExt) + if err := remove(pathLink); err != nil { + return fmt.Errorf("removing hardlink: %w", err) + } + + // remove the link only if the file is empty + if datahash.IsEmptyEDS() { + return nil + } + + pathODS := s.hashToPath(datahash, odsFileExt) + if err := remove(pathODS); err != nil { + return fmt.Errorf("removing ODS file: %w", err) + } + return nil } -func (s *Store) removeFile(hash share.DataHash) error { - // we don't need to remove the empty file, it should always be there - if hash.IsEmptyEDS() { +func (s *Store) RemoveQ4(ctx context.Context, height uint64, datahash share.DataHash) error { + lock := s.stripLock.byHashAndHeight(datahash, height) + lock.lock() + defer lock.unlock() + + tNow := time.Now() + err := s.removeQ4(height, datahash) + s.metrics.observeRemoveQ4(ctx, time.Since(tNow), err != nil) + return err +} + +func (s *Store) removeQ4(height uint64, datahash share.DataHash) error { + if err := s.cache.Remove(height); err != nil { + return fmt.Errorf("removing from cache: %w", err) + } + + // remove Q4 file + pathQ4File := s.hashToPath(datahash, q4FileExt) + if err := remove(pathQ4File); err != nil { + return fmt.Errorf("removing Q4 file: %w", err) + } + + // update ODS file to remove Q4 + pathODS := s.hashToPath(datahash, odsFileExt) + ods, err := file.OpenODS(pathODS) + if errors.Is(err, os.ErrNotExist) { return nil } + if err != nil { + return fmt.Errorf("opening Q4 file: %w", err) + } + defer utils.CloseAndLog(log, "ods", ods) - pathODS := s.hashToPath(hash, odsFileExt) - pathQ4 := s.hashToPath(hash, q4FileExt) - return errors.Join( - remove(pathODS), - remove(pathQ4), - ) + if err = ods.ConvertToODS(); err != nil { + return fmt.Errorf("converting to ODS: %w", err) + } + return nil } func (s *Store) hashToPath(datahash share.DataHash, ext string) string { diff --git a/store/store_test.go b/store/store_test.go index dd814afcf4..3e452f869b 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,6 +2,8 @@ package store import ( "context" + "os" + "path" "sync/atomic" "testing" "time" @@ -13,6 +15,7 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds/edstest" "github.com/celestiaorg/celestia-node/store/cache" + "github.com/celestiaorg/celestia-node/store/file" ) func TestEDSStore(t *testing.T) { @@ -24,26 +27,28 @@ func TestEDSStore(t *testing.T) { require.NoError(t, err) // disable cache - edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) height := atomic.Uint64{} height.Store(100) t.Run("Put", func(t *testing.T) { + dir := t.TempDir() + edsStore, err := NewStore(paramsNoCache(), dir) + require.NoError(t, err) + eds, roots := randomEDS(t) height := height.Add(1) - err := edsStore.Put(ctx, roots, height, eds) + err = edsStore.Put(ctx, roots, height, eds) require.NoError(t, err) - // file should become available by hash - has, err := edsStore.HasByHash(ctx, roots.Hash()) - require.NoError(t, err) - require.True(t, has) + // file should exist in the store + hasByHashAndHeight(t, edsStore, ctx, roots.Hash(), height, true, true) - // file should become available by height - has, err = edsStore.HasByHeight(ctx, height) - require.NoError(t, err) - require.True(t, has) + // block folder should contain: + // - heights folder + // - empty file + // - ods+q4 files for the block + ensureAmountFileAndLinks(t, dir, 4, 1) }) t.Run("Cached after Put", func(t *testing.T) { @@ -70,16 +75,21 @@ func TestEDSStore(t *testing.T) { }) t.Run("Second Put should be noop", func(t *testing.T) { + dir := t.TempDir() + edsStore, err := NewStore(paramsNoCache(), dir) + require.NoError(t, err) + eds, roots := randomEDS(t) height := height.Add(1) - err := edsStore.Put(ctx, roots, height, eds) + err = edsStore.Put(ctx, roots, height, eds) require.NoError(t, err) err = edsStore.Put(ctx, roots, height, eds) require.NoError(t, err) - // TODO: check amount of files in the store after the second Put - // after store supports listing + + // ensure file is not duplicated + ensureAmountFileAndLinks(t, dir, 4, 1) }) t.Run("GetByHeight", func(t *testing.T) { @@ -122,13 +132,8 @@ func TestEDSStore(t *testing.T) { _, roots := randomEDS(t) height := height.Add(1) - has, err := edsStore.HasByHash(ctx, roots.Hash()) - require.NoError(t, err) - require.False(t, has) - - has, err = edsStore.HasByHeight(ctx, height) - require.NoError(t, err) - require.False(t, has) + // file does not exist + hasByHashAndHeight(t, edsStore, ctx, roots.Hash(), height, false, false) _, err = edsStore.GetByHeight(ctx, height) require.ErrorIs(t, err, ErrNotFound) @@ -137,40 +142,118 @@ func TestEDSStore(t *testing.T) { require.ErrorIs(t, err, ErrNotFound) }) - t.Run("Remove", func(t *testing.T) { - edsStore, err := NewStore(DefaultParameters(), t.TempDir()) - require.NoError(t, err) + t.Run("RemoveAll", func(t *testing.T) { + t.Run("empty file", func(t *testing.T) { + dir := t.TempDir() + edsStore, err := NewStore(DefaultParameters(), dir) + require.NoError(t, err) - // removing file that does not exist should be noop - missingHeight := height.Add(1) - err = edsStore.Remove(ctx, missingHeight, share.DataHash{0x01, 0x02}) - require.NoError(t, err) + height := height.Add(1) + hash := share.EmptyEDSDataHash() + err = edsStore.Put(ctx, share.EmptyEDSRoots(), height, share.EmptyEDS()) + require.NoError(t, err) + err = edsStore.RemoveAll(ctx, height, hash) + require.NoError(t, err) - eds, roots := randomEDS(t) - height := height.Add(1) - err = edsStore.Put(ctx, roots, height, eds) - require.NoError(t, err) + // file should be removed from cache + _, err = edsStore.cache.Get(height) + require.ErrorIs(t, err, cache.ErrCacheMiss) - err = edsStore.Remove(ctx, height, roots.Hash()) - require.NoError(t, err) + // empty file should be accessible by hash, but not by height + hasByHashAndHeight(t, edsStore, ctx, hash, height, true, false) - // file should be removed from cache - _, err = edsStore.cache.Get(height) - require.ErrorIs(t, err, cache.ErrCacheMiss) + // ensure link is removed from heights folder + ensureAmountFileAndLinks(t, dir, 2, 0) + }) - // file should not be accessible by hash - has, err := edsStore.HasByHash(ctx, roots.Hash()) - require.NoError(t, err) - require.False(t, has) + t.Run("non-empty file", func(t *testing.T) { + dir := t.TempDir() + edsStore, err := NewStore(DefaultParameters(), dir) + require.NoError(t, err) - // subsequent remove should be noop - err = edsStore.Remove(ctx, height, roots.Hash()) - require.NoError(t, err) + // removing file that does not exist should be noop + missingHeight := height.Add(1) + err = edsStore.RemoveAll(ctx, missingHeight, share.DataHash{0x01, 0x02}) + require.NoError(t, err) - // file should not be accessible by height - has, err = edsStore.HasByHeight(ctx, height) - require.NoError(t, err) - require.False(t, has) + eds, roots := randomEDS(t) + height := height.Add(1) + err = edsStore.Put(ctx, roots, height, eds) + require.NoError(t, err) + + err = edsStore.RemoveAll(ctx, height, roots.Hash()) + require.NoError(t, err) + + // file should be removed from cache + _, err = edsStore.cache.Get(height) + require.ErrorIs(t, err, cache.ErrCacheMiss) + + // file should not be accessible by hash or height + hasByHashAndHeight(t, edsStore, ctx, roots.Hash(), height, false, false) + + // ensure file and link are removed + ensureAmountFileAndLinks(t, dir, 2, 0) + + // subsequent removeAll should be noop + err = edsStore.RemoveAll(ctx, height, roots.Hash()) + require.NoError(t, err) + }) + }) + + t.Run("RemoveQ4", func(t *testing.T) { + t.Run("empty file", func(t *testing.T) { + dir := t.TempDir() + edsStore, err := NewStore(DefaultParameters(), dir) + require.NoError(t, err) + + height := height.Add(1) + hash := share.EmptyEDSDataHash() + err = edsStore.Put(ctx, share.EmptyEDSRoots(), height, share.EmptyEDS()) + require.NoError(t, err) + err = edsStore.RemoveQ4(ctx, height, hash) + require.NoError(t, err) + + // file should be removed from cache + _, err = edsStore.cache.Get(height) + require.ErrorIs(t, err, cache.ErrCacheMiss) + + // empty file should be accessible by hash and by height + hasByHashAndHeight(t, edsStore, ctx, hash, height, true, true) + + // ensure file and link are not removed + ensureAmountFileAndLinks(t, dir, 2, 1) + }) + + t.Run("non-empty file", func(t *testing.T) { + dir := t.TempDir() + edsStore, err := NewStore(DefaultParameters(), dir) + require.NoError(t, err) + + square, roots := randomEDS(t) + height := height.Add(1) + err = edsStore.Put(ctx, roots, height, square) + require.NoError(t, err) + + err = edsStore.RemoveQ4(ctx, height, roots.Hash()) + require.NoError(t, err) + + // file should be removed from cache + _, err = edsStore.cache.Get(height) + require.ErrorIs(t, err, cache.ErrCacheMiss) + + // Check of the Q4 file is removed + pathODS := edsStore.hashToPath(roots.Hash(), odsFileExt) + f, err := file.OpenODS(pathODS) + require.NoError(t, err) + require.False(t, f.HasQ4()) + require.NoError(t, f.Close()) + + // ODS file should be accessible by hash and by height + hasByHashAndHeight(t, edsStore, ctx, roots.Hash(), height, true, true) + + // ensure file and link are not removed + ensureAmountFileAndLinks(t, dir, 3, 1) + }) }) t.Run("empty EDS returned by hash", func(t *testing.T) { @@ -319,3 +402,34 @@ func randomEDS(t testing.TB) (*rsmt2d.ExtendedDataSquare, *share.AxisRoots) { return eds, roots } + +func ensureAmountFileAndLinks(t testing.TB, dir string, files, links int) { + // ensure empty file is not removed + blockPath := path.Join(dir, blocksPath) + entries, err := os.ReadDir(blockPath) + require.NoError(t, err) + require.Len(t, entries, files) + + // ensure link is not removed from heights folder + linksPath := path.Join(dir, heightsPath) + entries, err = os.ReadDir(linksPath) + require.NoError(t, err) + require.Len(t, entries, links) +} + +func hasByHashAndHeight( + t testing.TB, + store *Store, + ctx context.Context, + hash share.DataHash, + height uint64, + hasByHash, hasByHeight bool, +) { + has, err := store.HasByHash(ctx, hash) + require.NoError(t, err) + require.Equal(t, hasByHash, has) + + has, err = store.HasByHeight(ctx, height) + require.NoError(t, err) + require.Equal(t, hasByHeight, has) +}