Skip to content

Commit

Permalink
Merge pull request #537 from ipfs-force-community/fix/import-deal-fro…
Browse files Browse the repository at this point in the history
…m-msg

feat: import deal by message
  • Loading branch information
simlecode authored Oct 16, 2024
2 parents 5be3521 + aa115c6 commit 3823949
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 15 deletions.
227 changes: 213 additions & 14 deletions cli/direct-deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var directDealCmds = &cli.Command{
updateDirectDealStateCmd,
importDirectDealCmd,
importDirectDealsCmd,
importDirectDealsFromMsgCmd,
},
}

Expand Down Expand Up @@ -372,19 +373,11 @@ var importDirectDealsCmd = &cli.Command{

ctx := cliCtx.Context
carDir := cliCtx.String("car-dir")

findCar := func(pieceCID cid.Cid) (string, error) {
carPath := filepath.Join(carDir, pieceCID.String())
if _, err := os.Stat(carPath); err == nil {
return carPath, nil
}

carPath = filepath.Join(carDir, pieceCID.String()+".car")
if _, err := os.Stat(carPath); err == nil {
return carPath, nil
if len(carDir) != 0 {
carDir, err = filepath.Abs(carDir)
if err != nil {
return fmt.Errorf("failed to get absolute path for car-dir: %w", err)
}

return "", fmt.Errorf("car %s file not found", pieceCID.String())
}

startEpoch, err := GetStartEpoch(cliCtx, fapi)
Expand Down Expand Up @@ -429,7 +422,7 @@ var importDirectDealsCmd = &cli.Command{
if len(carDir) == 0 {
return fmt.Errorf("must specify car-dir")
}
param.FilePath, err = findCar(pieceCid)
param.FilePath, err = findCar(pieceCid, carDir)
if err != nil {
return err
}
Expand Down Expand Up @@ -459,7 +452,7 @@ var importDirectDealsCmd = &cli.Command{
return fmt.Errorf("must specify car-dir")
}
if len(carDir) != 0 {
param.FilePath, err = findCar(param.PieceCID)
param.FilePath, err = findCar(param.PieceCID, carDir)
if err != nil {
return err
}
Expand All @@ -484,6 +477,166 @@ var importDirectDealsCmd = &cli.Command{
},
}

func findCar(pieceCID cid.Cid, carDir string) (string, error) {
carPath := filepath.Join(carDir, pieceCID.String())
if _, err := os.Stat(carPath); err == nil {
return carPath, nil
}

carPath = filepath.Join(carDir, pieceCID.String()+".car")
if _, err := os.Stat(carPath); err == nil {
return carPath, nil
}

return "", fmt.Errorf("car %s file not found", pieceCID.String())
}

var importDirectDealsFromMsgCmd = &cli.Command{
Name: "import-deals-from-msg",
Usage: "import direct deal from message",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "msg",
Usage: "message cid",
Required: true,
},
&cli.StringFlag{
Name: "car-dir",
Usage: "Car file directory",
Hidden: true,
},
&cli.StringFlag{
Name: "manifest",
Usage: "Manifest file path",
},
&cli.BoolFlag{
Name: "skip-commp",
Usage: "skip calculate the piece-cid, please use with caution",
},
&cli.BoolFlag{
Name: "skip-index",
Usage: "skip generate index",
},
&cli.BoolFlag{
Name: "no-copy-car-file",
Usage: "not copy car files to piece storage",
},
&cli.IntFlag{
Name: "start-epoch",
Usage: "start epoch by when the deal should be proved by provider on-chain (default: 8 days from now)",
},
},
Action: func(cliCtx *cli.Context) error {
api, closer, err := NewMarketNode(cliCtx)
if err != nil {
return err
}
defer closer()

fapi, fcloser, err := NewFullNode(cliCtx, OldMarketRepoPath)
if err != nil {
return err
}
defer fcloser()

ctx := cliCtx.Context
carDir := cliCtx.String("car-dir")
if len(carDir) != 0 {
carDir, err = filepath.Abs(carDir)
if err != nil {
return fmt.Errorf("failed to get absolute path for car-dir: %w", err)
}
}

msgCid, err := cid.Decode(cliCtx.String("msg"))
if err != nil {
return err
}

ml, err := fapi.StateSearchMsg(ctx, shared.EmptyTSK, msgCid, -1, true)
if err != nil {
return err
}

if ml.Receipt.ExitCode != 0 {
return fmt.Errorf("message execution failed with exit code %d", ml.Receipt.ExitCode)
}

tr := &shared.TransferReturn{}
err = tr.UnmarshalCBOR(bytes.NewReader(ml.Receipt.Return))
if err != nil {
return err
}

ar := &shared.AllocationsResponse{}
err = ar.UnmarshalCBOR(bytes.NewReader(tr.RecipientData))
if err != nil {
return err
}

msg, err := fapi.ChainGetMessage(ctx, msgCid)
if err != nil {
return err
}

startEpoch, err := GetStartEpoch(cliCtx, fapi)
if err != nil {
return err
}

payloadSizes, err := loadPayloadSizes(cliCtx.String("manifest"))
if err != nil {
return err
}

var directDealParams []types.DirectDealParam
for _, allocationID := range ar.NewAllocations {
a, err := fapi.StateGetAllocation(context.Background(), msg.From, allocationID, shared.EmptyTSK)
if err != nil {
return err
}
endEpoch, err := CheckAndGetEndEpoch(ctx, fapi, msg.From, uint64(allocationID), startEpoch)
if err != nil {
return err
}
payloadSize := payloadSizes[a.Data]
param := types.DirectDealParam{
DealUUID: uuid.New(),
AllocationID: uint64(allocationID),
Client: msg.From,
PieceCID: a.Data,
PayloadSize: payloadSize,
StartEpoch: startEpoch,
EndEpoch: endEpoch,
}
if param.PayloadSize == 0 && len(carDir) == 0 {
return fmt.Errorf("payload size is zero, allocation id: %d, piece cid: %s", allocationID, a.Data)
}
if len(carDir) != 0 {
param.FilePath, err = findCar(a.Data, carDir)
if err != nil {
return err
}
}
directDealParams = append(directDealParams, param)
}

params := types.DirectDealParams{
SkipCommP: cliCtx.Bool("skip-commp"),
SkipGenerateIndex: cliCtx.Bool("skip-generate-index"),
NoCopyCarFile: cliCtx.Bool("no-copy-car-file"),
DealParams: directDealParams,
}

if err := api.ImportDirectDeal(cliCtx.Context, &params); err != nil {
return err
}
fmt.Println("import deal success")

return nil
},
}

func CheckAndGetEndEpoch(ctx context.Context,
fapi v1api.FullNode,
client address.Address,
Expand Down Expand Up @@ -594,3 +747,49 @@ var updateDirectDealStateCmd = &cli.Command{
return api.UpdateDirectDealState(cliCtx.Context, dealUUID, state)
},
}

func loadPayloadSizes(path string) (map[cid.Cid]uint64, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}

records, err := csv.NewReader(f).ReadAll()
if err != nil {
return nil, err
}

payloadSizes := make(map[cid.Cid]uint64)
for i, record := range records {
// skip title
if i == 0 {
continue
}

if len(record) == 4 {
// payload_cid,piece_cid,payload_size,piece_size
pieceCID, err := cid.Parse(record[1])
if err != nil {
continue
}
payloadSize, err := strconv.ParseUint(record[2], 10, 64)
if err != nil {
continue
}
payloadSizes[pieceCID] = payloadSize
} else if len(record) >= 5 {
// payload_cid,filename,piece_cid,payload_size,piece_size,detail
pieceCID, err := cid.Parse(record[2])
if err != nil {
continue
}
payloadSize, err := strconv.ParseUint(record[3], 10, 64)
if err != nil {
continue
}
payloadSizes[pieceCID] = payloadSize
}
}

return payloadSizes, nil
}
10 changes: 9 additions & 1 deletion docs/zh/direct-on-boarding.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,12 @@ Creation ID AllocationId P

```
./droplet storage direct-deal update-state --state 1 07cd5814-02bf-494d-b81c-87df73b3422b
```
```

### 从消息导入订单

发送订单时程序退出,但订单没有导入到 `droplet`,这种情况可以从消息里面获取订单信息并导入到 `droplet`

```
./droplet storage direct-deal import-deals-from-msg --msg <msg cid> --manifest <manifest> --skip-commp --skip-index --no-copy-car-file
```

0 comments on commit 3823949

Please sign in to comment.