From 63926add825db47ecc95ee77e1fbd83e32f14957 Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Sun, 29 Jan 2023 17:43:10 +0800 Subject: [PATCH] feat:opt cmd --- cli/data_transfer.go | 239 +++ cli/market.go | 1277 ----------------- cli/retrieval-asks.go | 183 +++ cli/retrieval-cfg.go | 133 ++ cli/retrieval-deals.go | 263 +--- cli/storage-ask.go | 213 +++ cli/storage-cfg.go | 533 +++++++ cli/storage-deals.go | 418 ++++++ cmd/market-client/retrieval.go | 4 +- cmd/venus-market/main.go | 4 +- ...53\351\200\237\345\220\257\347\224\250.md" | 10 +- retrievalprovider/stream_handler.go | 6 +- 12 files changed, 1740 insertions(+), 1543 deletions(-) create mode 100644 cli/data_transfer.go delete mode 100644 cli/market.go create mode 100644 cli/retrieval-asks.go create mode 100644 cli/retrieval-cfg.go create mode 100644 cli/storage-ask.go create mode 100644 cli/storage-cfg.go create mode 100644 cli/storage-deals.go diff --git a/cli/data_transfer.go b/cli/data_transfer.go new file mode 100644 index 00000000..7b195d85 --- /dev/null +++ b/cli/data_transfer.go @@ -0,0 +1,239 @@ +package cli + +import ( + "context" + "errors" + "fmt" + "os" + "strconv" + "time" + + tm "github.com/buger/goterm" + + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/urfave/cli/v2" +) + +var DataTransfersCmd = &cli.Command{ + Name: "data-transfers", + Usage: "Manage data transfers", + Subcommands: []*cli.Command{ + transfersListCmd, + marketRestartTransfer, + marketCancelTransfer, + }, +} + +var marketRestartTransfer = &cli.Command{ + Name: "restart", + Usage: "Force restart a stalled data transfer", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "peerid", + Usage: "narrow to transfer with specific peer", + }, + &cli.BoolFlag{ + Name: "initiator", + Usage: "specify only transfers where peer is/is not initiator", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return cli.ShowCommandHelp(cctx, cctx.Command.Name) + } + nodeApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64) + if err != nil { + return fmt.Errorf("Error reading transfer ID: %w", err) + } + transferID := datatransfer.TransferID(transferUint) + initiator := cctx.Bool("initiator") + var other peer.ID + if pidstr := cctx.String("peerid"); pidstr != "" { + p, err := peer.Decode(pidstr) + if err != nil { + return err + } + other = p + } else { + channels, err := nodeApi.MarketListDataTransfers(ctx) + if err != nil { + return err + } + found := false + for _, channel := range channels { + if channel.IsInitiator == initiator && channel.TransferID == transferID { + other = channel.OtherPeer + found = true + break + } + } + if !found { + return errors.New("unable to find matching data transfer") + } + } + + return nodeApi.MarketRestartDataTransfer(ctx, transferID, other, initiator) + }, +} + +var marketCancelTransfer = &cli.Command{ + Name: "cancel", + Usage: "Force cancel a data transfer", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "peerid", + Usage: "narrow to transfer with specific peer", + }, + &cli.BoolFlag{ + Name: "initiator", + Usage: "specify only transfers where peer is/is not initiator", + Value: false, + }, + &cli.DurationFlag{ + Name: "cancel-timeout", + Usage: "time to wait for cancel to be sent to client", + Value: 5 * time.Second, + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return cli.ShowCommandHelp(cctx, cctx.Command.Name) + } + nodeApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64) + if err != nil { + return fmt.Errorf("Error reading transfer ID: %w", err) + } + transferID := datatransfer.TransferID(transferUint) + initiator := cctx.Bool("initiator") + var other peer.ID + if pidstr := cctx.String("peerid"); pidstr != "" { + p, err := peer.Decode(pidstr) + if err != nil { + return err + } + other = p + } else { + channels, err := nodeApi.MarketListDataTransfers(ctx) + if err != nil { + return err + } + found := false + for _, channel := range channels { + if channel.IsInitiator == initiator && channel.TransferID == transferID { + other = channel.OtherPeer + found = true + break + } + } + if !found { + return errors.New("unable to find matching data transfer") + } + } + + timeoutCtx, cancel := context.WithTimeout(ctx, cctx.Duration("cancel-timeout")) + defer cancel() + return nodeApi.MarketCancelDataTransfer(timeoutCtx, transferID, other, initiator) + }, +} + +var transfersListCmd = &cli.Command{ + Name: "list", + Usage: "List ongoing data transfers for this miner", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "verbose", + Aliases: []string{"v"}, + Usage: "print verbose transfer details", + }, + &cli.BoolFlag{ + Name: "color", + Usage: "use color in display output", + Value: true, + }, + &cli.BoolFlag{ + Name: "completed", + Usage: "show completed data transfers", + }, + &cli.BoolFlag{ + Name: "watch", + Usage: "watch deal updates in real-time, rather than a one time list", + }, + &cli.BoolFlag{ + Name: "show-failed", + Usage: "show failed/cancelled transfers", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + channels, err := api.MarketListDataTransfers(ctx) + if err != nil { + return err + } + + verbose := cctx.Bool("verbose") + completed := cctx.Bool("completed") + color := cctx.Bool("color") + watch := cctx.Bool("watch") + showFailed := cctx.Bool("show-failed") + if watch { + channelUpdates, err := api.MarketDataTransferUpdates(ctx) + if err != nil { + return err + } + + for { + tm.Clear() // Clear current screen + + tm.MoveCursor(1, 1) + + OutputDataTransferChannels(tm.Screen, channels, verbose, completed, color, showFailed) + + tm.Flush() + + select { + case <-ctx.Done(): + return nil + case channelUpdate := <-channelUpdates: + var found bool + for i, existing := range channels { + if existing.TransferID == channelUpdate.TransferID && + existing.OtherPeer == channelUpdate.OtherPeer && + existing.IsSender == channelUpdate.IsSender && + existing.IsInitiator == channelUpdate.IsInitiator { + channels[i] = channelUpdate + found = true + break + } + } + if !found { + channels = append(channels, channelUpdate) + } + } + } + } + OutputDataTransferChannels(os.Stdout, channels, verbose, completed, color, showFailed) + return nil + }, +} diff --git a/cli/market.go b/cli/market.go deleted file mode 100644 index 18a60193..00000000 --- a/cli/market.go +++ /dev/null @@ -1,1277 +0,0 @@ -package cli - -import ( - "bufio" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "log" - "os" - "path/filepath" - "sort" - "strconv" - "strings" - "text/tabwriter" - "time" - - tm "github.com/buger/goterm" - "github.com/docker/go-units" - "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/urfave/cli/v2" - - "github.com/filecoin-project/go-address" - cborutil "github.com/filecoin-project/go-cbor-util" - datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-state-types/abi" - - "github.com/filecoin-project/venus-market/v2/storageprovider" - - "github.com/filecoin-project/venus/pkg/constants" - "github.com/filecoin-project/venus/venus-shared/types" - "github.com/filecoin-project/venus/venus-shared/types/market" -) - -var storageDealSelectionCmd = &cli.Command{ - Name: "selection", - Usage: "Configure acceptance criteria for storage deal proposals", - Flags: []cli.Flag{ - minerFlag, - }, - Subcommands: []*cli.Command{ - storageDealSelectionShowCmd, - storageDealSelectionResetCmd, - storageDealSelectionRejectCmd, - }, -} - -var storageDealSelectionShowCmd = &cli.Command{ - Name: "list", - Usage: "List storage deal proposal selection criteria", - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - smapi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - onlineOk, err := smapi.DealsConsiderOnlineStorageDeals(DaemonContext(cctx), mAddr) - if err != nil { - return err - } - - offlineOk, err := smapi.DealsConsiderOfflineStorageDeals(DaemonContext(cctx), mAddr) - if err != nil { - return err - } - - verifiedOk, err := smapi.DealsConsiderVerifiedStorageDeals(DaemonContext(cctx), mAddr) - if err != nil { - return err - } - - unverifiedOk, err := smapi.DealsConsiderUnverifiedStorageDeals(DaemonContext(cctx), mAddr) - if err != nil { - return err - } - - fmt.Printf("considering online storage deals: %t\n", onlineOk) - fmt.Printf("considering offline storage deals: %t\n", offlineOk) - fmt.Printf("considering verified storage deals: %t\n", verifiedOk) - fmt.Printf("considering unverified storage deals: %t\n", unverifiedOk) - - return nil - }, -} - -var storageDealSelectionResetCmd = &cli.Command{ - Name: "reset", - Usage: "Reset storage deal proposal selection criteria to default values", - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - smapi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - err = smapi.DealsSetConsiderOnlineStorageDeals(DaemonContext(cctx), mAddr, true) - if err != nil { - return err - } - - err = smapi.DealsSetConsiderOfflineStorageDeals(DaemonContext(cctx), mAddr, true) - if err != nil { - return err - } - - err = smapi.DealsSetConsiderVerifiedStorageDeals(DaemonContext(cctx), mAddr, true) - if err != nil { - return err - } - - err = smapi.DealsSetConsiderUnverifiedStorageDeals(DaemonContext(cctx), mAddr, true) - if err != nil { - return err - } - - return nil - }, -} - -var storageDealSelectionRejectCmd = &cli.Command{ - Name: "reject", - Usage: "Configure criteria which necessitate automatic rejection", - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "online", - }, - &cli.BoolFlag{ - Name: "offline", - }, - &cli.BoolFlag{ - Name: "verified", - }, - &cli.BoolFlag{ - Name: "unverified", - }, - }, - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - smapi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - if cctx.Bool("online") { - err = smapi.DealsSetConsiderOnlineStorageDeals(DaemonContext(cctx), mAddr, false) - if err != nil { - return err - } - } - - if cctx.Bool("offline") { - err = smapi.DealsSetConsiderOfflineStorageDeals(DaemonContext(cctx), mAddr, false) - if err != nil { - return err - } - } - - if cctx.Bool("verified") { - err = smapi.DealsSetConsiderVerifiedStorageDeals(DaemonContext(cctx), mAddr, false) - if err != nil { - return err - } - } - - if cctx.Bool("unverified") { - err = smapi.DealsSetConsiderUnverifiedStorageDeals(DaemonContext(cctx), mAddr, false) - if err != nil { - return err - } - } - - return nil - }, -} - -var setAskCmd = &cli.Command{ - Name: "set-ask", - Usage: "Configure the miner's ask", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "price", - Usage: "Set the price of the ask for unverified deals (specified as FIL / GiB / Epoch) to `PRICE`.", - Required: true, - }, - &cli.StringFlag{ - Name: "verified-price", - Usage: "Set the price of the ask for verified deals (specified as FIL / GiB / Epoch) to `PRICE`", - Required: true, - }, - &cli.StringFlag{ - Name: "min-piece-size", - Usage: "Set minimum piece size (w/bit-padding, in bytes) in ask to `SIZE`", - DefaultText: "256B", - Value: "256B", - }, - &cli.StringFlag{ - Name: "max-piece-size", - Usage: "Set maximum piece size (w/bit-padding, in bytes) in ask to `SIZE`, eg. KiB, MiB, GiB, TiB, PiB", - DefaultText: "miner sector size", - }, - &cli.StringFlag{ - Name: "miner", - Required: true, - }, - }, - Action: func(cctx *cli.Context) error { - ctx := DaemonContext(cctx) - - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - pri, err := types.ParseFIL(cctx.String("price")) - if err != nil { - return err - } - - vpri, err := types.ParseFIL(cctx.String("verified-price")) - if err != nil { - return err - } - - dur, err := time.ParseDuration("720h0m0s") - if err != nil { - return fmt.Errorf("cannot parse duration: %w", err) - } - - qty := dur.Seconds() / float64(constants.MainNetBlockDelaySecs) - - min, err := units.RAMInBytes(cctx.String("min-piece-size")) - if err != nil { - return fmt.Errorf("cannot parse min-piece-size to quantity of bytes: %w", err) - } - - if min < 256 { - return errors.New("minimum piece size (w/bit-padding) is 256B") - } - - max, err := units.RAMInBytes(cctx.String("max-piece-size")) - if err != nil { - return fmt.Errorf("cannot parse max-piece-size to quantity of bytes: %w", err) - } - - maddr, err := address.NewFromString(cctx.String("miner")) - if err != nil { - return fmt.Errorf("para `miner` is invalid: %w", err) - } - - ssize, err := api.ActorSectorSize(ctx, maddr) - if err != nil { - return err - } - - smax := int64(ssize) - - if max == 0 { - max = smax - } - - if max > smax { - return fmt.Errorf("max piece size (w/bit-padding) %s cannot exceed miner sector size %s", types.SizeStr(types.NewInt(uint64(max))), types.SizeStr(types.NewInt(uint64(smax)))) - } - - return api.MarketSetAsk(ctx, maddr, types.BigInt(pri), types.BigInt(vpri), abi.ChainEpoch(qty), abi.PaddedPieceSize(min), abi.PaddedPieceSize(max)) - }, -} - -var getAskCmd = &cli.Command{ - Name: "get-ask", - Usage: "Print the miner's ask", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "miner", - Required: true, - }, - }, - Action: func(cctx *cli.Context) error { - ctx := DaemonContext(cctx) - - fnapi, closer, err := NewFullNode(cctx) - if err != nil { - return err - } - defer closer() - - smapi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - maddr, err := address.NewFromString(cctx.String("miner")) - if err != nil { - return fmt.Errorf("para `miner` is invalid: %w", err) - } - - sask, err := smapi.MarketGetAsk(ctx, maddr) - if err != nil { - return err - } - - var ask *storagemarket.StorageAsk - if sask != nil && sask.Ask != nil { - ask = sask.Ask - } - - w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) - fmt.Fprintf(w, "Price per GiB/Epoch\tVerified\tMin. Piece Size (padded)\tMax. Piece Size (padded)\tExpiry (Epoch)\tExpiry (Appx. Rem. Time)\tSeq. No.\n") - if ask == nil { - fmt.Fprintf(w, "\n") - return w.Flush() - } - - head, err := fnapi.ChainHead(ctx) - if err != nil { - return err - } - - dlt := ask.Expiry - head.Height() - rem := "" - if dlt > 0 { - rem = (time.Second * time.Duration(int64(dlt)*int64(constants.MainNetBlockDelaySecs))).String() - } - - fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\t%s\t%d\n", types.FIL(ask.Price), types.FIL(ask.VerifiedPrice), types.SizeStr(types.NewInt(uint64(ask.MinPieceSize))), types.SizeStr(types.NewInt(uint64(ask.MaxPieceSize))), ask.Expiry, rem, ask.SeqNo) - - return w.Flush() - }, -} - -var StorageDealsCmd = &cli.Command{ - Name: "storage-deals", - Usage: "Manage storage deals and related configuration", - Subcommands: []*cli.Command{ - dealsImportDataCmd, - importOfflineDealCmd, - dealsListCmd, - updateStorageDealStateCmd, - storageDealSelectionCmd, - setAskCmd, - getAskCmd, - setBlocklistCmd, - getBlocklistCmd, - resetBlocklistCmd, - expectedSealDurationCmd, - maxDealStartDelayCmd, - dealsPublishMsgPeriodCmd, - dealsPendingPublish, - }, -} - -var dealsImportDataCmd = &cli.Command{ - Name: "import-data", - Usage: "Manually import data for a deal", - ArgsUsage: " ", - Action: func(cctx *cli.Context) error { - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - ctx := DaemonContext(cctx) - - if cctx.Args().Len() < 2 { - return fmt.Errorf("must specify proposal CID and file path") - } - - propCid, err := cid.Decode(cctx.Args().Get(0)) - if err != nil { - return err - } - - fpath := cctx.Args().Get(1) - - return api.DealsImportData(ctx, propCid, fpath) - }, -} - -var importOfflineDealCmd = &cli.Command{ - Name: "import-offlinedeal", - Usage: "Manually import offline deal", - ArgsUsage: "", - Flags: []cli.Flag{ - // verbose - &cli.BoolFlag{ - Name: "verbose", - Usage: "Print verbose output", - Aliases: []string{ - "v", - }, - }, - }, - Action: func(cctx *cli.Context) error { - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - ctx := DaemonContext(cctx) - - if cctx.Args().Len() < 1 { - return fmt.Errorf("must specify the path of json file which records the deal") - } - - fpath := cctx.Args().Get(0) - - dealbyte, err := ioutil.ReadFile(fpath) - if err != nil { - return fmt.Errorf("read deal file(%s) fail %w", fpath, err) - } - - data := []market.MinerDeal{} - err = json.Unmarshal(dealbyte, &data) - if err != nil { - return fmt.Errorf("parse deal file(%s) fail %w", fpath, err) - } - - totalCount := len(data) - importedCount := 0 - - // if verbose, print the deal info - - for i := 0; i < totalCount; i++ { - err := api.OfflineDealImport(ctx, data[i]) - if err != nil { - if cctx.Bool("verbose") { - fmt.Printf("( %d / %d ) %s : fail : %v\n", i+1, totalCount, data[i].ProposalCid, err) - } - } else { - importedCount++ - if cctx.Bool("verbose") { - fmt.Printf("( %d / %d ) %s : success\n", i+1, totalCount, data[i].ProposalCid) - } - } - } - - fmt.Printf("import %d deals, %d deal success , %d deal fail .\n", totalCount, importedCount, totalCount-importedCount) - - return nil - }, -} - -var dealsListCmd = &cli.Command{ - Name: "list", - Usage: "List all deals for this miner", - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "verbose", - Aliases: []string{"v"}, - }, - &cli.BoolFlag{ - Name: "watch", - Usage: "watch deal updates in real-time, rather than a one time list", - }, - &cli.StringFlag{ - Name: "miner", - }, - }, - Action: func(cctx *cli.Context) error { - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - maddr := address.Undef - if cctx.IsSet("miner") { - maddr, err = address.NewFromString(cctx.String("miner")) - if err != nil { - return fmt.Errorf("para `miner` is invalid: %w", err) - } - } - - ctx := DaemonContext(cctx) - deals, err := api.MarketListIncompleteDeals(ctx, maddr) - if err != nil { - return err - } - - verbose := cctx.Bool("verbose") - watch := cctx.Bool("watch") - - if watch { - updates, err := api.MarketGetDealUpdates(ctx) - if err != nil { - return err - } - - for { - tm.Clear() - tm.MoveCursor(1, 1) - - err = outputStorageDeals(tm.Output, deals, verbose) - if err != nil { - return err - } - - tm.Flush() - - select { - case <-ctx.Done(): - return nil - case updated := <-updates: - var found bool - for i, existing := range deals { - if existing.ProposalCid.Equals(updated.ProposalCid) { - deals[i] = updated - found = true - break - } - } - if !found { - deals = append(deals, updated) - } - } - } - } - - return outputStorageDeals(os.Stdout, deals, verbose) - }, -} - -var dealStateUsage = func() string { - const c, spliter = 5, " | " - size := len(storageprovider.StringToStorageState) - states := make([]string, 0, size+size/c) - idx := 0 - for s := range storageprovider.StringToStorageState { - states = append(states, s) - idx++ - states = append(states, spliter) - if idx%c == 0 { - states = append(states, "\n\t") - continue - } - } - - usage := strings.Join(states, "") - { - size := len(usage) - if size > 3 && usage[size-3:] == spliter { - usage = usage[:size-3] - } - } - return usage + ", set to 'StorageDealUnknown' means no change" -} - -var updateStorageDealStateCmd = &cli.Command{ - Name: "update", - Usage: "update deal status", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "proposalcid", - Required: true, - }, - &cli.BoolFlag{ - Name: "really-do-it", - Usage: "Actually send transaction performing the action", - Value: false, - }, - &cli.StringFlag{ - Name: "piece-state", - Usage: "Undefine | Assigned | Packing | Proving, empty means no change", - }, - &cli.StringFlag{ - Name: "state", - Usage: dealStateUsage(), - }, - }, - Action: func(cctx *cli.Context) error { - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - ctx := DaemonContext(cctx) - proposalCid, err := cid.Decode(cctx.String("proposalcid")) - if err != nil { - return err - } - var isParamOk bool - var state storagemarket.StorageDealStatus - var pieceStatus market.PieceStatus - - if cctx.IsSet("state") { - isParamOk = true - state = storageprovider.StringToStorageState[cctx.String("state")] - } - - if cctx.IsSet("piece-state") { - pieceStatus = market.PieceStatus(cctx.String("piece-state")) - isParamOk = true - } - - if !isParamOk { - return fmt.Errorf("must set 'state' or 'piece-state'") - } - - if !cctx.Bool("really-do-it") { - fmt.Println("Pass --really-do-it to actually execute this action") - return nil - } - - return api.UpdateStorageDealStatus(ctx, proposalCid, state, pieceStatus) - }, -} - -func outputStorageDeals(out io.Writer, deals []market.MinerDeal, verbose bool) error { - sort.Slice(deals, func(i, j int) bool { - return deals[i].CreationTime.Time().Before(deals[j].CreationTime.Time()) - }) - - w := tabwriter.NewWriter(out, 2, 4, 2, ' ', 0) - - if verbose { - _, _ = fmt.Fprintf(w, "Creation\tVerified\tProposalCid\tDealId\tState\tPieceState\tClient\tProvider\tSize\tPrice\tDuration\tTransferChannelID\tAddFundCid\tPublishCid\tMessage\n") - } else { - _, _ = fmt.Fprintf(w, "ProposalCid\tDealId\tState\tPieceState\tClient\tProvider\tSize\tPrice\tDuration\n") - } - - for _, deal := range deals { - propcid := deal.ProposalCid.String() - if !verbose { - propcid = "..." + propcid[len(propcid)-8:] - } - - fil := types.FIL(types.BigMul(deal.Proposal.StoragePricePerEpoch, types.NewInt(uint64(deal.Proposal.Duration())))) - - if verbose { - _, _ = fmt.Fprintf(w, "%s\t%t\t", deal.CreationTime.Time().Format(time.Stamp), deal.Proposal.VerifiedDeal) - } - - _, _ = fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s", propcid, deal.DealID, storagemarket.DealStates[deal.State], deal.PieceStatus, - deal.Proposal.Client, deal.Proposal.Provider, units.BytesSize(float64(deal.Proposal.PieceSize)), fil, deal.Proposal.Duration()) - if verbose { - tchid := "" - if deal.TransferChannelID != nil { - tchid = deal.TransferChannelID.String() - } - - addFundcid := "" - if deal.AddFundsCid != nil { - addFundcid = deal.AddFundsCid.String() - } - - pubcid := "" - if deal.PublishCid != nil { - pubcid = deal.PublishCid.String() - } - - _, _ = fmt.Fprintf(w, "\t%s", tchid) - _, _ = fmt.Fprintf(w, "\t%s", addFundcid) - _, _ = fmt.Fprintf(w, "\t%s", pubcid) - _, _ = fmt.Fprintf(w, "\t%s", deal.Message) - } - - _, _ = fmt.Fprintln(w) - } - - return w.Flush() -} - -var getBlocklistCmd = &cli.Command{ - Name: "get-blocklist", - Usage: "List the contents of the miner's piece CID blocklist", - Flags: []cli.Flag{ - &CidBaseFlag, - minerFlag, - }, - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - blocklist, err := api.DealsPieceCidBlocklist(DaemonContext(cctx), mAddr) - if err != nil { - return err - } - - encoder, err := GetCidEncoder(cctx) - if err != nil { - return err - } - - for idx := range blocklist { - fmt.Println(encoder.Encode(blocklist[idx])) - } - - return nil - }, -} - -var setBlocklistCmd = &cli.Command{ - Name: "set-blocklist", - Usage: "Set the miner's list of blocklisted piece CIDs", - ArgsUsage: "[ (optional, will read from stdin if omitted)]", - Flags: []cli.Flag{ - minerFlag, - }, - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - scanner := bufio.NewScanner(os.Stdin) - if cctx.Args().Present() && cctx.Args().First() != "-" { - absPath, err := filepath.Abs(cctx.Args().First()) - if err != nil { - return err - } - - file, err := os.Open(absPath) - if err != nil { - log.Fatal(err) - } - defer file.Close() //nolint:errcheck - - scanner = bufio.NewScanner(file) - } - - var blocklist []cid.Cid - for scanner.Scan() { - decoded, err := cid.Decode(scanner.Text()) - if err != nil { - return err - } - - blocklist = append(blocklist, decoded) - } - - err = scanner.Err() - if err != nil { - return err - } - - return api.DealsSetPieceCidBlocklist(DaemonContext(cctx), mAddr, blocklist) - }, -} - -var resetBlocklistCmd = &cli.Command{ - Name: "reset-blocklist", - Usage: "Remove all entries from the miner's piece CID blocklist", - Flags: []cli.Flag{ - minerFlag, - }, - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - return api.DealsSetPieceCidBlocklist(DaemonContext(cctx), mAddr, []cid.Cid{}) - }, -} - -var expectedSealDurationCmd = &cli.Command{ - Name: "seal-duration", - Usage: "Configure the expected time, that you expect sealing sectors to take. Deals that start before this duration will be rejected.", - Flags: []cli.Flag{ - minerFlag, - }, - Subcommands: []*cli.Command{ - expectedSealDurationGetCmd, - expectedSealDurationSetCmd, - }, -} - -var expectedSealDurationGetCmd = &cli.Command{ - Name: "get", - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - marketApi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - ctx := ReqContext(cctx) - t, err := marketApi.SectorGetExpectedSealDuration(ctx, mAddr) - if err != nil { - return err - } - fmt.Println("seal-duration: ", t.String()) - return nil - }, -} - -var expectedSealDurationSetCmd = &cli.Command{ - Name: "set-seal-duration", - Usage: "eg. '1m','30s',...", - ArgsUsage: "", - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - marketApi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - ctx := ReqContext(cctx) - if cctx.Args().Len() != 1 { - return fmt.Errorf("must pass duration") - } - - d, err := time.ParseDuration(cctx.Args().Get(0)) - if err != nil { - return fmt.Errorf("could not parse duration: %w", err) - } - - return marketApi.SectorSetExpectedSealDuration(ctx, mAddr, d) - }, -} - -var maxDealStartDelayCmd = &cli.Command{ - Name: "max-start-delay", - Usage: "Configure the maximum amount of time proposed deal StartEpoch can be in future.", - Flags: []cli.Flag{ - minerFlag, - }, - Subcommands: []*cli.Command{ - maxDealStartDelayGetCmd, - maxDealStartDelaySetCmd, - }, -} - -var maxDealStartDelayGetCmd = &cli.Command{ - Name: "get", - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - marketApi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - ctx := ReqContext(cctx) - t, err := marketApi.DealsMaxStartDelay(ctx, mAddr) - if err != nil { - return err - } - fmt.Println("max start delay: ", t.String()) - return nil - }, -} - -var maxDealStartDelaySetCmd = &cli.Command{ - Name: "set", - Usage: "eg. '1m','30s',...", - ArgsUsage: "", - Flags: []cli.Flag{ - minerFlag, - }, - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - marketApi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - ctx := ReqContext(cctx) - if cctx.Args().Len() != 1 { - return fmt.Errorf("must pass duration") - } - - delay, err := time.ParseDuration(cctx.Args().Get(0)) - if err != nil { - return fmt.Errorf("could not parse duration: %w", err) - } - - return marketApi.DealsSetMaxStartDelay(ctx, mAddr, delay) - }, -} - -var dealsPublishMsgPeriodCmd = &cli.Command{ - Name: "max-start-delay", - Usage: "Configure the the amount of time to wait for more deals to be ready to publish before publishing them all as a batch.", - Flags: []cli.Flag{ - minerFlag, - }, - Subcommands: []*cli.Command{ - dealsPublishMsgPeriodGetCmd, - dealsPublishMsgPeriodSetCmd, - }, -} - -var dealsPublishMsgPeriodGetCmd = &cli.Command{ - Name: "get", - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - marketApi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - ctx := ReqContext(cctx) - t, err := marketApi.DealsPublishMsgPeriod(ctx, mAddr) - if err != nil { - return err - } - fmt.Println("publish msg period: ", t.String()) - return nil - }, -} - -var dealsPublishMsgPeriodSetCmd = &cli.Command{ - Name: "set", - Usage: "eg. '1m','30s',...", - ArgsUsage: "", - Flags: []cli.Flag{ - minerFlag, - }, - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - marketApi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - ctx := ReqContext(cctx) - if cctx.Args().Len() != 1 { - return fmt.Errorf("must pass duration") - } - - period, err := time.ParseDuration(cctx.Args().Get(0)) - if err != nil { - return fmt.Errorf("could not parse duration: %w", err) - } - return marketApi.DealsSetPublishMsgPeriod(ctx, mAddr, period) - }, -} - -var DataTransfersCmd = &cli.Command{ - Name: "data-transfers", - Usage: "Manage data transfers", - Subcommands: []*cli.Command{ - transfersListCmd, - marketRestartTransfer, - marketCancelTransfer, - }, -} - -var marketRestartTransfer = &cli.Command{ - Name: "restart", - Usage: "Force restart a stalled data transfer", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "peerid", - Usage: "narrow to transfer with specific peer", - }, - &cli.BoolFlag{ - Name: "initiator", - Usage: "specify only transfers where peer is/is not initiator", - Value: false, - }, - }, - Action: func(cctx *cli.Context) error { - if !cctx.Args().Present() { - return cli.ShowCommandHelp(cctx, cctx.Command.Name) - } - nodeApi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - ctx := ReqContext(cctx) - - transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64) - if err != nil { - return fmt.Errorf("Error reading transfer ID: %w", err) - } - transferID := datatransfer.TransferID(transferUint) - initiator := cctx.Bool("initiator") - var other peer.ID - if pidstr := cctx.String("peerid"); pidstr != "" { - p, err := peer.Decode(pidstr) - if err != nil { - return err - } - other = p - } else { - channels, err := nodeApi.MarketListDataTransfers(ctx) - if err != nil { - return err - } - found := false - for _, channel := range channels { - if channel.IsInitiator == initiator && channel.TransferID == transferID { - other = channel.OtherPeer - found = true - break - } - } - if !found { - return errors.New("unable to find matching data transfer") - } - } - - return nodeApi.MarketRestartDataTransfer(ctx, transferID, other, initiator) - }, -} - -var marketCancelTransfer = &cli.Command{ - Name: "cancel", - Usage: "Force cancel a data transfer", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "peerid", - Usage: "narrow to transfer with specific peer", - }, - &cli.BoolFlag{ - Name: "initiator", - Usage: "specify only transfers where peer is/is not initiator", - Value: false, - }, - &cli.DurationFlag{ - Name: "cancel-timeout", - Usage: "time to wait for cancel to be sent to client", - Value: 5 * time.Second, - }, - }, - Action: func(cctx *cli.Context) error { - if !cctx.Args().Present() { - return cli.ShowCommandHelp(cctx, cctx.Command.Name) - } - nodeApi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - ctx := ReqContext(cctx) - - transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64) - if err != nil { - return fmt.Errorf("Error reading transfer ID: %w", err) - } - transferID := datatransfer.TransferID(transferUint) - initiator := cctx.Bool("initiator") - var other peer.ID - if pidstr := cctx.String("peerid"); pidstr != "" { - p, err := peer.Decode(pidstr) - if err != nil { - return err - } - other = p - } else { - channels, err := nodeApi.MarketListDataTransfers(ctx) - if err != nil { - return err - } - found := false - for _, channel := range channels { - if channel.IsInitiator == initiator && channel.TransferID == transferID { - other = channel.OtherPeer - found = true - break - } - } - if !found { - return errors.New("unable to find matching data transfer") - } - } - - timeoutCtx, cancel := context.WithTimeout(ctx, cctx.Duration("cancel-timeout")) - defer cancel() - return nodeApi.MarketCancelDataTransfer(timeoutCtx, transferID, other, initiator) - }, -} - -var transfersListCmd = &cli.Command{ - Name: "list", - Usage: "List ongoing data transfers for this miner", - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "verbose", - Aliases: []string{"v"}, - Usage: "print verbose transfer details", - }, - &cli.BoolFlag{ - Name: "color", - Usage: "use color in display output", - Value: true, - }, - &cli.BoolFlag{ - Name: "completed", - Usage: "show completed data transfers", - }, - &cli.BoolFlag{ - Name: "watch", - Usage: "watch deal updates in real-time, rather than a one time list", - }, - &cli.BoolFlag{ - Name: "show-failed", - Usage: "show failed/cancelled transfers", - }, - }, - Action: func(cctx *cli.Context) error { - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - ctx := ReqContext(cctx) - - channels, err := api.MarketListDataTransfers(ctx) - if err != nil { - return err - } - - verbose := cctx.Bool("verbose") - completed := cctx.Bool("completed") - color := cctx.Bool("color") - watch := cctx.Bool("watch") - showFailed := cctx.Bool("show-failed") - if watch { - channelUpdates, err := api.MarketDataTransferUpdates(ctx) - if err != nil { - return err - } - - for { - tm.Clear() // Clear current screen - - tm.MoveCursor(1, 1) - - OutputDataTransferChannels(tm.Screen, channels, verbose, completed, color, showFailed) - - tm.Flush() - - select { - case <-ctx.Done(): - return nil - case channelUpdate := <-channelUpdates: - var found bool - for i, existing := range channels { - if existing.TransferID == channelUpdate.TransferID && - existing.OtherPeer == channelUpdate.OtherPeer && - existing.IsSender == channelUpdate.IsSender && - existing.IsInitiator == channelUpdate.IsInitiator { - channels[i] = channelUpdate - found = true - break - } - } - if !found { - channels = append(channels, channelUpdate) - } - } - } - } - OutputDataTransferChannels(os.Stdout, channels, verbose, completed, color, showFailed) - return nil - }, -} - -var dealsPendingPublish = &cli.Command{ - Name: "pending-publish", - Usage: "list deals waiting in publish queue", - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "publish-now", - Usage: "send a publish message now", - }, - }, - Action: func(cctx *cli.Context) error { - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - ctx := ReqContext(cctx) - - if cctx.Bool("publish-now") { - if err := api.MarketPublishPendingDeals(ctx); err != nil { - return fmt.Errorf("publishing deals: %w", err) - } - fmt.Println("triggered deal publishing") - return nil - } - - pendings, err := api.MarketPendingDeals(ctx) - if err != nil { - return fmt.Errorf("getting pending deals: %w", err) - } - - for _, pending := range pendings { - if len(pending.Deals) > 0 { - endsIn := time.Until(pending.PublishPeriodStart.Add(pending.PublishPeriod)) - w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) - _, _ = fmt.Fprintf(w, "Publish period: %s (ends in %s)\n", pending.PublishPeriod, endsIn.Round(time.Second)) - _, _ = fmt.Fprintf(w, "First deal queued at: %s\n", pending.PublishPeriodStart) - _, _ = fmt.Fprintf(w, "Deals will be published at: %s\n", pending.PublishPeriodStart.Add(pending.PublishPeriod)) - _, _ = fmt.Fprintf(w, "%d deals queued to be published:\n", len(pending.Deals)) - _, _ = fmt.Fprintf(w, "ProposalCID\tClient\tSize\n") - for _, deal := range pending.Deals { - proposalNd, err := cborutil.AsIpld(&deal) // nolint - if err != nil { - return err - } - - _, _ = fmt.Fprintf(w, "%s\t%s\t%s\n", proposalNd.Cid(), deal.Proposal.Client, units.BytesSize(float64(deal.Proposal.PieceSize))) - } - return w.Flush() - } - } - - fmt.Println("No deals queued to be published") - return nil - }, -} diff --git a/cli/retrieval-asks.go b/cli/retrieval-asks.go new file mode 100644 index 00000000..35a58ba7 --- /dev/null +++ b/cli/retrieval-asks.go @@ -0,0 +1,183 @@ +package cli + +import ( + "errors" + "fmt" + "os" + "strings" + "text/tabwriter" + + "github.com/filecoin-project/venus/venus-shared/types" + + "github.com/docker/go-units" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-state-types/abi" + "github.com/urfave/cli/v2" +) + +var retirevalAsksCmds = &cli.Command{ + Name: "ask", + Usage: "Configure retrieval asks", + Subcommands: []*cli.Command{ + retrievalGetAskCmd, + retrievalSetAskCmd, + }, +} + +var retrievalSetAskCmd = &cli.Command{ + Name: "set", + ArgsUsage: "", + Usage: "Configure(set/update)the provider's retrieval ask", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "price", + Usage: "Set the price of the ask for retrievals (FIL/GiB)", + Value: "0", + }, + &cli.StringFlag{ + Name: "unseal-price", + Usage: "Set the price to unseal", + Value: "0", + }, + &cli.StringFlag{ + Name: "payment-interval", + Usage: "Set the payment interval (in bytes) for retrieval", + Value: "1MiB", + }, + &cli.StringFlag{ + Name: "payment-interval-increase", + Usage: "Set the payment interval increase (in bytes) for retrieval", + Value: "1MiB", + }, + }, + Action: func(cctx *cli.Context) error { + ctx := DaemonContext(cctx) + + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := address.NewFromString(cctx.Args().Get(0)) + if err != nil { + return err + } + + isUpdate := true + ask, err := api.MarketGetRetrievalAsk(ctx, mAddr) + if err != nil { + if !strings.Contains(err.Error(), "record not found") { + return err + } + ask = &retrievalmarket.Ask{} + isUpdate = false + } + + if isUpdate { + if cctx.IsSet("price") { + v, err := types.ParseFIL(cctx.String("price")) + if err != nil { + return err + } + ask.PricePerByte = types.BigDiv(types.BigInt(v), types.NewInt(1<<30)) + } + if cctx.IsSet("unseal-price") { + v, err := types.ParseFIL(cctx.String("unseal-price")) + if err != nil { + return err + } + ask.UnsealPrice = abi.TokenAmount(v) + } + + if cctx.IsSet("payment-interval") { + v, err := units.RAMInBytes(cctx.String("payment-interval")) + if err != nil { + return err + } + ask.PaymentInterval = uint64(v) + } + + if cctx.IsSet("payment-interval-increase") { + v, err := units.RAMInBytes(cctx.String("payment-interval-increase")) + if err != nil { + return err + } + ask.PaymentIntervalIncrease = uint64(v) + } + } else { + price, err := types.ParseFIL(cctx.String("price")) + if err != nil { + return err + } + ask.PricePerByte = types.BigDiv(types.BigInt(price), types.NewInt(1<<30)) + + unsealPrice, err := types.ParseFIL(cctx.String("unseal-price")) + if err != nil { + return err + } + ask.UnsealPrice = abi.TokenAmount(unsealPrice) + + paymentInterval, err := units.RAMInBytes(cctx.String("payment-interval")) + if err != nil { + return err + } + ask.PaymentInterval = uint64(paymentInterval) + + paymentIntervalIncrease, err := units.RAMInBytes(cctx.String("payment-interval-increase")) + if err != nil { + return err + } + ask.PaymentIntervalIncrease = uint64(paymentIntervalIncrease) + } + + return api.MarketSetRetrievalAsk(ctx, mAddr, ask) + }, +} + +var retrievalGetAskCmd = &cli.Command{ + Name: "get", + ArgsUsage: "", + Usage: "Get the provider's current retrieval ask", + Action: func(cctx *cli.Context) error { + ctx := DaemonContext(cctx) + + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := address.NewFromString(cctx.Args().Get(0)) + if err != nil { + return err + } + + ask, err := api.MarketGetRetrievalAsk(ctx, mAddr) + if err != nil { + return err + } + + w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) + fmt.Fprintf(w, "Price per Byte\tUnseal Price\tPayment Interval\tPayment Interval Increase\n") + if ask == nil { + fmt.Fprintf(w, "\n") + return w.Flush() + } + + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", + types.FIL(ask.PricePerByte), + types.FIL(ask.UnsealPrice), + units.BytesSize(float64(ask.PaymentInterval)), + units.BytesSize(float64(ask.PaymentIntervalIncrease)), + ) + return w.Flush() + }, +} diff --git a/cli/retrieval-cfg.go b/cli/retrieval-cfg.go new file mode 100644 index 00000000..81233548 --- /dev/null +++ b/cli/retrieval-cfg.go @@ -0,0 +1,133 @@ +package cli + +import ( + "errors" + "fmt" + + "github.com/urfave/cli/v2" +) + +// If more configurations appear in the future, they need to be changed to retrieval-cfg. +var retrievalDealSelectionCmds = &cli.Command{ + Name: "selection", + Usage: "Configure acceptance criteria for retrieval deal proposals", + Subcommands: []*cli.Command{ + retrievalDealSelectionShowCmd, + retrievalDealSelectionResetCmd, + retrievalDealSelectionRejectCmd, + }, +} + +var retrievalDealSelectionShowCmd = &cli.Command{ + Name: "list", + ArgsUsage: "", + Usage: "List retrieval deal proposal selection criteria", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + smapi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + onlineOk, err := smapi.DealsConsiderOnlineRetrievalDeals(DaemonContext(cctx), mAddr) + if err != nil { + return err + } + + offlineOk, err := smapi.DealsConsiderOfflineRetrievalDeals(DaemonContext(cctx), mAddr) + if err != nil { + return err + } + + fmt.Printf("considering online retrieval deals: %t\n", onlineOk) + fmt.Printf("considering offline retrieval deals: %t\n", offlineOk) + + return nil + }, +} + +var retrievalDealSelectionResetCmd = &cli.Command{ + Name: "reset", + Usage: "Reset retrieval deal proposal selection criteria to default values", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + smapi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + err = smapi.DealsSetConsiderOnlineRetrievalDeals(DaemonContext(cctx), mAddr, true) + if err != nil { + return err + } + + err = smapi.DealsSetConsiderOfflineRetrievalDeals(DaemonContext(cctx), mAddr, true) + if err != nil { + return err + } + + return nil + }, +} + +var retrievalDealSelectionRejectCmd = &cli.Command{ + Name: "reject", + Usage: "Configure criteria which necessitate automatic rejection", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "online", + }, + &cli.BoolFlag{ + Name: "offline", + }, + }, + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + smapi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + if cctx.Bool("online") { + err = smapi.DealsSetConsiderOnlineRetrievalDeals(DaemonContext(cctx), mAddr, false) + if err != nil { + return err + } + } + + if cctx.Bool("offline") { + err = smapi.DealsSetConsiderOfflineRetrievalDeals(DaemonContext(cctx), mAddr, false) + if err != nil { + return err + } + } + + return nil + }, +} diff --git a/cli/retrieval-deals.go b/cli/retrieval-deals.go index 96cd45c9..a0207374 100644 --- a/cli/retrieval-deals.go +++ b/cli/retrieval-deals.go @@ -5,143 +5,25 @@ import ( "os" "text/tabwriter" - "github.com/filecoin-project/go-address" - - "github.com/docker/go-units" "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/go-state-types/abi" "github.com/urfave/cli/v2" - - "github.com/filecoin-project/venus/venus-shared/types" ) -var RetrievalDealsCmd = &cli.Command{ - Name: "retrieval-deals", +var RetrievalCmds = &cli.Command{ + Name: "retrieval", Usage: "Manage retrieval deals and related configuration", Subcommands: []*cli.Command{ - retrievalDealSelectionCmd, - retrievalDealsListCmd, - retrievalSetAskCmd, - retrievalGetAskCmd, + retrievalDealsCmds, + retirevalAsksCmds, + retrievalDealSelectionCmds, }, } -var retrievalDealSelectionCmd = &cli.Command{ - Name: "selection", - Usage: "Configure acceptance criteria for retrieval deal proposals", +var retrievalDealsCmds = &cli.Command{ + Name: "deal", + Usage: "Manage retrieval deals and related configuration", Subcommands: []*cli.Command{ - retrievalDealSelectionShowCmd, - retrievalDealSelectionResetCmd, - retrievalDealSelectionRejectCmd, - }, -} - -var retrievalDealSelectionShowCmd = &cli.Command{ - Name: "list", - Usage: "List retrieval deal proposal selection criteria", - Flags: []cli.Flag{ - minerFlag, - }, - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - smapi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - onlineOk, err := smapi.DealsConsiderOnlineRetrievalDeals(DaemonContext(cctx), mAddr) - if err != nil { - return err - } - - offlineOk, err := smapi.DealsConsiderOfflineRetrievalDeals(DaemonContext(cctx), mAddr) - if err != nil { - return err - } - - fmt.Printf("considering online retrieval deals: %t\n", onlineOk) - fmt.Printf("considering offline retrieval deals: %t\n", offlineOk) - - return nil - }, -} - -var retrievalDealSelectionResetCmd = &cli.Command{ - Name: "reset", - Usage: "Reset retrieval deal proposal selection criteria to default values", - Flags: []cli.Flag{ - minerFlag, - }, - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - smapi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - err = smapi.DealsSetConsiderOnlineRetrievalDeals(DaemonContext(cctx), mAddr, true) - if err != nil { - return err - } - - err = smapi.DealsSetConsiderOfflineRetrievalDeals(DaemonContext(cctx), mAddr, true) - if err != nil { - return err - } - - return nil - }, -} - -var retrievalDealSelectionRejectCmd = &cli.Command{ - Name: "reject", - Usage: "Configure criteria which necessitate automatic rejection", - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "online", - }, - &cli.BoolFlag{ - Name: "offline", - }, - minerFlag, - }, - Action: func(cctx *cli.Context) error { - mAddr, err := shouldAddress(cctx.String("miner"), false, false) - if err != nil { - return fmt.Errorf("invalid miner address: %w", err) - } - - smapi, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - if cctx.Bool("online") { - err = smapi.DealsSetConsiderOnlineRetrievalDeals(DaemonContext(cctx), mAddr, false) - if err != nil { - return err - } - } - - if cctx.Bool("offline") { - err = smapi.DealsSetConsiderOfflineRetrievalDeals(DaemonContext(cctx), mAddr, false) - if err != nil { - return err - } - } - - return nil + retrievalDealsListCmd, }, } @@ -183,130 +65,3 @@ var retrievalDealsListCmd = &cli.Command{ return w.Flush() }, } - -var retrievalSetAskCmd = &cli.Command{ - Name: "set-ask", - Usage: "Configure the provider's retrieval ask", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "price", - Usage: "Set the price of the ask for retrievals (FIL/GiB)", - }, - &cli.StringFlag{ - Name: "unseal-price", - Usage: "Set the price to unseal", - }, - &cli.StringFlag{ - Name: "payment-interval", - Usage: "Set the payment interval (in bytes) for retrieval", - DefaultText: "1MiB", - }, - &cli.StringFlag{ - Name: "payment-interval-increase", - Usage: "Set the payment interval increase (in bytes) for retrieval", - DefaultText: "1MiB", - }, - &cli.StringFlag{ - Name: "payment-addr", - Required: true, - }, - }, - Action: func(cctx *cli.Context) error { - ctx := DaemonContext(cctx) - - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - mAddr, err := address.NewFromString(cctx.String("payment-addr")) - if err != nil { - return err - } - - ask, err := api.MarketGetRetrievalAsk(ctx, mAddr) - if err != nil { - if err.Error() != "record not found" { - return err - } - ask = &retrievalmarket.Ask{} - } - - if cctx.IsSet("price") { - v, err := types.ParseFIL(cctx.String("price")) - if err != nil { - return err - } - ask.PricePerByte = types.BigDiv(types.BigInt(v), types.NewInt(1<<30)) - } - - if cctx.IsSet("unseal-price") { - v, err := types.ParseFIL(cctx.String("unseal-price")) - if err != nil { - return err - } - ask.UnsealPrice = abi.TokenAmount(v) - } - - if cctx.IsSet("payment-interval") { - v, err := units.RAMInBytes(cctx.String("payment-interval")) - if err != nil { - return err - } - ask.PaymentInterval = uint64(v) - } - - if cctx.IsSet("payment-interval-increase") { - v, err := units.RAMInBytes(cctx.String("payment-interval-increase")) - if err != nil { - return err - } - ask.PaymentIntervalIncrease = uint64(v) - } - - return api.MarketSetRetrievalAsk(ctx, mAddr, ask) - }, -} - -var retrievalGetAskCmd = &cli.Command{ - Name: "get-ask", - Usage: "Get the provider's current retrieval ask", - Flags: []cli.Flag{ - requiredMinerFlag, - }, - Action: func(cctx *cli.Context) error { - ctx := DaemonContext(cctx) - - api, closer, err := NewMarketNode(cctx) - if err != nil { - return err - } - defer closer() - - mAddr, err := address.NewFromString(cctx.String("miner")) - if err != nil { - return err - } - - ask, err := api.MarketGetRetrievalAsk(ctx, mAddr) - if err != nil { - return err - } - - w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) - fmt.Fprintf(w, "Price per Byte\tUnseal Price\tPayment Interval\tPayment Interval Increase\n") - if ask == nil { - fmt.Fprintf(w, "\n") - return w.Flush() - } - - fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", - types.FIL(ask.PricePerByte), - types.FIL(ask.UnsealPrice), - units.BytesSize(float64(ask.PaymentInterval)), - units.BytesSize(float64(ask.PaymentIntervalIncrease)), - ) - return w.Flush() - }, -} diff --git a/cli/storage-ask.go b/cli/storage-ask.go new file mode 100644 index 00000000..ca7558aa --- /dev/null +++ b/cli/storage-ask.go @@ -0,0 +1,213 @@ +package cli + +import ( + "errors" + "fmt" + "os" + "strings" + "text/tabwriter" + "time" + + "github.com/filecoin-project/venus/venus-shared/types/market" + + "github.com/docker/go-units" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/venus/pkg/constants" + "github.com/filecoin-project/venus/venus-shared/types" +) + +var storageAsksCmds = &cli.Command{ + Name: "ask", + Usage: "Configure storage asks", + Subcommands: []*cli.Command{ + setAskCmd, + getAskCmd, + }, +} + +var setAskCmd = &cli.Command{ + Name: "set", + ArgsUsage: "", + Usage: "Configure(set/update) the miner's ask", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "price", + Usage: "Set the price of the ask for unverified deals (specified as FIL / GiB / Epoch) to `PRICE`.", + Value: "0", + }, + &cli.StringFlag{ + Name: "verified-price", + Usage: "Set the price of the ask for verified deals (specified as FIL / GiB / Epoch) to `PRICE`", + Value: "0", + }, + &cli.StringFlag{ + Name: "min-piece-size", + Usage: "Set minimum piece size (w/bit-padding, in bytes) in ask to `SIZE`", + DefaultText: "256B", + Value: "256B", + }, + &cli.StringFlag{ + Name: "max-piece-size", + Usage: "Set maximum piece size (w/bit-padding, in bytes) in ask to `SIZE`, eg. KiB, MiB, GiB, TiB, PiB", + DefaultText: "miner sector size", + Value: "0", //default to use miner's size + }, + }, + Action: func(cctx *cli.Context) error { + ctx := DaemonContext(cctx) + + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := address.NewFromString(cctx.Args().Get(0)) + if err != nil { + return fmt.Errorf("para `miner` is invalid: %w", err) + } + + isUpdate := true + storageAsk, err := api.MarketGetAsk(ctx, mAddr) + if err != nil { + if !strings.Contains(err.Error(), "record not found") { + return err + } + storageAsk = &market.SignedStorageAsk{} + isUpdate = false + } + + pri, err := types.ParseFIL(cctx.String("price")) + if err != nil { + return err + } + + vpri, err := types.ParseFIL(cctx.String("verified-price")) + if err != nil { + return err + } + + dur, err := time.ParseDuration("720h0m0s") + if err != nil { + return fmt.Errorf("cannot parse duration: %w", err) + } + + qty := dur.Seconds() / float64(constants.MainNetBlockDelaySecs) + + min, err := units.RAMInBytes(cctx.String("min-piece-size")) + if err != nil { + return fmt.Errorf("cannot parse min-piece-size to quantity of bytes: %w", err) + } + + if min < 256 { + return errors.New("minimum piece size (w/bit-padding) is 256B") + } + + max, err := units.RAMInBytes(cctx.String("max-piece-size")) + if err != nil { + return fmt.Errorf("cannot parse max-piece-size to quantity of bytes: %w", err) + } + + ssize, err := api.ActorSectorSize(ctx, mAddr) + if err != nil { + return fmt.Errorf("get miner's size %w", err) + } + + smax := int64(ssize) + + if max == 0 { + max = smax + } + + if max > smax { + return fmt.Errorf("max piece size (w/bit-padding) %s cannot exceed miner sector size %s", types.SizeStr(types.NewInt(uint64(max))), types.SizeStr(types.NewInt(uint64(smax)))) + } + + if isUpdate { + if cctx.IsSet("price") { + storageAsk.Ask.Price = types.BigInt(pri) + } + if cctx.IsSet("verified-price") { + storageAsk.Ask.VerifiedPrice = types.BigInt(vpri) + } + if cctx.IsSet("min-piece-size") { + storageAsk.Ask.MinPieceSize = abi.PaddedPieceSize(min) + } + if cctx.IsSet("max-piece-size") { + storageAsk.Ask.MaxPieceSize = abi.PaddedPieceSize(max) + } + return api.MarketSetAsk(ctx, mAddr, storageAsk.Ask.Price, storageAsk.Ask.VerifiedPrice, abi.ChainEpoch(qty), storageAsk.Ask.MinPieceSize, storageAsk.Ask.MaxPieceSize) + } else { + return api.MarketSetAsk(ctx, mAddr, types.BigInt(pri), types.BigInt(vpri), abi.ChainEpoch(qty), abi.PaddedPieceSize(min), abi.PaddedPieceSize(max)) + } + }, +} + +var getAskCmd = &cli.Command{ + Name: "get", + Usage: "Print the miner's ask", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + ctx := DaemonContext(cctx) + + fnapi, closer, err := NewFullNode(cctx) + if err != nil { + return err + } + defer closer() + + smapi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := address.NewFromString(cctx.Args().Get(0)) + if err != nil { + return fmt.Errorf("para `miner` is invalid: %w", err) + } + + sask, err := smapi.MarketGetAsk(ctx, mAddr) + if err != nil { + return err + } + + var ask *storagemarket.StorageAsk + if sask != nil && sask.Ask != nil { + ask = sask.Ask + } + + w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) + fmt.Fprintf(w, "Price per GiB/Epoch\tVerified\tMin. Piece Size (padded)\tMax. Piece Size (padded)\tExpiry (Epoch)\tExpiry (Appx. Rem. Time)\tSeq. No.\n") + if ask == nil { + fmt.Fprintf(w, "\n") + return w.Flush() + } + + head, err := fnapi.ChainHead(ctx) + if err != nil { + return err + } + + dlt := ask.Expiry - head.Height() + rem := "" + if dlt > 0 { + rem = (time.Second * time.Duration(int64(dlt)*int64(constants.MainNetBlockDelaySecs))).String() + } + + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\t%s\t%d\n", types.FIL(ask.Price), types.FIL(ask.VerifiedPrice), types.SizeStr(types.NewInt(uint64(ask.MinPieceSize))), types.SizeStr(types.NewInt(uint64(ask.MaxPieceSize))), ask.Expiry, rem, ask.SeqNo) + + return w.Flush() + }, +} diff --git a/cli/storage-cfg.go b/cli/storage-cfg.go new file mode 100644 index 00000000..55e7f596 --- /dev/null +++ b/cli/storage-cfg.go @@ -0,0 +1,533 @@ +package cli + +import ( + "bufio" + "errors" + "fmt" + "log" + "os" + "path/filepath" + "time" + + "github.com/ipfs/go-cid" + + "github.com/urfave/cli/v2" +) + +var storageCfgCmds = &cli.Command{ + Name: "cfg", + Usage: "Configure storage config", + Subcommands: []*cli.Command{ + storageDealSelectionCmds, + blocksListCmds, + expectedSealDurationCmds, + maxDealStartDelayCmds, + dealsPublishMsgPeriodCmds, + }, +} + +var storageDealSelectionCmds = &cli.Command{ + Name: "selection", + Usage: "Configure acceptance criteria for storage deal proposals", + Subcommands: []*cli.Command{ + storageDealSelectionShowCmd, + storageDealSelectionResetCmd, + storageDealSelectionRejectCmd, + }, +} + +var storageDealSelectionShowCmd = &cli.Command{ + Name: "list", + Usage: "List storage deal proposal selection criteria", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + smapi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + onlineOk, err := smapi.DealsConsiderOnlineStorageDeals(DaemonContext(cctx), mAddr) + if err != nil { + return err + } + + offlineOk, err := smapi.DealsConsiderOfflineStorageDeals(DaemonContext(cctx), mAddr) + if err != nil { + return err + } + + verifiedOk, err := smapi.DealsConsiderVerifiedStorageDeals(DaemonContext(cctx), mAddr) + if err != nil { + return err + } + + unverifiedOk, err := smapi.DealsConsiderUnverifiedStorageDeals(DaemonContext(cctx), mAddr) + if err != nil { + return err + } + + fmt.Printf("considering online storage deals: %t\n", onlineOk) + fmt.Printf("considering offline storage deals: %t\n", offlineOk) + fmt.Printf("considering verified storage deals: %t\n", verifiedOk) + fmt.Printf("considering unverified storage deals: %t\n", unverifiedOk) + + return nil + }, +} + +var storageDealSelectionResetCmd = &cli.Command{ + Name: "reset", + Usage: "Reset storage deal proposal selection criteria to default values", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must set miner address argument") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + smapi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + err = smapi.DealsSetConsiderOnlineStorageDeals(DaemonContext(cctx), mAddr, true) + if err != nil { + return err + } + + err = smapi.DealsSetConsiderOfflineStorageDeals(DaemonContext(cctx), mAddr, true) + if err != nil { + return err + } + + err = smapi.DealsSetConsiderVerifiedStorageDeals(DaemonContext(cctx), mAddr, true) + if err != nil { + return err + } + + err = smapi.DealsSetConsiderUnverifiedStorageDeals(DaemonContext(cctx), mAddr, true) + if err != nil { + return err + } + + return nil + }, +} + +var storageDealSelectionRejectCmd = &cli.Command{ + Name: "reject", + Usage: "Configure criteria which necessitate automatic rejection", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "online", + }, + &cli.BoolFlag{ + Name: "offline", + }, + &cli.BoolFlag{ + Name: "verified", + }, + &cli.BoolFlag{ + Name: "unverified", + }, + }, + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + smapi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + if cctx.Bool("online") { + err = smapi.DealsSetConsiderOnlineStorageDeals(DaemonContext(cctx), mAddr, false) + if err != nil { + return err + } + } + + if cctx.Bool("offline") { + err = smapi.DealsSetConsiderOfflineStorageDeals(DaemonContext(cctx), mAddr, false) + if err != nil { + return err + } + } + + if cctx.Bool("verified") { + err = smapi.DealsSetConsiderVerifiedStorageDeals(DaemonContext(cctx), mAddr, false) + if err != nil { + return err + } + } + + if cctx.Bool("unverified") { + err = smapi.DealsSetConsiderUnverifiedStorageDeals(DaemonContext(cctx), mAddr, false) + if err != nil { + return err + } + } + + return nil + }, +} + +var blocksListCmds = &cli.Command{ + Name: "block-list", + Usage: "Configure miner's CID block list", + Subcommands: []*cli.Command{ + getBlocklistCmd, + setBlocklistCmd, + resetBlocklistCmd, + }, +} + +var getBlocklistCmd = &cli.Command{ + Name: "get", + Usage: "List the contents of the miner's piece CID blocklist", + ArgsUsage: "", + Flags: []cli.Flag{ + &CidBaseFlag, + minerFlag, + }, + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + blocklist, err := api.DealsPieceCidBlocklist(DaemonContext(cctx), mAddr) + if err != nil { + return err + } + + encoder, err := GetCidEncoder(cctx) + if err != nil { + return err + } + + for idx := range blocklist { + fmt.Println(encoder.Encode(blocklist[idx])) + } + + return nil + }, +} + +var setBlocklistCmd = &cli.Command{ + Name: "set", + Usage: "Set the miner's list of blocklisted piece CIDs", + ArgsUsage: "[ (optional, will read from stdin if omitted)]", + Flags: []cli.Flag{ + minerFlag, + }, + Action: func(cctx *cli.Context) error { + if cctx.NArg() == 0 { + return errors.New("need at least one argument for miner address argument") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + scanner := bufio.NewScanner(os.Stdin) + if cctx.NArg() == 2 && cctx.Args().Get(1) != "-" { + absPath, err := filepath.Abs(cctx.Args().Get(1)) + if err != nil { + return err + } + + file, err := os.Open(absPath) + if err != nil { + log.Fatal(err) + } + defer file.Close() //nolint:errcheck + + scanner = bufio.NewScanner(file) + } + + var blocklist []cid.Cid + for scanner.Scan() { + decoded, err := cid.Decode(scanner.Text()) + if err != nil { + return err + } + + blocklist = append(blocklist, decoded) + } + + err = scanner.Err() + if err != nil { + return err + } + + return api.DealsSetPieceCidBlocklist(DaemonContext(cctx), mAddr, blocklist) + }, +} + +var resetBlocklistCmd = &cli.Command{ + Name: "reset", + Usage: "Remove all entries from the miner's piece CID blocklist", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + return api.DealsSetPieceCidBlocklist(DaemonContext(cctx), mAddr, []cid.Cid{}) + }, +} + +var expectedSealDurationCmds = &cli.Command{ + Name: "seal-duration", + Usage: "Configure the expected time, that you expect sealing sectors to take. Deals that start before this duration will be rejected.", + Subcommands: []*cli.Command{ + expectedSealDurationGetCmd, + expectedSealDurationSetCmd, + }, +} + +var expectedSealDurationGetCmd = &cli.Command{ + Name: "get", + Usage: "set miner's expected seal duration", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + marketApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + t, err := marketApi.SectorGetExpectedSealDuration(ctx, mAddr) + if err != nil { + return err + } + fmt.Println("seal-duration: ", t.String()) + return nil + }, +} + +var expectedSealDurationSetCmd = &cli.Command{ + Name: "set", + Usage: "eg. '1m','30s',...", + ArgsUsage: " ", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 2 { + return errors.New("must miner address and time duration arguments") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + marketApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + if cctx.Args().Len() != 1 { + return fmt.Errorf("must pass duration") + } + + d, err := time.ParseDuration(cctx.Args().Get(1)) + if err != nil { + return fmt.Errorf("could not parse duration: %w", err) + } + + return marketApi.SectorSetExpectedSealDuration(ctx, mAddr, d) + }, +} + +var maxDealStartDelayCmds = &cli.Command{ + Name: "max-start-delay", + Usage: "Configure the maximum amount of time proposed deal StartEpoch can be in future.", + Subcommands: []*cli.Command{ + maxDealStartDelayGetCmd, + maxDealStartDelaySetCmd, + }, +} + +var maxDealStartDelayGetCmd = &cli.Command{ + Name: "get", + Usage: "config miner's max deal start delay", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + marketApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + t, err := marketApi.DealsMaxStartDelay(ctx, mAddr) + if err != nil { + return err + } + fmt.Println("max start delay: ", t.String()) + return nil + }, +} + +var maxDealStartDelaySetCmd = &cli.Command{ + Name: "set", + Usage: "eg. '1m','30s',...", + ArgsUsage: " ", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 2 { + return errors.New("must miner address and time duration arguments") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + marketApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + if cctx.Args().Len() != 1 { + return fmt.Errorf("must pass duration") + } + + delay, err := time.ParseDuration(cctx.Args().Get(1)) + if err != nil { + return fmt.Errorf("could not parse duration: %w", err) + } + + return marketApi.DealsSetMaxStartDelay(ctx, mAddr, delay) + }, +} + +var dealsPublishMsgPeriodCmds = &cli.Command{ + Name: "publish-period", + Usage: "Configure the the amount of time to wait for more deals to be ready to publish before publishing them all as a batch.", + Subcommands: []*cli.Command{ + dealsPublishMsgPeriodGetCmd, + dealsPublishMsgPeriodSetCmd, + }, +} + +var dealsPublishMsgPeriodGetCmd = &cli.Command{ + Name: "get", + Usage: "config miner's period of publishing message", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return errors.New("must specify one argument as miner address") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + marketApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + t, err := marketApi.DealsPublishMsgPeriod(ctx, mAddr) + if err != nil { + return err + } + fmt.Println("publish msg period: ", t.String()) + return nil + }, +} + +var dealsPublishMsgPeriodSetCmd = &cli.Command{ + Name: "set", + Usage: "eg. '1m','30s',...", + ArgsUsage: " ", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 2 { + return errors.New("must miner address and time duration arguments") + } + mAddr, err := shouldAddress(cctx.Args().Get(0), false, false) + if err != nil { + return fmt.Errorf("invalid miner address: %w", err) + } + + marketApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + if cctx.Args().Len() != 1 { + return fmt.Errorf("must pass duration") + } + + period, err := time.ParseDuration(cctx.Args().Get(1)) + if err != nil { + return fmt.Errorf("could not parse duration: %w", err) + } + return marketApi.DealsSetPublishMsgPeriod(ctx, mAddr, period) + }, +} diff --git a/cli/storage-deals.go b/cli/storage-deals.go new file mode 100644 index 00000000..73368ea6 --- /dev/null +++ b/cli/storage-deals.go @@ -0,0 +1,418 @@ +package cli + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "sort" + "strings" + "text/tabwriter" + "time" + + tm "github.com/buger/goterm" + "github.com/docker/go-units" + "github.com/ipfs/go-cid" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-markets/storagemarket" + + "github.com/filecoin-project/venus-market/v2/storageprovider" + + "github.com/filecoin-project/venus/venus-shared/types" + "github.com/filecoin-project/venus/venus-shared/types/market" +) + +var StorageCmds = &cli.Command{ + Name: "storage", + Usage: "Manage storage deals and related configuration", + Subcommands: []*cli.Command{ + storageDealsCmds, + storageAsksCmds, + storageCfgCmds, + }, +} + +var storageDealsCmds = &cli.Command{ + Name: "deal", + Usage: "Manage storage deals and related configuration", + Subcommands: []*cli.Command{ + dealsImportDataCmd, + importOfflineDealCmd, + dealsListCmd, + updateStorageDealStateCmd, + dealsPendingPublish, + }, +} + +var dealsImportDataCmd = &cli.Command{ + Name: "import-data", + Usage: "Manually import data for a deal", + ArgsUsage: " ", + Action: func(cctx *cli.Context) error { + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := DaemonContext(cctx) + + if cctx.Args().Len() < 2 { + return fmt.Errorf("must specify proposal CID and file path") + } + + propCid, err := cid.Decode(cctx.Args().Get(0)) + if err != nil { + return err + } + + fpath := cctx.Args().Get(1) + + return api.DealsImportData(ctx, propCid, fpath) + }, +} + +var importOfflineDealCmd = &cli.Command{ + Name: "import-offlinedeal", + Usage: "Manually import offline deal", + ArgsUsage: "", + Flags: []cli.Flag{ + // verbose + &cli.BoolFlag{ + Name: "verbose", + Usage: "Print verbose output", + Aliases: []string{ + "v", + }, + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := DaemonContext(cctx) + + if cctx.Args().Len() < 1 { + return fmt.Errorf("must specify the path of json file which records the deal") + } + + fpath := cctx.Args().Get(0) + + dealbyte, err := ioutil.ReadFile(fpath) + if err != nil { + return fmt.Errorf("read deal file(%s) fail %w", fpath, err) + } + + data := []market.MinerDeal{} + err = json.Unmarshal(dealbyte, &data) + if err != nil { + return fmt.Errorf("parse deal file(%s) fail %w", fpath, err) + } + + totalCount := len(data) + importedCount := 0 + + // if verbose, print the deal info + + for i := 0; i < totalCount; i++ { + err := api.OfflineDealImport(ctx, data[i]) + if err != nil { + if cctx.Bool("verbose") { + fmt.Printf("( %d / %d ) %s : fail : %v\n", i+1, totalCount, data[i].ProposalCid, err) + } + } else { + importedCount++ + if cctx.Bool("verbose") { + fmt.Printf("( %d / %d ) %s : success\n", i+1, totalCount, data[i].ProposalCid) + } + } + } + + fmt.Printf("import %d deals, %d deal success , %d deal fail .\n", totalCount, importedCount, totalCount-importedCount) + + return nil + }, +} + +var dealsListCmd = &cli.Command{ + Name: "list", + Usage: "List all deals for this miner", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "verbose", + Aliases: []string{"v"}, + }, + &cli.BoolFlag{ + Name: "watch", + Usage: "watch deal updates in real-time, rather than a one time list", + }, + &cli.StringFlag{ + Name: "miner", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + maddr := address.Undef + if cctx.IsSet("miner") { + maddr, err = address.NewFromString(cctx.String("miner")) + if err != nil { + return fmt.Errorf("para `miner` is invalid: %w", err) + } + } + + ctx := DaemonContext(cctx) + deals, err := api.MarketListIncompleteDeals(ctx, maddr) + if err != nil { + return err + } + + verbose := cctx.Bool("verbose") + watch := cctx.Bool("watch") + + if watch { + updates, err := api.MarketGetDealUpdates(ctx) + if err != nil { + return err + } + + for { + tm.Clear() + tm.MoveCursor(1, 1) + + err = outputStorageDeals(tm.Output, deals, verbose) + if err != nil { + return err + } + + tm.Flush() + + select { + case <-ctx.Done(): + return nil + case updated := <-updates: + var found bool + for i, existing := range deals { + if existing.ProposalCid.Equals(updated.ProposalCid) { + deals[i] = updated + found = true + break + } + } + if !found { + deals = append(deals, updated) + } + } + } + } + + return outputStorageDeals(os.Stdout, deals, verbose) + }, +} + +var dealStateUsage = func() string { + const c, spliter = 5, " | " + size := len(storageprovider.StringToStorageState) + states := make([]string, 0, size+size/c) + idx := 0 + for s := range storageprovider.StringToStorageState { + states = append(states, s) + idx++ + states = append(states, spliter) + if idx%c == 0 { + states = append(states, "\n\t") + continue + } + } + + usage := strings.Join(states, "") + { + size := len(usage) + if size > 3 && usage[size-3:] == spliter { + usage = usage[:size-3] + } + } + return usage + ", set to 'StorageDealUnknown' means no change" +} + +var updateStorageDealStateCmd = &cli.Command{ + Name: "update", + Usage: "update deal status", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "proposalcid", + Required: true, + }, + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "Actually send transaction performing the action", + Value: false, + }, + &cli.StringFlag{ + Name: "piece-state", + Usage: "Undefine | Assigned | Packing | Proving, empty means no change", + }, + &cli.StringFlag{ + Name: "state", + Usage: dealStateUsage(), + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := DaemonContext(cctx) + proposalCid, err := cid.Decode(cctx.String("proposalcid")) + if err != nil { + return err + } + var isParamOk bool + var state storagemarket.StorageDealStatus + var pieceStatus market.PieceStatus + + if cctx.IsSet("state") { + isParamOk = true + state = storageprovider.StringToStorageState[cctx.String("state")] + } + + if cctx.IsSet("piece-state") { + pieceStatus = market.PieceStatus(cctx.String("piece-state")) + isParamOk = true + } + + if !isParamOk { + return fmt.Errorf("must set 'state' or 'piece-state'") + } + + if !cctx.Bool("really-do-it") { + fmt.Println("Pass --really-do-it to actually execute this action") + return nil + } + + return api.UpdateStorageDealStatus(ctx, proposalCid, state, pieceStatus) + }, +} + +var dealsPendingPublish = &cli.Command{ + Name: "pending-publish", + Usage: "list deals waiting in publish queue", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "publish-now", + Usage: "send a publish message now", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + if cctx.Bool("publish-now") { + if err := api.MarketPublishPendingDeals(ctx); err != nil { + return fmt.Errorf("publishing deals: %w", err) + } + fmt.Println("triggered deal publishing") + return nil + } + + pendings, err := api.MarketPendingDeals(ctx) + if err != nil { + return fmt.Errorf("getting pending deals: %w", err) + } + + for _, pending := range pendings { + if len(pending.Deals) > 0 { + endsIn := time.Until(pending.PublishPeriodStart.Add(pending.PublishPeriod)) + w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) + _, _ = fmt.Fprintf(w, "Publish period: %s (ends in %s)\n", pending.PublishPeriod, endsIn.Round(time.Second)) + _, _ = fmt.Fprintf(w, "First deal queued at: %s\n", pending.PublishPeriodStart) + _, _ = fmt.Fprintf(w, "Deals will be published at: %s\n", pending.PublishPeriodStart.Add(pending.PublishPeriod)) + _, _ = fmt.Fprintf(w, "%d deals queued to be published:\n", len(pending.Deals)) + _, _ = fmt.Fprintf(w, "ProposalCID\tClient\tSize\n") + for _, deal := range pending.Deals { + proposalNd, err := cborutil.AsIpld(&deal) // nolint + if err != nil { + return err + } + + _, _ = fmt.Fprintf(w, "%s\t%s\t%s\n", proposalNd.Cid(), deal.Proposal.Client, units.BytesSize(float64(deal.Proposal.PieceSize))) + } + return w.Flush() + } + } + + fmt.Println("No deals queued to be published") + return nil + }, +} + +func outputStorageDeals(out io.Writer, deals []market.MinerDeal, verbose bool) error { + sort.Slice(deals, func(i, j int) bool { + return deals[i].CreationTime.Time().Before(deals[j].CreationTime.Time()) + }) + + w := tabwriter.NewWriter(out, 2, 4, 2, ' ', 0) + + if verbose { + _, _ = fmt.Fprintf(w, "Creation\tVerified\tProposalCid\tDealId\tState\tPieceState\tClient\tProvider\tSize\tPrice\tDuration\tTransferChannelID\tAddFundCid\tPublishCid\tMessage\n") + } else { + _, _ = fmt.Fprintf(w, "ProposalCid\tDealId\tState\tPieceState\tClient\tProvider\tSize\tPrice\tDuration\n") + } + + for _, deal := range deals { + propcid := deal.ProposalCid.String() + if !verbose { + propcid = "..." + propcid[len(propcid)-8:] + } + + fil := types.FIL(types.BigMul(deal.Proposal.StoragePricePerEpoch, types.NewInt(uint64(deal.Proposal.Duration())))) + + if verbose { + _, _ = fmt.Fprintf(w, "%s\t%t\t", deal.CreationTime.Time().Format(time.Stamp), deal.Proposal.VerifiedDeal) + } + + _, _ = fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s", propcid, deal.DealID, storagemarket.DealStates[deal.State], deal.PieceStatus, + deal.Proposal.Client, deal.Proposal.Provider, units.BytesSize(float64(deal.Proposal.PieceSize)), fil, deal.Proposal.Duration()) + if verbose { + tchid := "" + if deal.TransferChannelID != nil { + tchid = deal.TransferChannelID.String() + } + + addFundcid := "" + if deal.AddFundsCid != nil { + addFundcid = deal.AddFundsCid.String() + } + + pubcid := "" + if deal.PublishCid != nil { + pubcid = deal.PublishCid.String() + } + + _, _ = fmt.Fprintf(w, "\t%s", tchid) + _, _ = fmt.Fprintf(w, "\t%s", addFundcid) + _, _ = fmt.Fprintf(w, "\t%s", pubcid) + _, _ = fmt.Fprintf(w, "\t%s", deal.Message) + } + + _, _ = fmt.Fprintln(w) + } + + return w.Flush() +} diff --git a/cmd/market-client/retrieval.go b/cmd/market-client/retrieval.go index af28e8bd..a7e0f8ed 100644 --- a/cmd/market-client/retrieval.go +++ b/cmd/market-client/retrieval.go @@ -201,7 +201,7 @@ func outputRetrievalDeals(ctx context.Context, out io.Writer, localDeals []clien } var retrievalListCmd = &cli.Command{ - Name: "list-retrievals", + Name: "list", Usage: "List retrieval market deals", Flags: []cli.Flag{ &cli.BoolFlag{ @@ -318,7 +318,7 @@ var retrievalCancelCmd = &cli.Command{ } var clientQueryRetrievalAskCmd = &cli.Command{ - Name: "retrieval-ask", + Name: "query-ask", Usage: "Get a miner's retrieval ask", ArgsUsage: "[minerAddress] [data CID]", Flags: []cli.Flag{ diff --git a/cmd/venus-market/main.go b/cmd/venus-market/main.go index 244ebb61..de52d521 100644 --- a/cmd/venus-market/main.go +++ b/cmd/venus-market/main.go @@ -88,8 +88,8 @@ func main() { Commands: []*cli.Command{ runCmd, cli2.PiecesCmd, - cli2.RetrievalDealsCmd, - cli2.StorageDealsCmd, + cli2.RetrievalCmds, + cli2.StorageCmds, cli2.ActorCmd, cli2.NetCmd, cli2.DataTransfersCmd, diff --git "a/docs/zh/\345\277\253\351\200\237\345\220\257\347\224\250.md" "b/docs/zh/\345\277\253\351\200\237\345\220\257\347\224\250.md" index 2e897a3d..949a2b30 100644 --- "a/docs/zh/\345\277\253\351\200\237\345\220\257\347\224\250.md" +++ "b/docs/zh/\345\277\253\351\200\237\345\220\257\347\224\250.md" @@ -307,12 +307,12 @@ addr: /ip4/192.168.19.67/tcp/58418 ### 存储挂单 ```bash -./venus-market storage-deals set-ask --miner=t01041 --price=0.01fil --verified-price=0.02fil --min-piece-size=512b --max-piece-size=512M +./venus-market storage ask set --price=0.01fil --verified-price=0.02fil --min-piece-size=512b --max-piece-size=512M t01041 ``` 可以通过命令行工具检查定价信息: ```shell -./venus-market storage-deals get-ask --miner=t01041 +./venus-market storage ask get t01041 Price per GiB/Epoch Verified Min. Piece Size (padded) Max. Piece Size (padded) Expiry (Epoch) Expiry (Appx. Rem. Time) Seq. No. 0.01 FIL 0.02 FIL 512 B 521 MiB 161256 719h59m0s 0 ``` @@ -322,16 +322,16 @@ Price per GiB/Epoch Verified Min. Piece Size (padded) Max. Piece Size (padded 存储服务提供商至少应设置收款地址 ```bash -./venus-market retrieval-deals set-ask --payment-addr t3ueb62v5kbyuvwo5tuyzpvds2bfakdjeg2s33p47buvbfiyd7w5fwmeilobt5cqzi673s5z6i267igkgxum6a +./venus-market retrieve ask set t3ueb62v5kbyuvwo5tuyzpvds2bfakdjeg2s33p47buvbfiyd7w5fwmeilobt5cqzi673s5z6i267igkgxum6a ``` 同时,也可以设置数据检索订单的价格,不设置时,默认为0. ```bash -./venus-market retrieval-deals set-ask \ +./venus-market retrieve ask set \ --price 0.02fil \ --unseal-price 0.01fil \ --payment-interval 1MB \ ---payment-addr t3ueb62v5kbyuvwo5tuyzpvds2bfakdjeg2s33p47buvbfiyd7w5fwmeilobt5cqzi673s5z6i267igkgxum6a +t3ueb62v5kbyuvwo5tuyzpvds2bfakdjeg2s33p47buvbfiyd7w5fwmeilobt5cqzi673s5z6i267igkgxum6a ``` diff --git a/retrievalprovider/stream_handler.go b/retrievalprovider/stream_handler.go index 1d7af0c9..43c6a11b 100644 --- a/retrievalprovider/stream_handler.go +++ b/retrievalprovider/stream_handler.go @@ -63,6 +63,9 @@ func (p *RetrievalStreamHandler) HandleQueryStream(stream rmnet.RetrievalQuerySt } sendResp := func(resp retrievalmarket.QueryResponse) { + if resp.Status == retrievalmarket.QueryResponseError { + log.Errorf(resp.Message) + } if err := stream.WriteQueryResponse(resp); err != nil { log.Errorf("Retrieval query: writing query response: %s", err) } @@ -77,7 +80,6 @@ func (p *RetrievalStreamHandler) HandleQueryStream(stream rmnet.RetrievalQuerySt minerDeals, err := p.pieceInfo.GetPieceInfoFromCid(ctx, query.PayloadCID, query.PieceCID) if err != nil { - log.Errorf("Retrieval query: query ready data: %s", err) answer.Status = retrievalmarket.QueryResponseError if errors.Is(err, repo.ErrNotFound) { answer.Message = fmt.Sprintf("retrieve piece(%s) or payload(%s) failed, not found", @@ -97,7 +99,6 @@ func (p *RetrievalStreamHandler) HandleQueryStream(stream rmnet.RetrievalQuerySt answer.PieceCIDFound = retrievalmarket.QueryItemAvailable paymentAddr := address.Address(p.cfg.MinerProviderConfig(selectDeal.Proposal.Provider, true).RetrievalPaymentAddress) if paymentAddr == address.Undef { - log.Errorf("must specific payment address in venus-market") answer.Status = retrievalmarket.QueryResponseError answer.Message = "must specific payment address in venus-market" sendResp(answer) @@ -107,7 +108,6 @@ func (p *RetrievalStreamHandler) HandleQueryStream(stream rmnet.RetrievalQuerySt ask, err := p.askRepo.GetAsk(ctx, selectDeal.Proposal.Provider) if err != nil { - log.Errorf("Retrieval query: GetAsk: %s", err) answer.Status = retrievalmarket.QueryResponseError answer.Message = fmt.Sprintf("failed to price deal: %s", err) sendResp(answer)