Skip to content

Commit

Permalink
Calculate commp on sealing node (#541)
Browse files Browse the repository at this point in the history
* feat: lotus v16

* feat: calculate commp on sealing node

* get size for carv1

* test: carv1 vs carv2

* fix: bad merge

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
  • Loading branch information
dirkmc and nonsense authored Aug 18, 2022
1 parent d9e9ab7 commit 0dffd87
Show file tree
Hide file tree
Showing 19 changed files with 510 additions and 188 deletions.
2 changes: 1 addition & 1 deletion cmd/boostd/dummydeal.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ var dummydealCmd = &cli.Command{
return fmt.Errorf("generating commp for %s: %w", carFilepath, err)
}
pieceCid = cidAndSize.PieceCID
pieceSize = cidAndSize.PieceSize
pieceSize = cidAndSize.Size

if pieceCid.Prefix() != market.PieceCIDPrefix {
return fmt.Errorf("piece cid has wrong prefix: %s", pieceCid)
Expand Down
1 change: 1 addition & 0 deletions db/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func newDealAccessor(db *sql.DB, deal *types.ProviderDealState) *dealAccessor {
"CheckpointAt": &fielddef.FieldDef{F: &deal.CheckpointAt},
"Error": &fielddef.FieldDef{F: &deal.Err},
"Retry": &fielddef.FieldDef{F: &deal.Retry},

// Needed so the deal can be looked up by signed proposal cid
"SignedProposalCID": &fielddef.SignedPropFieldDef{Prop: deal.ClientDealProposal},
},
Expand Down
3 changes: 2 additions & 1 deletion itests/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (f *TestFramework) Start() error {
}
cfg.LotusFees.MaxPublishDealsFee = val
cfg.Dealmaking.MaxStagingDealsBytes = 4000000 // 4 MB
cfg.Dealmaking.RemoteCommp = true
cfg.Storage.ParallelFetchLimit = 10

err = lr.SetConfig(func(raw interface{}) {
Expand Down Expand Up @@ -491,7 +492,7 @@ func (f *TestFramework) MakeDummyDeal(dealUuid uuid.UUID, carFilepath string, ro
}
proposal := market.DealProposal{
PieceCID: cidAndSize.PieceCID,
PieceSize: cidAndSize.PieceSize,
PieceSize: cidAndSize.Size,
VerifiedDeal: false,
Client: f.ClientAddr,
Provider: f.MinerAddr,
Expand Down
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/boost/storagemanager"
"github.com/filecoin-project/boost/storagemarket"
"github.com/filecoin-project/boost/storagemarket/dealfilter"
smtypes "github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
Expand Down Expand Up @@ -474,6 +475,7 @@ func ConfigBoost(cfg *config.Boost) Option {
Override(new(*indexprovider.Wrapper), indexprovider.NewWrapper(cfg.DAGStore)),

Override(new(*storagemarket.ChainDealManager), modules.NewChainDealManager),
Override(new(smtypes.CommpCalculator), From(new(lotus_modules.MinerStorageService))),

Override(new(*storagemarket.Provider), modules.NewStorageMarketProvider(walletMiner, cfg)),

Expand Down
4 changes: 2 additions & 2 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"strconv"
"time"

"github.com/ipfs/go-cid"

"github.com/filecoin-project/lotus/chain/types"
lotus_config "github.com/filecoin-project/lotus/node/config"
"github.com/ipfs/go-cid"
)

const (
Expand Down Expand Up @@ -98,6 +97,7 @@ func DefaultBoost() *Boost {
},

MaxTransferDuration: Duration(24 * 3600 * time.Second),
RemoteCommp: false,
},

LotusDealmaking: lotus_config.DealmakingConfig{
Expand Down
6 changes: 6 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ type DealmakingConfig struct {

// The maximum amount of time a transfer can take before it fails
MaxTransferDuration Duration
// Whether to do commp on the Boost node (local) or on the Sealer (remote)
RemoteCommp bool
}

type FeeConfig struct {
Expand Down
13 changes: 6 additions & 7 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/boost/storagemarket"
"github.com/filecoin-project/boost/storagemarket/logs"
"github.com/filecoin-project/boost/storagemarket/lp2pimpl"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/boost/transport/httptransport"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -392,23 +393,21 @@ func NewChainDealManager(a v1api.FullNode) *storagemarket.ChainDealManager {
return storagemarket.NewChainDealManager(a, cdmCfg)
}

func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, a v1api.FullNode,
sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager,
dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB,
dagst *mktsdagstore.Wrapper, ps lotus_dtypes.ProviderPieceStore, ip *indexprovider.Wrapper, lp lotus_storagemarket.StorageProvider,
cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) {
func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, commpc types.CommpCalculator, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, dagst *mktsdagstore.Wrapper, ps lotus_dtypes.ProviderPieceStore, ip *indexprovider.Wrapper, lp lotus_storagemarket.StorageProvider, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) {
return func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB,
fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, sps sealingpipeline.API,
fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks,
commpc types.CommpCalculator, sps sealingpipeline.API,
df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB,
dagst *mktsdagstore.Wrapper, ps lotus_dtypes.ProviderPieceStore, ip *indexprovider.Wrapper,
lp lotus_storagemarket.StorageProvider, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) {

prvCfg := storagemarket.Config{
MaxTransferDuration: time.Duration(cfg.Dealmaking.MaxTransferDuration),
RemoteCommp: cfg.Dealmaking.RemoteCommp,
}
dl := logs.NewDealLogger(logsDB)
tspt := httptransport.New(h, dl)
prov, err := storagemarket.NewProvider(prvCfg, sqldb, dealsDB, fundMgr, storageMgr, a, dp, provAddr, secb,
prov, err := storagemarket.NewProvider(prvCfg, sqldb, dealsDB, fundMgr, storageMgr, a, dp, provAddr, secb, commpc,
sps, cdm, df, logsSqlDB.db, logsDB, dagst, ps, ip, lp, &signatureVerifier{a}, dl, tspt)
if err != nil {
return nil, err
Expand Down
193 changes: 193 additions & 0 deletions storagemarket/deal_commp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package storagemarket

import (
"fmt"
"io"
"os"

"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/go-commp-utils/writer"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
)

// Verify that the commp provided in the deal proposal matches commp calculated
// over the downloaded file
func (p *Provider) verifyCommP(deal *types.ProviderDealState) *dealMakingError {
p.dealLogger.Infow(deal.DealUuid, "checking commP")
pieceCid, err := p.generatePieceCommitment(deal.InboundFilePath, deal.ClientDealProposal.Proposal.PieceSize)
if err != nil {
err.error = fmt.Errorf("failed to generate CommP: %w", err.error)
return err
}

clientPieceCid := deal.ClientDealProposal.Proposal.PieceCID
if pieceCid != clientPieceCid {
return &dealMakingError{
retry: types.DealRetryFatal,
error: fmt.Errorf("commP mismatch, expected=%s, actual=%s", clientPieceCid, pieceCid),
}
}

return nil
}

// generatePieceCommitment generates commp either locally or remotely,
// depending on config, and pads it as necessary to match the piece size.
func (p *Provider) generatePieceCommitment(filepath string, pieceSize abi.PaddedPieceSize) (cid.Cid, *dealMakingError) {
var pi *abi.PieceInfo
if p.config.RemoteCommp {
var err *dealMakingError
pi, err = p.remoteCommP(filepath)
if err != nil {
err.error = fmt.Errorf("performing remote commp: %w", err.error)
return cid.Undef, err
}
} else {
var err error
pi, err = GenerateCommP(filepath)
if err != nil {
return cid.Undef, &dealMakingError{
retry: types.DealRetryFatal,
error: fmt.Errorf("performing local commp: %w", err),
}
}
}

// if the data does not fill the whole piece
if pi.Size < pieceSize {
// pad the data so that it fills the piece
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
pi.PieceCID.Hash()[len(pi.PieceCID.Hash())-32:],
uint64(pi.Size),
uint64(pieceSize),
)
if err != nil {
return cid.Undef, &dealMakingError{
retry: types.DealRetryFatal,
error: fmt.Errorf("failed to pad commp: %w", err),
}
}
pi.PieceCID, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

return pi.PieceCID, nil
}

// remoteCommP makes an API call to the sealing service to calculate commp
func (p *Provider) remoteCommP(filepath string) (*abi.PieceInfo, *dealMakingError) {
// Open the CAR file
rd, err := carv2.OpenReader(filepath)
if err != nil {
return nil, &dealMakingError{
retry: types.DealRetryFatal,
error: fmt.Errorf("failed to get CARv2 reader: %w", err),
}
}

defer func() {
if err := rd.Close(); err != nil {
log.Warnf("failed to close CARv2 reader for %s: %w", filepath, err)
}
}()

// Get the size of the CAR file
size, err := getCarSize(filepath, rd)
if err != nil {
return nil, &dealMakingError{retry: types.DealRetryFatal, error: err}
}

// Get the data portion of the CAR file
dataReader, err := rd.DataReader()
if err != nil {
return nil, &dealMakingError{
retry: types.DealRetryManual,
error: fmt.Errorf("getting CAR data reader to calculate commp: %w", err),
}
}

// The commp calculation requires the data to be of length
// pieceSize.Unpadded(), so add zeros until it reaches that size
pr, numBytes := padreader.New(dataReader, uint64(size))
log.Debugw("computing remote commp", "size", size, "padded-size", numBytes)
pi, err := p.commpCalc.ComputeDataCid(p.ctx, numBytes, pr)
if err != nil {
if p.ctx.Err() != nil {
return nil, &dealMakingError{
retry: types.DealRetryAuto,
error: fmt.Errorf("boost shutdown while making remote API call to calculate commp: %w", p.ctx.Err()),
}
}
return nil, &dealMakingError{
retry: types.DealRetryManual,
error: fmt.Errorf("making remote API call to calculate commp: %w", err),
}
}
return &pi, nil
}

// GenerateCommP calculates commp locally
func GenerateCommP(filepath string) (*abi.PieceInfo, error) {
rd, err := carv2.OpenReader(filepath)
if err != nil {
return nil, fmt.Errorf("failed to get CARv2 reader: %w", err)
}

defer func() {
if err := rd.Close(); err != nil {
log.Warnf("failed to close CARv2 reader for %s: %w", filepath, err)
}
}()

// dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP.
w := &writer.Writer{}
r, err := rd.DataReader()
if err != nil {
return nil, fmt.Errorf("getting data reader for CAR v1 from CAR v2: %w", err)
}

written, err := io.Copy(w, r)
if err != nil {
return nil, fmt.Errorf("failed to write to CommP writer: %w", err)
}

// get the size of the CAR file
size, err := getCarSize(filepath, rd)
if err != nil {
return nil, err
}

if written != size {
return nil, fmt.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, rd.Header.DataSize)
}

pi, err := w.Sum()
if err != nil {
return nil, fmt.Errorf("failed to calculate CommP: %w", err)
}

return &abi.PieceInfo{
Size: pi.PieceSize,
PieceCID: pi.PieceCID,
}, nil
}

func getCarSize(filepath string, rd *carv2.Reader) (int64, error) {
var size int64
switch rd.Version {
case 2:
size = int64(rd.Header.DataSize)
case 1:
st, err := os.Stat(filepath)
if err != nil {
return 0, fmt.Errorf("failed to get CARv1 file size: %w", err)
}
size = st.Size()
}
return size, nil
}
Loading

0 comments on commit 0dffd87

Please sign in to comment.