Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler forward packets #898

Merged
merged 14 commits into from
Apr 26, 2024
31 changes: 31 additions & 0 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
crate::{
banking_stage::{
consume_worker::ConsumeWorker,
forward_worker::ForwardWorker,
packet_deserializer::PacketDeserializer,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphScheduler,
Expand Down Expand Up @@ -582,6 +583,34 @@ impl BankingStage {
)
}

// Spawn the forward worker threads.
let (forward_work_sender, forward_work_receiver) = unbounded();
let (finished_forward_work_sender, finished_forwrard_work_receiver) = unbounded();
for id in 0..num_workers {
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
let id = id + 2;
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
let forwarder = Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
cluster_info.clone(),
connection_cache.clone(),
data_budget.clone(),
);
let forward_worker = ForwardWorker::new(
forward_work_receiver.clone(),
ForwardOption::ForwardTransaction, // non-votes through central scheduler
forwarder,
finished_forward_work_sender.clone(),
);
bank_thread_hdls.push(
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
Builder::new()
.name(format!("solFwWorker{id:02}"))
.spawn(move || {
let _ = forward_worker.run();
})
.unwrap(),
);
}

// Spawn the central scheduler thread
bank_thread_hdls.push({
let packet_deserializer =
Expand All @@ -593,6 +622,8 @@ impl BankingStage {
bank_forks,
scheduler,
worker_metrics,
forward_work_sender,
finished_forwrard_work_receiver,
);
Builder::new()
.name("solBnkTxSched".to_string())
Expand Down
8 changes: 8 additions & 0 deletions core/src/banking_stage/forward_packet_batches_by_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl ForwardBatch {
pub fn is_empty(&self) -> bool {
self.forwardable_packets.is_empty()
}

pub fn take_packets(self) -> Vec<Arc<ImmutableDeserializedPacket>> {
self.forwardable_packets
}
}

/// To avoid forward queue being saturated by transactions for single hot account,
Expand Down Expand Up @@ -130,6 +134,10 @@ impl ForwardPacketBatchesByAccounts {
pub fn iter_batches(&self) -> impl Iterator<Item = &ForwardBatch> {
self.forward_batches.iter()
}

pub fn take_iter_batches(self) -> impl Iterator<Item = ForwardBatch> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I send the packets from these ForwardBatch, created by scheduler, to workers. Instead of cloning the packets, just take ownership.

self.forward_batches.into_iter()
}
}

#[cfg(test)]
Expand Down
12 changes: 1 addition & 11 deletions core/src/banking_stage/forward_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ mod tests {
super::*,
crate::banking_stage::{
immutable_deserialized_packet::ImmutableDeserializedPacket,
scheduler_messages::TransactionId,
tests::{create_slow_genesis_config, new_test_cluster_info, simulate_poh},
},
crossbeam_channel::unbounded,
Expand Down Expand Up @@ -200,9 +199,6 @@ mod tests {
system_transaction::transfer(mint_keypair, &pubkey2, 2, genesis_config.hash()),
];

let id1 = TransactionId::new(1);
let id2 = TransactionId::new(0);

let packets = to_packet_batches(&txs, 2);
assert_eq!(packets.len(), 1);
let packets = packets[0]
Expand All @@ -211,14 +207,8 @@ mod tests {
.map(|p| ImmutableDeserializedPacket::new(p).unwrap())
.map(Arc::new)
.collect();
forward_sender
.send(ForwardWork {
packets,
ids: vec![id1, id2],
})
.unwrap();
forward_sender.send(ForwardWork { packets }).unwrap();
let forwarded = forwarded_receiver.recv().unwrap();
assert_eq!(forwarded.work.ids, vec![id1, id2]);
assert!(forwarded.successful);

drop(test_frame);
Expand Down
1 change: 0 additions & 1 deletion core/src/banking_stage/scheduler_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ pub struct ConsumeWork {
/// Message: [Scheduler -> Worker]
/// Transactions to be forwarded to the next leader(s)
pub struct ForwardWork {
pub ids: Vec<TransactionId>,
pub packets: Vec<Arc<ImmutableDeserializedPacket>>,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,18 @@ impl Batches {
mod tests {
use {
super::*,
crate::banking_stage::consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
crate::banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
immutable_deserialized_packet::ImmutableDeserializedPacket,
},
crossbeam_channel::{unbounded, Receiver},
itertools::Itertools,
solana_sdk::{
compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, pubkey::Pubkey,
signature::Keypair, signer::Signer, system_instruction, transaction::Transaction,
compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, packet::Packet,
pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction,
transaction::Transaction,
},
std::borrow::Borrow,
std::{borrow::Borrow, sync::Arc},
};

macro_rules! txid {
Expand Down Expand Up @@ -570,6 +574,12 @@ mod tests {
lamports,
compute_unit_price,
);
let packet = Arc::new(
ImmutableDeserializedPacket::new(
Packet::from_data(None, transaction.to_versioned_transaction()).unwrap(),
)
.unwrap(),
);
let transaction_ttl = SanitizedTransactionTTL {
transaction,
max_age_slot: Slot::MAX,
Expand All @@ -578,6 +588,7 @@ mod tests {
container.insert_new_transaction(
id,
transaction_ttl,
packet,
compute_unit_price,
TEST_TRANSACTION_COST,
);
Expand Down
Loading
Loading