diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index bb69027b0..cc822275b 100644 Binary files a/build/openrpc/boost.json.gz and b/build/openrpc/boost.json.gz differ diff --git a/cmd/boost/deal_cmd.go b/cmd/boost/deal_cmd.go index 5b33aba78..7727f2949 100644 --- a/cmd/boost/deal_cmd.go +++ b/cmd/boost/deal_cmd.go @@ -78,14 +78,19 @@ var dealFlags = []cli.Flag{ Value: true, }, &cli.BoolFlag{ - Name: "fast-retrieval", - Usage: "indicates that data should be available for fast retrieval", - Value: true, + Name: "remove-unsealed-copy", + Usage: "indicates that an unsealed copy of the sector in not required for fast retrieval", + Value: false, }, &cli.StringFlag{ Name: "wallet", Usage: "wallet address to be used to initiate the deal", }, + &cli.BoolFlag{ + Name: "skip-ipni-announce", + Usage: "indicates that deal index should not be announced to the IPNI(Network Indexer)", + Value: false, + }, } var dealCmd = &cli.Command{ @@ -256,7 +261,8 @@ func dealCmdAction(cctx *cli.Context, isOnline bool) error { DealDataRoot: rootCid, IsOffline: !isOnline, Transfer: transfer, - FastRetrieval: cctx.Bool("fast-retrieval"), + RemoveUnsealedCopy: cctx.Bool("remove-unsealed-copy"), + SkipIPNIAnnounce: cctx.Bool("skip-ipni-announce"), } log.Debugw("about to submit deal proposal", "uuid", dealUuid.String()) diff --git a/db/deals.go b/db/deals.go index c719b5a25..ac13327be 100644 --- a/db/deals.go +++ b/db/deals.go @@ -79,6 +79,7 @@ func newDealAccessor(db *sql.DB, deal *types.ProviderDealState) *dealAccessor { "Error": &fielddef.FieldDef{F: &deal.Err}, "Retry": &fielddef.FieldDef{F: &deal.Retry}, "FastRetrieval": &fielddef.FieldDef{F: &deal.FastRetrieval}, + "AnnounceToIPNI": &fielddef.FieldDef{F: &deal.AnnounceToIPNI}, // Needed so the deal can be looked up by signed proposal cid "SignedProposalCID": &fielddef.SignedPropFieldDef{Prop: deal.ClientDealProposal}, diff --git a/db/fixtures.go b/db/fixtures.go index 0ca56eddb..956c5efe4 100644 --- a/db/fixtures.go +++ b/db/fixtures.go @@ -84,15 +84,16 @@ func GenerateNDeals(count int) ([]types.ProviderDealState, error) { Params: []byte(fmt.Sprintf(`{"url":"http://files.org/file%d.car"}`, rand.Intn(1000))), Size: uint64(rand.Intn(10000)), }, - ChainDealID: abi.DealID(rand.Intn(10000)), - PublishCID: &publishCid, - SectorID: abi.SectorNumber(rand.Intn(10000)), - Offset: abi.PaddedPieceSize(rand.Intn(1000000)), - Length: abi.PaddedPieceSize(rand.Intn(1000000)), - Checkpoint: dealcheckpoints.Accepted, - Retry: types.DealRetryAuto, - Err: dealErr, - FastRetrieval: false, + ChainDealID: abi.DealID(rand.Intn(10000)), + PublishCID: &publishCid, + SectorID: abi.SectorNumber(rand.Intn(10000)), + Offset: abi.PaddedPieceSize(rand.Intn(1000000)), + Length: abi.PaddedPieceSize(rand.Intn(1000000)), + Checkpoint: dealcheckpoints.Accepted, + Retry: types.DealRetryAuto, + Err: dealErr, + FastRetrieval: false, + AnnounceToIPNI: false, } deals = append(deals, deal) diff --git a/db/migrations/20230104230242_deal_announce_to_ipni.sql b/db/migrations/20230104230242_deal_announce_to_ipni.sql new file mode 100644 index 000000000..7490a3387 --- /dev/null +++ b/db/migrations/20230104230242_deal_announce_to_ipni.sql @@ -0,0 +1,10 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE Deals + ADD AnnounceToIPNI BOOL; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +SELECT 'down SQL query'; +-- +goose StatementEnd \ No newline at end of file diff --git a/db/migrations/20230104230411_deal_announce_to_ipni.go b/db/migrations/20230104230411_deal_announce_to_ipni.go new file mode 100644 index 000000000..eba9ef5fb --- /dev/null +++ b/db/migrations/20230104230411_deal_announce_to_ipni.go @@ -0,0 +1,24 @@ +package migrations + +import ( + "database/sql" + + "github.com/pressly/goose/v3" +) + +func init() { + goose.AddMigration(upSetdealsAnnounceToIPNI, downSetdealsAnnounceToIPNI) +} + +func upSetdealsAnnounceToIPNI(tx *sql.Tx) error { + _, err := tx.Exec("UPDATE Deals SET AnnounceToIPNI=?;", true) + if err != nil { + return err + } + return nil +} + +func downSetdealsAnnounceToIPNI(tx *sql.Tx) error { + // This code is executed when the migration is rolled back. + return nil +} diff --git a/db/migrations_tests/deals_announce_to_ipni_test.go b/db/migrations_tests/deals_announce_to_ipni_test.go new file mode 100644 index 000000000..086235fcc --- /dev/null +++ b/db/migrations_tests/deals_announce_to_ipni_test.go @@ -0,0 +1,46 @@ +package migrations_tests + +import ( + "context" + "testing" + + "github.com/filecoin-project/boost/db" + "github.com/filecoin-project/boost/db/migrations" + "github.com/pressly/goose/v3" + "github.com/stretchr/testify/require" +) + +func TestDealAnnounceToIPNI(t *testing.T) { + req := require.New(t) + ctx := context.Background() + + sqldb := db.CreateTestTmpDB(t) + req.NoError(db.CreateAllBoostTables(ctx, sqldb, sqldb)) + + // Run migrations up to the one that adds the AnnounceToIPNI field to Deals + goose.SetBaseFS(migrations.EmbedMigrations) + req.NoError(goose.SetDialect("sqlite3")) + req.NoError(goose.UpTo(sqldb, ".", 20230104230242)) + + // Generate 1 deal + dealsDB := db.NewDealsDB(sqldb) + deals, err := db.GenerateNDeals(1) + req.NoError(err) + + // Insert the deals in DB + err = dealsDB.Insert(ctx, &deals[0]) + require.NoError(t, err) + + // Get deal state + dealState, err := dealsDB.ByID(ctx, deals[0].DealUuid) + require.NoError(t, err) + require.False(t, dealState.AnnounceToIPNI) + + //Run migration + req.NoError(goose.UpByOne(sqldb, ".")) + + // Check the deal state again + dealState, err = dealsDB.ByID(ctx, deals[0].DealUuid) + require.NoError(t, err) + require.True(t, dealState.AnnounceToIPNI) +} diff --git a/db/migrations_tests/deals_fast_retrieval_test.go b/db/migrations_tests/deals_fast_retrieval_test.go index 466e31aea..28693471e 100644 --- a/db/migrations_tests/deals_fast_retrieval_test.go +++ b/db/migrations_tests/deals_fast_retrieval_test.go @@ -22,25 +22,41 @@ func TestDealFastRetrieval(t *testing.T) { req.NoError(goose.SetDialect("sqlite3")) req.NoError(goose.UpTo(sqldb, ".", 20221124191002)) - // Generate 2 deals - dealsDB := db.NewDealsDB(sqldb) + // Generate 1 deal deals, err := db.GenerateNDeals(1) req.NoError(err) - // Insert the deals in DB - err = dealsDB.Insert(ctx, &deals[0]) - require.NoError(t, err) + deal := deals[0] + + _, err = sqldb.Exec(`INSERT INTO Deals ("ID", "CreatedAt", "DealProposalSignature", "PieceCID", "PieceSize", + "VerifiedDeal", "IsOffline", "ClientAddress", "ProviderAddress","Label", "StartEpoch", "EndEpoch", + "StoragePricePerEpoch", "ProviderCollateral", "ClientCollateral", "ClientPeerID", "DealDataRoot", + "InboundFilePath", "TransferType", "TransferParams", "TransferSize", "ChainDealID", "PublishCID", + "SectorID", "Offset", "Length", "Checkpoint", "CheckpointAt", "Error", "Retry", "SignedProposalCID") + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`, + deal.DealUuid, deal.CreatedAt, []byte("test"), deal.ClientDealProposal.Proposal.PieceCID.String(), + deal.ClientDealProposal.Proposal.PieceSize, deal.ClientDealProposal.Proposal.VerifiedDeal, deal.IsOffline, + deal.ClientDealProposal.Proposal.Client.String(), deal.ClientDealProposal.Proposal.Provider.String(), "test", + deal.ClientDealProposal.Proposal.StartEpoch, deal.ClientDealProposal.Proposal.EndEpoch, deal.ClientDealProposal.Proposal.StoragePricePerEpoch.Uint64(), + deal.ClientDealProposal.Proposal.ProviderCollateral.Int64(), deal.ClientDealProposal.Proposal.ClientCollateral.Uint64(), deal.ClientPeerID.String(), + deal.DealDataRoot.String(), deal.InboundFilePath, deal.Transfer.Type, deal.Transfer.Params, deal.Transfer.Size, deal.ChainDealID, + deal.PublishCID.String(), deal.SectorID, deal.Offset, deal.Length, deal.Checkpoint, deal.CheckpointAt, deal.Err, deal.Retry, []byte("test")) - // Get deal state - dealState, err := dealsDB.ByID(ctx, deals[0].DealUuid) require.NoError(t, err) - require.False(t, dealState.FastRetrieval) //Run migration req.NoError(goose.UpByOne(sqldb, ".")) - // Check the deal state again - dealState, err = dealsDB.ByID(ctx, deals[0].DealUuid) + rows, err := sqldb.Query("SELECT FastRetrieval FROM Deals WHERE ID =?", deal.DealUuid) require.NoError(t, err) - require.True(t, dealState.FastRetrieval) + defer rows.Close() + + for rows.Next() { + + var ft bool + + err = rows.Scan(&ft) + require.NoError(t, err) + require.True(t, ft) + } } diff --git a/db/migrations_tests/storagetagged_set_host_test.go b/db/migrations_tests/storagetagged_set_host_test.go index 0516eb2d9..830f5e0aa 100644 --- a/db/migrations_tests/storagetagged_set_host_test.go +++ b/db/migrations_tests/storagetagged_set_host_test.go @@ -24,13 +24,6 @@ func TestStorageTaggedSetHost(t *testing.T) { req.NoError(goose.SetDialect("sqlite3")) req.NoError(goose.UpTo(sqldb, ".", 20220908122510)) - // Generate 2 deals - dealsDB := db.NewDealsDB(sqldb) - - // Add FastRetrieval to allow tests to works - _, err := sqldb.Exec(`ALTER TABLE Deals ADD FastRetrieval BOOL`) - require.NoError(t, err) - deals, err := db.GenerateNDeals(2) req.NoError(err) @@ -44,8 +37,21 @@ func TestStorageTaggedSetHost(t *testing.T) { Params: []byte(fmt.Sprintf(`{"url":"http://%s/file.car"}`, getHost(i))), Size: uint64(1024), } - err = dealsDB.Insert(ctx, &deal) - req.NoError(err) + _, err = sqldb.Exec(`INSERT INTO Deals ("ID", "CreatedAt", "DealProposalSignature", "PieceCID", "PieceSize", + "VerifiedDeal", "IsOffline", "ClientAddress", "ProviderAddress","Label", "StartEpoch", "EndEpoch", + "StoragePricePerEpoch", "ProviderCollateral", "ClientCollateral", "ClientPeerID", "DealDataRoot", + "InboundFilePath", "TransferType", "TransferParams", "TransferSize", "ChainDealID", "PublishCID", + "SectorID", "Offset", "Length", "Checkpoint", "CheckpointAt", "Error", "Retry", "SignedProposalCID") + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`, + deal.DealUuid, deal.CreatedAt, []byte("test"), deal.ClientDealProposal.Proposal.PieceCID.String(), + deal.ClientDealProposal.Proposal.PieceSize, deal.ClientDealProposal.Proposal.VerifiedDeal, deal.IsOffline, + deal.ClientDealProposal.Proposal.Client.String(), deal.ClientDealProposal.Proposal.Provider.String(), "test", + deal.ClientDealProposal.Proposal.StartEpoch, deal.ClientDealProposal.Proposal.EndEpoch, deal.ClientDealProposal.Proposal.StoragePricePerEpoch.Uint64(), + deal.ClientDealProposal.Proposal.ProviderCollateral.Int64(), deal.ClientDealProposal.Proposal.ClientCollateral.Uint64(), deal.ClientPeerID.String(), + deal.DealDataRoot.String(), deal.InboundFilePath, deal.Transfer.Type, deal.Transfer.Params, deal.Transfer.Size, deal.ChainDealID, + deal.PublishCID.String(), deal.SectorID, deal.Offset, deal.Length, deal.Checkpoint, deal.CheckpointAt, deal.Err, deal.Retry, []byte("test")) + + require.NoError(t, err) } // Simulate tagging a deal diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index 49e7d22a6..47f442066 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -396,7 +396,8 @@ Response: "Err": "string value", "Retry": "auto", "NBytesReceived": 9, - "FastRetrieval": true + "FastRetrieval": true, + "AnnounceToIPNI": true } ``` @@ -462,7 +463,8 @@ Response: "Err": "string value", "Retry": "auto", "NBytesReceived": 9, - "FastRetrieval": true + "FastRetrieval": true, + "AnnounceToIPNI": true } ``` @@ -507,7 +509,8 @@ Inputs: "Params": "Ynl0ZSBhcnJheQ==", "Size": 42 }, - "FastRetrieval": true + "RemoveUnsealedCopy": true, + "SkipIPNIAnnounce": true } ] ``` diff --git a/gql/resolver.go b/gql/resolver.go index 61acd1afb..9e3ed5fbd 100644 --- a/gql/resolver.go +++ b/gql/resolver.go @@ -5,7 +5,9 @@ import ( "encoding/hex" "errors" "fmt" + "math" + "github.com/dustin/go-humanize" "github.com/filecoin-project/boost/db" "github.com/filecoin-project/boost/fundmanager" gqltypes "github.com/filecoin-project/boost/gql/types" @@ -143,6 +145,7 @@ func (r *resolver) Deals(ctx context.Context, args dealsArgs) (*dealListResolver resolvers := make([]*dealResolver, 0, len(deals)) for _, deal := range deals { + deal.NBytesReceived = int64(r.provider.NBytesReceived(deal.DealUuid)) resolvers = append(resolvers, newDealResolver(&deal, r.provider, r.dealsDB, r.logsDB, r.spApi)) } @@ -376,6 +379,14 @@ func (dr *dealResolver) IsVerified() bool { return dr.ProviderDealState.ClientDealProposal.Proposal.VerifiedDeal } +func (dr *dealResolver) KeepUnsealedCopy() bool { + return dr.ProviderDealState.FastRetrieval +} + +func (dr *dealResolver) AnnounceToIPNI() bool { + return dr.ProviderDealState.AnnounceToIPNI +} + func (dr *dealResolver) ProposalLabel() (string, error) { l := dr.ProviderDealState.ClientDealProposal.Proposal.Label if l.IsString() { @@ -512,16 +523,25 @@ func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints. if dr.IsOffline { return "Awaiting Offline Data Import" } + var pct uint64 = math.MaxUint64 + if dr.ProviderDealState.Transfer.Size > 0 { + pct = (100 * dr.transferred) / dr.ProviderDealState.Transfer.Size + } switch { case dr.transferred == 0 && !dr.provider.IsTransferStalled(dr.DealUuid): return "Transfer Queued" - case dr.transferred == 100: - return "Transfer Complete" + case pct == 100: + return "Verifying Commp" default: - pct := (100 * dr.transferred) / dr.ProviderDealState.Transfer.Size isStalled := dr.provider.IsTransferStalled(dr.DealUuid) if isStalled { - return fmt.Sprintf("Transfer stalled at %d%% ", pct) + if pct == math.MaxUint64 { + return fmt.Sprintf("Transfer stalled at %s", humanize.Bytes(dr.transferred)) + } + return fmt.Sprintf("Transfer stalled at %d%%", pct) + } + if pct == math.MaxUint64 { + return fmt.Sprintf("Transferring %s", humanize.Bytes(dr.transferred)) } return fmt.Sprintf("Transferring %d%%", pct) } diff --git a/gql/schema.graphql b/gql/schema.graphql index eef4f1c97..c962c03a3 100644 --- a/gql/schema.graphql +++ b/gql/schema.graphql @@ -45,6 +45,8 @@ type Deal { PieceCid: String! PieceSize: Uint64! IsVerified: Boolean! + AnnounceToIPNI: Boolean! + KeepUnsealedCopy: Boolean! ProposalLabel: String! ProviderCollateral: Uint64! ClientCollateral: Uint64! diff --git a/itests/framework/framework.go b/itests/framework/framework.go index 4bc924ba9..da0012d71 100644 --- a/itests/framework/framework.go +++ b/itests/framework/framework.go @@ -541,7 +541,8 @@ func (f *TestFramework) MakeDummyDeal(dealUuid uuid.UUID, carFilepath string, ro Params: transferParamsJSON, Size: uint64(carFileinfo.Size()), }, - FastRetrieval: true, + RemoveUnsealedCopy: false, + SkipIPNIAnnounce: false, } return f.Client.StorageDeal(f.ctx, dealParams, peerID) diff --git a/react/src/DealDetail.js b/react/src/DealDetail.js index b61b4f3a4..fd9969738 100644 --- a/react/src/DealDetail.js +++ b/react/src/DealDetail.js @@ -149,6 +149,14 @@ export function DealDetail(props) { Verified {deal.IsVerified ? 'Yes' : 'No'} + + Keep Unsealed Copy + {deal.KeepUnsealedCopy ? 'Yes' : 'No'} + + + Announce To IPNI + {deal.AnnounceToIPNI ? 'Yes' : 'No'} + Piece CID {deal.PieceCid} diff --git a/react/src/Mpool.js b/react/src/Mpool.js index 24fc6d8b3..89950b346 100644 --- a/react/src/Mpool.js +++ b/react/src/Mpool.js @@ -55,7 +55,7 @@ function MpoolMessage(props) { Nonce - {msg.Nonce} + {msg.Nonce+''} Value diff --git a/react/src/SealingPipeline.js b/react/src/SealingPipeline.js index 37cc6e221..37f314c0e 100644 --- a/react/src/SealingPipeline.js +++ b/react/src/SealingPipeline.js @@ -162,9 +162,14 @@ function SealingType(props) { } function Workers(props) { + const workers = JSON.parse(JSON.stringify(props.workers)).sort((a, b) => { + if (a.Start > b.Start) return 1 + if (a.Start < b.Start) return -1 + return 0 + }) return
Workers
- {props.workers.length === 0 ?
No active jobs
: ( + {workers.length === 0 ?
No active jobs
: ( @@ -173,7 +178,7 @@ function Workers(props) { - {props.workers.map(worker => ( + {workers.map(worker => ( diff --git a/react/src/gql.js b/react/src/gql.js index 052dab1ea..1ea689198 100644 --- a/react/src/gql.js +++ b/react/src/gql.js @@ -78,6 +78,8 @@ const DealsListQuery = gql` ClientAddress Checkpoint CheckpointAt + AnnounceToIPNI + KeepUnsealedCopy IsOffline Err Retry @@ -166,6 +168,8 @@ const DealSubscription = gql` IsOffline Checkpoint CheckpointAt + AnnounceToIPNI + KeepUnsealedCopy Retry Err Message @@ -314,6 +318,8 @@ const NewDealsSubscription = gql` CreatedAt PieceCid PieceSize + AnnounceToIPNI + KeepUnsealedCopy ClientAddress StartEpoch EndEpoch diff --git a/storagemarket/deal_execution.go b/storagemarket/deal_execution.go index 1e1a2f161..1e6e9e91a 100644 --- a/storagemarket/deal_execution.go +++ b/storagemarket/deal_execution.go @@ -222,7 +222,11 @@ func (p *Provider) execDealUptoAddPiece(ctx context.Context, deal *types.Provide err.error = fmt.Errorf("failed to add index and announce deal: %w", err.error) return err } - p.dealLogger.Infow(deal.DealUuid, "deal successfully indexed and announced") + if deal.AnnounceToIPNI { + p.dealLogger.Infow(deal.DealUuid, "deal successfully indexed and announced") + } else { + p.dealLogger.Infow(deal.DealUuid, "deal successfully indexed") + } } else { p.dealLogger.Infow(deal.DealUuid, "deal has already been indexed and announced") } @@ -582,15 +586,20 @@ func (p *Provider) indexAndAnnounce(ctx context.Context, pub event.Emitter, deal // if the index provider is enabled if p.ip.Enabled() { - // announce to the network indexer but do not fail the deal if the announcement fails - annCid, err := p.ip.AnnounceBoostDeal(ctx, deal) - if err != nil { - return &dealMakingError{ - retry: types.DealRetryAuto, - error: fmt.Errorf("failed to announce deal to network indexer: %w", err), + if deal.AnnounceToIPNI { + // announce to the network indexer but do not fail the deal if the announcement fails, + // just retry the next time boost restarts + annCid, err := p.ip.AnnounceBoostDeal(ctx, deal) + if err != nil { + return &dealMakingError{ + retry: types.DealRetryAuto, + error: fmt.Errorf("failed to announce deal to network indexer: %w", err), + } } + p.dealLogger.Infow(deal.DealUuid, "announced deal to network indexer", "announcement-cid", annCid) + } else { + p.dealLogger.Infow(deal.DealUuid, "didn't announce deal as requested in the deal proposal") } - p.dealLogger.Infow(deal.DealUuid, "announced deal to network indexer", "announcement-cid", annCid) } else { p.dealLogger.Infow(deal.DealUuid, "didn't announce deal because network indexer is disabled") } diff --git a/storagemarket/provider.go b/storagemarket/provider.go index 664a3aef3..45d0728de 100644 --- a/storagemarket/provider.go +++ b/storagemarket/provider.go @@ -272,7 +272,8 @@ func (p *Provider) ExecuteDeal(ctx context.Context, dp *types.DealParams, client Transfer: dp.Transfer, IsOffline: dp.IsOffline, Retry: smtypes.DealRetryAuto, - FastRetrieval: dp.FastRetrieval, + FastRetrieval: !dp.RemoveUnsealedCopy, + AnnounceToIPNI: !dp.SkipIPNIAnnounce, } // validate the deal proposal if err := p.validateDealProposal(ds); err != nil { @@ -368,6 +369,12 @@ func (p *Provider) Start() error { return fmt.Errorf("failed to migrate db: %w", err) } + // De-fragment the logs DB + _, err = p.logsSqlDB.Exec("Vacuum") + if err != nil { + log.Errorf("failed to de-fragment the logs db: %w", err) + } + log.Infow("db: initialized") // cleanup all completed deals in case Boost resumed before they were cleanedup @@ -393,7 +400,7 @@ func (p *Provider) Start() error { // cleanup all deals that have finished successfully for _, deal := range activeDeals { - // Make sure that deals that have reached the index and announce stage + // Make sure that deals that have reached the IndexedAndAnnounced stage // have their resources untagged // TODO Update this once we start listening for expired/slashed deals etc if deal.Checkpoint >= dealcheckpoints.IndexedAndAnnounced { diff --git a/storagemarket/provider_loop.go b/storagemarket/provider_loop.go index 60bbe9dc7..288e28d55 100644 --- a/storagemarket/provider_loop.go +++ b/storagemarket/provider_loop.go @@ -70,26 +70,7 @@ type acceptError struct { reason string } -func (p *Provider) processDealProposal(deal *types.ProviderDealState) *acceptError { - host, err := deal.Transfer.Host() - if err != nil { - return &acceptError{ - error: fmt.Errorf("failed to get deal transfer host: %w", err), - reason: fmt.Sprintf("server error: get deal transfer host: %s", err), - isSevereError: false, - } - } - - // Check that the deal proposal is unique - if aerr := p.checkDealPropUnique(deal); aerr != nil { - return aerr - } - - // Check that the deal uuid is unique - if aerr := p.checkDealUuidUnique(deal); aerr != nil { - return aerr - } - +func (p *Provider) runDealFilters(deal *types.ProviderDealState) *acceptError { // get current sealing pipeline status status, err := sealingpipeline.GetStatus(p.ctx, p.fullnodeApi, p.sps) if err != nil { @@ -128,6 +109,33 @@ func (p *Provider) processDealProposal(deal *types.ProviderDealState) *acceptErr isSevereError: false, } } + return nil +} + +func (p *Provider) processDealProposal(deal *types.ProviderDealState) *acceptError { + host, err := deal.Transfer.Host() + if err != nil { + return &acceptError{ + error: fmt.Errorf("failed to get deal transfer host: %w", err), + reason: fmt.Sprintf("server error: get deal transfer host: %s", err), + isSevereError: false, + } + } + + // Check that the deal proposal is unique + if aerr := p.checkDealPropUnique(deal); aerr != nil { + return aerr + } + + // Check that the deal uuid is unique + if aerr := p.checkDealUuidUnique(deal); aerr != nil { + return aerr + } + + // Run deal through the filter + if aerr := p.runDealFilters(deal); aerr != nil { + return aerr + } cleanup := func() { collat, pub, errf := p.fundManager.UntagFunds(p.ctx, deal.DealUuid) @@ -222,7 +230,7 @@ func (p *Provider) processDealProposal(deal *types.ProviderDealState) *acceptErr return nil } -// processOfflineDealProposal just saves the deal to the database. +// processOfflineDealProposal just saves the deal to the database after running deal filters // Execution resumes when processImportOfflineDealData is called. func (p *Provider) processOfflineDealProposal(ds *smtypes.ProviderDealState, dh *dealHandler) *acceptError { // Check that the deal proposal is unique @@ -235,6 +243,11 @@ func (p *Provider) processOfflineDealProposal(ds *smtypes.ProviderDealState, dh return aerr } + // Run deal through the filter + if aerr := p.runDealFilters(ds); aerr != nil { + return aerr + } + // Save deal to DB ds.CreatedAt = time.Now() ds.Checkpoint = dealcheckpoints.Accepted diff --git a/storagemarket/provider_resumption_test.go b/storagemarket/provider_resumption_test.go index 543d8ba61..8dcfff4ed 100644 --- a/storagemarket/provider_resumption_test.go +++ b/storagemarket/provider_resumption_test.go @@ -24,7 +24,7 @@ func TestDealCompletionOnProcessResumption(t *testing.T) { return h.newDealBuilder(t, 1).withBlockingHttpServer().build() }, stubAfterResumeF: func(tb *testDealBuilder) *testDeal { - return tb.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().build() + return tb.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build() }, unblockF: func(td *testDeal) { td.unblockTransfer() @@ -51,7 +51,7 @@ func TestDealCompletionOnProcessResumption(t *testing.T) { return h.newDealBuilder(t, 1).withCommpNonBlocking().withPublishBlocking().withNormalHttpServer().build() }, stubAfterResumeF: func(tb *testDealBuilder) *testDeal { - return tb.withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().build() + return tb.withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build() }, waitForAndAssertBeforeResumeF: func(t *testing.T, h *ProviderHarness, td *testDeal) { td.waitForAndAssert(t, ctx, dealcheckpoints.Transferred) @@ -63,7 +63,7 @@ func TestDealCompletionOnProcessResumption(t *testing.T) { return h.newDealBuilder(t, 1).withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmBlocking().withNormalHttpServer().build() }, stubAfterResumeF: func(tb *testDealBuilder) *testDeal { - return tb.withPublishConfirmNonBlocking().withAddPieceNonBlocking().build() + return tb.withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build() }, waitForAndAssertBeforeResumeF: func(t *testing.T, h *ProviderHarness, td *testDeal) { td.waitForAndAssert(t, ctx, dealcheckpoints.Published) @@ -75,7 +75,7 @@ func TestDealCompletionOnProcessResumption(t *testing.T) { return h.newDealBuilder(t, 1).withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceBlocking().withNormalHttpServer().build() }, stubAfterResumeF: func(tb *testDealBuilder) *testDeal { - return tb.withAddPieceNonBlocking().build() + return tb.withAddPieceNonBlocking().withAnnounceNonBlocking().build() }, waitForAndAssertBeforeResumeF: func(t *testing.T, h *ProviderHarness, td *testDeal) { td.waitForAndAssert(t, ctx, dealcheckpoints.PublishConfirmed) diff --git a/storagemarket/provider_test.go b/storagemarket/provider_test.go index 0e06f68f5..e3911f389 100644 --- a/storagemarket/provider_test.go +++ b/storagemarket/provider_test.go @@ -79,7 +79,7 @@ func TestSimpleDealHappy(t *testing.T) { if localCommp { // if commp is calculated locally, don't expect a remote call to commp // (expect calls to all other miner APIs) - tdBuilder.withPublishBlocking().withPublishConfirmBlocking().withAddPieceBlocking() + tdBuilder.withPublishBlocking().withPublishConfirmBlocking().withAddPieceBlocking().withAnnounceBlocking() } else { tdBuilder.withAllMinerCallsBlocking() } @@ -162,100 +162,6 @@ func TestMultipleDealsConcurrent(t *testing.T) { harness.EventuallyAssertNoTagged(t, ctx) } -func TestMultipleDealsConcurrentWithFundsAndStorage(t *testing.T) { - nDeals := 10 - ctx := context.Background() - - // setup the provider test harness - harness := NewHarness(t) - // start the provider test harness - harness.Start(t, ctx) - defer harness.Stop() - - var errGrp errgroup.Group - var tds []*testDeal - totalStorage := uint64(0) - totalCollat := abi.NewTokenAmount(0) - totalPublish := abi.NewTokenAmount(0) - // half the deals will finish, half will be blocked on the wait for publish call -> we will then assert that the funds and storage manager state is as expected - for i := 0; i < nDeals; i++ { - i := i - var td *testDeal - // for even numbered deals, we will never block - if i%2 == 0 { - // setup mock publish & add-piece expectations with non-blocking behaviours -> the associated tagged funds and storage will be released - td = harness.newDealBuilder(t, i).withAllMinerCallsNonBlocking().withNormalHttpServer().build() - } else { - // for odd numbered deals, we will block on the publish-confirm step - // setup mock publish & add-piece expectations with blocking wait-for-publish behaviours -> the associated tagged funds and storage will not be released - td = harness.newDealBuilder(t, i).withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmBlocking().withAddPieceBlocking().withNormalHttpServer().build() - totalStorage = totalStorage + td.params.Transfer.Size - totalCollat = abi.NewTokenAmount(totalCollat.Add(totalCollat.Int, td.params.ClientDealProposal.Proposal.ProviderCollateral.Int).Int64()) - totalPublish = abi.NewTokenAmount(totalPublish.Add(totalPublish.Int, harness.MinPublishFees.Int).Int64()) - } - - tds = append(tds, td) - - errGrp.Go(func() error { - err := td.executeAndSubscribe() - if err != nil { - return err - } - var checkpoint dealcheckpoints.Checkpoint - if i%2 == 0 { - checkpoint = dealcheckpoints.AddedPiece - } else { - checkpoint = dealcheckpoints.Published - } - if err := td.waitForCheckpoint(checkpoint); err != nil { - return err - } - - return nil - }) - } - require.NoError(t, errGrp.Wait()) - - for i := 0; i < nDeals; i++ { - td := tds[i] - if i%2 == 0 { - td.assertPieceAdded(t, ctx) - } else { - td.assertDealPublished(t, ctx) - } - } - - harness.EventuallyAssertStorageFundState(t, ctx, totalStorage, totalPublish, totalCollat) - - // now confirm the publish for remaining deals and assert funds and storage - for i := 0; i < nDeals; i++ { - td := tds[i] - if i%2 != 0 { - td.unblockWaitForPublish() - totalPublish = abi.NewTokenAmount(totalPublish.Sub(totalPublish.Int, harness.MinPublishFees.Int).Int64()) - totalCollat = abi.NewTokenAmount(totalCollat.Sub(totalCollat.Int, td.params.ClientDealProposal.Proposal.ProviderCollateral.Int).Int64()) - } - } - harness.EventuallyAssertStorageFundState(t, ctx, totalStorage, totalPublish, totalCollat) - - // now finish the remaining deals and assert funds and storage - for i := 0; i < nDeals; i++ { - td := tds[i] - if i%2 != 0 { - td.unblockAddPiece() - totalStorage = totalStorage - td.params.Transfer.Size - } - } - harness.EventuallyAssertNoTagged(t, ctx) - // assert that piece has been added for the deals - for i := 0; i < nDeals; i++ { - if i%2 != 0 { - td := tds[i] - td.assertPieceAdded(t, ctx) - } - } -} - func TestDealsRejectedForFunds(t *testing.T) { ctx := context.Background() // setup the provider test harness with configured publish fee per deal and a total wallet balance. @@ -580,7 +486,7 @@ func TestDealAutoRestartAfterAutoRecoverableErrors(t *testing.T) { expectedErr: "pubconferr", onResume: func(builder *testDealBuilder) *testDeal { // Simulate publish confirm success (and then success for all other calls to miner) - return builder.withPublishConfirmNonBlocking().withAddPieceNonBlocking().build() + return builder.withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build() }, }, { name: "add piece fails", @@ -591,7 +497,7 @@ func TestDealAutoRestartAfterAutoRecoverableErrors(t *testing.T) { expectedErr: "addpieceerr", onResume: func(builder *testDealBuilder) *testDeal { // Simulate add piece success - return builder.withAddPieceNonBlocking().build() + return builder.withAddPieceNonBlocking().withAnnounceNonBlocking().build() }, }} @@ -628,7 +534,7 @@ func TestDealAutoRestartAfterAutoRecoverableErrors(t *testing.T) { require.NotNil(t, dh) //Check for fast retrieval - require.False(t, td.params.FastRetrieval) + require.True(t, td.params.RemoveUnsealedCopy) sub, err := dh.subscribeUpdates() require.NoError(t, err) @@ -657,7 +563,7 @@ func TestOfflineDealRestartAfterManualRecoverableErrors(t *testing.T) { }, expectedErr: "mismatcherr", onResume: func(builder *testDealBuilder) *testDeal { - return builder.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().build() + return builder.withCommpNonBlocking().withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build() }, }} @@ -734,7 +640,7 @@ func TestDealRestartAfterManualRecoverableErrors(t *testing.T) { }, expectedErr: "puberr", onResume: func(builder *testDealBuilder) *testDeal { - return builder.withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().build() + return builder.withPublishNonBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().build() }, }} @@ -1102,6 +1008,31 @@ func TestDealVerification(t *testing.T) { } } +func TestIPNIAnnounce(t *testing.T) { + ctx := context.Background() + + runTest := func(announce bool) { + // setup the provider test harness + harness := NewHarness(t) + harness.Start(t, ctx) + defer harness.Stop() + + td := harness.newDealBuilder(t, 1).withAllMinerCallsNonBlocking().withNormalHttpServer(). + withDealParamAnnounce(announce).build() + require.NoError(t, td.executeAndSubscribe()) + + td.waitForAndAssert(t, ctx, dealcheckpoints.IndexedAndAnnounced) + } + + t.Run("announce", func(t *testing.T) { + runTest(true) + }) + + t.Run("don't announce", func(t *testing.T) { + runTest(false) + }) +} + func (h *ProviderHarness) executeNDealsConcurrentAndWaitFor(t *testing.T, nDeals int, buildDeal func(i int) *testDeal, waitF func(i int, td *testDeal) error) []*testDeal { tds := make([]*testDeal, 0, nDeals) @@ -1163,6 +1094,11 @@ func (h *ProviderHarness) AssertPieceAdded(t *testing.T, ctx context.Context, dp //require.Empty(t, rg.CarPath) } +func (h *ProviderHarness) AssertDealIndexed(t *testing.T, ctx context.Context, dp *types.DealParams, so *smtestutil.StubbedMinerOutput) { + h.AssertEventuallyDealCleanedup(t, ctx, dp) + h.AssertDealDBState(t, ctx, dp, so.DealID, &so.FinalPublishCid, dealcheckpoints.IndexedAndAnnounced, so.SectorID, so.Offset, dp.ClientDealProposal.Proposal.PieceSize.Unpadded().Padded(), "") +} + func (h *ProviderHarness) EventuallyAssertNoTagged(t *testing.T, ctx context.Context) { h.EventuallyAssertStorageFundState(t, ctx, 0, abi.NewTokenAmount(0), abi.NewTokenAmount(0)) } @@ -1523,7 +1459,7 @@ func NewHarness(t *testing.T, opts ...harnessOpt) *ProviderHarness { }, } prov, err := NewProvider(prvCfg, sqldb, dealsDB, fm, sm, fn, minerStub, minerAddr, minerStub, minerStub, sps, minerStub, df, sqldb, - logsDB, pm, &NoOpIndexProvider{}, askStore, &mockSignatureVerifier{true, nil}, dl, tspt) + logsDB, pm, minerStub, askStore, &mockSignatureVerifier{true, nil}, dl, tspt) require.NoError(t, err) ph.Provider = prov @@ -1578,7 +1514,7 @@ func (h *ProviderHarness) shutdownAndCreateNewProvider(t *testing.T, opts ...har // construct a new provider with pre-existing state prov, err := NewProvider(h.Provider.config, h.Provider.db, h.Provider.dealsDB, h.Provider.fundManager, h.Provider.storageManager, h.Provider.fullnodeApi, h.MinerStub, h.MinerAddr, h.MinerStub, h.MinerStub, h.MockSealingPipelineAPI, h.MinerStub, - df, h.Provider.logsSqlDB, h.Provider.logsDB, h.Provider.piecedirectory, &NoOpIndexProvider{}, h.Provider.askGetter, + df, h.Provider.logsSqlDB, h.Provider.logsDB, h.Provider.piecedirectory, h.MinerStub, h.Provider.askGetter, h.Provider.sigVerifier, h.Provider.dealLogger, h.Provider.Transport) require.NoError(t, err) @@ -1788,7 +1724,7 @@ func (ph *ProviderHarness) newDealBuilder(t *testing.T, seed int, opts ...dealPr Params: xferParams, Size: uint64(carv2Fileinfo.Size()), }, - FastRetrieval: false, + RemoveUnsealedCopy: true, } td := &testDeal{ @@ -1825,6 +1761,7 @@ type testDealBuilder struct { msPublish *minerStubCall msPublishConfirm *minerStubCall msAddPiece *minerStubCall + msAnnounce *minerStubCall } func (tbuilder *testDealBuilder) withPublishFailing(err error) *testDealBuilder { @@ -1887,11 +1824,22 @@ func (tbuilder *testDealBuilder) withAddPieceNonBlocking() *testDealBuilder { return tbuilder } +func (tbuilder *testDealBuilder) withAnnounceBlocking() *testDealBuilder { + tbuilder.msAnnounce = &minerStubCall{blocking: true} + return tbuilder +} + +func (tbuilder *testDealBuilder) withAnnounceNonBlocking() *testDealBuilder { + tbuilder.msAnnounce = &minerStubCall{blocking: true} + return tbuilder +} + func (tbuilder *testDealBuilder) withAllMinerCallsNonBlocking() *testDealBuilder { tbuilder.msCommp = &minerStubCall{blocking: false} tbuilder.msPublish = &minerStubCall{blocking: false} tbuilder.msPublishConfirm = &minerStubCall{blocking: false} tbuilder.msAddPiece = &minerStubCall{blocking: false} + tbuilder.msAnnounce = &minerStubCall{blocking: false} return tbuilder } @@ -1900,6 +1848,7 @@ func (tbuilder *testDealBuilder) withAllMinerCallsBlocking() *testDealBuilder { tbuilder.msPublish = &minerStubCall{blocking: true} tbuilder.msPublishConfirm = &minerStubCall{blocking: true} tbuilder.msAddPiece = &minerStubCall{blocking: true} + tbuilder.msAnnounce = &minerStubCall{blocking: true} return tbuilder } @@ -1934,12 +1883,17 @@ func (tbuilder *testDealBuilder) setTransferParams(serverURL string) { tbuilder.td.params.Transfer.Params = transferParamsJSON } +func (tbuilder *testDealBuilder) withDealParamAnnounce(announce bool) *testDealBuilder { + tbuilder.td.params.SkipIPNIAnnounce = !announce + return tbuilder +} + func (tbuilder *testDealBuilder) build() *testDeal { // if the miner stub is supposed to be a no-op, setup a no-op and don't build any other stub behaviour if tbuilder.msNoOp { tbuilder.ms.SetupNoOp() } else { - tbuilder.buildCommp().buildPublish().buildPublishConfirm().buildAddPiece() + tbuilder.buildCommp().buildPublish().buildPublishConfirm().buildAddPiece().buildAnnounce() } testDeal := tbuilder.td @@ -1996,6 +1950,13 @@ func (tbuilder *testDealBuilder) buildAddPiece() *testDealBuilder { return tbuilder } +func (tbuilder *testDealBuilder) buildAnnounce() *testDealBuilder { + if tbuilder.msAnnounce != nil { + tbuilder.ms.SetupAnnounce(tbuilder.msAnnounce.blocking, !tbuilder.td.params.SkipIPNIAnnounce) + } + return tbuilder +} + type testDeal struct { ph *ProviderHarness params *types.DealParams @@ -2140,6 +2101,8 @@ func (td *testDeal) waitForAndAssert(t *testing.T, ctx context.Context, cp dealc td.ph.AssertPublishConfirmed(t, ctx, td.params, td.stubOutput) case dealcheckpoints.AddedPiece: td.ph.AssertPieceAdded(t, ctx, td.params, td.stubOutput, td.carv2FilePath) + case dealcheckpoints.IndexedAndAnnounced: + td.ph.AssertDealIndexed(t, ctx, td.params, td.stubOutput) default: t.Fail() } @@ -2169,10 +2132,6 @@ func (td *testDeal) assertPieceAdded(t *testing.T, ctx context.Context) { td.ph.AssertPieceAdded(t, ctx, td.params, td.stubOutput, td.carv2FilePath) } -func (td *testDeal) assertDealPublished(t *testing.T, ctx context.Context) { - td.ph.AssertPublished(t, ctx, td.params, td.stubOutput) -} - func (td *testDeal) assertDealFailedTransferNonRecoverable(t *testing.T, ctx context.Context, errStr string) { td.ph.AssertDealFailedTransferNonRecoverable(t, ctx, td.params, errStr) } @@ -2191,20 +2150,6 @@ func (td *testDeal) assertDealFailedNonRecoverable(t *testing.T, ctx context.Con require.EqualValues(t, types.DealRetryFatal, dbState.Retry) } -type NoOpIndexProvider struct{} - -func (n *NoOpIndexProvider) Enabled() bool { - return true -} - -func (n *NoOpIndexProvider) AnnounceBoostDeal(ctx context.Context, pds *types.ProviderDealState) (cid.Cid, error) { - return testutil.GenerateCid(), nil -} - -func (n *NoOpIndexProvider) Start(_ context.Context) { - -} - type mockAskStore struct { ask *storagemarket.StorageAsk } diff --git a/storagemarket/provider_transfer_test.go b/storagemarket/provider_transfer_test.go index 5ed35fa95..39757ba2f 100644 --- a/storagemarket/provider_transfer_test.go +++ b/storagemarket/provider_transfer_test.go @@ -146,7 +146,7 @@ func TestCancelTransferForTransferredDealFails(t *testing.T) { defer harness.Stop() // build the deal proposal - td := harness.newDealBuilder(t, 1).withCommpNonBlocking().withPublishBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withNormalHttpServer().build() + td := harness.newDealBuilder(t, 1).withCommpNonBlocking().withPublishBlocking().withPublishConfirmNonBlocking().withAddPieceNonBlocking().withAnnounceNonBlocking().withNormalHttpServer().build() // execute deal err := td.executeAndSubscribe() diff --git a/storagemarket/smtestutil/mocks.go b/storagemarket/smtestutil/mocks.go index 7fe101b9d..a07d3172e 100644 --- a/storagemarket/smtestutil/mocks.go +++ b/storagemarket/smtestutil/mocks.go @@ -8,6 +8,7 @@ import ( "github.com/filecoin-project/boost/storagemarket/types" "github.com/filecoin-project/boost/storagemarket/types/mock_types" + "github.com/filecoin-project/boost/testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/builtin/v9/market" @@ -23,12 +24,14 @@ type MinerStub struct { *mock_types.MockChainDealManager *mock_types.MockPieceAdder *mock_types.MockCommpCalculator + *mock_types.MockIndexProvider lk sync.Mutex unblockCommp map[uuid.UUID]chan struct{} unblockPublish map[uuid.UUID]chan struct{} unblockWaitForPublish map[uuid.UUID]chan struct{} unblockAddPiece map[uuid.UUID]chan struct{} + unblockAnnounce map[uuid.UUID]chan struct{} } func NewMinerStub(ctrl *gomock.Controller) *MinerStub { @@ -37,11 +40,13 @@ func NewMinerStub(ctrl *gomock.Controller) *MinerStub { MockDealPublisher: mock_types.NewMockDealPublisher(ctrl), MockChainDealManager: mock_types.NewMockChainDealManager(ctrl), MockPieceAdder: mock_types.NewMockPieceAdder(ctrl), + MockIndexProvider: mock_types.NewMockIndexProvider(ctrl), unblockCommp: make(map[uuid.UUID]chan struct{}), unblockPublish: make(map[uuid.UUID]chan struct{}), unblockWaitForPublish: make(map[uuid.UUID]chan struct{}), unblockAddPiece: make(map[uuid.UUID]chan struct{}), + unblockAnnounce: make(map[uuid.UUID]chan struct{}), } } @@ -99,14 +104,6 @@ type MinerStubBuilder struct { rb *[]byte } -func (mb *MinerStubBuilder) SetupAllNonBlocking() *MinerStubBuilder { - return mb.SetupCommp(false).SetupPublish(false).SetupPublishConfirm(false).SetupAddPiece(false) -} - -func (mb *MinerStubBuilder) SetupAllBlocking() *MinerStubBuilder { - return mb.SetupCommp(true).SetupPublish(true).SetupPublishConfirm(true).SetupAddPiece(true) -} - func (mb *MinerStubBuilder) SetupNoOp() *MinerStubBuilder { mb.stub.MockCommpCalculator.EXPECT().ComputeDataCid(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any()).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { return abi.PieceInfo{ @@ -130,6 +127,10 @@ func (mb *MinerStubBuilder) SetupNoOp() *MinerStubBuilder { return mb.sectorId, mb.offset, nil }).AnyTimes() + mb.stub.MockIndexProvider.EXPECT().Enabled().Return(true).AnyTimes() + mb.stub.MockIndexProvider.EXPECT().Start(gomock.Any()).AnyTimes() + mb.stub.MockIndexProvider.EXPECT().AnnounceBoostDeal(gomock.Any(), gomock.Any()).Return(testutil.GenerateCid(), nil).AnyTimes() + return mb } @@ -262,7 +263,7 @@ func (mb *MinerStubBuilder) SetupAddPiece(blocking bool) *MinerStubBuilder { StartEpoch: mb.dp.ClientDealProposal.Proposal.StartEpoch, EndEpoch: mb.dp.ClientDealProposal.Proposal.EndEpoch, }, - KeepUnsealed: mb.dp.FastRetrieval, + KeepUnsealed: !mb.dp.RemoveUnsealedCopy, } var readBytes []byte @@ -300,7 +301,7 @@ func (mb *MinerStubBuilder) SetupAddPieceFailure(err error) { StartEpoch: mb.dp.ClientDealProposal.Proposal.StartEpoch, EndEpoch: mb.dp.ClientDealProposal.Proposal.EndEpoch, }, - KeepUnsealed: mb.dp.FastRetrieval, + KeepUnsealed: !mb.dp.RemoveUnsealedCopy, } mb.stub.MockPieceAdder.EXPECT().AddPiece(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Eq(sdInfo)).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { @@ -308,6 +309,41 @@ func (mb *MinerStubBuilder) SetupAddPieceFailure(err error) { }) } +func (mb *MinerStubBuilder) SetupAnnounce(blocking bool, announce bool) *MinerStubBuilder { + mb.stub.lk.Lock() + if blocking { + mb.stub.unblockAnnounce[mb.dp.DealUUID] = make(chan struct{}) + } + mb.stub.lk.Unlock() + + var callCount int + if announce { + callCount = 1 + } + + mb.stub.MockIndexProvider.EXPECT().Enabled().AnyTimes().Return(true) + mb.stub.MockIndexProvider.EXPECT().Start(gomock.Any()).AnyTimes() + mb.stub.MockIndexProvider.EXPECT().AnnounceBoostDeal(gomock.Any(), gomock.Any()).Times(callCount).DoAndReturn(func(ctx context.Context, _ *types.ProviderDealState) (cid.Cid, error) { + mb.stub.lk.Lock() + ch := mb.stub.unblockAddPiece[mb.dp.DealUUID] + mb.stub.lk.Unlock() + if ch != nil { + select { + case <-ctx.Done(): + return cid.Undef, ctx.Err() + case <-ch: + } + + } + if ctx.Err() != nil { + return cid.Undef, ctx.Err() + } + + return testutil.GenerateCid(), nil + }) + return mb +} + func (mb *MinerStubBuilder) Output() *StubbedMinerOutput { return &StubbedMinerOutput{ PublishCid: mb.publishCid, diff --git a/storagemarket/types/deal_state.go b/storagemarket/types/deal_state.go index d41b41ee6..092536b0d 100644 --- a/storagemarket/types/deal_state.go +++ b/storagemarket/types/deal_state.go @@ -61,6 +61,9 @@ type ProviderDealState struct { // Keep unsealed copy of the data FastRetrieval bool + + //Announce deal to the IPNI(Index Provider) + AnnounceToIPNI bool } func (d *ProviderDealState) String() string { diff --git a/storagemarket/types/mock_types/mocks.go b/storagemarket/types/mock_types/mocks.go index 399d93653..942864cd6 100644 --- a/storagemarket/types/mock_types/mocks.go +++ b/storagemarket/types/mock_types/mocks.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/filecoin-project/boost/storagemarket/types (interfaces: PieceAdder,CommpCalculator,DealPublisher,ChainDealManager) +// Source: github.com/filecoin-project/boost/storagemarket/types (interfaces: PieceAdder,CommpCalculator,DealPublisher,ChainDealManager,IndexProvider) // Package mock_types is a generated GoMock package. package mock_types @@ -9,6 +9,7 @@ import ( io "io" reflect "reflect" + types "github.com/filecoin-project/boost/storagemarket/types" storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" abi "github.com/filecoin-project/go-state-types/abi" market "github.com/filecoin-project/go-state-types/builtin/v9/market" @@ -169,3 +170,67 @@ func (mr *MockChainDealManagerMockRecorder) WaitForPublishDeals(arg0, arg1, arg2 mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForPublishDeals", reflect.TypeOf((*MockChainDealManager)(nil).WaitForPublishDeals), arg0, arg1, arg2) } + +// MockIndexProvider is a mock of IndexProvider interface. +type MockIndexProvider struct { + ctrl *gomock.Controller + recorder *MockIndexProviderMockRecorder +} + +// MockIndexProviderMockRecorder is the mock recorder for MockIndexProvider. +type MockIndexProviderMockRecorder struct { + mock *MockIndexProvider +} + +// NewMockIndexProvider creates a new mock instance. +func NewMockIndexProvider(ctrl *gomock.Controller) *MockIndexProvider { + mock := &MockIndexProvider{ctrl: ctrl} + mock.recorder = &MockIndexProviderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockIndexProvider) EXPECT() *MockIndexProviderMockRecorder { + return m.recorder +} + +// AnnounceBoostDeal mocks base method. +func (m *MockIndexProvider) AnnounceBoostDeal(arg0 context.Context, arg1 *types.ProviderDealState) (cid.Cid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AnnounceBoostDeal", arg0, arg1) + ret0, _ := ret[0].(cid.Cid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AnnounceBoostDeal indicates an expected call of AnnounceBoostDeal. +func (mr *MockIndexProviderMockRecorder) AnnounceBoostDeal(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AnnounceBoostDeal", reflect.TypeOf((*MockIndexProvider)(nil).AnnounceBoostDeal), arg0, arg1) +} + +// Enabled mocks base method. +func (m *MockIndexProvider) Enabled() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Enabled") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Enabled indicates an expected call of Enabled. +func (mr *MockIndexProviderMockRecorder) Enabled() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enabled", reflect.TypeOf((*MockIndexProvider)(nil).Enabled)) +} + +// Start mocks base method. +func (m *MockIndexProvider) Start(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start", arg0) +} + +// Start indicates an expected call of Start. +func (mr *MockIndexProviderMockRecorder) Start(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockIndexProvider)(nil).Start), arg0) +} diff --git a/storagemarket/types/types.go b/storagemarket/types/types.go index a70b49a87..516ca48e8 100644 --- a/storagemarket/types/types.go +++ b/storagemarket/types/types.go @@ -24,7 +24,7 @@ import ( ) //go:generate cbor-gen-for --map-encoding StorageAsk DealParams Transfer DealResponse DealStatusRequest DealStatusResponse DealStatus -//go:generate go run github.com/golang/mock/mockgen -destination=mock_types/mocks.go -package=mock_types . PieceAdder,CommpCalculator,DealPublisher,ChainDealManager +//go:generate go run github.com/golang/mock/mockgen -destination=mock_types/mocks.go -package=mock_types . PieceAdder,CommpCalculator,DealPublisher,ChainDealManager,IndexProvider // StorageAsk defines the parameters by which a miner will choose to accept or // reject a deal. Note: making a storage deal proposal which matches the miner's @@ -83,7 +83,8 @@ type DealParams struct { ClientDealProposal market.ClientDealProposal DealDataRoot cid.Cid Transfer Transfer // Transfer params will be the zero value if this is an offline deal - FastRetrieval bool + RemoveUnsealedCopy bool + SkipIPNIAnnounce bool } type DealFilterParams struct { diff --git a/storagemarket/types/types_cbor_gen.go b/storagemarket/types/types_cbor_gen.go index 63787de2b..4a25e7206 100644 --- a/storagemarket/types/types_cbor_gen.go +++ b/storagemarket/types/types_cbor_gen.go @@ -228,7 +228,7 @@ func (t *DealParams) MarshalCBOR(w io.Writer) error { cw := cbg.NewCborWriter(w) - if _, err := cw.Write([]byte{166}); err != nil { + if _, err := cw.Write([]byte{167}); err != nil { return err } @@ -320,19 +320,35 @@ func (t *DealParams) MarshalCBOR(w io.Writer) error { return err } - // t.FastRetrieval (bool) (bool) - if len("FastRetrieval") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"FastRetrieval\" was too long") + // t.RemoveUnsealedCopy (bool) (bool) + if len("RemoveUnsealedCopy") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"RemoveUnsealedCopy\" was too long") } - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("FastRetrieval"))); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoveUnsealedCopy"))); err != nil { return err } - if _, err := io.WriteString(w, string("FastRetrieval")); err != nil { + if _, err := io.WriteString(w, string("RemoveUnsealedCopy")); err != nil { return err } - if err := cbg.WriteBool(w, t.FastRetrieval); err != nil { + if err := cbg.WriteBool(w, t.RemoveUnsealedCopy); err != nil { + return err + } + + // t.SkipIPNIAnnounce (bool) (bool) + if len("SkipIPNIAnnounce") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"SkipIPNIAnnounce\" was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("SkipIPNIAnnounce"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("SkipIPNIAnnounce")); err != nil { + return err + } + + if err := cbg.WriteBool(w, t.SkipIPNIAnnounce); err != nil { return err } return nil @@ -451,8 +467,26 @@ func (t *DealParams) UnmarshalCBOR(r io.Reader) (err error) { } } - // t.FastRetrieval (bool) (bool) - case "FastRetrieval": + // t.RemoveUnsealedCopy (bool) (bool) + case "RemoveUnsealedCopy": + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.RemoveUnsealedCopy = false + case 21: + t.RemoveUnsealedCopy = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + // t.SkipIPNIAnnounce (bool) (bool) + case "SkipIPNIAnnounce": maj, extra, err = cr.ReadHeader() if err != nil { @@ -463,9 +497,9 @@ func (t *DealParams) UnmarshalCBOR(r io.Reader) (err error) { } switch extra { case 20: - t.FastRetrieval = false + t.SkipIPNIAnnounce = false case 21: - t.FastRetrieval = true + t.SkipIPNIAnnounce = true default: return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) }
Stage Sector
{moment(worker.Start).format(dateFormat)} {worker.ID}