Skip to content

Commit

Permalink
[FEATURE]: use JoinSet to run transaction checks concurrently (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
haroune-mohammedi authored Dec 6, 2023
1 parent f7228f3 commit f81a0b6
Showing 1 changed file with 36 additions and 34 deletions.
70 changes: 36 additions & 34 deletions src/actions/shoot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use crate::utils::{
use color_eyre::eyre::Context;
use color_eyre::{eyre::eyre, Report as EyreReport, Result};

use log::{debug, error, info, warn};
use log::{debug, info, warn};
use starknet::core::types::contract::SierraClass;

use futures::stream::StreamExt;
use std::collections::HashMap;
use std::path::Path;
use tokio::task::JoinSet;

use crate::metrics::BenchmarkReport;

Expand Down Expand Up @@ -41,7 +41,7 @@ use url::Url;
// Used to bypass validation
pub static MAX_FEE: FieldElement = felt!("0xffffffff");
pub static CHECK_INTERVAL: Duration = Duration::from_millis(500);
pub static TX_VALIDATION_CHUNKS: usize = 10;
pub static TX_VALIDATION_CONCURRENCY: usize = 10;

type StarknetAccount = SingleOwnerAccount<Arc<JsonRpcClient<HttpTransport>>, LocalWallet>;

Expand Down Expand Up @@ -218,40 +218,42 @@ impl GatlingShooter {
let mut accepted_txs = Vec::new();
let mut errors = Vec::new();

// Verify transactions in parallel
let chunk_size = transactions.len() / TX_VALIDATION_CHUNKS;
let transactions_sets = transactions.chunks(chunk_size).map(|chunk| chunk.to_vec());

let fetches = futures::stream::iter(transactions_sets.map(|transactions_set| {
// Should we clone the rpc client or is it ok to share it between threads ?
let starknet_rpc = self.starknet_rpc.clone();
tokio::spawn(async move {
let mut results = Vec::new();
for tx in transactions_set {
let res = wait_for_tx(&starknet_rpc, tx, CHECK_INTERVAL)
let mut set = JoinSet::new();

let mut transactions = transactions.into_iter();

for _ in 0..TX_VALIDATION_CONCURRENCY {
if let Some(transaction) = transactions.next() {
let starknet_rpc = Arc::clone(&self.starknet_rpc);
set.spawn(async move {
wait_for_tx(&starknet_rpc, transaction, CHECK_INTERVAL)
.await
.map(|_| transaction)
.map_err(|err| (err, transaction))
});
}
}

while let Some(res) = set.join_next().await {
if let Some(transaction) = transactions.next() {
let starknet_rpc = Arc::clone(&self.starknet_rpc);
set.spawn(async move {
wait_for_tx(&starknet_rpc, transaction, CHECK_INTERVAL)
.await
.map(|_| tx);
debug!("Transaction {:#064x} result: {:?}", tx, res);
results.push(res);
.map(|_| transaction)
.map_err(|err| (err, transaction))
});
}

match res.unwrap() {
Ok(transaction) => {
accepted_txs.push(transaction);
debug!("Transaction {:#064x} accepted", transaction)
}
results
})
}))
.buffer_unordered(TX_VALIDATION_CHUNKS) // Adjust the concurrency level to the number of connections
.collect::<Vec<_>>();

// fetches.await will resolve to a Vec<Result<Vec<Result<Transaction, Error>>, JoinError>>
for fetch_result in fetches.await {
match fetch_result {
Ok(results) => {
for res in results {
match res {
Ok(tx) => accepted_txs.push(tx),
Err(e) => errors.push(e),
}
}
Err((err, transaction)) => {
errors.push(err);
debug!("Transaction {:#064x} rejected", transaction)
}
Err(e) => error!("JoinError {:?}", e),
}
}

Expand Down

0 comments on commit f81a0b6

Please sign in to comment.