Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curio: feat: break trees task into TreeD(prefetch) and TreeRC #11895

Merged
merged 11 commits into from
May 2, 2024
5 changes: 3 additions & 2 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
activeTasks = append(activeTasks, sdrTask)
}
if cfg.Subsystems.EnableSealSDRTrees {
treesTask := seal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
treeDTask := seal.NewTreeDTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
treeRCTask := seal.NewTreeRCTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
finalizeTask := seal.NewFinalizeTask(cfg.Subsystems.FinalizeMaxTasks, sp, slr, db)
activeTasks = append(activeTasks, treesTask, finalizeTask)
activeTasks = append(activeTasks, treeDTask, treeRCTask, finalizeTask)
}
if cfg.Subsystems.EnableSendPrecommitMsg {
precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee)
Expand Down
146 changes: 89 additions & 57 deletions curiosrc/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type storageProvider struct {
}

func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (fspaths, ids storiface.SectorPaths, release func(), err error) {
var paths, storageIDs storiface.SectorPaths
var sectorPaths, storageIDs storiface.SectorPaths
var releaseStorage func()

var ok bool
Expand All @@ -77,7 +77,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask

log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate)

paths = resv.Paths
sectorPaths = resv.Paths
storageIDs = resv.PathIDs
releaseStorage = resv.Release

Expand All @@ -87,7 +87,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
// present locally. Note that we do not care about 'allocate' reqeuests, those files don't exist, and are just
// proposed paths with a reservation of space.

_, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: paths, IDs: storageIDs}))
_, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: sectorPaths, IDs: storageIDs}))
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: %w", err)
}
Expand All @@ -101,20 +101,20 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
// No related reservation, acquire storage as usual

var err error
paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
sectorPaths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, err
}

releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal, paths.MinFreeStoragePercentage)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
}

log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, sectorPaths)

return paths, storageIDs, func() {
return sectorPaths, storageIDs, func() {
releaseStorage()

for _, fileType := range storiface.PathTypes {
Expand Down Expand Up @@ -194,80 +194,69 @@ func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathID
return nil
}

func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (scid cid.Cid, ucid cid.Cid, err error) {
func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid) (scid cid.Cid, ucid cid.Cid, err error) {
p1o, err := sb.makePhase1Out(unsealed, sector.ProofType)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
}

paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer releaseSector()

defer func() {
if err != nil {
clerr := removeDRCTrees(paths.Cache)
clerr := removeDRCTrees(fspaths.Cache, false)
if clerr != nil {
log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", paths.Cache)
log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache)
}
}
}()

treeDUnsealed, err := proof.BuildTreeD(data, unpaddedData, filepath.Join(paths.Cache, proofpaths.TreeDName), size)
// create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("building tree-d: %w", err)
}

if treeDUnsealed != unsealed {
return cid.Undef, cid.Undef, xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid")
return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err)
}

{
// create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err)
}

{
// copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector
// copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector

// first try reflink + truncate, that should be way faster
err := reflink.Always(filepath.Join(paths.Cache, proofpaths.TreeDName), paths.Sealed)
if err == nil {
err = os.Truncate(paths.Sealed, int64(ssize))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err)
}
} else {
log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", paths.Cache, "sealed", paths.Sealed)
// first try reflink + truncate, that should be way faster
err := reflink.Always(filepath.Join(fspaths.Cache, proofpaths.TreeDName), fspaths.Sealed)
if err == nil {
err = os.Truncate(fspaths.Sealed, int64(ssize))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err)
}
} else {
log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", fspaths.Cache, "sealed", fspaths.Sealed)

// fallback to slow copy, copy ssize bytes from treed to sealed
dst, err := os.OpenFile(paths.Sealed, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err)
}
src, err := os.Open(filepath.Join(paths.Cache, proofpaths.TreeDName))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err)
}
// fallback to slow copy, copy ssize bytes from treed to sealed
dst, err := os.OpenFile(fspaths.Sealed, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err)
}
src, err := os.Open(filepath.Join(fspaths.Cache, proofpaths.TreeDName))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err)
}

_, err = io.CopyN(dst, src, int64(ssize))
derr := dst.Close()
_ = src.Close()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err)
}
if derr != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr)
}
_, err = io.CopyN(dst, src, int64(ssize))
derr := dst.Close()
_ = src.Close()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err)
}
if derr != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr)
}
}
}

sl, uns, err := ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed)
sl, uns, err := ffi.SealPreCommitPhase2(p1o, fspaths.Cache, fspaths.Sealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("computing seal proof: %w", err)
}
Expand All @@ -283,22 +272,28 @@ func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sect
return sl, uns, nil
}

func removeDRCTrees(cache string) error {
// list files in cache
func removeDRCTrees(cache string, isDTree bool) error {
files, err := os.ReadDir(cache)
if err != nil {
return xerrors.Errorf("listing cache: %w", err)
}

var testFunc func(string) bool

if isDTree {
testFunc = proofpaths.IsTreeDFile
} else {
testFunc = proofpaths.IsTreeRCFile
}

for _, file := range files {
if proofpaths.IsTreeFile(file.Name()) {
if testFunc(file.Name()) {
err := os.Remove(filepath.Join(cache, file.Name()))
if err != nil {
return xerrors.Errorf("removing tree file: %w", err)
}
}
}

return nil
}

Expand Down Expand Up @@ -625,3 +620,40 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec

return true, storiface.PathStorage, nil
}

// PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks
func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) (fsPath, pathID storiface.SectorPaths, releaseSector func(), err error) {
fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector paths: %w", err)
}
// Don't release the storage locks. They will be released in TreeD func()
return
}

