Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply cleanups to solana-core for unified scheduler #4123

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"accounts-db/store-histogram",
"accounts-db/store-tool",
"banking-bench",
"banking-stage-ingress-types",
"banks-client",
"banks-interface",
"banks-server",
Expand Down Expand Up @@ -255,6 +256,7 @@ check-cfg = [

[workspace.dependencies]
Inflector = "0.11.4"
agave-banking-stage-ingress-types = { path = "banking-stage-ingress-types", version = "=2.2.0" }
agave-transaction-view = { path = "transaction-view", version = "=2.2.0" }
aquamarine = "0.3.3"
aes-gcm-siv = "0.11.1"
Expand Down
4 changes: 3 additions & 1 deletion banking-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
agave-banking-stage-ingress-types = { workspace = true }
assert_matches = { workspace = true }
clap = { version = "3.1.8", features = ["derive", "cargo"] }
crossbeam-channel = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
solana-client = { workspace = true }
solana-core = { workspace = true }
solana-core = { workspace = true, features = ["dev-context-only-utils"] }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-logger = { workspace = true }
Expand Down
33 changes: 18 additions & 15 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#![allow(clippy::arithmetic_side_effects)]
use {
agave_banking_stage_ingress_types::BankingPacketBatch,
assert_matches::assert_matches,
clap::{crate_description, crate_name, Arg, ArgEnum, Command},
crossbeam_channel::{unbounded, Receiver},
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::BankingStage,
banking_trace::{
BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
},
banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage},
banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT},
validator::BlockProductionMethod,
},
solana_gossip::cluster_info::{ClusterInfo, Node},
Expand Down Expand Up @@ -349,7 +349,7 @@ fn main() {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let bank0 = Bank::new_for_benches(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank0);
let mut bank = bank_forks.read().unwrap().working_bank();
let mut bank = bank_forks.read().unwrap().working_bank_with_scheduler();

// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker()
Expand Down Expand Up @@ -552,21 +552,24 @@ fn main() {
poh_time.stop();

let mut new_bank_time = Measure::start("new_bank");
if let Some((result, _timings)) = bank.wait_for_completed_scheduler() {
assert_matches!(result, Ok(_));
}
let new_slot = bank.slot() + 1;
let new_bank = Bank::new_from_parent(bank, &collector, new_slot);
let new_bank = Bank::new_from_parent(bank.clone(), &collector, new_slot);
new_bank_time.stop();

let mut insert_time = Measure::start("insert_time");
bank_forks.write().unwrap().insert(new_bank);
bank = bank_forks.read().unwrap().working_bank();
assert_matches!(poh_recorder.read().unwrap().bank(), None);
update_bank_forks_and_poh_recorder_for_new_tpu_bank(
&bank_forks,
&poh_recorder,
new_bank,
false,
);
bank = bank_forks.read().unwrap().working_bank_with_scheduler();
assert_matches!(poh_recorder.read().unwrap().bank(), Some(_));
insert_time.stop();

assert!(poh_recorder.read().unwrap().bank().is_none());
poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());
assert!(poh_recorder.read().unwrap().bank().is_some());
debug!(
"new_bank_time: {}us insert_time: {}us poh_time: {}us",
new_bank_time.as_us(),
Expand Down
14 changes: 14 additions & 0 deletions banking-stage-ingress-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "agave-banking-stage-ingress-types"
description = "Agave banking stage ingress types"
documentation = "https://docs.rs/agave-banking-stage-ingress-types"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are basically internal types (at least for now); I'm not sure we should publish this crate, wdyt?
i.e. add publish = false here

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe its' fine since it would make it easier for someone to write their own banking stage replacement (good)

[dependencies]
crossbeam-channel = { workspace = true }
solana-perf = { workspace = true }
4 changes: 4 additions & 0 deletions banking-stage-ingress-types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
use {crossbeam_channel::Receiver, solana_perf::packet::PacketBatch, std::sync::Arc};

pub type BankingPacketBatch = Arc<Vec<PacketBatch>>;
pub type BankingPacketReceiver = Receiver<BankingPacketBatch>;
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ edition = { workspace = true }
codecov = { repository = "solana-labs/solana", branch = "master", service = "github" }

[dependencies]
agave-banking-stage-ingress-types = { workspace = true }
ahash = { workspace = true }
anyhow = { workspace = true }
arrayvec = { workspace = true }
assert_matches = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
bs58 = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![feature(test)]

use {
agave_banking_stage_ingress_types::BankingPacketBatch,
solana_core::{banking_trace::Channels, validator::BlockProductionMethod},
solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction},
};
Expand All @@ -24,7 +25,7 @@ use {
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
BankingStage, BankingStageStats,
},
banking_trace::{BankingPacketBatch, BankingTracer},
banking_trace::BankingTracer,
},
solana_entry::entry::{next_hash, Entry},
solana_gossip::cluster_info::{ClusterInfo, Node},
Expand Down
5 changes: 3 additions & 2 deletions core/benches/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
extern crate test;

use {
agave_banking_stage_ingress_types::BankingPacketBatch,
solana_core::banking_trace::{
for_test::{
drop_and_clean_temp_dir_unless_suppressed, sample_packet_batch, terminate_tracer,
},
receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, Channels,
TraceError, TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
receiving_loop_with_minimized_sender_overhead, BankingTracer, Channels, TraceError,
TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
},
std::{
path::PathBuf,
Expand Down
25 changes: 16 additions & 9 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#![cfg(feature = "dev-context-only-utils")]
use {
crate::{
banking_stage::{BankingStage, LikeClusterInfo},
banking_stage::{
update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, LikeClusterInfo,
},
banking_trace::{
BankingPacketBatch, BankingTracer, ChannelLabel, Channels, TimedTracedEvent,
TracedEvent, TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
BASENAME,
BankingTracer, ChannelLabel, Channels, TimedTracedEvent, TracedEvent, TracedSender,
TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME,
},
validator::BlockProductionMethod,
},
agave_banking_stage_ingress_types::BankingPacketBatch,
assert_matches::assert_matches,
bincode::deserialize_from,
crossbeam_channel::{unbounded, Sender},
itertools::Itertools,
Expand Down Expand Up @@ -450,6 +453,9 @@ impl SimulatorLoop {
info!("Bank::new_from_parent()!");

logger.log_jitter(&bank);
if let Some((result, _execute_timings)) = bank.wait_for_completed_scheduler() {
assert_matches!(result, Ok(()));
}
bank.freeze();
let new_slot = if bank.slot() == self.parent_slot {
info!("initial leader block!");
Expand Down Expand Up @@ -484,16 +490,17 @@ impl SimulatorLoop {
logger.log_frozen_bank_cost(&bank);
}
self.retransmit_slots_sender.send(bank.slot()).unwrap();
self.bank_forks.write().unwrap().insert(new_bank);
update_bank_forks_and_poh_recorder_for_new_tpu_bank(
&self.bank_forks,
&self.poh_recorder,
new_bank,
false,
);
bank = self
.bank_forks
.read()
.unwrap()
.working_bank_with_scheduler();
self.poh_recorder
.write()
.unwrap()
.set_bank(bank.clone_with_scheduler(), false);
} else {
logger.log_ongoing_bank_cost(&bank);
}
Expand Down
23 changes: 20 additions & 3 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//! to construct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.

#[cfg(feature = "dev-context-only-utils")]
use qualifier_attr::qualifiers;
use {
self::{
committer::Committer,
Expand All @@ -23,9 +25,9 @@ use {
scheduler_controller::SchedulerController, scheduler_error::SchedulerError,
},
},
banking_trace::BankingPacketReceiver,
validator::BlockProductionMethod,
},
agave_banking_stage_ingress_types::BankingPacketReceiver,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
histogram::Histogram,
solana_client::connection_cache::ConnectionCache,
Expand All @@ -35,7 +37,7 @@ use {
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
Expand Down Expand Up @@ -716,11 +718,26 @@ impl BankingStage {
}
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn update_bank_forks_and_poh_recorder_for_new_tpu_bank(
Copy link
Member Author

@ryoqun ryoqun Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bank_forks: &RwLock<BankForks>,
poh_recorder: &RwLock<PohRecorder>,
tpu_bank: Bank,
track_transaction_indexes: bool,
) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not returning BankWithScheduler is intentional because production code (i.e. relevant ReplayingStage) doesn't need it, while making dev call-sites more verbose a bit.

let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
poh_recorder
.write()
.unwrap()
.set_bank(tpu_bank, track_transaction_indexes);
}

#[cfg(test)]
mod tests {
use {
super::*,
crate::banking_trace::{BankingPacketBatch, BankingTracer, Channels},
crate::banking_trace::{BankingTracer, Channels},
agave_banking_stage_ingress_types::BankingPacketBatch,
crossbeam_channel::{unbounded, Receiver},
itertools::Itertools,
solana_entry::entry::{self, Entry, EntrySlice},
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage/packet_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
packet_filter::PacketFilterFailure,
},
crate::banking_trace::{BankingPacketBatch, BankingPacketReceiver},
agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
crossbeam_channel::RecvTimeoutError,
solana_perf::packet::PacketBatch,
solana_sdk::saturating_add_assign,
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage/packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
unprocessed_transaction_storage::UnprocessedTransactionStorage,
BankingStageStats,
},
crate::banking_trace::BankingPacketReceiver,
agave_banking_stage_ingress_types::BankingPacketReceiver,
crossbeam_channel::RecvTimeoutError,
solana_measure::{measure::Measure, measure_us},
solana_sdk::{saturating_add_assign, timing::timestamp},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,19 +435,17 @@ impl<C: LikeClusterInfo, R: ReceiveAndBuffer> SchedulerController<C, R> {
mod tests {
use {
super::*,
crate::{
banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
packet_deserializer::PacketDeserializer,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphSchedulerConfig,
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
},
crate::banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
packet_deserializer::PacketDeserializer,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphSchedulerConfig,
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
},
banking_trace::BankingPacketBatch,
},
agave_banking_stage_ingress_types::BankingPacketBatch,
crossbeam_channel::{unbounded, Receiver, Sender},
itertools::Itertools,
solana_gossip::cluster_info::ClusterInfo,
Expand Down
4 changes: 1 addition & 3 deletions core/src/banking_trace.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use {
agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
bincode::serialize_into,
chrono::{DateTime, Local},
crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError},
rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender},
solana_perf::packet::PacketBatch,
solana_sdk::{hash::Hash, slot_history::Slot},
std::{
fs::{create_dir_all, remove_dir_all},
Expand All @@ -19,9 +19,7 @@ use {
thiserror::Error,
};

pub type BankingPacketBatch = Arc<Vec<PacketBatch>>;
pub type BankingPacketSender = TracedSender;
pub type BankingPacketReceiver = Receiver<BankingPacketBatch>;
pub type TracerThreadResult = Result<(), TraceError>;
pub type TracerThread = Option<JoinHandle<TracerThreadResult>>;
pub type DirByteLimit = u64;
Expand Down
3 changes: 2 additions & 1 deletion core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use {
crate::{
banking_trace::{BankingPacketBatch, BankingPacketSender},
banking_trace::BankingPacketSender,
consensus::vote_stake_tracker::VoteStakeTracker,
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
replay_stage::DUPLICATE_THRESHOLD,
result::{Error, Result},
sigverify,
},
agave_banking_stage_ingress_types::BankingPacketBatch,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Select, Sender},
log::*,
solana_gossip::{
Expand Down
Loading
Loading