Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: curio: Cleanup data copies after seal ops #11847

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion curiosrc/ffi/piece_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func (sb *SealCalls) WritePiece(ctx context.Context, taskID *harmonytask.TaskID, pieceID storiface.PieceNumber, size int64, data io.Reader) error {
// todo: config(?): allow setting PathStorage for this
// todo storage reservations
paths, done, err := sb.sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing)
paths, _, done, err := sb.sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing)
if err != nil {
return err
}
Expand Down
53 changes: 43 additions & 10 deletions curiosrc/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type storageProvider struct {
storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation]
}

func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (fspaths, ids storiface.SectorPaths, release func(), err error) {
var paths, storageIDs storiface.SectorPaths
var releaseStorage func()

Expand All @@ -72,7 +72,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
if ok && resv != nil {
if resv.Alloc != allocate || resv.Existing != existing {
// this should never happen, only when task definition is wrong
return storiface.SectorPaths{}, nil, xerrors.Errorf("storage reservation type mismatch")
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("storage reservation type mismatch")
}

log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate)
Expand All @@ -89,12 +89,12 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask

_, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: paths, IDs: storageIDs}))
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: %w", err)
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: %w", err)
}

// assert that checkPathIDs is the same as storageIDs
if storageIDs.Subset(existing) != checkPathIDs.Subset(existing) {
return storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: pathIDs mismatch %#v != %#v", storageIDs, checkPathIDs)
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: pathIDs mismatch %#v != %#v", storageIDs, checkPathIDs)
}
}
} else {
Expand All @@ -103,18 +103,18 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
var err error
paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
if err != nil {
return storiface.SectorPaths{}, nil, err
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, err
}

releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
}

log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)

return paths, func() {
return paths, storageIDs, func() {
releaseStorage()

for _, fileType := range storiface.PathTypes {
Expand All @@ -131,7 +131,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
}

func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing)
paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand All @@ -158,6 +158,31 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID,
return xerrors.Errorf("generating SDR %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}

if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache); err != nil {
return xerrors.Errorf("ensure one copy: %w", err)
}

return nil
}

// ensureOneCopy makes sure that there is only one version of sector data.
// Usually called after a successful operation was done successfully on sector data.
func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathIDs storiface.SectorPaths, fts storiface.SectorFileType) error {
if !pathIDs.HasAllSet(fts) {
return xerrors.Errorf("ensure one copy: not all paths are set")
}

for _, fileType := range fts.AllSet() {
pid := storiface.PathByType(pathIDs, fileType)
keepIn := []storiface.ID{storiface.ID(pid)}

log.Debugw("ensureOneCopy", "sector", sid, "type", fileType, "keep", keepIn)

if err := sb.sectors.storage.Remove(ctx, sid, fileType, true, keepIn); err != nil {
return err
}
}

return nil
}

Expand All @@ -167,7 +192,7 @@ func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sect
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
}

paths, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -243,6 +268,10 @@ func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sect
return cid.Undef, cid.Undef, xerrors.Errorf("unsealed cid changed after sealing")
}

if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache|storiface.FTSealed); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("ensure one copy: %w", err)
}

return sl, uns, nil
}

Expand Down Expand Up @@ -425,7 +454,7 @@ func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.Sector
alloc = storiface.FTUnsealed
}

sectorPaths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, alloc, storiface.PathSealing)
sectorPaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, alloc, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -514,6 +543,10 @@ afterUnsealedMove:
return xerrors.Errorf("clearing cache: %w", err)
}

if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache|alloc); err != nil {
return xerrors.Errorf("ensure one copy: %w", err)
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion storage/paths/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,8 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ storiface.Sec
return xerrors.New("delete expects one file type")
}

log.Debugw("Remove called", "sid", sid, "type", typ, "force", force, "keepIn", keepIn)

si, err := st.index.StorageFindSector(ctx, sid, typ, 0, false)
if err != nil {
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
Expand Down Expand Up @@ -739,7 +741,7 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ storifa
}

spath := p.sectorPath(sid, typ)
log.Infof("remove %s", spath)
log.Infow("remove", "path", spath, "id", sid, "type", typ, "storage", storage)

if err := os.RemoveAll(spath); err != nil {
log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err)
Expand Down
Loading