func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool, fspaths, pathIDs storiface.SectorPaths) error {
var err error
defer func() {
if err != nil {
clerr := removeDRCTrees(fspaths.Cache, true)
if clerr != nil {
log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache)
}
}
}()

treeDUnsealed, err := proof.BuildTreeD(data, unpaddedData, filepath.Join(fspaths.Cache, proofpaths.TreeDName), size)
if err != nil {
return xerrors.Errorf("building tree-d: %w", err)
}

if treeDUnsealed != unsealed {
return xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid")
}

if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache); err != nil {
return xerrors.Errorf("ensure one copy: %w", err)
}

return nil
}
20 changes: 12 additions & 8 deletions curiosrc/ffi/task_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type TaskStorage struct {
pathType storiface.PathType

taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error)

// Minimum free storage percentage cutoff for reservation rejection
MinFreeStoragePercentage float64
}

type ReleaseStorageFunc func() // free storage reservation
Expand All @@ -56,14 +59,15 @@ type StorageReservation struct {
Alloc, Existing storiface.SectorFileType
}

func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) *TaskStorage {
func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, MinFreeStoragePercentage float64) *TaskStorage {
return &TaskStorage{
sc: sb,
alloc: alloc,
existing: existing,
ssize: ssize,
pathType: pathType,
taskToSectorRef: taskToSectorRef,
sc: sb,
alloc: alloc,
existing: existing,
ssize: ssize,
pathType: pathType,
taskToSectorRef: taskToSectorRef,
MinFreeStoragePercentage: MinFreeStoragePercentage,
}
}

Expand Down Expand Up @@ -166,7 +170,7 @@ func (t *TaskStorage) Claim(taskID int) error {
}

// reserve the space
release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal)
release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.MinFreeStoragePercentage)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions curiosrc/gc/storage_endpoint_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool

// Remove dead URLs from storage_path entries and handle path cleanup
for _, du := range deadURLs {
du := du
// Fetch the current URLs for the storage path
var URLs string
err = tx.QueryRow("SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID).Scan(&URLs)
Expand Down
3 changes: 2 additions & 1 deletion curiosrc/piece/task_park_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

Expand Down Expand Up @@ -185,7 +186,7 @@ func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails {
Cpu: 1,
Gpu: 0,
Ram: 64 << 20,
Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing),
Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing, paths.MinFreeStoragePercentage),
},
MaxFailures: 10,
}
Expand Down
43 changes: 31 additions & 12 deletions curiosrc/seal/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ var log = logging.Logger("lpseal")

const (
pollerSDR = iota
pollerTrees
pollerTreeD
pollerTreeRC
pollerPrecommitMsg
pollerPoRep
pollerCommitMsg
Expand Down Expand Up @@ -154,7 +155,8 @@ func (s *SealPoller) poll(ctx context.Context) error {
}

s.pollStartSDR(ctx, task)
s.pollStartSDRTrees(ctx, task)
s.pollStartSDRTreeD(ctx, task)
s.pollStartSDRTreeRC(ctx, task)
s.pollStartPrecommitMsg(ctx, task)
s.mustPoll(s.pollPrecommitMsgLanded(ctx, task))
s.pollStartPoRep(ctx, task, ts)
Expand Down Expand Up @@ -187,14 +189,31 @@ func (t pollTask) afterSDR() bool {
return t.AfterSDR
}

func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) {
if !task.AfterTreeD && !task.AfterTreeC && !task.AfterTreeR &&
task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil &&
s.pollers[pollerTrees].IsSet() && task.AfterSDR {
func (s *SealPoller) pollStartSDRTreeD(ctx context.Context, task pollTask) {
if !task.AfterTreeD && task.TaskTreeD == nil && s.pollers[pollerTreeD].IsSet() && task.afterSDR() {
s.pollers[pollerTreeD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1 WHERE sp_id = $2 AND sector_number = $3 AND after_sdr = TRUE AND task_id_tree_d IS NULL`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}

return true, nil
})
}
}

func (t pollTask) afterTreeD() bool {
return t.AfterTreeD && t.afterSDR()
}

s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1
WHERE sp_id = $2 AND sector_number = $3 AND after_sdr = TRUE AND task_id_tree_d IS NULL AND task_id_tree_c IS NULL AND task_id_tree_r IS NULL`, id, task.SpID, task.SectorNumber)
func (s *SealPoller) pollStartSDRTreeRC(ctx context.Context, task pollTask) {
if !task.AfterTreeC && !task.AfterTreeR && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTreeRC].IsSet() && task.afterTreeD() {
s.pollers[pollerTreeRC].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_c = $1, task_id_tree_r = $1
WHERE sp_id = $2 AND sector_number = $3 AND after_tree_d = TRUE AND task_id_tree_c IS NULL AND task_id_tree_r IS NULL`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
Expand All @@ -207,12 +226,12 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) {
}
}

func (t pollTask) afterTrees() bool {
return t.AfterTreeD && t.AfterTreeC && t.AfterTreeR && t.afterSDR()
func (t pollTask) afterTreeRC() bool {
return t.AfterTreeC && t.AfterTreeR && t.afterTreeD()
}

func (t pollTask) afterPrecommitMsg() bool {
return t.AfterPrecommitMsg && t.afterTrees()
return t.AfterPrecommitMsg && t.afterTreeRC()
}

func (t pollTask) afterPrecommitMsgSuccess() bool {
Expand Down
Loading