Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lpseal: SDR Storage revervations #11658

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 43 additions & 18 deletions provider/lpffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"github.com/KarpelesLab/reflink"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/puzpuzpuz/xsync/v2"
"golang.org/x/xerrors"

ffi "github.com/filecoin-project/filecoin-ffi"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
proof2 "github.com/filecoin-project/go-state-types/proof"

"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/provider/lpproof"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/proofpaths"
Expand All @@ -43,28 +45,51 @@ type SealCalls struct {
func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCalls {
return &SealCalls{
sectors: &storageProvider{
storage: st,
localStore: ls,
sindex: si,
storage: st,
localStore: ls,
sindex: si,
storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, *StorageReservation](),
},
}
}

type storageProvider struct {
storage paths.Store
localStore *paths.Local
sindex paths.SectorIndex
storage paths.Store
localStore *paths.Local
sindex paths.SectorIndex
storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation]
}

func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
paths, storageIDs, err := l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
if err != nil {
return storiface.SectorPaths{}, nil, err
func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
var paths, storageIDs storiface.SectorPaths
var releaseStorage func()

var ok bool
var resv *StorageReservation
if taskID != nil {
resv, ok = l.storageReservations.Load(*taskID)
}
if ok {
if resv.Alloc != allocate || resv.Existing != existing {
// this should never happen, only when task definition is wrong
return storiface.SectorPaths{}, nil, xerrors.Errorf("storage reservation type mismatch")
}

releaseStorage, err := l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate)

paths = resv.Paths
releaseStorage = resv.Release
} else {
var err error
paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
if err != nil {
return storiface.SectorPaths{}, nil, err
}

releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
}

log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
Expand All @@ -85,8 +110,8 @@ func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.Se
}, nil
}

func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing)
func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -120,7 +145,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size
maybeUns := storiface.FTNone
// todo sectors with data

paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, maybeUns, storiface.PathSealing)
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, maybeUns, storiface.PathSealing)
if err != nil {
return cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand All @@ -135,7 +160,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
}

paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -331,7 +356,7 @@ func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.Sector
alloc = storiface.FTUnsealed
}

sectorPaths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, alloc, storiface.PathSealing)
sectorPaths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, alloc, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down
241 changes: 241 additions & 0 deletions provider/lpffi/task_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package lpffi

import (
"context"
"time"

"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/must"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

type SectorRef struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"`
}

func (sr SectorRef) ID() abi.SectorID {
return abi.SectorID{
Miner: abi.ActorID(sr.SpID),
Number: abi.SectorNumber(sr.SectorNumber),
}
}

func (sr SectorRef) Ref() storiface.SectorRef {
return storiface.SectorRef{
ID: sr.ID(),
ProofType: sr.RegSealProof,
}
}

type TaskStorage struct {
sc *SealCalls

alloc, existing storiface.SectorFileType
ssize abi.SectorSize
pathType storiface.PathType

taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error)
}

type ReleaseStorageFunc func() // free storage reservation

type StorageReservation struct {
SectorRef SectorRef
Release ReleaseStorageFunc
Paths storiface.SectorPaths
PathIDs storiface.SectorPaths

Alloc, Existing storiface.SectorFileType
}

func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) *TaskStorage {
return &TaskStorage{
sc: sb,
alloc: alloc,
existing: existing,
ssize: ssize,
pathType: pathType,
taskToSectorRef: taskToSectorRef,
}
}

func (t *TaskStorage) HasCapacity() bool {
ctx := context.Background()

paths, err := t.sc.sectors.sindex.StorageBestAlloc(ctx, t.alloc, t.ssize, t.pathType)
if err != nil {
log.Errorf("finding best alloc in HasCapacity: %+v", err)
return false
}

local, err := t.sc.sectors.localStore.Local(ctx)
if err != nil {
log.Errorf("getting local storage: %+v", err)
return false
}

for _, path := range paths {
if t.pathType == storiface.PathStorage && !path.CanStore {
continue // we want to store, and this isn't a store path
}
if t.pathType == storiface.PathSealing && !path.CanSeal {
continue // we want to seal, and this isn't a seal path
}

// check if this path is on this node
var found bool
for _, storagePath := range local {
if storagePath.ID == path.ID {
found = true
break
}
}
if !found {
// this path isn't on this node
continue
}

// StorageBestAlloc already checks that there is enough space; Not atomic like reserving space, but it's
// good enough for HasCapacity
return true
}

return false // no path found
}

