Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve offline deals #1515

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type FullNode interface {
ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error)
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref FileRef) error
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error)
ClientCalcCommP(ctx context.Context, inpath string, miner address.Address) (*CommPRet, error)
ClientGenCar(ctx context.Context, ref FileRef, outpath string) error

// ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string)
Expand Down Expand Up @@ -398,3 +400,8 @@ type BlockTemplate struct {
Epoch abi.ChainEpoch
Timestamp uint64
}

type CommPRet struct {
Root cid.Cid
Size abi.UnpaddedPieceSize
}
9 changes: 9 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type FullNodeStruct struct {
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref api.FileRef) error `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
ClientCalcCommP func(ctx context.Context, inpath string, miner address.Address) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`

StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"`
Expand Down Expand Up @@ -309,6 +311,13 @@ func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.Retrieval
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) {
return c.Internal.ClientQueryAsk(ctx, p, miner)
}
func (c *FullNodeStruct) ClientCalcCommP(ctx context.Context, inpath string, miner address.Address) (*api.CommPRet, error) {
return c.Internal.ClientCalcCommP(ctx, inpath, miner)
}

func (c *FullNodeStruct) ClientGenCar(ctx context.Context, ref api.FileRef, outpath string) error {
return c.Internal.ClientGenCar(ctx, ref, outpath)
}

func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
return c.Internal.MpoolPending(ctx, tsk)
Expand Down
81 changes: 70 additions & 11 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ var clientCmd = &cli.Command{
Usage: "Make deals, store data, retrieve data",
Subcommands: []*cli.Command{
clientImportCmd,
clientCommPCmd,
clientLocalCmd,
clientDealCmd,
clientFindCmd,
clientRetrieveCmd,
clientQueryAskCmd,
clientListDeals,
clientCarGenCmd,
},
}

Expand All @@ -41,7 +43,7 @@ var clientImportCmd = &cli.Command{
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "car",
Usage: "export to a car file instead of a regular file",
Usage: "import from a car file instead of a regular file",
},
},
Action: func(cctx *cli.Context) error {
Expand Down Expand Up @@ -69,6 +71,68 @@ var clientImportCmd = &cli.Command{
},
}

var clientCommPCmd = &cli.Command{
Name: "commP",
Usage: "calculate the piece-cid (commP) of a CAR file",
ArgsUsage: "[inputFile minerAddress]",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

if cctx.Args().Len() != 2 {
return fmt.Errorf("usage: commP <inputPath> <minerAddr>")
}

miner, err := address.NewFromString(cctx.Args().Get(1))
if err != nil {
return err
}

ret, err := api.ClientCalcCommP(ctx, cctx.Args().Get(0), miner)

if err != nil {
return err
}
fmt.Println("CID: ", ret.Root)
fmt.Println("Piece size: ", ret.Size)
return nil
},
}

var clientCarGenCmd = &cli.Command{
Name: "generate-car",
Usage: "generate a car file from input",
ArgsUsage: "[inputPath outputPath]",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

if cctx.Args().Len() != 2 {
return fmt.Errorf("usage: generate-car <inputPath> <outputPath>")
}

ref := lapi.FileRef{
Path: cctx.Args().First(),
IsCAR: false,
}

op := cctx.Args().Get(1)

if err = api.ClientGenCar(ctx, ref, op); err != nil {
return err
}
return nil
},
}

