Skip to content

Commit

Permalink
add archival pruning support
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Aug 23, 2024
1 parent 75af48b commit 4bbe418
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 96 deletions.
4 changes: 4 additions & 0 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
}
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(archival.NewPruner, new(pruner.Pruner)),
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
Expand All @@ -78,6 +80,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
}
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(archival.NewPruner, new(pruner.Pruner)),
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
Expand Down
23 changes: 19 additions & 4 deletions pruner/archival/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,33 @@ package archival
import (
"context"

logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/store"
)

var log = logging.Logger("pruner/archival")

// Pruner is a noop implementation of the pruner.Factory interface
// that allows archival nodes to sync and retain historical data
// that is out of the availability window.
type Pruner struct{}
type Pruner struct {
store *store.Store
}

func NewPruner() *Pruner {
return &Pruner{}
func NewPruner(store *store.Store) *Pruner {
return &Pruner{
store: store,
}
}
func (p *Pruner) Prune(ctx context.Context, eh *header.ExtendedHeader) error {
log.Debugf("pruning header %s", eh.DAH.Hash())

func (p *Pruner) Prune(context.Context, *header.ExtendedHeader) error {
// Archival nodes should keep ODS data indefinitely. Reduce the file to only ODS data by removing the Q4 part.
err := p.store.RemoveQ4(ctx, eh.Height(), eh.DAH.Hash())
if err != nil {
return err
}
return nil
}
6 changes: 4 additions & 2 deletions pruner/archival/window.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package archival

import "github.com/celestiaorg/celestia-node/pruner"
import (
"github.com/celestiaorg/celestia-node/pruner/full"
)

const Window = pruner.AvailabilityWindow(0)
const Window = full.Window
2 changes: 1 addition & 1 deletion pruner/full/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewPruner(store *store.Store) *Pruner {
func (p *Pruner) Prune(ctx context.Context, eh *header.ExtendedHeader) error {
log.Debugf("pruning header %s", eh.DAH.Hash())

err := p.store.Remove(ctx, eh.Height(), eh.DAH.Hash())
err := p.store.RemoveAll(ctx, eh.Height(), eh.DAH.Hash())
if err != nil {
return err
}
Expand Down
17 changes: 16 additions & 1 deletion store/file/ods.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error {
// If file is empty, the ErrEmptyFile is returned.
// File must be closed after usage.
func OpenODS(path string) (*ODS, error) {
f, err := os.Open(path)
f, err := os.OpenFile(path, os.O_RDWR, filePermissions)
if err != nil {
return nil, err
}
Expand All @@ -167,6 +167,21 @@ func (o *ODS) HasQ4() bool {
return o.hdr.fileType == odsq4
}

func (o *ODS) ConvertToODS() error {
if !o.HasQ4() {
return nil
}

// update header to change the file type
o.hdr.fileType = ods
w := io.NewOffsetWriter(o.fl, 0)
if err := writeHeader(w, o.hdr); err != nil {
return fmt.Errorf("writing header: %w", err)
}

return nil
}

// Size returns EDS size stored in file's header.
func (o *ODS) Size(context.Context) int {
return o.size()
Expand Down
31 changes: 25 additions & 6 deletions store/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type metrics struct {
putExists metric.Int64Counter
get metric.Float64Histogram
has metric.Float64Histogram
remove metric.Float64Histogram
removeAll metric.Float64Histogram
removeQ4 metric.Float64Histogram
unreg func() error
}

Expand Down Expand Up @@ -53,18 +54,24 @@ func (s *Store) WithMetrics() error {
return err
}

remove, err := meter.Float64Histogram("eds_store_remove_time_histogram",
metric.WithDescription("eds store remove time histogram(s)"))
removeAll, err := meter.Float64Histogram("eds_store_remove_all_time_histogram",
metric.WithDescription("eds store remove all data time histogram(s)"))
if err != nil {
return err
}

removeQ4, err := meter.Float64Histogram("eds_store_remove_q4_time_histogram",
metric.WithDescription("eds store remove q4 data time histogram(s)"))
if err != nil {
return err
}
s.metrics = &metrics{
put: put,
putExists: putExists,
get: get,
has: has,
remove: remove,
removeAll: removeAll,
removeQ4: removeQ4,
}
return s.metrics.addCacheMetrics(s.cache)
}
Expand Down Expand Up @@ -130,15 +137,27 @@ func (m *metrics) observeHas(ctx context.Context, dur time.Duration, failed bool
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed bool) {
func (m *metrics) observeRemoveAll(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.removeAll.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeRemoveQ4(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.remove.Record(ctx, dur.Seconds(), metric.WithAttributes(
m.removeQ4.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

Expand Down
104 changes: 69 additions & 35 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
var ErrNotFound = errors.New("eds not found in store")

// Store is a storage for EDS files. It persists EDS files on disk in form of Q1Q4 files or ODS
// files. It provides methods to put, get and remove EDS files. It has two caches: recent eds cache
// files. It provides methods to put, get and removeAll EDS files. It has two caches: recent eds cache
// and availability cache. Recent eds cache is used to cache recent blocks. Availability cache is
// used to cache blocks that are accessed by sample requests. Store is thread-safe.
type Store struct {
Expand Down Expand Up @@ -149,23 +149,22 @@ func (s *Store) createFile(
return true, nil
}
if err != nil {
// ensure we don't have partial writes if any operation fails
removeErr := s.removeAll(height, roots.Hash())
return false, errors.Join(
fmt.Errorf("creating ODSQ4 file: %w", err),
// ensure we don't have partial writes
remove(pathODS),
remove(pathQ4),
removeErr,
)
}

// create hard link with height as name
err = s.linkHeight(roots.Hash(), height)
if err != nil {
// ensure we don't have partial writes if any operation fails
removeErr := s.removeAll(height, roots.Hash())
return false, errors.Join(
fmt.Errorf("hardlinking height: %w", err),
remove(pathODS),
remove(pathQ4),
s.removeLink(height),
removeErr,
)
}
return false, nil
Expand All @@ -190,7 +189,7 @@ func (s *Store) populateEmptyFile() error {
return fmt.Errorf("cleaning old empty EDS file: %w", err)
}

err = file.CreateODSQ4(pathOds, pathQ4, share.EmptyEDSRoots(), eds.EmptyAccessor.ExtendedDataSquare)
err = file.CreateODS(pathOds, share.EmptyEDSRoots(), eds.EmptyAccessor.ExtendedDataSquare, false)
if err != nil {
return fmt.Errorf("creating fresh empty EDS file: %w", err)
}
Expand Down Expand Up @@ -310,51 +309,86 @@ func (s *Store) hasByHeight(height uint64) (bool, error) {
return exists(pathODS)
}

func (s *Store) Remove(ctx context.Context, height uint64, datahash share.DataHash) error {
func (s *Store) RemoveAll(ctx context.Context, height uint64, datahash share.DataHash) error {
lock := s.stripLock.byHashAndHeight(datahash, height)
lock.lock()
defer lock.unlock()

tNow := time.Now()
err := s.remove(height, datahash)
s.metrics.observeRemove(ctx, time.Since(tNow), err != nil)
err := s.removeAll(height, datahash)
s.metrics.observeRemoveAll(ctx, time.Since(tNow), err != nil)
return err
}

func (s *Store) remove(height uint64, datahash share.DataHash) error {
lock := s.stripLock.byHeight(height)
lock.Lock()
if err := s.removeLink(height); err != nil {
return fmt.Errorf("removing link: %w", err)
func (s *Store) removeAll(height uint64, datahash share.DataHash) error {
if err := s.removeODS(height, datahash); err != nil {
return fmt.Errorf("removing ODS: %w", err)
}
lock.Unlock()

dlock := s.stripLock.byHash(datahash)
dlock.Lock()
defer dlock.Unlock()
if err := s.removeFile(datahash); err != nil {
return fmt.Errorf("removing file: %w", err)
if err := s.removeQ4(height, datahash); err != nil {
return fmt.Errorf("removing Q4: %w", err)
}
return nil
}

func (s *Store) removeLink(height uint64) error {
func (s *Store) removeODS(height uint64, datahash share.DataHash) error {
if err := s.cache.Remove(height); err != nil {
return fmt.Errorf("removing from cache: %w", err)
}

pathODS := s.heightToPath(height, odsFileExt)
return remove(pathODS)
pathLink := s.heightToPath(height, odsFileExt)
if err := remove(pathLink); err != nil {
return fmt.Errorf("removing hardlink: %w", err)
}

// remove the link only if the file is empty
if datahash.IsEmptyEDS() {
return nil
}

pathODS := s.hashToPath(datahash, odsFileExt)
if err := remove(pathODS); err != nil {
return fmt.Errorf("removing ODS file: %w", err)
}
return nil
}

func (s *Store) removeFile(hash share.DataHash) error {
// we don't need to remove the empty file, it should always be there
if hash.IsEmptyEDS() {
func (s *Store) RemoveQ4(ctx context.Context, height uint64, datahash share.DataHash) error {
lock := s.stripLock.byHashAndHeight(datahash, height)
lock.lock()
defer lock.unlock()

tNow := time.Now()
err := s.removeQ4(height, datahash)
s.metrics.observeRemoveQ4(ctx, time.Since(tNow), err != nil)
return err
}

func (s *Store) removeQ4(height uint64, datahash share.DataHash) error {
if err := s.cache.Remove(height); err != nil {
return fmt.Errorf("removing from cache: %w", err)
}

// remove Q4 file
pathQ4File := s.hashToPath(datahash, q4FileExt)
if err := remove(pathQ4File); err != nil {
return fmt.Errorf("removing Q4 file: %w", err)
}

// update ODS file to remove Q4
pathODS := s.hashToPath(datahash, odsFileExt)
ods, err := file.OpenODS(pathODS)
if errors.Is(err, os.ErrNotExist) {
return nil
}
if err != nil {
return fmt.Errorf("opening Q4 file: %w", err)
}
defer utils.CloseAndLog(log, "ods", ods)

pathODS := s.hashToPath(hash, odsFileExt)
pathQ4 := s.hashToPath(hash, q4FileExt)
return errors.Join(
remove(pathODS),
remove(pathQ4),
)
if err = ods.ConvertToODS(); err != nil {
return fmt.Errorf("converting to ODS: %w", err)
}
return nil
}

func (s *Store) hashToPath(datahash share.DataHash, ext string) string {
Expand Down
Loading

0 comments on commit 4bbe418

Please sign in to comment.