func (t *TaskStorage) Claim(taskID int) error {
ctx := context.Background()

sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
if err != nil {
return xerrors.Errorf("getting sector ref: %w", err)
}

// storage writelock sector
lkctx, cancel := context.WithCancel(ctx)

allocate := storiface.FTCache

lockAcquireTimuout := time.Second * 10
lockAcquireTimer := time.NewTimer(lockAcquireTimuout)

go func() {
defer cancel()

select {
case <-lockAcquireTimer.C:
case <-ctx.Done():
}
}()

if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, allocate); err != nil {
// timer will expire
return xerrors.Errorf("claim StorageLock: %w", err)
}

if !lockAcquireTimer.Stop() {
// timer expired, so lkctx is done, and that means the lock was acquired and dropped..
return xerrors.Errorf("failed to acquire lock")
}
defer func() {
// make sure we release the sector lock
lockAcquireTimer.Reset(0)
}()

// find anywhere
// if found return nil, for now
s, err := t.sc.sectors.sindex.StorageFindSector(ctx, sectorRef.ID(), allocate, must.One(sectorRef.RegSealProof.SectorSize()), false)
if err != nil {
return xerrors.Errorf("claim StorageFindSector: %w", err)
}

lp, err := t.sc.sectors.localStore.Local(ctx)
if err != nil {
return err
}

// see if there are any non-local sector files in storage
for _, info := range s {
for _, l := range lp {
if l.ID == info.ID {
continue
}

// TODO: Create reservation for fetching; This will require quite a bit more refactoring, but for now we'll
// only care about new allocations
return nil
}
}

// acquire a path to make a reservation in
pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, allocate, storiface.PathSealing, storiface.AcquireMove)
if err != nil {
return err
}

// reserve the space
release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), allocate, pathIDs, storiface.FSOverheadSeal)
if err != nil {
return err
}

sres := &StorageReservation{
SectorRef: sectorRef,
Release: release,
Paths: pathsFs,
PathIDs: pathIDs,

Alloc: t.alloc,
Existing: t.existing,
}

t.sc.sectors.storageReservations.Store(harmonytask.TaskID(taskID), sres)

log.Debugw("claimed storage", "task_id", taskID, "sector", sectorRef.ID(), "paths", pathsFs)

// note: we drop the sector writelock on return; THAT IS INTENTIONAL, this code runs in CanAccept, which doesn't
// guarantee that the work for this sector will happen on this node; SDR CanAccept just ensures that the node can
// run the job, harmonytask is what ensures that only one SDR runs at a time
return nil
}

func (t *TaskStorage) MarkComplete(taskID int) error {
// MarkComplete is ALWAYS called after the task is done or not scheduled
// If Claim is called and returns without errors, MarkComplete with the same
// taskID is guaranteed to eventually be called

sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
if err != nil {
return xerrors.Errorf("getting sector ref: %w", err)
}

sres, ok := t.sc.sectors.storageReservations.Load(harmonytask.TaskID(taskID))
if !ok {
return xerrors.Errorf("no reservation found for task %d", taskID)
}

if sectorRef != sres.SectorRef {
return xerrors.Errorf("reservation sector ref doesn't match task sector ref: %+v != %+v", sectorRef, sres.SectorRef)
}

log.Debugw("marking storage complete", "task_id", taskID, "sector", sectorRef.ID(), "paths", sres.Paths)

// remove the reservation
t.sc.sectors.storageReservations.Delete(harmonytask.TaskID(taskID))

// release the reservation
sres.Release()

// note: this only frees the reservation, allocated sectors are declared in AcquireSector which is aware of
// the reservation
return nil
}

var _ resources.Storage = &TaskStorage{}
Loading