diff --git a/.circleci/config.yml b/.circleci/config.yml index 85bcd045cdd..a1970327201 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1030,7 +1030,7 @@ workflows: requires: - build suite: utest-unit-rest - target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./provider/... ./tools/..." + target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./tools/..." - test: name: test-unit-storage diff --git a/Makefile b/Makefile index 0d986485e31..68d97227b88 100644 --- a/Makefile +++ b/Makefile @@ -128,7 +128,7 @@ an existing lotus binary in your PATH. This may cause problems if you don't run .PHONY: build -install: install-daemon install-miner install-worker +install: install-daemon install-miner install-worker install-provider install-daemon: install -C ./lotus /usr/local/bin/lotus diff --git a/build/actors/v12.tar.zst b/build/actors/v12.tar.zst index def521bd7e8..859431b58d6 100644 Binary files a/build/actors/v12.tar.zst and b/build/actors/v12.tar.zst differ diff --git a/build/bootstrap/butterflynet.pi b/build/bootstrap/butterflynet.pi index 88ff5069f3e..3f5033361f9 100644 --- a/build/bootstrap/butterflynet.pi +++ b/build/bootstrap/butterflynet.pi @@ -1,2 +1,2 @@ -/dns4/bootstrap-0.butterfly.fildev.network/tcp/1347/p2p/12D3KooWNwAkUtWuLtKCyyFP2vBzmpTHSrQao7KQx7Xfa8YvSg1N -/dns4/bootstrap-1.butterfly.fildev.network/tcp/1347/p2p/12D3KooWPn8BDeNcctAcAGuxxiic8uMw2pAi3G5vgdFtfgRs5zBu +/dns4/bootstrap-0.butterfly.fildev.network/tcp/1347/p2p/12D3KooWRaoPgwJuZdPSN4A2iTeh8xzkZGCEBxan9vMkidHisUgn +/dns4/bootstrap-1.butterfly.fildev.network/tcp/1347/p2p/12D3KooWMjLCZeEf3VzSWvQYuhe9VzCcrN6RENX9FgmQqiJfQDWs diff --git a/build/genesis/butterflynet.car b/build/genesis/butterflynet.car index 1ba3a8a1be3..962964c353c 100644 Binary files a/build/genesis/butterflynet.car and b/build/genesis/butterflynet.car differ diff --git a/cli/net.go b/cli/net.go index f25799e9503..99ee92aefc0 100644 --- a/cli/net.go +++ b/cli/net.go @@ -847,7 +847,8 @@ var NetStatCmd = &cli.Command{ }) for _, stat := range stats { - printScope(&stat.stat, name+stat.name) + tmp := stat.stat + printScope(&tmp, name+stat.name) } } diff --git a/cmd/lotus-miner/proving.go b/cmd/lotus-miner/proving.go index 3ecc58ba7af..2fc1427b520 100644 --- a/cmd/lotus-miner/proving.go +++ b/cmd/lotus-miner/proving.go @@ -559,7 +559,8 @@ var provingCheckProvableCmd = &cli.Command{ for parIdx, par := range partitions { sectors := make(map[abi.SectorNumber]struct{}) - sectorInfos, err := api.StateMinerSectors(ctx, addr, &par.LiveSectors, types.EmptyTSK) + tmp := par.LiveSectors + sectorInfos, err := api.StateMinerSectors(ctx, addr, &tmp, types.EmptyTSK) if err != nil { return err } diff --git a/cmd/lotus-provider/config.go b/cmd/lotus-provider/config.go index 2371b79dcec..fa637fcf316 100644 --- a/cmd/lotus-provider/config.go +++ b/cmd/lotus-provider/config.go @@ -12,6 +12,7 @@ import ( "github.com/BurntSushi/toml" "github.com/urfave/cli/v2" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/node/config" @@ -42,19 +43,26 @@ var configDefaultCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - c := config.DefaultLotusProvider() - - cb, err := config.ConfigUpdate(c, nil, config.Commented(!cctx.Bool("no-comment")), config.DefaultKeepUncommented(), config.NoEnv()) + comment := !cctx.Bool("no-comment") + cfg, err := getDefaultConfig(comment) if err != nil { return err } - - fmt.Print(string(cb)) + fmt.Print(cfg) return nil }, } +func getDefaultConfig(comment bool) (string, error) { + c := config.DefaultLotusProvider() + cb, err := config.ConfigUpdate(c, nil, config.Commented(comment), config.DefaultKeepUncommented(), config.NoEnv()) + if err != nil { + return "", err + } + return string(cb), nil +} + var configSetCmd = &cli.Command{ Name: "set", Aliases: []string{"add"}, @@ -190,7 +198,7 @@ var configRmCmd = &cli.Command{ var configViewCmd = &cli.Command{ Name: "interpret", Aliases: []string{"view", "stacked", "stack"}, - Usage: "Interpret stacked config layers by this version of lotus-provider.", + Usage: "Interpret stacked config layers by this version of lotus-provider, with system-generated comments.", ArgsUsage: "a list of layers to be interpreted as the final config", Flags: []cli.Flag{ &cli.StringSliceFlag{ @@ -209,10 +217,12 @@ var configViewCmd = &cli.Command{ if err != nil { return err } - - e := toml.NewEncoder(os.Stdout) - e.Indent = " " - return e.Encode(lp) + cb, err := config.ConfigUpdate(lp, config.DefaultLotusProvider(), config.Commented(true), config.DefaultKeepUncommented(), config.NoEnv()) + if err != nil { + return xerrors.Errorf("cannot interpret config: %w", err) + } + fmt.Println(string(cb)) + return nil }, } diff --git a/cmd/lotus-provider/main.go b/cmd/lotus-provider/main.go index a7b26040244..9eda6db870f 100644 --- a/cmd/lotus-provider/main.go +++ b/cmd/lotus-provider/main.go @@ -43,6 +43,7 @@ func main() { runCmd, stopCmd, configCmd, + testCmd, //backupCmd, //lcli.WithCategory("chain", actorCmd), //lcli.WithCategory("storage", sectorsCmd), diff --git a/cmd/lotus-provider/migrate.go b/cmd/lotus-provider/migrate.go index 674a537dfd3..fc1e930aaee 100644 --- a/cmd/lotus-provider/migrate.go +++ b/cmd/lotus-provider/migrate.go @@ -181,7 +181,12 @@ environment variable LOTUS_WORKER_WINDOWPOST. } if !lo.Contains(titles, "base") { - _, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '')") + cfg, err := getDefaultConfig(true) + if err != nil { + return xerrors.Errorf("Cannot get default config: %w", err) + } + _, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '$1')", cfg) + if err != nil { return err } diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go new file mode 100644 index 00000000000..bbe816e80ce --- /dev/null +++ b/cmd/lotus-provider/proving.go @@ -0,0 +1,180 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/dline" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/provider" +) + +var testCmd = &cli.Command{ + Name: "test", + Usage: "Utility functions for testing", + Subcommands: []*cli.Command{ + //provingInfoCmd, + wdPostCmd, + }, +} + +var wdPostCmd = &cli.Command{ + Name: "window-post", + Aliases: []string{"wd", "windowpost", "wdpost"}, + Usage: "Compute a proof-of-spacetime for a sector (requires the sector to be pre-sealed). These will not send to the chain.", + Subcommands: []*cli.Command{ + wdPostHereCmd, + wdPostTaskCmd, + }, +} + +var wdPostTaskCmd = &cli.Command{ + Name: "task", + Aliases: []string{"scheduled", "schedule", "async", "asynchronous"}, + Usage: "Test the windowpost scheduler by running it on the next available lotus-provider. ", + Flags: []cli.Flag{ + &cli.Uint64Flag{ + Name: "deadline", + Usage: "deadline to compute WindowPoSt for ", + Value: 0, + }, + &cli.StringSliceFlag{ + Name: "layers", + Usage: "list of layers to be interpreted (atop defaults). Default: base", + Value: cli.NewStringSlice("base"), + }, + }, + Action: func(cctx *cli.Context) error { + ctx := context.Background() + + deps, err := getDeps(ctx, cctx) + if err != nil { + return err + } + + ts, err := deps.full.ChainHead(ctx) + if err != nil { + return xerrors.Errorf("cannot get chainhead %w", err) + } + ht := ts.Height() + + addr, err := address.NewFromString(deps.cfg.Addresses.MinerAddresses[0]) + if err != nil { + return xerrors.Errorf("cannot get miner address %w", err) + } + maddr, err := address.IDFromAddress(addr) + if err != nil { + return xerrors.Errorf("cannot get miner id %w", err) + } + did, err := deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + _, err = tx.Exec(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123)`) + if err != nil { + log.Error("inserting harmony_task: ", err) + return false, xerrors.Errorf("inserting harmony_task: %w", err) + } + var id int64 + if err = tx.QueryRow(`SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1`).Scan(&id); err != nil { + log.Error("getting inserted id: ", err) + return false, xerrors.Errorf("getting inserted id: %w", err) + } + _, err = tx.Exec(`INSERT INTO wdpost_partition_tasks + (task_id, sp_id, proving_period_start, deadline_index, partition_index) VALUES ($1, $2, $3, $4, $5)`, + id, maddr, ht, cctx.Uint64("deadline"), 0) + if err != nil { + log.Error("inserting wdpost_partition_tasks: ", err) + return false, xerrors.Errorf("inserting wdpost_partition_tasks: %w", err) + } + _, err = tx.Exec("INSERT INTO harmony_test (task_id) VALUES ($1)", id) + if err != nil { + return false, xerrors.Errorf("inserting into harmony_tests: %w", err) + } + return true, nil + }) + if err != nil { + return xerrors.Errorf("writing SQL transaction: %w", err) + } + log.Infof("Inserted task %v", did) + log.Infof("Check your lotus-provider logs for more details.") + return nil + }, +} + +var wdPostHereCmd = &cli.Command{ + Name: "here", + Aliases: []string{"cli"}, + Usage: "Compute WindowPoSt for performance and configuration testing.", + Description: `Note: This command is intended to be used to verify PoSt compute performance. +It will not send any messages to the chain. Since it can compute any deadline, output may be incorrectly timed for the chain.`, + ArgsUsage: "[deadline index]", + Flags: []cli.Flag{ + &cli.Uint64Flag{ + Name: "deadline", + Usage: "deadline to compute WindowPoSt for ", + Value: 0, + }, + &cli.StringSliceFlag{ + Name: "layers", + Usage: "list of layers to be interpreted (atop defaults). Default: base", + Value: cli.NewStringSlice("base"), + }, + &cli.StringFlag{ + Name: "storage-json", + Usage: "path to json file containing storage config", + Value: "~/.lotus-provider/storage.json", + }, + &cli.Uint64Flag{ + Name: "partition", + Usage: "partition to compute WindowPoSt for", + Value: 0, + }, + }, + Action: func(cctx *cli.Context) error { + + ctx := context.Background() + deps, err := getDeps(ctx, cctx) + if err != nil { + return err + } + + wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw, + deps.as, deps.maddrs, deps.db, deps.stor, deps.si, deps.cfg.Subsystems.WindowPostMaxTasks) + if err != nil { + return err + } + _, _ = wdPoStSubmitTask, derlareRecoverTask + + if len(deps.maddrs) == 0 { + return errors.New("no miners to compute WindowPoSt for") + } + head, err := deps.full.ChainHead(ctx) + if err != nil { + return xerrors.Errorf("failed to get chain head: %w", err) + } + + di := dline.NewInfo(head.Height(), cctx.Uint64("deadline"), 0, 0, 0, 10 /*challenge window*/, 0, 0) + + for _, maddr := range deps.maddrs { + out, err := wdPostTask.DoPartition(ctx, head, address.Address(maddr), di, cctx.Uint64("partition")) + if err != nil { + fmt.Println("Error computing WindowPoSt for miner", maddr, err) + continue + } + fmt.Println("Computed WindowPoSt for miner", maddr, ":") + err = json.NewEncoder(os.Stdout).Encode(out) + if err != nil { + fmt.Println("Could not encode WindowPoSt output for miner", maddr, err) + continue + } + } + + return nil + }, +} diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index 2d1bc99cd21..9b3b6f6a337 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/base64" "fmt" "net" @@ -15,6 +16,7 @@ import ( ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/pkg/errors" + "github.com/samber/lo" "github.com/urfave/cli/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -40,6 +42,7 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/provider" + "github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/provider/lpwinning" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer" @@ -113,6 +116,12 @@ var runCmd = &cli.Command{ tag.Insert(metrics.Commit, build.CurrentCommit), tag.Insert(metrics.NodeType, "provider"), ) + shutdownChan := make(chan struct{}) + ctx, ctxclose := context.WithCancel(ctx) + go func() { + <-shutdownChan + ctxclose() + }() // Register all metric views /* if err := view.Register( @@ -130,116 +139,12 @@ var runCmd = &cli.Command{ } } - // Open repo - - repoPath := cctx.String(FlagRepoPath) - fmt.Println("repopath", repoPath) - r, err := repo.NewFS(repoPath) - if err != nil { - return err - } - - ok, err := r.Exists() - if err != nil { - return err - } - if !ok { - if err := r.Init(repo.Provider); err != nil { - return err - } - } - - db, err := makeDB(cctx) - if err != nil { - return err - } - shutdownChan := make(chan struct{}) - - const unspecifiedAddress = "0.0.0.0" - listenAddr := cctx.String("listen") - addressSlice := strings.Split(listenAddr, ":") - if ip := net.ParseIP(addressSlice[0]); ip != nil { - if ip.String() == unspecifiedAddress { - rip, err := db.GetRoutableIP() - if err != nil { - return err - } - listenAddr = rip + ":" + addressSlice[1] - } - } - - /////////////////////////////////////////////////////////////////////// - ///// Dependency Setup - /////////////////////////////////////////////////////////////////////// - - // The config feeds into task runners & their helpers - cfg, err := getConfig(cctx, db) - if err != nil { - return err - } - - log.Debugw("config", "config", cfg) - - var verif storiface.Verifier = ffiwrapper.ProofVerifier - - as, err := provider.AddressSelector(&cfg.Addresses)() - if err != nil { - return err - } - - de, err := journal.ParseDisabledEvents(cfg.Journal.DisabledEvents) - if err != nil { - return err - } - j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de) - if err != nil { - return err - } - defer j.Close() + deps, err := getDeps(ctx, cctx) - full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.ChainApiInfo) if err != nil { return err } - defer fullCloser() - - sa, err := StorageAuth(cfg.Apis.StorageRPCSecret) - if err != nil { - return xerrors.Errorf(`'%w' while parsing the config toml's - [Apis] - StorageRPCSecret=%v -Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, cfg.Apis.StorageRPCSecret) - } - - al := alerting.NewAlertingSystem(j) - si := paths.NewIndexProxy(al, db, true) - bls := &paths.BasicLocalStorage{ - PathToJSON: cctx.String("storage-json"), - } - localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"}) - if err != nil { - return err - } - - stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) - - wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore())) - - // todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper - // maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably - // don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask) - lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, wstates) - - var maddrs []dtypes.MinerAddress - for _, s := range cfg.Addresses.MinerAddresses { - addr, err := address.NewFromString(s) - if err != nil { - return err - } - maddrs = append(maddrs, dtypes.MinerAddress(addr)) - } - - log.Infow("providers handled", "maddrs", maddrs) + cfg, db, full, verif, lw, as, maddrs, stor, si, localStore := deps.cfg, deps.db, deps.full, deps.verif, deps.lw, deps.as, deps.maddrs, deps.stor, deps.si, deps.localStore /////////////////////////////////////////////////////////////////////// ///// Task Selection @@ -261,7 +166,11 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, activeTasks = append(activeTasks, winPoStTask) } } - taskEngine, err := harmonytask.New(db, activeTasks, listenAddr) + log.Infow("This lotus_provider instance handles", + "miner_addresses", maddrs, + "tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name })) + + taskEngine, err := harmonytask.New(db, activeTasks, deps.listenAddr) if err != nil { return err } @@ -306,7 +215,6 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, //node.ShutdownHandler{Component: "provider", StopFunc: stop}, <-finishCh - return nil }, } @@ -351,3 +259,147 @@ func StorageAuth(apiKey string) (sealer.StorageAuth, error) { headers.Add("Authorization", "Bearer "+string(token)) return sealer.StorageAuth(headers), nil } + +type Deps struct { + cfg *config.LotusProviderConfig + db *harmonydb.DB + full api.FullNode + verif storiface.Verifier + lw *sealer.LocalWorker + as *ctladdr.AddressSelector + maddrs []dtypes.MinerAddress + stor *paths.Remote + si *paths.IndexProxy + localStore *paths.Local + listenAddr string +} + +func getDeps(ctx context.Context, cctx *cli.Context) (*Deps, error) { + // Open repo + + repoPath := cctx.String(FlagRepoPath) + fmt.Println("repopath", repoPath) + r, err := repo.NewFS(repoPath) + if err != nil { + return nil, err + } + + ok, err := r.Exists() + if err != nil { + return nil, err + } + if !ok { + if err := r.Init(repo.Provider); err != nil { + return nil, err + } + } + + db, err := makeDB(cctx) + if err != nil { + return nil, err + } + + /////////////////////////////////////////////////////////////////////// + ///// Dependency Setup + /////////////////////////////////////////////////////////////////////// + + // The config feeds into task runners & their helpers + cfg, err := getConfig(cctx, db) + if err != nil { + return nil, err + } + + log.Debugw("config", "config", cfg) + + var verif storiface.Verifier = ffiwrapper.ProofVerifier + + as, err := provider.AddressSelector(&cfg.Addresses)() + if err != nil { + return nil, err + } + + de, err := journal.ParseDisabledEvents(cfg.Journal.DisabledEvents) + if err != nil { + return nil, err + } + j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de) + if err != nil { + return nil, err + } + + full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.ChainApiInfo) + if err != nil { + return nil, err + } + + go func() { + select { + case <-ctx.Done(): + fullCloser() + _ = j.Close() + } + }() + sa, err := StorageAuth(cfg.Apis.StorageRPCSecret) + if err != nil { + return nil, xerrors.Errorf(`'%w' while parsing the config toml's + [Apis] + StorageRPCSecret=%v +Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, cfg.Apis.StorageRPCSecret) + } + + al := alerting.NewAlertingSystem(j) + si := paths.NewIndexProxy(al, db, true) + bls := &paths.BasicLocalStorage{ + PathToJSON: cctx.String("storage-json"), + } + + listenAddr := cctx.String("listen") + const unspecifiedAddress = "0.0.0.0" + addressSlice := strings.Split(listenAddr, ":") + if ip := net.ParseIP(addressSlice[0]); ip != nil { + if ip.String() == unspecifiedAddress { + rip, err := db.GetRoutableIP() + if err != nil { + return nil, err + } + listenAddr = rip + ":" + addressSlice[1] + } + } + localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"}) + if err != nil { + return nil, err + } + + stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) + + wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore())) + + // todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper + // maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably + // don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask) + lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, wstates) + + var maddrs []dtypes.MinerAddress + for _, s := range cfg.Addresses.MinerAddresses { + addr, err := address.NewFromString(s) + if err != nil { + return nil, err + } + maddrs = append(maddrs, dtypes.MinerAddress(addr)) + } + + return &Deps{ // lint: intentionally not-named so it will fail if one is forgotten + cfg, + db, + full, + verif, + lw, + as, + maddrs, + stor, + si, + localStore, + listenAddr, + }, nil + +} diff --git a/cmd/lotus-shed/terminations.go b/cmd/lotus-shed/terminations.go index c5f35995a4f..563c1ba3a77 100644 --- a/cmd/lotus-shed/terminations.go +++ b/cmd/lotus-shed/terminations.go @@ -157,7 +157,8 @@ var terminationsCmd = &cli.Command{ } for _, t := range termParams.Terminations { - sectors, err := minerSt.LoadSectors(&t.Sectors) + tmp := t.Sectors + sectors, err := minerSt.LoadSectors(&tmp) if err != nil { return err } diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go index 7867ca19410..d588a2490e9 100644 --- a/itests/harmonytask_test.go +++ b/itests/harmonytask_test.go @@ -72,7 +72,13 @@ func (t *task1) Adder(add harmonytask.AddTaskFunc) { } } +func init() { + //logging.SetLogLevel("harmonydb", "debug") + //logging.SetLogLevel("harmonytask", "debug") +} + func TestHarmonyTasks(t *testing.T) { + //t.Parallel() withDbSetup(t, func(m *kit.TestMiner) { cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB t1 := &task1{ @@ -82,7 +88,7 @@ func TestHarmonyTasks(t *testing.T) { harmonytask.POLL_DURATION = time.Millisecond * 100 e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1") require.NoError(t, err) - time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. + time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE. e.GracefullyTerminate(time.Minute) expected := []string{"taskResult56", "taskResult73"} sort.Strings(t1.WorkCompleted) @@ -154,6 +160,7 @@ func fooLetterSaver(t *testing.T, cdb *harmonydb.DB, dest *[]string) *passthru { } func TestHarmonyTasksWith2PartiesPolling(t *testing.T) { + //t.Parallel() withDbSetup(t, func(m *kit.TestMiner) { cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB senderParty := fooLetterAdder(t, cdb) @@ -164,7 +171,7 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) { require.NoError(t, err) worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2") require.NoError(t, err) - time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. + time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE. sender.GracefullyTerminate(time.Second * 5) worker.GracefullyTerminate(time.Second * 5) sort.Strings(dest) @@ -173,14 +180,15 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) { } func TestWorkStealing(t *testing.T) { + //t.Parallel() withDbSetup(t, func(m *kit.TestMiner) { cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB ctx := context.Background() // The dead worker will be played by a few SQL INSERTS. _, err := cdb.Exec(ctx, `INSERT INTO harmony_machines - (id, last_contact,host_and_port, cpu, ram, gpu, gpuram) - VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1, 1000000)`) + (id, last_contact,host_and_port, cpu, ram, gpu) + VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1)`) require.ErrorIs(t, err, nil) _, err = cdb.Exec(ctx, `INSERT INTO harmony_task (id, name, owner_id, posted_time, added_by) @@ -194,13 +202,14 @@ func TestWorkStealing(t *testing.T) { var dest []string worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2") require.ErrorIs(t, err, nil) - time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. + time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE. worker.GracefullyTerminate(time.Second * 5) require.Equal(t, []string{"M"}, dest) }) } func TestTaskRetry(t *testing.T) { + //t.Parallel() withDbSetup(t, func(m *kit.TestMiner) { cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB senderParty := fooLetterAdder(t, cdb) @@ -232,7 +241,7 @@ func TestTaskRetry(t *testing.T) { } rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2") require.NoError(t, err) - time.Sleep(3 * time.Second) + time.Sleep(time.Second) sender.GracefullyTerminate(time.Hour) rcv.GracefullyTerminate(time.Hour) sort.Strings(dest) diff --git a/lib/harmony/harmonydb/harmonydb.go b/lib/harmony/harmonydb/harmonydb.go index 6d99707327b..0fed176d232 100644 --- a/lib/harmony/harmonydb/harmonydb.go +++ b/lib/harmony/harmonydb/harmonydb.go @@ -3,7 +3,6 @@ package harmonydb import ( "context" "embed" - "errors" "fmt" "math/rand" "net" @@ -17,6 +16,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/node/config" ) @@ -205,16 +205,16 @@ func ensureSchemaExists(connString, schema string) error { p, err := pgx.Connect(ctx, connString) defer cncl() if err != nil { - return fmt.Errorf("unable to connect to db: %s, err: %v", connString, err) + return xerrors.Errorf("unable to connect to db: %s, err: %v", connString, err) } defer func() { _ = p.Close(context.Background()) }() if len(schema) < 5 || !schemaRE.MatchString(schema) { - return errors.New("schema must be of the form " + schemaREString + "\n Got: " + schema) + return xerrors.New("schema must be of the form " + schemaREString + "\n Got: " + schema) } _, err = p.Exec(context.Background(), "CREATE SCHEMA IF NOT EXISTS "+schema) if err != nil { - return fmt.Errorf("cannot create schema: %w", err) + return xerrors.Errorf("cannot create schema: %w", err) } return nil } @@ -232,7 +232,7 @@ func (db *DB) upgrade() error { )`) if err != nil { logger.Error("Upgrade failed.") - return err + return xerrors.Errorf("Cannot create base table %w", err) } // __Run scripts in order.__ @@ -243,10 +243,10 @@ func (db *DB) upgrade() error { err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base") if err != nil { logger.Error("Cannot read entries: " + err.Error()) - return err + return xerrors.Errorf("cannot read entries: %w", err) } for _, l := range landedEntries { - landed[l.Entry] = true + landed[l.Entry[:8]] = true } } dir, err := fs.ReadDir("sql") @@ -261,7 +261,11 @@ func (db *DB) upgrade() error { } for _, e := range dir { name := e.Name() - if landed[name] || !strings.HasSuffix(name, ".sql") { + if !strings.HasSuffix(name, ".sql") { + logger.Debug("Must have only SQL files here, found: " + name) + continue + } + if landed[name[:8]] { logger.Debug("DB Schema " + name + " already applied.") continue } @@ -278,15 +282,15 @@ func (db *DB) upgrade() error { if err != nil { msg := fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error()) logger.Error(msg) - return errors.New(msg) // makes devs lives easier by placing message at the end. + return xerrors.New(msg) // makes devs lives easier by placing message at the end. } } // Mark Completed. - _, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name) + _, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name[:8]) if err != nil { logger.Error("Cannot update base: " + err.Error()) - return fmt.Errorf("cannot insert into base: %w", err) + return xerrors.Errorf("cannot insert into base: %w", err) } } return nil diff --git a/lib/harmony/harmonydb/sql/20231120-testing1.sql b/lib/harmony/harmonydb/sql/20231120-testing1.sql new file mode 100644 index 00000000000..0aeb4fc58c9 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20231120-testing1.sql @@ -0,0 +1,6 @@ +CREATE TABLE harmony_test ( + task_id bigint + constraint harmony_test_pk + primary key, + options text +); \ No newline at end of file diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index ffab5903c17..788ca4a34e5 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -111,7 +111,9 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, er var commit bool defer func() { // Panic clean-up. if !commit { - retErr = tx.Rollback(ctx) + if tmp := tx.Rollback(ctx); tmp != nil { + retErr = tmp + } } }() commit, err = f(&Tx{tx, ctx}) @@ -119,7 +121,7 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, er return false, err } if commit { - err := tx.Commit(ctx) + err = tx.Commit(ctx) if err != nil { return false, err } diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 7ec47d32a7c..79a156fef12 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -198,7 +198,7 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done } } _, err = tx.Exec(`INSERT INTO harmony_task_history - (task_id, name, posted, work_start, work_end, result, by_host_and_port, err) + (task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, workEnd, done, h.TaskEngine.hostAndPort, result) if err != nil { return false, fmt.Errorf("could not write history: %w", err) diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go index 8288aaf024e..b129496d8bd 100644 --- a/lib/harmony/resources/resources.go +++ b/lib/harmony/resources/resources.go @@ -93,8 +93,9 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { } func CleanupMachines(ctx context.Context, db *harmonydb.DB) int { - ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`, - time.Now().Add(-1*LOOKS_DEAD_TIMEOUT).UTC()) + ct, err := db.Exec(ctx, + `DELETE FROM harmony_machines WHERE last_contact < CURRENT_TIMESTAMP - INTERVAL '1 MILLISECOND' * $1 `, + LOOKS_DEAD_TIMEOUT.Milliseconds()) // ms enables unit testing to change timeout. if err != nil { logger.Warn("unable to delete old machines: ", err) } diff --git a/node/repo/repo_test.go b/node/repo/repo_test.go index 16c101d44b9..c78afa9db3d 100644 --- a/node/repo/repo_test.go +++ b/node/repo/repo_test.go @@ -16,7 +16,7 @@ import ( func basicTest(t *testing.T, repo Repo) { apima, err := repo.APIEndpoint() if assert.Error(t, err) { - assert.Equal(t, ErrNoAPIEndpoint, err) + assert.ErrorContains(t, err, ErrNoAPIEndpoint.Error()) } assert.Nil(t, apima, "with no api endpoint, return should be nil") @@ -72,7 +72,7 @@ func basicTest(t *testing.T, repo Repo) { apima, err = repo.APIEndpoint() if assert.Error(t, err) { - assert.Equal(t, ErrNoAPIEndpoint, err, "after closing repo, api should be nil") + assert.ErrorContains(t, err, ErrNoAPIEndpoint.Error(), "after closing repo, api should be nil") } assert.Nil(t, apima, "with closed repo, apima should be set back to nil") diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index 5123410c957..f75f2a4b8ad 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -18,6 +18,10 @@ import ( var log = logging.Logger("lpmessage") +type str string // makes ctx value collissions impossible + +var CtxTaskID str = "task_id" + type SenderAPI interface { StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) @@ -47,8 +51,7 @@ func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender { return &Sender{ api: api, signer: signer, - - db: db, + db: db, } } @@ -100,6 +103,14 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS var sigMsg *types.SignedMessage + var idCount int + err = s.db.QueryRow(ctx, `SELECT COUNT(*) FROM harmony_test WHERE task_id=$1`, + ctx.Value(CtxTaskID)).Scan(&idCount) + if err != nil { + return cid.Undef, xerrors.Errorf("reading harmony_test: %w", err) + } + noSend := idCount == 1 + // start db tx c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { // assign nonce (max(api.MpoolGetNonce, db nonce+1)) @@ -137,6 +148,18 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return false, xerrors.Errorf("marshaling message: %w", err) } + if noSend { + log.Errorw("SKIPPED SENDING MESSAGE PER ENVIRONMENT VARIABLE - NOT PRODUCTION SAFE", + "from_key", fromA.String(), + "nonce", msg.Nonce, + "to_addr", msg.To.String(), + "signed_data", data, + "signed_json", string(jsonBytes), + "signed_cid", sigMsg.Cid(), + "send_reason", reason, + ) + return true, nil // nothing committed + } // write to db c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`, fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason) @@ -153,6 +176,9 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS if err != nil || !c { return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err) } + if noSend { + return sigMsg.Cid(), nil + } // push to mpool _, err = s.api.MpoolPush(ctx, sigMsg) @@ -168,7 +194,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return cid.Undef, xerrors.Errorf("updating db record: %w", err) } if cn != 1 { - return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c) + return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", cn) } log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit) diff --git a/provider/lpwindow/compute_do.go b/provider/lpwindow/compute_do.go index ba13cdc2b53..800f23c57c3 100644 --- a/provider/lpwindow/compute_do.go +++ b/provider/lpwindow/compute_do.go @@ -31,7 +31,7 @@ import ( const disablePreChecks = false // todo config -func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr address.Address, di *dline.Info, partIdx uint64) (out *miner2.SubmitWindowedPoStParams, err error) { +func (t *WdPostTask) DoPartition(ctx context.Context, ts *types.TipSet, maddr address.Address, di *dline.Info, partIdx uint64) (out *miner2.SubmitWindowedPoStParams, err error) { defer func() { if r := recover(); r != nil { log.Errorf("recover: %s", r) @@ -207,8 +207,9 @@ func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr ad time.Sleep(5 * time.Second) continue todo retry loop */ } else if !correct { + _ = correct /*log.Errorw("generated incorrect window post proof", "post", postOut, "error", err) - continue todo retry loop */ + continue todo retry loop*/ } // Proof generation successful, stop retrying @@ -322,11 +323,11 @@ func (t *WdPostTask) sectorsForProof(ctx context.Context, maddr address.Address, if err := allSectors.ForEach(func(sectorNo uint64) error { if info, found := sectorByID[sectorNo]; found { proofSectors = append(proofSectors, info) - } else { - //skip - // todo: testing: old logic used to put 'substitute' sectors here - // that probably isn't needed post nv19, but we do need to check that - } + } //else { + //skip + // todo: testing: old logic used to put 'substitute' sectors here + // that probably isn't needed post nv19, but we do need to check that + //} return nil }); err != nil { return nil, xerrors.Errorf("iterating partition sector bitmap: %w", err) diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index 83bf67a3104..c3385533c66 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -74,10 +74,10 @@ type WdPostTask struct { } type wdTaskIdentity struct { - Sp_id uint64 - Proving_period_start abi.ChainEpoch - Deadline_index uint64 - Partition_index uint64 + SpID uint64 `db:"sp_id"` + ProvingPeriodStart abi.ChainEpoch `db:"proving_period_start"` + DeadlineIndex uint64 `db:"deadline_index"` + PartitionIndex uint64 `db:"partition_index"` } func NewWdPostTask(db *harmonydb.DB, @@ -150,7 +150,7 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, err } - postOut, err := t.doPartition(context.Background(), ts, maddr, deadline, partIdx) + postOut, err := t.DoPartition(context.Background(), ts, maddr, deadline, partIdx) if err != nil { log.Errorf("WdPostTask.Do() failed to doPartition: %v", err) return false, err @@ -206,11 +206,11 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng // GetData for tasks type wdTaskDef struct { - Task_id harmonytask.TaskID - Sp_id uint64 - Proving_period_start abi.ChainEpoch - Deadline_index uint64 - Partition_index uint64 + TaskID harmonytask.TaskID + SpID uint64 + ProvingPeriodStart abi.ChainEpoch + DeadlineIndex uint64 + PartitionIndex uint64 dlInfo *dline.Info `pgx:"-"` openTs *types.TipSet @@ -232,10 +232,10 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng // Accept those past deadline, then delete them in Do(). for i := range tasks { - tasks[i].dlInfo = wdpost.NewDeadlineInfo(tasks[i].Proving_period_start, tasks[i].Deadline_index, ts.Height()) + tasks[i].dlInfo = wdpost.NewDeadlineInfo(tasks[i].ProvingPeriodStart, tasks[i].DeadlineIndex, ts.Height()) if tasks[i].dlInfo.PeriodElapsed() { - return &tasks[i].Task_id, nil + return &tasks[i].TaskID, nil } tasks[i].openTs, err = t.api.ChainGetTipSetAfterHeight(context.Background(), tasks[i].dlInfo.Open, ts.Key()) @@ -281,7 +281,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng var r int err := t.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_task_history - WHERE task_id = $1 AND result = false`, d.Task_id).Scan(&r) + WHERE task_id = $1 AND result = false`, d.TaskID).Scan(&r) if err != nil { log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err) } @@ -293,7 +293,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng return tasks[i].dlInfo.Open < tasks[j].dlInfo.Open }) - return &tasks[0].Task_id, nil + return &tasks[0].TaskID, nil } var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt] @@ -353,10 +353,10 @@ func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types for pidx := range partitions { tid := wdTaskIdentity{ - Sp_id: aid, - Proving_period_start: di.PeriodStart, - Deadline_index: di.Index, - Partition_index: uint64(pidx), + SpID: aid, + ProvingPeriodStart: di.PeriodStart, + DeadlineIndex: di.Index, + PartitionIndex: uint64(pidx), } tf := t.windowPoStTF.Val(ctx) @@ -384,10 +384,10 @@ func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIden partition_index ) VALUES ($1, $2, $3, $4, $5)`, taskId, - taskIdent.Sp_id, - taskIdent.Proving_period_start, - taskIdent.Deadline_index, - taskIdent.Partition_index, + taskIdent.SpID, + taskIdent.ProvingPeriodStart, + taskIdent.DeadlineIndex, + taskIdent.PartitionIndex, ) if err != nil { return false, xerrors.Errorf("insert partition task: %w", err) diff --git a/provider/lpwindow/compute_task_test.go b/provider/lpwindow/compute_task_test.go deleted file mode 100644 index 0f7a1335bce..00000000000 --- a/provider/lpwindow/compute_task_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package lpwindow - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-state-types/dline" - - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" - "github.com/filecoin-project/lotus/lib/harmony/harmonytask" - "github.com/filecoin-project/lotus/node/config" -) - -// test to create WDPostTask, invoke AddTask and check if the task is added to the DB -func TestAddTask(t *testing.T) { - db, err := harmonydb.NewFromConfig(config.HarmonyDB{ - Hosts: []string{"localhost"}, - Port: "5433", - Username: "yugabyte", - Password: "yugabyte", - Database: "yugabyte", - }) - require.NoError(t, err) - wdPostTask := NewWdPostTask(db, nil, 0) - taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300") - _ = taskEngine - ts := types.TipSet{} - deadline := dline.Info{} - - require.NoError(t, err) -} diff --git a/provider/lpwindow/recover_task.go b/provider/lpwindow/recover_task.go index fafe76e569b..6006f3c35ca 100644 --- a/provider/lpwindow/recover_task.go +++ b/provider/lpwindow/recover_task.go @@ -166,7 +166,7 @@ func (w *WdPostRecoverDeclareTask) Do(taskID harmonytask.TaskID, stillOwned func recDecl := miner.RecoveryDeclaration{ Deadline: dlIdx, - Partition: uint64(partIdx), + Partition: partIdx, Sectors: recovered, } @@ -187,6 +187,9 @@ func (w *WdPostRecoverDeclareTask) Do(taskID harmonytask.TaskID, stillOwned func } msg, mss, err := preparePoStMessage(w.api, w.as, maddr, msg, abi.TokenAmount(w.maxDeclareRecoveriesGasFee)) + if err != nil { + return false, xerrors.Errorf("sending declare recoveries message: %w", err) + } mc, err := w.sender.Send(ctx, msg, mss, "declare-recoveries") if err != nil { @@ -279,10 +282,10 @@ func (w *WdPostRecoverDeclareTask) processHeadChange(ctx context.Context, revert } tid := wdTaskIdentity{ - Sp_id: aid, - Proving_period_start: pps, - Deadline_index: declDeadline, - Partition_index: uint64(pidx), + SpID: aid, + ProvingPeriodStart: pps, + DeadlineIndex: declDeadline, + PartitionIndex: uint64(pidx), } tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { @@ -304,10 +307,10 @@ func (w *WdPostRecoverDeclareTask) addTaskToDB(taskId harmonytask.TaskID, taskId partition_index ) VALUES ($1, $2, $3, $4, $5)`, taskId, - taskIdent.Sp_id, - taskIdent.Proving_period_start, - taskIdent.Deadline_index, - taskIdent.Partition_index, + taskIdent.SpID, + taskIdent.ProvingPeriodStart, + taskIdent.DeadlineIndex, + taskIdent.PartitionIndex, ) if err != nil { return false, xerrors.Errorf("insert partition task: %w", err) diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index ba0f63f6d83..fab3a270809 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -149,14 +149,15 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("preparing proof message: %w", err) } - smsg, err := w.sender.Send(context.Background(), msg, mss, "wdpost") + ctx := context.WithValue(context.Background(), lpmessage.CtxTaskID, taskID) + smsg, err := w.sender.Send(ctx, msg, mss, "wdpost") if err != nil { return false, xerrors.Errorf("sending proof message: %w", err) } // set message_cid in the wdpost_proofs entry - _, err = w.db.Exec(context.Background(), `UPDATE wdpost_proofs SET message_cid = $1 WHERE sp_id = $2 AND proving_period_start = $3 AND deadline = $4 AND partition = $5`, smsg.String(), spID, pps, deadline, partition) + _, err = w.db.Exec(ctx, `UPDATE wdpost_proofs SET message_cid = $1 WHERE sp_id = $2 AND proving_period_start = $3 AND deadline = $4 AND partition = $5`, smsg.String(), spID, pps, deadline, partition) if err != nil { return true, xerrors.Errorf("updating wdpost_proofs: %w", err) } @@ -257,7 +258,7 @@ func preparePoStMessage(w MsgPrepAPI, as *ctladdr.AddressSelector, maddr address msg.From = mi.Worker mss := &api.MessageSendSpec{ - MaxFee: abi.TokenAmount(maxFee), + MaxFee: maxFee, } // (optimal) initial estimation with some overestimation that guarantees