Skip to content

Commit

Permalink
perform batch activation in a single state transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
alexytsu committed May 26, 2023
1 parent ccb9eb5 commit 43923d2
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 16 deletions.
122 changes: 114 additions & 8 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use fvm_shared::econ::TokenAmount;
use fvm_shared::error::ExitCode;
use fvm_shared::piece::PieceInfo;
use fvm_shared::reward::ThisEpochRewardReturn;
use fvm_shared::sector::{RegisteredSealProof, SectorSize, StoragePower};
use fvm_shared::sector::{RegisteredSealProof, SectorNumber, SectorSize, StoragePower};
use fvm_shared::{ActorID, METHOD_CONSTRUCTOR, METHOD_SEND};
use integer_encoding::VarInt;
use log::info;
Expand Down Expand Up @@ -531,21 +531,127 @@ impl Actor {
Ok(VerifyDealsForActivationReturn { sectors: sectors_data })
}

/// Activate a batch of deals across sectors, returning the combined deal space and extra info
/// for sectors that were successfully activated
fn activate_deals_batch(
rt: &impl Runtime,
params: ActivateDealsBatchParams,
) -> Result<ActivateDealsBatchResult, ActorError> {
rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?;
Self::activate_deals_batch_inner(rt, params)
}

let mut sectors = Vec::new();
fn activate_deals_batch_inner(
rt: &impl Runtime,
params: ActivateDealsBatchParams,
) -> Result<ActivateDealsBatchResult, ActorError> {
let miner_addr = rt.message().caller();
let curr_epoch = rt.curr_epoch();

for (activate_params, sector_number) in params.sectors {
let expiry = activate_params.sector_expiry;
let res = Self::activate_deals_inner(rt, activate_params)?;
sectors.push((res, sector_number, expiry));
}
let activated_sectors = rt.transaction(|st: &mut State, rt| {
let mut activated_sectors: Vec<(ActivateDealsResult, SectorNumber, ChainEpoch)> =
Vec::new();

for (sector_params, sector_number) in params.sectors {
let proposal_array = st.get_proposal_array(rt.store())?;
let proposals =
get_proposals(&proposal_array, &sector_params.deal_ids, st.next_id)?;

// Check deals in this sector, skipping activation if unable to validate
let deal_spaces = match validate_and_return_deal_space(
&proposals,
&miner_addr,
sector_params.sector_expiry,
curr_epoch,
None,
) {
Ok(deal_spaces) => deal_spaces,
Err(e) => {
info!(
"failed to validate deal proposals from sector {} for activation: {}",
sector_number, e
);
continue;
}
};

// Update deal states
let mut verified_infos = Vec::new();
let mut deal_states: Vec<(DealID, DealState)> = vec![];

for (deal_id, proposal) in proposals {
// This construction could be replaced with a single "update deal state"
// state method, possibly batched over all deal ids at once.
let s = st.find_deal_state(rt.store(), deal_id)?;

if s.is_some() {
return Err(actor_error!(
illegal_argument,
"deal {} already activated",
deal_id
));
}

let propc = rt_deal_cid(rt, &proposal)?;

// Confirm the deal is in the pending proposals queue.
// It will be removed from this queue later, during cron.
let has = st.has_pending_deal(rt.store(), propc)?;

if !has {
return Err(actor_error!(
illegal_state,
"tried to activate deal that was not in the pending set ({})",
propc
));
}

// Extract and remove any verified allocation ID for the pending deal.
let allocation = st
.remove_pending_deal_allocation_id(rt.store(), &deal_id_key(deal_id))?
.unwrap_or((BytesKey(vec![]), NO_ALLOCATION_ID))
.1;

if allocation != NO_ALLOCATION_ID {
verified_infos.push(VerifiedDealInfo {
client: proposal.client.id().unwrap(),
allocation_id: allocation,
data: proposal.piece_cid,
size: proposal.piece_size,
})
}

deal_states.push((
deal_id,
DealState {
sector_start_epoch: curr_epoch,
last_updated_epoch: EPOCH_UNDEFINED,
slash_epoch: EPOCH_UNDEFINED,
verified_claim: allocation,
},
))
}

st.put_deal_states(rt.store(), &deal_states)?;

activated_sectors.push((
ActivateDealsResult {
nonverified_deal_space: deal_spaces.deal_space,
verified_infos,
},
sector_number,
sector_params.sector_expiry,
));
}

if activated_sectors.is_empty() {
return Err(actor_error!(illegal_argument, "all sectors failed to activated"));
}

Ok(activated_sectors)
})?;

Ok(ActivateDealsBatchResult { sectors })
Ok(ActivateDealsBatchResult { sectors: activated_sectors })
}