var clientLocalCmd = &cli.Command{
Name: "local",
Usage: "List locally imported data",
Expand Down Expand Up @@ -96,17 +160,13 @@ var clientDealCmd = &cli.Command{
Usage: "Initialize storage deal with a miner",
ArgsUsage: "[dataCid miner price duration]",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "manual-transfer",
Usage: "data will be transferred out of band",
},
&cli.StringFlag{
Name: "manual-piece-cid",
Usage: "manually specify piece commitment for data",
Usage: "manually specify piece commitment for data (dataCid must be to a car file)",
},
&cli.Int64Flag{
Name: "manual-piece-size",
Usage: "if manually specifying piece cid, used to specify size",
Usage: "if manually specifying piece cid, used to specify size (dataCid must be to a car file)",
},
&cli.StringFlag{
Name: "from",
Expand All @@ -125,7 +185,7 @@ var clientDealCmd = &cli.Command{
return xerrors.New("expected 4 args: dataCid, miner, price, duration")
}

// [data, miner, dur]
// [data, miner, price, dur]

data, err := cid.Parse(cctx.Args().Get(0))
if err != nil {
Expand Down Expand Up @@ -166,9 +226,6 @@ var clientDealCmd = &cli.Command{
TransferType: storagemarket.TTGraphsync,
Root: data,
}
if cctx.Bool("manual-transfer") {
ref.TransferType = storagemarket.TTManual
}

if mpc := cctx.String("manual-piece-cid"); mpc != "" {
c, err := cid.Parse(mpc)
Expand All @@ -184,6 +241,8 @@ var clientDealCmd = &cli.Command{
}

ref.PieceSize = abi.UnpaddedPieceSize(psize)

ref.TransferType = storagemarket.TTManual
}

proposal, err := api.ClientStartDeal(ctx, &lapi.StartDealParams{
Expand Down
179 changes: 129 additions & 50 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package client
import (
"context"
"errors"
"github.com/filecoin-project/go-fil-markets/pieceio"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"

"io"
"os"

Expand Down Expand Up @@ -202,63 +207,15 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe
}

func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) {
f, err := os.Open(ref.Path)
if err != nil {
return cid.Undef, err
}

stat, err := f.Stat()
if err != nil {
return cid.Undef, err
}

file, err := files.NewReaderPathFile(ref.Path, f, stat)
if err != nil {
return cid.Undef, err
}
if ref.IsCAR {
var store car.Store
if a.Filestore == nil {
store = a.Blockstore
} else {
store = (*filestore.Filestore)(a.Filestore)
}
result, err := car.LoadCar(store, file)
if err != nil {
return cid.Undef, err
}

if len(result.Roots) != 1 {
return cid.Undef, xerrors.New("cannot import car with more than one root")
}

return result.Roots[0], nil
}

bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG)
nd, err := a.clientImport(ref, bufferedDS)

params := ihelper.DagBuilderParams{
Maxlinks: build.UnixfsLinksPerLevel,
RawLeaves: true,
CidBuilder: nil,
Dagserv: bufferedDS,
NoCopy: true,
}

db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))
if err != nil {
return cid.Undef, err
}
nd, err := balanced.Layout(db)
if err != nil {
return cid.Undef, err
}

if err := bufferedDS.Commit(); err != nil {
return cid.Undef, err
}

return nd.Cid(), nil
return nd, nil
}

func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) {
Expand Down Expand Up @@ -410,3 +367,125 @@ func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Addre
}
return signedAsk, nil
}

func (a *API) ClientCalcCommP(ctx context.Context, inpath string, miner address.Address) (*api.CommPRet, error) {
ssize, err := a.StateMinerSectorSize(ctx, miner, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed checking miners sector size: %w", err)
}

rt, _, err := ffiwrapper.ProofTypeFromSectorSize(ssize)
if err != nil {
return nil, xerrors.Errorf("bad sector size: %w", err)
}

rdr, err := os.Open(inpath)
if err != nil {
return nil, err
}

stat, err := rdr.Stat()
if err != nil {
return nil, err
}

commP, pieceSize, err := pieceio.GeneratePieceCommitment(rt, rdr, uint64(stat.Size()))

if err != nil {
return nil, xerrors.Errorf("computing commP failed: %w", err)
}

return &api.CommPRet{
Root: commP,
Size: pieceSize,
}, nil
}

func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {

bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG)
c, err := a.clientImport(ref, bufferedDS)

if err != nil {
return err
}

defer bufferedDS.Remove(ctx, c)
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())

// entire DAG selector
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

f, err := os.Create(outputPath)
defer f.Close()
if err != nil {
return err
}

sc := car.NewSelectiveCar(ctx, a.Blockstore, []car.Dag{{Root: c, Selector: allSelector}})
if err = sc.Write(f); err != nil {
return err
}

return nil
}

func (a *API) clientImport(ref api.FileRef, bufferedDS *ipld.BufferedDAG) (cid.Cid, error) {
f, err := os.Open(ref.Path)
if err != nil {
return cid.Undef, err
}

stat, err := f.Stat()
if err != nil {
return cid.Undef, err
}

file, err := files.NewReaderPathFile(ref.Path, f, stat)
if err != nil {
return cid.Undef, err
}

if ref.IsCAR {
var store car.Store
if a.Filestore == nil {
store = a.Blockstore
} else {
store = (*filestore.Filestore)(a.Filestore)
}
result, err := car.LoadCar(store, file)
if err != nil {
return cid.Undef, err
}

if len(result.Roots) != 1 {
return cid.Undef, xerrors.New("cannot import car with more than one root")
}

return result.Roots[0], nil
}

params := ihelper.DagBuilderParams{
Maxlinks: build.UnixfsLinksPerLevel,
RawLeaves: true,
CidBuilder: nil,
Dagserv: bufferedDS,
NoCopy: true,
}

db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))
if err != nil {
return cid.Undef, err
}
nd, err := balanced.Layout(db)
if err != nil {
return cid.Undef, err
}

if err := bufferedDS.Commit(); err != nil {
return cid.Undef, err
}

return nd.Cid(), nil
}