Skip to content
This repository has been archived by the owner on Aug 18, 2020. It is now read-only.

Window based scheduling #67

Merged
merged 16 commits into from
Jul 17, 2020
19 changes: 6 additions & 13 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect

var selector WorkerSelector
if len(best) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
} else { // append to existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
var selector WorkerSelector
var err error
if len(existingPieces) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
} else { // use existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
Expand Down Expand Up @@ -300,10 +300,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke

// TODO: also consider where the unsealed data sits

selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing)
if err != nil {
return nil, xerrors.Errorf("creating path selector: %w", err)
}
selector := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing)

err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
Expand Down Expand Up @@ -417,11 +414,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
return err
}

fetchSel, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage)
if err != nil {
return xerrors.Errorf("creating fetchSel: %w", err)
}

fetchSel := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage)
moveUnsealed := unsealed
{
if len(keepUnsealed) == 0 {
Expand Down Expand Up @@ -496,8 +489,8 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, erro
return m.storage.FsStat(ctx, id)
}

func (m *Manager) Close() error {
return m.sched.Close()
func (m *Manager) Close(ctx context.Context) error {
return m.sched.Close(ctx)
}

var _ SectorManager = &Manager{}
4 changes: 4 additions & 0 deletions manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/filecoin-project/sector-storage/stores"
)

func init() {
logging.SetAllLoggers(logging.LevelDebug)
}

type testStorage stores.StorageConfig

func (t testStorage) DiskUsage(path string) (int64, error) {
Expand Down
Loading