/// Activate a set of deals, returning the combined deal space and extra info for verified deals.
Expand Down
8 changes: 3 additions & 5 deletions actors/miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4799,7 +4799,6 @@ fn confirm_sector_proofs_valid_internal(
let activation = rt.curr_epoch();

// Pre-commits for new sectors.
// TODO: either all the pre_commits are valid or it aborts. we should handle the case where only some are valid
let deal_spaces = batch_activate_deals_and_claim_allocations(rt, pre_commits.clone())?;
if deal_spaces.is_none() {
return Err(actor_error!(illegal_argument, "all prove commits failed to validate"));
Expand All @@ -4818,7 +4817,6 @@ fn confirm_sector_proofs_valid_internal(
let mut new_sectors = Vec::<SectorOnChainInfo>::new();
let mut total_pledge = TokenAmount::zero();

// FIXME: this is hacky and wrong, the deal space needs to be calculated per sector
for (pre_commit, deal_spaces) in pre_commits.iter().zip(deal_spaces.iter()) {
// compute initial pledge
let duration = pre_commit.info.expiration - activation;
Expand Down Expand Up @@ -4967,7 +4965,7 @@ fn batch_activate_deals_and_claim_allocations(
})
.collect();

// check and activate storage deals. Abort if checks failed
// check and activate storage deals, returns the activated claims
let batch_activate_raw = extract_send_result(rt.send_simple(
&STORAGE_MARKET_ACTOR_ADDR,
ext::market::ACTIVATE_DEALS_BATCH_EXPORTED,
Expand All @@ -4982,7 +4980,7 @@ fn batch_activate_deals_and_claim_allocations(
}
};

let sector_claims: Vec<ext::verifreg::ClaimAllocationsParams> = batch_activate_res
let activated_claims: Vec<ext::verifreg::ClaimAllocationsParams> = batch_activate_res
.sectors
.iter()
.map(|(a, b, c)| ClaimAllocationsParams {
Expand All @@ -5006,7 +5004,7 @@ fn batch_activate_deals_and_claim_allocations(
&VERIFIED_REGISTRY_ACTOR_ADDR,
ext::verifreg::CLAIM_ALLOCATIONS_BATCH_METHOD,
IpldBlock::serialize_cbor(&ext::verifreg::ClaimAllocationsBatchParams {
claims: sector_claims,
claims: activated_claims,
})?,
TokenAmount::zero(),
));
Expand Down
13 changes: 10 additions & 3 deletions test_vm/tests/batch_onboarding_deals_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use num_traits::Zero;

use fil_actor_market::{deal_id_key, DealProposal};
use fil_actor_miner::{
max_prove_commit_duration, power_for_sector, CompactCommD, SectorPreCommitOnChainInfo,
State as MinerState,
max_prove_commit_duration, power_for_sector, CompactCommD, SectorPreCommitInfo,
SectorPreCommitOnChainInfo, State as MinerState,
};
use fil_actor_miner::{Method as MinerMethod, ProveCommitAggregateParams};
use fil_actors_runtime::runtime::policy::policy_constants::PRE_COMMIT_CHALLENGE_DELAY;
Expand Down Expand Up @@ -94,7 +94,7 @@ pub fn batch_onboarding_deals_test<BS: Blockstore>(v: &dyn VM<BS>) {
.collect();

// Pre-commit as single batch.
let precommits = precommit_sectors_v2(
let mut precommits = precommit_sectors_v2(
v,
BATCH_SIZE,
BATCH_SIZE,
Expand All @@ -109,6 +109,13 @@ pub fn batch_onboarding_deals_test<BS: Blockstore>(v: &dyn VM<BS>) {
);
let first_sector_no = precommits[0].info.sector_number;

// add a bad precommit to the batch
precommits.push(SectorPreCommitOnChainInfo {
info: SectorPreCommitInfo { ..Default::default() },
pre_commit_deposit: TokenAmount::from_atto(1000),
pre_commit_epoch: ChainEpoch::default(),
});

// Prove-commit as a single aggregate.
v.set_epoch(v.epoch() + PRE_COMMIT_CHALLENGE_DELAY + 1);
prove_commit_aggregate(v, &worker, &miner, precommits);
Expand Down

0 comments on commit 43923d2

Please sign in to comment.