Skip to content

Commit

Permalink
Merge pull request #2454 from subspace/construct-bundle
Browse files Browse the repository at this point in the history
Try to include more tx in the bundle and skip including tx that already included in previous bundle
  • Loading branch information
NingLin-P authored Feb 1, 2024
2 parents 0d97b2c + 71935c7 commit e8ed377
Show file tree
Hide file tree
Showing 9 changed files with 500 additions and 346 deletions.
30 changes: 12 additions & 18 deletions crates/subspace-malicious-operator/src/malicious_bundle_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ impl MaliciousOperatorStatus {
*self = MaliciousOperatorStatus::NoStatus
}

fn registered_operator(&self) -> Option<(&OperatorId, &OperatorPublicKey)> {
fn registered_operator(&self) -> Option<(OperatorId, OperatorPublicKey)> {
match self {
MaliciousOperatorStatus::Registered {
operator_id,
signing_key,
} => Some((operator_id, signing_key)),
} => Some((*operator_id, signing_key.clone())),
_ => None,
}
}
Expand Down Expand Up @@ -106,7 +106,10 @@ where
CClient::Api: DomainsApi<CBlock, <DomainBlock as BlockT>::Header>
+ BundleProducerElectionApi<CBlock, Balance>
+ AccountNonceApi<CBlock, AccountId, Nonce>,
TransactionPool: sc_transaction_pool_api::TransactionPool<Block = DomainBlock> + 'static,
TransactionPool: sc_transaction_pool_api::TransactionPool<
Block = DomainBlock,
Hash = <DomainBlock as BlockT>::Hash,
> + 'static,
{
pub fn new(
domain_id: DomainId,
Expand Down Expand Up @@ -161,24 +164,15 @@ where
}

async fn handle_new_slot(
&self,
&mut self,
operator_id: OperatorId,
new_slot_info: OperatorSlotInfo,
) -> Option<OpaqueBundleFor<DomainBlock, CBlock>> {
let slot = new_slot_info.slot;
let consensus_block_info = {
let info = self.consensus_client.info();
sp_blockchain::HashAndNumber {
number: info.best_number,
hash: info.best_hash,
}
};
self.bundle_producer
.clone()
.produce_bundle(operator_id, consensus_block_info.clone(), new_slot_info)
.produce_bundle(operator_id, new_slot_info)
.unwrap_or_else(move |error| {
tracing::error!(
?consensus_block_info,
?slot,
?operator_id,
?error,
Expand All @@ -200,7 +194,7 @@ where
{
let maybe_opaque_bundle = self
.handle_new_slot(
*operator_id,
operator_id,
OperatorSlotInfo {
slot,
global_randomness,
Expand All @@ -211,7 +205,7 @@ where
if let Some(mut opaque_bundle) = maybe_opaque_bundle {
if let Err(err) = self
.malicious_bundle_tamper
.maybe_tamper_bundle(&mut opaque_bundle, signing_key)
.maybe_tamper_bundle(&mut opaque_bundle, &signing_key)
{
tracing::error!(?err, "Got error when try to tamper bundle");
}
Expand Down Expand Up @@ -245,7 +239,7 @@ where
if let Some((malicious_operator_id, _)) =
self.malicious_operator_status.registered_operator()
{
if next_operators.contains(malicious_operator_id) {
if next_operators.contains(&malicious_operator_id) {
return Ok(());
} else {
tracing::info!(
Expand All @@ -255,7 +249,7 @@ where
// Remove the current malicious operator to not account its stake toward
// `current_total_stake` otherwise the next malicious operator will stake
// more and more fund
current_operators.remove(malicious_operator_id);
current_operators.remove(&malicious_operator_id);
self.malicious_operator_status.no_status();
}
}
Expand Down
12 changes: 6 additions & 6 deletions domains/client/domain-operator/src/domain_bundle_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use domain_runtime_primitives::DomainCoreApi;
use sc_client_api::{AuxStore, BlockBackend};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder;
use sp_blockchain::{HashAndNumber, HeaderBackend};
use sp_blockchain::HeaderBackend;
use sp_domains::{
Bundle, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey,
OperatorSignature, SealedBundleHeader,
Expand Down Expand Up @@ -74,7 +74,8 @@ where
Client::Api: BlockBuilder<Block> + DomainCoreApi<Block> + TaggedTransactionQueue<Block>,
CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
CClient::Api: DomainsApi<CBlock, Block::Header> + BundleProducerElectionApi<CBlock, Balance>,
TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block>,
TransactionPool:
sc_transaction_pool_api::TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -109,9 +110,8 @@ where
}

pub async fn produce_bundle(
self,
&mut self,
operator_id: OperatorId,
consensus_block_info: HashAndNumber<CBlock>,
slot_info: OperatorSlotInfo,
) -> sp_blockchain::Result<Option<OpaqueBundle<Block, CBlock>>> {
let OperatorSlotInfo {
Expand Down Expand Up @@ -145,7 +145,7 @@ where
if let Some((proof_of_election, operator_signing_key)) =
self.bundle_producer_election_solver.solve_challenge(
slot,
consensus_block_info.hash,
consensus_chain_best_hash,
self.domain_id,
operator_id,
global_randomness,
Expand All @@ -156,7 +156,7 @@ where
let tx_range = self
.consensus_client
.runtime_api()
.domain_tx_range(consensus_block_info.hash, self.domain_id)
.domain_tx_range(consensus_chain_best_hash, self.domain_id)
.map_err(|error| {
sp_blockchain::Error::Application(Box::from(format!(
"Error getting tx range: {error}"
Expand Down
94 changes: 85 additions & 9 deletions domains/client/domain-operator/src/domain_bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,63 @@ use sp_domains::{
BundleHeader, DomainId, DomainsApi, ExecutionReceipt, HeaderHashingFor, ProofOfElection,
};
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor, One, Zero};
use sp_runtime::Percent;
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use sp_weights::Weight;
use std::collections::HashSet;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time;
use subspace_core_primitives::U256;
use subspace_runtime_primitives::Balance;

pub struct DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool> {
/// If the bundle utilization is below `BUNDLE_UTILIZATION_THRESHOLD` we will attempt to push
/// at most `MAX_SKIPPED_TRANSACTIONS` number of transactions before quitting for real.
const MAX_SKIPPED_TRANSACTIONS: usize = 8;

const BUNDLE_UTILIZATION_THRESHOLD: Percent = Percent::from_percent(95);

// `PreviousBundledTx` used to keep track of tx that have included in previous bundle and avoid
// to re-include the these tx in the following bundle to reduce deplicated tx.
struct PreviousBundledTx<Block: BlockT, CBlock: BlockT> {
bundled_at: <CBlock as BlockT>::Hash,
tx_hashes: HashSet<<Block as BlockT>::Hash>,
}

impl<Block: BlockT, CBlock: BlockT> PreviousBundledTx<Block, CBlock> {
fn new() -> Self {
PreviousBundledTx {
bundled_at: Default::default(),
tx_hashes: HashSet::new(),
}
}

fn already_bundled(&self, tx_hash: &<Block as BlockT>::Hash) -> bool {
self.tx_hashes.contains(tx_hash)
}

fn maybe_clear(&mut self, consensus_hash: <CBlock as BlockT>::Hash) {
if self.bundled_at != consensus_hash {
self.bundled_at = consensus_hash;
self.tx_hashes.clear();
}
}

fn add_bundled(&mut self, tx_hash: <Block as BlockT>::Hash) {
self.tx_hashes.insert(tx_hash);
}
}

pub struct DomainBundleProposer<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> {
domain_id: DomainId,
client: Arc<Client>,
consensus_client: Arc<CClient>,
transaction_pool: Arc<TransactionPool>,
previous_bundled_tx: PreviousBundledTx<Block, CBlock>,
_phantom_data: PhantomData<(Block, CBlock)>,
}

impl<Block, Client, CBlock, CClient, TransactionPool> Clone
impl<Block: BlockT, Client, CBlock: BlockT, CClient, TransactionPool> Clone
for DomainBundleProposer<Block, Client, CBlock, CClient, TransactionPool>
{
fn clone(&self) -> Self {
Expand All @@ -36,6 +76,7 @@ impl<Block, Client, CBlock, CClient, TransactionPool> Clone
client: self.client.clone(),
consensus_client: self.consensus_client.clone(),
transaction_pool: self.transaction_pool.clone(),
previous_bundled_tx: PreviousBundledTx::new(),
_phantom_data: self._phantom_data,
}
}
Expand All @@ -56,7 +97,8 @@ where
Client::Api: BlockBuilder<Block> + DomainCoreApi<Block> + TaggedTransactionQueue<Block>,
CClient: HeaderBackend<CBlock> + ProvideRuntimeApi<CBlock>,
CClient::Api: DomainsApi<CBlock, Block::Header>,
TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block>,
TransactionPool:
sc_transaction_pool_api::TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
{
pub fn new(
domain_id: DomainId,
Expand All @@ -69,12 +111,13 @@ where
client,
consensus_client,
transaction_pool,
previous_bundled_tx: PreviousBundledTx::new(),
_phantom_data: PhantomData,
}
}

pub(crate) async fn propose_bundle_at(
&self,
&mut self,
proof_of_election: ProofOfElection<CBlock::Hash>,
tx_range: U256,
) -> sp_blockchain::Result<ProposeBundleOutput<Block, CBlock>> {
Expand All @@ -96,6 +139,12 @@ where
}
};

// Clear the previous bundled tx info whenever the consensus chain tip is changed,
// this allow the operator to retry for the previous bundled tx in case the previous
// bundle fail to submit to the consensus chain due to any reason.
self.previous_bundled_tx
.maybe_clear(self.consensus_client.info().best_hash);

let bundle_vrf_hash = U256::from_be_bytes(proof_of_election.vrf_hash());
let domain_block_limit = self
.consensus_client
Expand All @@ -109,6 +158,7 @@ where
let mut extrinsics = Vec::new();
let mut estimated_bundle_weight = Weight::default();
let mut bundle_size = 0u32;
let mut skipped = 0;

// Seperate code block to make sure that runtime api instance is dropped after validation is done.
{
Expand All @@ -132,6 +182,14 @@ where
continue;
}

// Skip the tx if is is already bundled by a recent bundle
if self
.previous_bundled_tx
.already_bundled(&self.transaction_pool.hash_of(pending_tx_data))
{
continue;
}

let tx_weight = runtime_api_instance
.extrinsic_weight(parent_hash, pending_tx_data)
.map_err(|error| {
Expand All @@ -142,17 +200,30 @@ where
let next_estimated_bundle_weight =
estimated_bundle_weight.saturating_add(tx_weight);
if next_estimated_bundle_weight.any_gt(domain_block_limit.max_block_weight) {
break;
if skipped < MAX_SKIPPED_TRANSACTIONS
&& Percent::from_rational(
estimated_bundle_weight.ref_time(),
domain_block_limit.max_block_weight.ref_time(),
) < BUNDLE_UTILIZATION_THRESHOLD
{
skipped += 1;
} else {
break;
}
}

let next_bundle_size = bundle_size + pending_tx_data.encoded_size() as u32;
if next_bundle_size > domain_block_limit.max_block_size {
break;
if skipped < MAX_SKIPPED_TRANSACTIONS
&& Percent::from_rational(bundle_size, domain_block_limit.max_block_size)
< BUNDLE_UTILIZATION_THRESHOLD
{
skipped += 1;
} else {
break;
}
}

estimated_bundle_weight = next_estimated_bundle_weight;
bundle_size = next_bundle_size;

// Double check the transaction validity, because the tx pool are re-validate the transaction
// in pool asynchronously so there is race condition that the operator imported a domain block
// and start producing bundle immediately before the re-validation based on the latest block
Expand Down Expand Up @@ -181,7 +252,12 @@ where
continue;
}

estimated_bundle_weight = next_estimated_bundle_weight;
bundle_size = next_bundle_size;
extrinsics.push(pending_tx_data.clone());

self.previous_bundled_tx
.add_bundled(self.transaction_pool.hash_of(pending_tx_data));
}
}

Expand Down
Loading

0 comments on commit e8ed377

Please sign in to comment.