diff --git a/actors/market/src/lib.rs b/actors/market/src/lib.rs index 7906d54acc..dd1d66a84b 100644 --- a/actors/market/src/lib.rs +++ b/actors/market/src/lib.rs @@ -93,6 +93,7 @@ pub enum Method { GetDealProviderCollateralExported = frc42_dispatch::method_hash!("GetDealProviderCollateral"), GetDealVerifiedExported = frc42_dispatch::method_hash!("GetDealVerified"), GetDealActivationExported = frc42_dispatch::method_hash!("GetDealActivation"), + ActivateDealsBatchExported = frc42_dispatch::method_hash!("ActivateDealsBatch"), } /// Market Actor @@ -530,12 +531,36 @@ impl Actor { Ok(VerifyDealsForActivationReturn { sectors: sectors_data }) } + fn activate_deals_batch( + rt: &impl Runtime, + params: ActivateDealsBatchParams, + ) -> Result { + rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?; + + let mut sectors = Vec::new(); + + for (activate_params, sector_number) in params.sectors { + let expiry = activate_params.sector_expiry; + let res = Self::activate_deals_inner(rt, activate_params)?; + sectors.push((res, sector_number, expiry)); + } + + Ok(ActivateDealsBatchResult { sectors }) + } + /// Activate a set of deals, returning the combined deal space and extra info for verified deals. fn activate_deals( rt: &impl Runtime, params: ActivateDealsParams, ) -> Result { rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?; + Self::activate_deals_inner(rt, params) + } + + fn activate_deals_inner( + rt: &impl Runtime, + params: ActivateDealsParams, + ) -> Result { let miner_addr = rt.message().caller(); let curr_epoch = rt.curr_epoch(); @@ -608,7 +633,7 @@ impl Actor { slash_epoch: EPOCH_UNDEFINED, verified_claim: allocation, }, - )); + )) } st.put_deal_states(rt.store(), &deal_states)?; @@ -1418,5 +1443,6 @@ impl ActorCode for Actor { GetDealProviderCollateralExported => get_deal_provider_collateral, GetDealVerifiedExported => get_deal_verified, GetDealActivationExported => get_deal_activation, + ActivateDealsBatchExported => activate_deals_batch, } } diff --git a/actors/market/src/types.rs b/actors/market/src/types.rs index 7bc0d1d8bd..51ef3733bb 100644 --- a/actors/market/src/types.rs +++ b/actors/market/src/types.rs @@ -13,6 +13,7 @@ use fvm_shared::clock::ChainEpoch; use fvm_shared::deal::DealID; use fvm_shared::econ::TokenAmount; use fvm_shared::piece::PaddedPieceSize; +use fvm_shared::sector::SectorNumber; use fvm_shared::ActorID; use crate::Label; @@ -117,6 +118,16 @@ pub struct ActivateDealsResult { pub verified_infos: Vec, } +#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)] +pub struct ActivateDealsBatchParams { + pub sectors: Vec<(ActivateDealsParams, SectorNumber)>, +} + +#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)] +pub struct ActivateDealsBatchResult { + pub sectors: Vec<(ActivateDealsResult, SectorNumber, ChainEpoch)>, +} + #[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)] pub struct DealSpaces { #[serde(with = "bigint_ser")] diff --git a/actors/miner/src/ext.rs b/actors/miner/src/ext.rs index b151019fa9..57e79ad7a8 100644 --- a/actors/miner/src/ext.rs +++ b/actors/miner/src/ext.rs @@ -24,6 +24,8 @@ pub mod market { pub const ACTIVATE_DEALS_METHOD: u64 = 6; pub const ON_MINER_SECTORS_TERMINATE_METHOD: u64 = 7; pub const COMPUTE_DATA_COMMITMENT_METHOD: u64 = 8; + pub const ACTIVATE_DEALS_BATCH_EXPORTED: u64 = + frc42_dispatch::method_hash!("ActivateDealsBatch"); #[derive(Serialize_tuple, Deserialize_tuple)] pub struct SectorDeals { @@ -38,6 +40,16 @@ pub mod market { pub sector_expiry: ChainEpoch, } + #[derive(Serialize_tuple, Deserialize_tuple)] + pub struct ActivateDealsBatchParams { + pub sectors: Vec<(ActivateDealsParams, SectorNumber)>, + } + + #[derive(Serialize_tuple, Deserialize_tuple)] + pub struct ActivateDealsBatchResult { + pub sectors: Vec<(ActivateDealsResult, SectorNumber, ChainEpoch)>, + } + #[derive(Serialize_tuple, Deserialize_tuple, Clone)] pub struct VerifiedDealInfo { pub client: ActorID, @@ -160,6 +172,8 @@ pub mod verifreg { pub const GET_CLAIMS_METHOD: u64 = 10; pub const CLAIM_ALLOCATIONS_METHOD: u64 = 9; + pub const CLAIM_ALLOCATIONS_BATCH_METHOD: u64 = + frc42_dispatch::method_hash!("ClaimAllocationsBatch"); pub type ClaimID = u64; pub type AllocationID = u64; @@ -216,4 +230,14 @@ pub mod verifreg { #[serde(with = "bigint_ser")] pub claimed_space: BigInt, } + + #[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)] + pub struct ClaimAllocationsBatchParams { + pub claims: Vec, + } + + #[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)] + pub struct ClaimAllocationsBatchReturn { + pub claims: Vec, + } } diff --git a/actors/miner/src/lib.rs b/actors/miner/src/lib.rs index e85389c76b..d93e83c983 100644 --- a/actors/miner/src/lib.rs +++ b/actors/miner/src/lib.rs @@ -10,6 +10,8 @@ use std::ops::Neg; use anyhow::{anyhow, Error}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use cid::Cid; +use ext::market::ActivateDealsParams; +use ext::verifreg::{ClaimAllocationsParams, SectorAllocationClaim}; use fvm_ipld_bitfield::{BitField, Validate}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::{from_slice, BytesDe, CborStore}; @@ -4795,43 +4797,29 @@ fn confirm_sector_proofs_valid_internal( // Ideally, we'd combine some of these operations, but at least we have // a constant number of them. let activation = rt.curr_epoch(); - // Pre-commits for new sectors. - let mut valid_pre_commits = Vec::default(); - for pre_commit in pre_commits { - match activate_deals_and_claim_allocations( - rt, - pre_commit.clone().info.deal_ids, - pre_commit.info.expiration, - pre_commit.info.sector_number, - )? { - None => { - info!( - "failed to activate deals on sector {}, dropping from prove commit set", - pre_commit.info.sector_number, - ); - continue; - } - Some(deal_spaces) => valid_pre_commits.push((pre_commit, deal_spaces)), - }; - } - - // When all prove commits have failed abort early - if valid_pre_commits.is_empty() { + // Pre-commits for new sectors. + // TODO: either all the pre_commits are valid or it aborts. we should handle the case where only some are valid + let deal_spaces = batch_activate_deals_and_claim_allocations(rt, pre_commits.clone())?; + if deal_spaces.is_none() { return Err(actor_error!(illegal_argument, "all prove commits failed to validate")); } + // assume that when we get here all pre_commits are valid + let deal_spaces = deal_spaces.unwrap(); + let (total_pledge, newly_vested) = rt.transaction(|state: &mut State, rt| { let policy = rt.policy(); let store = rt.store(); let info = get_miner_info(store, state)?; - let mut new_sector_numbers = Vec::::with_capacity(valid_pre_commits.len()); + let mut new_sector_numbers = Vec::::with_capacity(pre_commits.len()); let mut deposit_to_unlock = TokenAmount::zero(); let mut new_sectors = Vec::::new(); let mut total_pledge = TokenAmount::zero(); - for (pre_commit, deal_spaces) in valid_pre_commits { + // FIXME: this is hacky and wrong, the deal space needs to be calculated per sector + for (pre_commit, deal_spaces) in pre_commits.iter().zip(deal_spaces.iter()) { // compute initial pledge let duration = pre_commit.info.expiration - activation; @@ -4844,8 +4832,8 @@ fn confirm_sector_proofs_valid_internal( continue; } - let deal_weight = deal_spaces.deal_space * duration; - let verified_deal_weight = deal_spaces.verified_deal_space * duration; + let deal_weight = deal_spaces.deal_space.clone() * duration; + let verified_deal_weight = deal_spaces.verified_deal_space.clone() * duration; let power = qa_power_for_weight( info.sector_size, @@ -4886,7 +4874,7 @@ fn confirm_sector_proofs_valid_internal( sector_number: pre_commit.info.sector_number, seal_proof: pre_commit.info.seal_proof, sealed_cid: pre_commit.info.sealed_cid, - deal_ids: pre_commit.info.deal_ids, + deal_ids: pre_commit.info.deal_ids.clone(), expiration: pre_commit.info.expiration, activation, deal_weight, @@ -4962,6 +4950,87 @@ fn confirm_sector_proofs_valid_internal( Ok(()) } +fn batch_activate_deals_and_claim_allocations( + rt: &impl Runtime, + pre_commits: Vec, +) -> Result>, ActorError> { + let deal_info = pre_commits + .iter() + .map(|pre_commit| { + ( + ActivateDealsParams { + deal_ids: pre_commit.info.deal_ids.clone(), + sector_expiry: pre_commit.info.expiration, + }, + pre_commit.info.sector_number, + ) + }) + .collect(); + + // check and activate storage deals. Abort if checks failed + let batch_activate_raw = extract_send_result(rt.send_simple( + &STORAGE_MARKET_ACTOR_ADDR, + ext::market::ACTIVATE_DEALS_BATCH_EXPORTED, + IpldBlock::serialize_cbor(&ext::market::ActivateDealsBatchParams { sectors: deal_info })?, + TokenAmount::zero(), + )); + let batch_activate_res: ext::market::ActivateDealsBatchResult = match batch_activate_raw { + Ok(res) => deserialize_block(res)?, + Err(e) => { + info!("error batch activating deals {}", e.msg()); + return Ok(None); + } + }; + + let sector_claims: Vec = batch_activate_res + .sectors + .iter() + .map(|(a, b, c)| ClaimAllocationsParams { + all_or_nothing: true, + sectors: a + .verified_infos + .iter() + .map(|i| SectorAllocationClaim { + allocation_id: i.allocation_id, + client: i.client, + data: i.data, + sector: *b, + sector_expiry: *c, + size: i.size, + }) + .collect(), + }) + .collect(); + + let claim_raw = extract_send_result(rt.send_simple( + &VERIFIED_REGISTRY_ACTOR_ADDR, + ext::verifreg::CLAIM_ALLOCATIONS_BATCH_METHOD, + IpldBlock::serialize_cbor(&ext::verifreg::ClaimAllocationsBatchParams { + claims: sector_claims, + })?, + TokenAmount::zero(), + )); + let claim_res: ext::verifreg::ClaimAllocationsBatchReturn = match claim_raw { + Ok(res) => deserialize_block(res)?, + Err(e) => { + info!("error claiming allocation: {}", e.msg()); + return Ok(None); + } + }; + + let deal_spaces = claim_res + .claims + .iter() + .zip(batch_activate_res.sectors.iter()) + .map(|(claim, sector)| ext::market::DealSpaces { + deal_space: sector.0.nonverified_deal_space.clone(), + verified_deal_space: claim.claimed_space.clone(), + }) + .collect_vec(); + + Ok(Some(deal_spaces)) +} + // activate deals with builtin market and claim allocations with verified registry actor // returns an error in case of a fatal programmer error // returns Ok(None) in case deal activation or verified allocation claim fails diff --git a/actors/verifreg/src/lib.rs b/actors/verifreg/src/lib.rs index 339ceea13d..9d712846f9 100644 --- a/actors/verifreg/src/lib.rs +++ b/actors/verifreg/src/lib.rs @@ -71,6 +71,7 @@ pub enum Method { ExtendClaimTermsExported = frc42_dispatch::method_hash!("ExtendClaimTerms"), RemoveExpiredClaimsExported = frc42_dispatch::method_hash!("RemoveExpiredClaims"), UniversalReceiverHook = frc42_dispatch::method_hash!("Receive"), + ClaimAllocationsBatch = frc42_dispatch::method_hash!("ClaimAllocationsBatch"), } pub struct Actor; @@ -363,6 +364,110 @@ impl Actor { }) } + pub fn claim_allocations_batch( + rt: &impl Runtime, + params: ClaimAllocationsBatchParams, + ) -> Result { + rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?; + let provider = rt.message().caller().id().unwrap(); + if params.claims.iter().all(|claim| claim.sectors.is_empty()) { + return Err(actor_error!(illegal_argument, "claim allocations called with no claims")); + } + + let mut rets: Vec = Vec::new(); + + for p in params.claims { + let mut datacap_claimed = DataCap::zero(); + let mut ret_gen = BatchReturnGen::new(p.sectors.len()); + let all_or_nothing = p.all_or_nothing; + rt.transaction(|st: &mut State, rt| { + let mut claims = st.load_claims(rt.store())?; + let mut allocs = st.load_allocs(rt.store())?; + + for claim_alloc in p.sectors { + let maybe_alloc = state::get_allocation( + &mut allocs, + claim_alloc.client, + claim_alloc.allocation_id, + )?; + let alloc: &Allocation = match maybe_alloc { + None => { + ret_gen.add_fail(ExitCode::USR_NOT_FOUND); + info!( + "no allocation {} for client {}", + claim_alloc.allocation_id, claim_alloc.client, + ); + continue; + } + Some(a) => a, + }; + + if !can_claim_alloc(&claim_alloc, provider, alloc, rt.curr_epoch()) { + ret_gen.add_fail(ExitCode::USR_FORBIDDEN); + info!( + "invalid sector {:?} for allocation {}", + claim_alloc.sector, claim_alloc.allocation_id, + ); + continue; + } + + let new_claim = Claim { + provider, + client: alloc.client, + data: alloc.data, + size: alloc.size, + term_min: alloc.term_min, + term_max: alloc.term_max, + term_start: rt.curr_epoch(), + sector: claim_alloc.sector, + }; + + let inserted = claims + .put_if_absent(provider, claim_alloc.allocation_id, new_claim) + .context_code( + ExitCode::USR_ILLEGAL_STATE, + format!("failed to write claim {}", claim_alloc.allocation_id), + )?; + if !inserted { + ret_gen.add_fail(ExitCode::USR_ILLEGAL_STATE); + // should be unreachable since claim and alloc can't exist at once + info!( + "claim for allocation {} could not be inserted as it already exists", + claim_alloc.allocation_id, + ); + continue; + } + + allocs.remove(claim_alloc.client, claim_alloc.allocation_id).context_code( + ExitCode::USR_ILLEGAL_STATE, + format!("failed to remove allocation {}", claim_alloc.allocation_id), + )?; + + datacap_claimed += DataCap::from(claim_alloc.size.0); + ret_gen.add_success(); + } + st.save_allocs(&mut allocs)?; + st.save_claims(&mut claims)?; + Ok(()) + }) + .context("state transaction failed")?; + let batch_info = ret_gen.gen(); + if all_or_nothing && !batch_info.all_ok() { + return Err(actor_error!( + illegal_argument, + "all or nothing call contained failures: {}", + batch_info.to_string() + )); + } + + // Burn the datacap tokens from verified registry's own balance. + burn(rt, &datacap_claimed)?; + + rets.push(ClaimAllocationsReturn { batch_info, claimed_space: datacap_claimed }); + } + + Ok(ClaimAllocationsBatchReturn { claims: rets }) + } // Called by storage provider actor to claim allocations for data provably committed to storage. // For each allocation claim, the registry checks that the provided piece CID // and size match that of the allocation. @@ -1089,5 +1194,6 @@ impl ActorCode for Actor { ExtendClaimTerms|ExtendClaimTermsExported => extend_claim_terms, RemoveExpiredClaims|RemoveExpiredClaimsExported => remove_expired_claims, UniversalReceiverHook => universal_receiver_hook, + ClaimAllocationsBatch => claim_allocations_batch, } } diff --git a/actors/verifreg/src/types.rs b/actors/verifreg/src/types.rs index d413556096..e687514aa8 100644 --- a/actors/verifreg/src/types.rs +++ b/actors/verifreg/src/types.rs @@ -143,6 +143,16 @@ pub struct ClaimAllocationsReturn { pub claimed_space: BigInt, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)] +pub struct ClaimAllocationsBatchParams { + pub claims: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)] +pub struct ClaimAllocationsBatchReturn { + pub claims: Vec, +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)] pub struct ClaimTerm { pub provider: ActorID, diff --git a/test_vm/tests/batch_onboarding.rs b/test_vm/tests/batch_onboarding.rs index fae347d75a..f379550f15 100644 --- a/test_vm/tests/batch_onboarding.rs +++ b/test_vm/tests/batch_onboarding.rs @@ -116,13 +116,13 @@ pub fn batch_onboarding_test(v: &dyn VM, v2: bool) { v2, ); precommmits.append(&mut new_precommits); - next_sector_no += item.pre_commit_sector_count; + next_sector_no += item.pre_commit_sector_count as u64; pre_committed_count += item.pre_commit_sector_count; } if item.prove_commit_sector_count > 0 { - let to_prove = precommmits[..item.prove_commit_sector_count as usize].to_vec(); - precommmits = precommmits[item.prove_commit_sector_count as usize..].to_vec(); + let to_prove = precommmits[..item.prove_commit_sector_count].to_vec(); + precommmits = precommmits[item.prove_commit_sector_count..].to_vec(); prove_commit_sectors(v, &worker, &id_addr, to_prove, item.prove_commit_aggregate_size); proven_count += item.prove_commit_sector_count; } @@ -144,7 +144,10 @@ pub fn batch_onboarding_test(v: &dyn VM, v2: bool) { let network_stats = get_network_stats(v); let sector_size = seal_proof.sector_size().unwrap() as u64; - assert_eq!(network_stats.total_bytes_committed, BigInt::from(sector_size * proven_count)); + assert_eq!( + network_stats.total_bytes_committed, + BigInt::from(sector_size * proven_count as u64) + ); assert!(network_stats.total_pledge_collateral.is_positive()); apply_ok( @@ -158,7 +161,10 @@ pub fn batch_onboarding_test(v: &dyn VM, v2: bool) { let network_stats = get_network_stats(v); let sector_size = seal_proof.sector_size().unwrap() as u64; - assert_eq!(network_stats.total_bytes_committed, BigInt::from(sector_size * proven_count)); + assert_eq!( + network_stats.total_bytes_committed, + BigInt::from(sector_size * proven_count as u64) + ); assert!(network_stats.total_pledge_collateral.is_positive()); expect_invariants(v, &[invariant_failure_patterns::REWARD_STATE_EPOCH_MISMATCH.to_owned()]); diff --git a/test_vm/tests/batch_onboarding_deals_test.rs b/test_vm/tests/batch_onboarding_deals_test.rs index 219c296e0d..1aa342daa0 100644 --- a/test_vm/tests/batch_onboarding_deals_test.rs +++ b/test_vm/tests/batch_onboarding_deals_test.rs @@ -148,7 +148,7 @@ fn publish_deals( count: usize, ) -> Vec<(DealID, DealProposal)> { let deal_opts = DealOptions { - piece_size: PaddedPieceSize(32 * 1 << 30), + piece_size: PaddedPieceSize(32 << 30), verified: true, deal_start: v.epoch() + max_prove_commit_duration(&Policy::default(), SEAL_PROOF).unwrap(), deal_lifetime: duration, @@ -159,7 +159,7 @@ fn publish_deals( let ret = batcher.publish_ok(worker); let good_inputs = bf_all(ret.valid_deals); assert_eq!((0..count as u64).collect::>(), good_inputs); - return ret.ids.into_iter().zip(batcher.proposals().iter().map(|p| p.clone())).collect(); + return ret.ids.into_iter().zip(batcher.proposals().iter().cloned()).collect(); } // This method doesn't check any trace expectations. diff --git a/test_vm/tests/publish_deals_test.rs b/test_vm/tests/publish_deals_test.rs index 52405481b4..d34bdbeee8 100644 --- a/test_vm/tests/publish_deals_test.rs +++ b/test_vm/tests/publish_deals_test.rs @@ -34,8 +34,7 @@ use test_vm::util::serialize_ok; use test_vm::util::{ apply_ok, bf_all, create_accounts, create_accounts_seeded, create_miner, verifreg_add_verifier, }; -use test_vm::workflow::{DealBatcher, DealOptions}; -use test_vm::{workflow, VM}; +use test_vm::VM; use test_vm::{ExpectInvocation, TestVM}; struct Addrs { diff --git a/test_vm/tests/replica_update_test.rs b/test_vm/tests/replica_update_test.rs index d6ed0e0dac..056135c7ed 100644 --- a/test_vm/tests/replica_update_test.rs +++ b/test_vm/tests/replica_update_test.rs @@ -1028,7 +1028,7 @@ fn deal_included_in_multiple_sectors_failure_test(v: &dyn VM let first_sector_number = 100; let precommits = precommit_sectors( v, - policy.min_aggregated_sectors, + policy.min_aggregated_sectors as usize, policy.pre_commit_sector_batch_max_size, &worker, &maddr,