Skip to content

Commit

Permalink
PCA: batch internal calls to Market::ActivateDeals and Registry::Clai…
Browse files Browse the repository at this point in the history
…mAllocations
  • Loading branch information
alexytsu committed May 25, 2023
1 parent 622b707 commit 78029ed
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 38 deletions.
28 changes: 27 additions & 1 deletion actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub enum Method {
GetDealProviderCollateralExported = frc42_dispatch::method_hash!("GetDealProviderCollateral"),
GetDealVerifiedExported = frc42_dispatch::method_hash!("GetDealVerified"),
GetDealActivationExported = frc42_dispatch::method_hash!("GetDealActivation"),
ActivateDealsBatchExported = frc42_dispatch::method_hash!("ActivateDealsBatch"),
}

/// Market Actor
Expand Down Expand Up @@ -530,12 +531,36 @@ impl Actor {
Ok(VerifyDealsForActivationReturn { sectors: sectors_data })
}

fn activate_deals_batch(
rt: &impl Runtime,
params: ActivateDealsBatchParams,
) -> Result<ActivateDealsBatchResult, ActorError> {
rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?;

let mut sectors = Vec::new();

for (activate_params, sector_number) in params.sectors {
let expiry = activate_params.sector_expiry;
let res = Self::activate_deals_inner(rt, activate_params)?;
sectors.push((res, sector_number, expiry));
}

Ok(ActivateDealsBatchResult { sectors })
}

/// Activate a set of deals, returning the combined deal space and extra info for verified deals.
fn activate_deals(
rt: &impl Runtime,
params: ActivateDealsParams,
) -> Result<ActivateDealsResult, ActorError> {
rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?;
Self::activate_deals_inner(rt, params)
}

fn activate_deals_inner(
rt: &impl Runtime,
params: ActivateDealsParams,
) -> Result<ActivateDealsResult, ActorError> {
let miner_addr = rt.message().caller();
let curr_epoch = rt.curr_epoch();

Expand Down Expand Up @@ -608,7 +633,7 @@ impl Actor {
slash_epoch: EPOCH_UNDEFINED,
verified_claim: allocation,
},
));
))
}

st.put_deal_states(rt.store(), &deal_states)?;
Expand Down Expand Up @@ -1418,5 +1443,6 @@ impl ActorCode for Actor {
GetDealProviderCollateralExported => get_deal_provider_collateral,
GetDealVerifiedExported => get_deal_verified,
GetDealActivationExported => get_deal_activation,
ActivateDealsBatchExported => activate_deals_batch,
}
}
11 changes: 11 additions & 0 deletions actors/market/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use fvm_shared::clock::ChainEpoch;
use fvm_shared::deal::DealID;
use fvm_shared::econ::TokenAmount;
use fvm_shared::piece::PaddedPieceSize;
use fvm_shared::sector::SectorNumber;
use fvm_shared::ActorID;

use crate::Label;
Expand Down Expand Up @@ -117,6 +118,16 @@ pub struct ActivateDealsResult {
pub verified_infos: Vec<VerifiedDealInfo>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct ActivateDealsBatchParams {
pub sectors: Vec<(ActivateDealsParams, SectorNumber)>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct ActivateDealsBatchResult {
pub sectors: Vec<(ActivateDealsResult, SectorNumber, ChainEpoch)>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct DealSpaces {
#[serde(with = "bigint_ser")]
Expand Down
24 changes: 24 additions & 0 deletions actors/miner/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub mod market {
pub const ACTIVATE_DEALS_METHOD: u64 = 6;
pub const ON_MINER_SECTORS_TERMINATE_METHOD: u64 = 7;
pub const COMPUTE_DATA_COMMITMENT_METHOD: u64 = 8;
pub const ACTIVATE_DEALS_BATCH_EXPORTED: u64 =
frc42_dispatch::method_hash!("ActivateDealsBatch");

#[derive(Serialize_tuple, Deserialize_tuple)]
pub struct SectorDeals {
Expand All @@ -38,6 +40,16 @@ pub mod market {
pub sector_expiry: ChainEpoch,
}

#[derive(Serialize_tuple, Deserialize_tuple)]
pub struct ActivateDealsBatchParams {
pub sectors: Vec<(ActivateDealsParams, SectorNumber)>,
}

#[derive(Serialize_tuple, Deserialize_tuple)]
pub struct ActivateDealsBatchResult {
pub sectors: Vec<(ActivateDealsResult, SectorNumber, ChainEpoch)>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Clone)]
pub struct VerifiedDealInfo {
pub client: ActorID,
Expand Down Expand Up @@ -160,6 +172,8 @@ pub mod verifreg {

pub const GET_CLAIMS_METHOD: u64 = 10;
pub const CLAIM_ALLOCATIONS_METHOD: u64 = 9;
pub const CLAIM_ALLOCATIONS_BATCH_METHOD: u64 =
frc42_dispatch::method_hash!("ClaimAllocationsBatch");

pub type ClaimID = u64;
pub type AllocationID = u64;
Expand Down Expand Up @@ -216,4 +230,14 @@ pub mod verifreg {
#[serde(with = "bigint_ser")]
pub claimed_space: BigInt,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct ClaimAllocationsBatchParams {
pub claims: Vec<ClaimAllocationsParams>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct ClaimAllocationsBatchReturn {
pub claims: Vec<ClaimAllocationsReturn>,
}
}
123 changes: 96 additions & 27 deletions actors/miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::ops::Neg;
use anyhow::{anyhow, Error};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use cid::Cid;
use ext::market::ActivateDealsParams;
use ext::verifreg::{ClaimAllocationsParams, SectorAllocationClaim};
use fvm_ipld_bitfield::{BitField, Validate};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::{from_slice, BytesDe, CborStore};
Expand Down Expand Up @@ -4795,43 +4797,29 @@ fn confirm_sector_proofs_valid_internal(
// Ideally, we'd combine some of these operations, but at least we have
// a constant number of them.
let activation = rt.curr_epoch();
// Pre-commits for new sectors.
let mut valid_pre_commits = Vec::default();

for pre_commit in pre_commits {
match activate_deals_and_claim_allocations(
rt,
pre_commit.clone().info.deal_ids,
pre_commit.info.expiration,
pre_commit.info.sector_number,
)? {
None => {
info!(
"failed to activate deals on sector {}, dropping from prove commit set",
pre_commit.info.sector_number,
);
continue;
}
Some(deal_spaces) => valid_pre_commits.push((pre_commit, deal_spaces)),
};
}

// When all prove commits have failed abort early
if valid_pre_commits.is_empty() {
// Pre-commits for new sectors.
// TODO: either all the pre_commits are valid or it aborts. we should handle the case where only some are valid
let deal_spaces = batch_activate_deals_and_claim_allocations(rt, pre_commits.clone())?;
if deal_spaces.is_none() {
return Err(actor_error!(illegal_argument, "all prove commits failed to validate"));
}

// assume that when we get here all pre_commits are valid
let deal_spaces = deal_spaces.unwrap();

let (total_pledge, newly_vested) = rt.transaction(|state: &mut State, rt| {
let policy = rt.policy();
let store = rt.store();
let info = get_miner_info(store, state)?;

let mut new_sector_numbers = Vec::<SectorNumber>::with_capacity(valid_pre_commits.len());
let mut new_sector_numbers = Vec::<SectorNumber>::with_capacity(pre_commits.len());
let mut deposit_to_unlock = TokenAmount::zero();
let mut new_sectors = Vec::<SectorOnChainInfo>::new();
let mut total_pledge = TokenAmount::zero();

for (pre_commit, deal_spaces) in valid_pre_commits {
// FIXME: this is hacky and wrong, the deal space needs to be calculated per sector
for (pre_commit, deal_spaces) in pre_commits.iter().zip(deal_spaces.iter()) {
// compute initial pledge
let duration = pre_commit.info.expiration - activation;

Expand All @@ -4844,8 +4832,8 @@ fn confirm_sector_proofs_valid_internal(
continue;
}

let deal_weight = deal_spaces.deal_space * duration;
let verified_deal_weight = deal_spaces.verified_deal_space * duration;
let deal_weight = deal_spaces.deal_space.clone() * duration;
let verified_deal_weight = deal_spaces.verified_deal_space.clone() * duration;

let power = qa_power_for_weight(
info.sector_size,
Expand Down Expand Up @@ -4886,7 +4874,7 @@ fn confirm_sector_proofs_valid_internal(
sector_number: pre_commit.info.sector_number,
seal_proof: pre_commit.info.seal_proof,
sealed_cid: pre_commit.info.sealed_cid,
deal_ids: pre_commit.info.deal_ids,
deal_ids: pre_commit.info.deal_ids.clone(),
expiration: pre_commit.info.expiration,
activation,
deal_weight,
Expand Down Expand Up @@ -4962,6 +4950,87 @@ fn confirm_sector_proofs_valid_internal(
Ok(())
}

fn batch_activate_deals_and_claim_allocations(
rt: &impl Runtime,
pre_commits: Vec<SectorPreCommitOnChainInfo>,
) -> Result<Option<Vec<ext::market::DealSpaces>>, ActorError> {
let deal_info = pre_commits
.iter()
.map(|pre_commit| {
(
ActivateDealsParams {
deal_ids: pre_commit.info.deal_ids.clone(),
sector_expiry: pre_commit.info.expiration,
},
pre_commit.info.sector_number,
)
})
.collect();

// check and activate storage deals. Abort if checks failed
let batch_activate_raw = extract_send_result(rt.send_simple(
&STORAGE_MARKET_ACTOR_ADDR,
ext::market::ACTIVATE_DEALS_BATCH_EXPORTED,
IpldBlock::serialize_cbor(&ext::market::ActivateDealsBatchParams { sectors: deal_info })?,
TokenAmount::zero(),
));
let batch_activate_res: ext::market::ActivateDealsBatchResult = match batch_activate_raw {
Ok(res) => deserialize_block(res)?,
Err(e) => {
info!("error batch activating deals {}", e.msg());
return Ok(None);
}
};

let sector_claims: Vec<ext::verifreg::ClaimAllocationsParams> = batch_activate_res
.sectors
.iter()
.map(|(a, b, c)| ClaimAllocationsParams {
all_or_nothing: true,
sectors: a
.verified_infos
.iter()
.map(|i| SectorAllocationClaim {
allocation_id: i.allocation_id,
client: i.client,
data: i.data,
sector: *b,
sector_expiry: *c,
size: i.size,
})
.collect(),
})
.collect();

let claim_raw = extract_send_result(rt.send_simple(
&VERIFIED_REGISTRY_ACTOR_ADDR,
ext::verifreg::CLAIM_ALLOCATIONS_BATCH_METHOD,
IpldBlock::serialize_cbor(&ext::verifreg::ClaimAllocationsBatchParams {
claims: sector_claims,
})?,
TokenAmount::zero(),
));
let claim_res: ext::verifreg::ClaimAllocationsBatchReturn = match claim_raw {
Ok(res) => deserialize_block(res)?,
Err(e) => {
info!("error claiming allocation: {}", e.msg());
return Ok(None);
}
};

let deal_spaces = claim_res
.claims
.iter()
.zip(batch_activate_res.sectors.iter())
.map(|(claim, sector)| ext::market::DealSpaces {
deal_space: sector.0.nonverified_deal_space.clone(),
verified_deal_space: claim.claimed_space.clone(),
})
.collect_vec();

Ok(Some(deal_spaces))
}

// activate deals with builtin market and claim allocations with verified registry actor
// returns an error in case of a fatal programmer error
// returns Ok(None) in case deal activation or verified allocation claim fails
Expand Down
Loading

0 comments on commit 78029ed

Please sign in to comment.