diff --git a/cmd/lotus-miner/init.go b/cmd/lotus-miner/init.go index c109e85b980..1b76960e999 100644 --- a/cmd/lotus-miner/init.go +++ b/cmd/lotus-miner/init.go @@ -463,7 +463,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix)) smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix)) - si := paths.NewIndex(nil) + si := paths.NewMemIndex(nil) lstor, err := paths.NewLocal(ctx, lr, si, nil) if err != nil { diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index 779345dff54..efe6cd96164 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -269,7 +269,7 @@ type Deps struct { as *ctladdr.AddressSelector maddrs []dtypes.MinerAddress stor *paths.Remote - si *paths.IndexProxy + si *paths.DBIndex localStore *paths.Local listenAddr string } @@ -348,7 +348,7 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, } al := alerting.NewAlertingSystem(j) - si := paths.NewIndexProxy(al, db, true) + si := paths.NewDBIndex(al, db) bls := &paths.BasicLocalStorage{ PathToJSON: cctx.String("storage-json"), } diff --git a/itests/harmonydb_test.go b/itests/harmonydb_test.go index 7c0c22d888d..8b1b6123431 100644 --- a/itests/harmonydb_test.go +++ b/itests/harmonydb_test.go @@ -16,6 +16,7 @@ func withSetup(t *testing.T, f func(*kit.TestMiner)) { _, miner, _ := kit.EnsembleMinimal(t, kit.LatestActorsAt(-1), kit.MockProofs(), + kit.WithSectorIndexDB(), ) f(miner) diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go index d588a2490e9..ab54cbef42f 100644 --- a/itests/harmonytask_test.go +++ b/itests/harmonytask_test.go @@ -30,6 +30,7 @@ func withDbSetup(t *testing.T, f func(*kit.TestMiner)) { _, miner, _ := kit.EnsembleMinimal(t, kit.LatestActorsAt(-1), kit.MockProofs(), + kit.WithSectorIndexDB(), ) logging.SetLogLevel("harmonytask", "debug") diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index daca6d2d4e5..3c83ba8962c 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -607,6 +607,7 @@ func (n *Ensemble) Start() *Ensemble { cfg.Subsystems.EnableMining = m.options.subsystems.Has(SMining) cfg.Subsystems.EnableSealing = m.options.subsystems.Has(SSealing) cfg.Subsystems.EnableSectorStorage = m.options.subsystems.Has(SSectorStorage) + cfg.Subsystems.EnableSectorIndexDB = m.options.subsystems.Has(SHarmony) cfg.Dealmaking.MaxStagingDealsBytes = m.options.maxStagingDealsBytes if m.options.mainMiner != nil { @@ -787,7 +788,9 @@ func (n *Ensemble) Start() *Ensemble { n.t.Cleanup(func() { _ = stop(context.Background()) }) mCopy := m n.t.Cleanup(func() { - mCopy.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB.ITestDeleteAll() + if mCopy.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB != nil { + mCopy.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB.ITestDeleteAll() + } }) m.BaseAPI = m.StorageMiner diff --git a/itests/kit/node_miner.go b/itests/kit/node_miner.go index 4b81c9df0bd..ee2ee3eaae2 100644 --- a/itests/kit/node_miner.go +++ b/itests/kit/node_miner.go @@ -37,6 +37,8 @@ const ( SSealing SSectorStorage + SHarmony + MinerSubsystems = iota ) diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 6469c0a3076..9af284148c4 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -89,6 +89,13 @@ func WithAllSubsystems() NodeOpt { } } +func WithSectorIndexDB() NodeOpt { + return func(opts *nodeOpts) error { + opts.subsystems = opts.subsystems.Add(SHarmony) + return nil + } +} + func WithSubsystems(systems ...MinerSubsystem) NodeOpt { return func(opts *nodeOpts) error { for _, s := range systems { diff --git a/node/builder_miner.go b/node/builder_miner.go index dd35c6becf9..08c71ba1976 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -104,9 +104,6 @@ func ConfigStorageMiner(c interface{}) Option { If(cfg.Subsystems.EnableSectorStorage, Error(xerrors.Errorf("sealing can only be enabled on a mining node"))), ), - Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) { - return harmonydb.NewFromConfigWithITestID(cfg)(id) - }), If(cfg.Subsystems.EnableMining, If(!cfg.Subsystems.EnableSealing, Error(xerrors.Errorf("sealing can't be disabled on a mining node yet"))), If(!cfg.Subsystems.EnableSectorStorage, Error(xerrors.Errorf("sealing can't be disabled on a mining node yet"))), @@ -136,8 +133,20 @@ func ConfigStorageMiner(c interface{}) Option { If(cfg.Subsystems.EnableSectorStorage, // Sector storage - Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(cfg.Subsystems.EnableSectorIndexDB)), - Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))), + If(cfg.Subsystems.EnableSectorIndexDB, + Override(new(*paths.DBIndex), paths.NewDBIndex), + Override(new(paths.SectorIndex), From(new(*paths.DBIndex))), + + // sector index db is the only thing on lotus-miner that will use harmonydb + Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) { + return harmonydb.NewFromConfigWithITestID(cfg)(id) + }), + ), + If(!cfg.Subsystems.EnableSectorIndexDB, + Override(new(*paths.MemIndex), paths.NewMemIndex), + Override(new(paths.SectorIndex), From(new(*paths.MemIndex))), + ), + Override(new(*sectorstorage.Manager), modules.SectorStorage), Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))), Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b675772cf39..2ce42c32715 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -124,7 +124,7 @@ type StorageMinerAPI struct { GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc `optional:"true"` SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc `optional:"true"` - HarmonyDB *harmonydb.DB + HarmonyDB *harmonydb.DB `optional:"true"` } var _ api.StorageMiner = &StorageMinerAPI{} diff --git a/storage/paths/index.go b/storage/paths/index.go index bc26bddb420..49ee11e0960 100644 --- a/storage/paths/index.go +++ b/storage/paths/index.go @@ -61,7 +61,7 @@ type storageEntry struct { heartbeatErr error } -type Index struct { +type MemIndex struct { *indexLocks lk sync.RWMutex @@ -73,8 +73,8 @@ type Index struct { stores map[storiface.ID]*storageEntry } -func NewIndex(al *alerting.Alerting) *Index { - return &Index{ +func NewMemIndex(al *alerting.Alerting) *MemIndex { + return &MemIndex{ indexLocks: &indexLocks{ locks: map[abi.SectorID]*sectorLock{}, }, @@ -87,7 +87,7 @@ func NewIndex(al *alerting.Alerting) *Index { } } -func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { +func (i *MemIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -116,7 +116,7 @@ func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.D return out, nil } -func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { +func (i *MemIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) if _, hasAlert := i.pathAlerts[si.ID]; i.alerting != nil && !hasAlert { @@ -219,7 +219,7 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st return nil } -func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) error { +func (i *MemIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error { i.lk.Lock() defer i.lk.Unlock() @@ -307,7 +307,7 @@ func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) return nil } -func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { +func (i *MemIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { i.lk.Lock() defer i.lk.Unlock() @@ -350,7 +350,7 @@ func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report return nil } -func (i *Index) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { +func (i *MemIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { i.lk.Lock() defer i.lk.Unlock() @@ -382,7 +382,7 @@ loop: return nil } -func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { +func (i *MemIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { i.lk.Lock() defer i.lk.Unlock() @@ -416,7 +416,7 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID storiface.ID, s return nil } -func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { +func (i *MemIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -564,7 +564,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif return out, nil } -func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { +func (i *MemIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -576,7 +576,7 @@ func (i *Index) StorageInfo(ctx context.Context, id storiface.ID) (storiface.Sto return *si.info, nil } -func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { +func (i *MemIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -641,7 +641,7 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorF return out, nil } -func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]storiface.ID, error) { +func (i *MemIndex) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]storiface.ID, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -660,4 +660,4 @@ func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]sto return out, nil } -var _ SectorIndex = &Index{} +var _ SectorIndex = &MemIndex{} diff --git a/storage/paths/index_proxy.go b/storage/paths/index_proxy.go deleted file mode 100644 index 06097b665ce..00000000000 --- a/storage/paths/index_proxy.go +++ /dev/null @@ -1,118 +0,0 @@ -package paths - -import ( - "context" - - "github.com/filecoin-project/go-state-types/abi" - - "github.com/filecoin-project/lotus/journal/alerting" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" - "github.com/filecoin-project/lotus/storage/sealer/fsutil" - "github.com/filecoin-project/lotus/storage/sealer/storiface" -) - -type IndexProxy struct { - memIndex *Index - dbIndex *DBIndex - enableSectorIndexDB bool -} - -func NewIndexProxyHelper(enableSectorIndexDB bool) func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy { - return func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy { - return NewIndexProxy(al, db, enableSectorIndexDB) - } -} - -func NewIndexProxy(al *alerting.Alerting, db *harmonydb.DB, enableSectorIndexDB bool) *IndexProxy { - return &IndexProxy{ - memIndex: NewIndex(al), - dbIndex: NewDBIndex(al, db), - enableSectorIndexDB: enableSectorIndexDB, - } -} - -func (ip *IndexProxy) StorageAttach(ctx context.Context, info storiface.StorageInfo, stat fsutil.FsStat) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageAttach(ctx, info, stat) - } - return ip.memIndex.StorageAttach(ctx, info, stat) -} - -func (ip *IndexProxy) StorageDetach(ctx context.Context, id storiface.ID, url string) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageDetach(ctx, id, url) - } - return ip.memIndex.StorageDetach(ctx, id, url) -} - -func (ip *IndexProxy) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageInfo(ctx, id) - } - return ip.memIndex.StorageInfo(ctx, id) -} - -func (ip *IndexProxy) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageReportHealth(ctx, id, report) - } - return ip.memIndex.StorageReportHealth(ctx, id, report) -} - -func (ip *IndexProxy) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageDeclareSector(ctx, storageID, s, ft, primary) - } - return ip.memIndex.StorageDeclareSector(ctx, storageID, s, ft, primary) -} - -func (ip *IndexProxy) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageDropSector(ctx, storageID, s, ft) - } - return ip.memIndex.StorageDropSector(ctx, storageID, s, ft) -} - -func (ip *IndexProxy) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch) - } - return ip.memIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch) -} - -func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageBestAlloc(ctx, allocate, ssize, pathType) - } - return ip.memIndex.StorageBestAlloc(ctx, allocate, ssize, pathType) -} - -func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageLock(ctx, sector, read, write) - } - return ip.memIndex.StorageLock(ctx, sector, read, write) -} - -func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageTryLock(ctx, sector, read, write) - } - return ip.memIndex.StorageTryLock(ctx, sector, read, write) -} - -func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageGetLocks(ctx) - } - return ip.memIndex.StorageGetLocks(ctx) -} - -func (ip *IndexProxy) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { - if ip.enableSectorIndexDB { - return ip.dbIndex.StorageList(ctx) - } - return ip.memIndex.StorageList(ctx) -} - -var _ SectorIndex = &IndexProxy{} diff --git a/storage/paths/index_test.go b/storage/paths/index_test.go index 9a241da23e0..96e17ce7db0 100644 --- a/storage/paths/index_test.go +++ b/storage/paths/index_test.go @@ -42,7 +42,7 @@ const s32g = 32 << 30 func TestFindSimple(t *testing.T) { ctx := context.Background() - i := NewIndex(nil) + i := NewMemIndex(nil) stor1 := newTestStorage() stor2 := newTestStorage() @@ -79,7 +79,7 @@ func TestFindSimple(t *testing.T) { func TestFindNoAllow(t *testing.T) { ctx := context.Background() - i := NewIndex(nil) + i := NewMemIndex(nil) stor1 := newTestStorage() stor1.AllowTo = []storiface.Group{"grp1"} stor2 := newTestStorage() @@ -111,7 +111,7 @@ func TestFindNoAllow(t *testing.T) { func TestFindAllow(t *testing.T) { ctx := context.Background() - i := NewIndex(nil) + i := NewMemIndex(nil) stor1 := newTestStorage() stor1.AllowTo = []storiface.Group{"grp1"} diff --git a/storage/paths/local_test.go b/storage/paths/local_test.go index bfa138ff6f3..4bc2642dc2d 100644 --- a/storage/paths/local_test.go +++ b/storage/paths/local_test.go @@ -80,7 +80,7 @@ func TestLocalStorage(t *testing.T) { root: root, } - index := NewIndex(nil) + index := NewMemIndex(nil) st, err := NewLocal(ctx, tstor, index, nil) require.NoError(t, err) diff --git a/storage/paths/remote_test.go b/storage/paths/remote_test.go index e3376e6fa10..7aea637296c 100644 --- a/storage/paths/remote_test.go +++ b/storage/paths/remote_test.go @@ -59,7 +59,7 @@ func createTestStorage(t *testing.T, p string, seal bool, att ...*paths.Local) s func TestMoveShared(t *testing.T) { logging.SetAllLoggers(logging.LevelDebug) - index := paths.NewIndex(nil) + index := paths.NewMemIndex(nil) ctx := context.Background() diff --git a/storage/sealer/manager_test.go b/storage/sealer/manager_test.go index 7c3e1a1f2b1..d76424d5ea4 100644 --- a/storage/sealer/manager_test.go +++ b/storage/sealer/manager_test.go @@ -100,10 +100,10 @@ func (t *testStorage) Stat(path string) (fsutil.FsStat, error) { var _ paths.LocalStorage = &testStorage{} -func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Manager, *paths.Local, *paths.Remote, *paths.Index, func()) { +func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Manager, *paths.Local, *paths.Remote, *paths.MemIndex, func()) { st := newTestStorage(t) - si := paths.NewIndex(nil) + si := paths.NewMemIndex(nil) lstor, err := paths.NewLocal(ctx, st, si, nil) require.NoError(t, err) diff --git a/storage/sealer/piece_provider_test.go b/storage/sealer/piece_provider_test.go index de1e07a7849..a8c243379f3 100644 --- a/storage/sealer/piece_provider_test.go +++ b/storage/sealer/piece_provider_test.go @@ -180,7 +180,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) { type pieceProviderTestHarness struct { ctx context.Context - index *paths.Index + index *paths.MemIndex pp PieceProvider sector storiface.SectorRef mgr *Manager @@ -210,7 +210,7 @@ func newPieceProviderTestHarness(t *testing.T, mgrConfig config.SealerConfig, se require.NoError(t, err) // create index, storage, local store & remote store. - index := paths.NewIndex(nil) + index := paths.NewMemIndex(nil) storage := newTestStorage(t) localStore, err := paths.NewLocal(ctx, storage, index, []string{"http://" + nl.Addr().String() + "/remote"}) require.NoError(t, err) diff --git a/storage/sealer/sched_test.go b/storage/sealer/sched_test.go index 2e2b05ab2c3..03e947b8a54 100644 --- a/storage/sealer/sched_test.go +++ b/storage/sealer/sched_test.go @@ -187,7 +187,7 @@ func (s *schedTestWorker) Close() error { var _ Worker = &schedTestWorker{} -func addTestWorker(t *testing.T, sched *Scheduler, index *paths.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) { +func addTestWorker(t *testing.T, sched *Scheduler, index *paths.MemIndex, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) { w := &schedTestWorker{ name: name, taskTypes: taskTypes, @@ -231,7 +231,7 @@ func TestSchedStartStop(t *testing.T) { require.NoError(t, err) go sched.runSched() - addTestWorker(t, sched, paths.NewIndex(nil), "fred", nil, decentWorkerResources, false) + addTestWorker(t, sched, paths.NewMemIndex(nil), "fred", nil, decentWorkerResources, false) require.NoError(t, sched.Close(context.TODO())) } @@ -264,13 +264,13 @@ func TestSched(t *testing.T) { wg sync.WaitGroup } - type task func(*testing.T, *Scheduler, *paths.Index, *runMeta) + type task func(*testing.T, *Scheduler, *paths.MemIndex, *runMeta) sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *paths.MemIndex, rm *runMeta) { done := make(chan struct{}) rm.done[taskName] = done @@ -324,7 +324,7 @@ func TestSched(t *testing.T) { taskStarted := func(name string) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *paths.MemIndex, rm *runMeta) { select { case rm.done[name] <- struct{}{}: case <-ctx.Done(): @@ -336,7 +336,7 @@ func TestSched(t *testing.T) { taskDone := func(name string) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *paths.MemIndex, rm *runMeta) { select { case rm.done[name] <- struct{}{}: case <-ctx.Done(): @@ -349,7 +349,7 @@ func TestSched(t *testing.T) { taskNotScheduled := func(name string) task { _, _, l, _ := runtime.Caller(1) _, _, l2, _ := runtime.Caller(2) - return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) { + return func(t *testing.T, sched *Scheduler, index *paths.MemIndex, rm *runMeta) { select { case rm.done[name] <- struct{}{}: t.Fatal("not expected", l, l2) @@ -360,7 +360,7 @@ func TestSched(t *testing.T) { testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) { return func(t *testing.T) { - index := paths.NewIndex(nil) + index := paths.NewMemIndex(nil) sched, err := newScheduler(ctx, "") require.NoError(t, err) @@ -389,7 +389,7 @@ func TestSched(t *testing.T) { } multTask := func(tasks ...task) task { - return func(t *testing.T, s *Scheduler, index *paths.Index, meta *runMeta) { + return func(t *testing.T, s *Scheduler, index *paths.MemIndex, meta *runMeta) { for _, tsk := range tasks { tsk(t, s, index, meta) } @@ -503,7 +503,7 @@ func TestSched(t *testing.T) { } diag := func() task { - return func(t *testing.T, s *Scheduler, index *paths.Index, meta *runMeta) { + return func(t *testing.T, s *Scheduler, index *paths.MemIndex, meta *runMeta) { time.Sleep(20 * time.Millisecond) for _, request := range s.diag().Requests { log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType)