Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch deal activations #1310

Merged
merged 23 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
95f55e2
wip: batch onboarding deals test works
alexytsu Jun 8, 2023
d9032fe
fix activate deals failures tests
alexytsu Jun 8, 2023
2d1e125
verified deal activation test
alexytsu Jun 8, 2023
c859afe
fix market tests
alexytsu Jun 8, 2023
12a4ae8
refactor to a map based pattern to ensure parallel return structure
alexytsu Jun 8, 2023
3d66878
fix deal failure test expectations
alexytsu Jun 8, 2023
97f07d0
adjust market tests for new failure expectations
alexytsu Jun 8, 2023
ad36fb8
cron epoch test
alexytsu Jun 10, 2023
b063030
fix the tests
alexytsu Jun 12, 2023
4a16333
remain ActivateDealsResult to DealActivation
alexytsu Jun 15, 2023
724205f
commit deal states into state once for all sectors
alexytsu Jun 15, 2023
a1195c6
use sectordeals for batchactivate
alexytsu Jun 15, 2023
31c5e33
cleanup logic for marketactor::BatchActivateDealsResult shortcut
alexytsu Jun 15, 2023
2fbfd11
refactor Market::BatchActivateDeals to use BatchReturn
alexytsu Jun 15, 2023
0796d28
revert verifreg to use BatchReturn
alexytsu Jun 15, 2023
4c305bf
better error context when deal activation fails
alexytsu Jun 15, 2023
de178e9
remove shortcut path, market actor already handles empty sectors
alexytsu Jun 15, 2023
c5b5fc2
don't activate sectors with duplicate deals
alexytsu Jun 16, 2023
519a8a9
use a batch activation helper
alexytsu Jun 19, 2023
efa14cb
de duplicate harness deal activation paths
alexytsu Jun 19, 2023
65e185d
drop Copy requirement on BatchGen::success
alexytsu Jun 19, 2023
c2a5cdc
simple tests for batch_activate_deals
alexytsu Jun 20, 2023
ddc6934
fix tests
alexytsu Jun 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 117 additions & 66 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use std::cmp::min;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};

use cid::multihash::{Code, MultihashDigest, MultihashGeneric};
use cid::Cid;
use fil_actors_runtime::{extract_send_result, FIRST_ACTOR_SPECIFIC_EXIT_CODE};
use fil_actors_runtime::{extract_send_result, BatchReturnGen, FIRST_ACTOR_SPECIFIC_EXIT_CODE};
use frc46_token::token::types::{BalanceReturn, TransferFromParams, TransferFromReturn};
use fvm_ipld_bitfield::BitField;
use fvm_ipld_blockstore::Blockstore;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub enum Method {
WithdrawBalance = 3,
PublishStorageDeals = 4,
VerifyDealsForActivation = 5,
ActivateDeals = 6,
BatchActivateDeals = 6,
OnMinerSectorsTerminate = 7,
ComputeDataCommitment = 8,
CronTick = 9,
Expand Down Expand Up @@ -530,93 +530,145 @@ impl Actor {
Ok(VerifyDealsForActivationReturn { sectors: sectors_data })
}

/// Activate a set of deals, returning the combined deal space and extra info for verified deals.
fn activate_deals(
/// Activate a set of deals, returning the deal space and extra info for sectors containing
/// verified deals. Sectors are activated in parameter-defined order and can fail independently of
/// each other with the responsible ExitCode recorded in a BatchReturn.
fn batch_activate_deals(
rt: &impl Runtime,
params: ActivateDealsParams,
) -> Result<ActivateDealsResult, ActorError> {
params: BatchActivateDealsParams,
) -> Result<BatchActivateDealsResult, ActorError> {
rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?;
let miner_addr = rt.message().caller();
let curr_epoch = rt.curr_epoch();

let (deal_spaces, verified_infos) = rt.transaction(|st: &mut State, rt| {
let proposal_array = st.get_proposal_array(rt.store())?;
let proposals = get_proposals(&proposal_array, &params.deal_ids, st.next_id)?;
let (activations, batch_ret) = rt.transaction(|st: &mut State, rt| {
let mut deal_states: Vec<(DealID, DealState)> = vec![];
let mut batch_gen = BatchReturnGen::new(params.sectors.len());
let mut activations: Vec<DealActivation> = vec![];
let mut activated_deals: HashSet<DealID> = HashSet::new();

for p in params.sectors {
let proposal_array = st.get_proposal_array(rt.store())?;

if p.deal_ids.iter().any(|id| activated_deals.contains(id)) {
log::warn!(
"failed to activate sector containing duplicate deals {:?}",
p.deal_ids
);
batch_gen.add_fail(ExitCode::USR_ILLEGAL_ARGUMENT);
continue;
}

let deal_spaces = {
validate_and_return_deal_space(
let proposals = match get_proposals(&proposal_array, &p.deal_ids, st.next_id) {
Ok(proposals) => proposals,
Err(e) => {
log::warn!("failed to get proposals for deals {:?}: {:?}", p.deal_ids, e);
batch_gen.add_fail(e.exit_code());
continue;
}
};

let deal_spaces = match validate_and_return_deal_space(
&proposals,
&miner_addr,
params.sector_expiry,
p.sector_expiry,
curr_epoch,
None,
)
.context("failed to validate deal proposals for activation")?
};
) {
Ok(ds) => ds,
Err(e) => {
log::warn!("failed validate deals {:?}: {}", p.deal_ids, e);
batch_gen.add_fail(e.exit_code());
continue;
}
};

// Update deal states
let mut verified_infos = Vec::new();
let mut deal_states: Vec<(DealID, DealState)> = vec![];
// Update deal states
let mut verified_infos = Vec::new();

for (deal_id, proposal) in proposals {
// This construction could be replaced with a single "update deal state"
// state method, possibly batched over all deal ids at once.
let s = st.find_deal_state(rt.store(), deal_id)?;
let update_result: Result<(), ActorError> =
proposals.into_iter().try_for_each(|(deal_id, proposal)| {
let s = st
.find_deal_state(rt.store(), deal_id)
.context(format!("error looking up deal state for {}", deal_id))?;

if s.is_some() {
return Err(actor_error!(
illegal_argument,
"deal {} already activated",
deal_id
));
}
if s.is_some() {
return Err(actor_error!(
illegal_argument,
"deal {} already activated",
deal_id
));
}

let propc = rt_deal_cid(rt, &proposal)?;
let propc = rt_deal_cid(rt, &proposal)?;

// Confirm the deal is in the pending proposals queue.
// It will be removed from this queue later, during cron.
let has = st.has_pending_deal(rt.store(), propc)?;
// Confirm the deal is in the pending proposals queue.
// It will be removed from this queue later, during cron.
let has = st.has_pending_deal(rt.store(), propc)?;

if !has {
return Err(actor_error!(
illegal_state,
"tried to activate deal that was not in the pending set ({})",
propc
));
}
if !has {
return Err(actor_error!(
illegal_state,
"tried to activate deal that was not in the pending set ({})",
propc
));
}

// Extract and remove any verified allocation ID for the pending deal.
let allocation = st
.remove_pending_deal_allocation_id(rt.store(), &deal_id_key(deal_id))?
.unwrap_or((BytesKey(vec![]), NO_ALLOCATION_ID))
.1;

if allocation != NO_ALLOCATION_ID {
verified_infos.push(VerifiedDealInfo {
client: proposal.client.id().unwrap(),
allocation_id: allocation,
data: proposal.piece_cid,
size: proposal.piece_size,
})
}
// Extract and remove any verified allocation ID for the pending deal.
let allocation = st
.remove_pending_deal_allocation_id(rt.store(), &deal_id_key(deal_id))
.context(format!(
"failed to remove pending deal allocation id {}",
deal_id
))?
.unwrap_or((BytesKey(vec![]), NO_ALLOCATION_ID))
.1;

if allocation != NO_ALLOCATION_ID {
verified_infos.push(VerifiedDealInfo {
client: proposal.client.id().unwrap(),
allocation_id: allocation,
data: proposal.piece_cid,
size: proposal.piece_size,
})
}

deal_states.push((
deal_id,
DealState {
sector_start_epoch: curr_epoch,
last_updated_epoch: EPOCH_UNDEFINED,
slash_epoch: EPOCH_UNDEFINED,
verified_claim: allocation,
},
));
deal_states.push((
deal_id,
DealState {
sector_start_epoch: curr_epoch,
last_updated_epoch: EPOCH_UNDEFINED,
slash_epoch: EPOCH_UNDEFINED,
verified_claim: allocation,
},
));
activated_deals.insert(deal_id);
Ok(())
});

match update_result {
Ok(_) => {
activations.push(DealActivation {
nonverified_deal_space: deal_spaces.deal_space,
verified_infos,
});
batch_gen.add_success();
}
Err(e) => {
log::warn!("failed to activate deals {:?}: {}", p.deal_ids, e);
batch_gen.add_fail(e.exit_code());
}
}
}

st.put_deal_states(rt.store(), &deal_states)?;

Ok((deal_spaces, verified_infos))
Ok((activations, batch_gen.gen()))
})?;

Ok(ActivateDealsResult { nonverified_deal_space: deal_spaces.deal_space, verified_infos })
Ok(BatchActivateDealsResult { activations, activation_results: batch_ret })
}

/// Terminate a set of deals in response to their containing sector being terminated.
Expand All @@ -634,7 +686,6 @@ impl Actor {

for id in params.deal_ids {
let deal = st.find_proposal(rt.store(), id)?;

// The deal may have expired and been deleted before the sector is terminated.
// Nothing to do, but continue execution for the other deals.
if deal.is_none() {
Expand Down Expand Up @@ -1403,7 +1454,7 @@ impl ActorCode for Actor {
WithdrawBalance|WithdrawBalanceExported => withdraw_balance,
PublishStorageDeals|PublishStorageDealsExported => publish_storage_deals,
VerifyDealsForActivation => verify_deals_for_activation,
ActivateDeals => activate_deals,
BatchActivateDeals => batch_activate_deals,
OnMinerSectorsTerminate => on_miner_sectors_terminate,
ComputeDataCommitment => compute_data_commitment,
CronTick => cron_tick,
Expand Down
15 changes: 11 additions & 4 deletions actors/market/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::ext::verifreg::AllocationID;
use cid::Cid;
use fil_actors_runtime::Array;
use fil_actors_runtime::BatchReturn;
use fvm_ipld_bitfield::BitField;
use fvm_ipld_encoding::strict_bytes;
use fvm_ipld_encoding::tuple::*;
Expand Down Expand Up @@ -97,9 +98,9 @@ pub struct SectorDealData {
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct ActivateDealsParams {
pub deal_ids: Vec<DealID>,
pub sector_expiry: ChainEpoch,
#[serde(transparent)]
pub struct BatchActivateDealsParams {
pub sectors: Vec<SectorDeals>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sector deals introduces sector_type: RegisteredSealProof which is unecessary in this PR but will be used in #1312 to optimise ProveReplicaUpdates

}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
Expand All @@ -111,12 +112,18 @@ pub struct VerifiedDealInfo {
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct ActivateDealsResult {
pub struct DealActivation {
#[serde(with = "bigint_ser")]
pub nonverified_deal_space: BigInt,
pub verified_infos: Vec<VerifiedDealInfo>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct BatchActivateDealsResult {
pub activation_results: BatchReturn,
pub activations: Vec<DealActivation>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct DealSpaces {
#[serde(with = "bigint_ser")]
Expand Down
Loading