Skip to content

Stop spawn_clean_orderpool_job on shutdown #349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 42 additions & 26 deletions crates/rbuilder/src/live_builder/order_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,33 +334,49 @@ where
let handle = tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually this jobs need to be spawned as blocking and not as async tasks and its time consuming and blocks executor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its probably fine here as this job happens between slots.

info!("Clean orderpool job: started");

while let Some(header) = header_receiver.recv().await {
let block_number = header.number;
set_current_block(block_number);
let state = match provider_factory.latest() {
Ok(state) => state,
Err(err) => {
error!("Failed to get latest state: {}", err);
// @Metric error count
continue;
loop {
tokio::select! {
header = header_receiver.recv() => {
if let Some(header) = header {
let block_number = header.number;
set_current_block(block_number);
let state = match provider_factory.latest() {
Ok(state) => state,
Err(err) => {
error!("Failed to get latest state: {}", err);
// @Metric error count
continue;
}
};

let mut orderpool = orderpool.lock();
let start = Instant::now();

orderpool.head_updated(block_number, &state);

let update_time = start.elapsed();
let (tx_count, bundle_count) = orderpool.content_count();
set_ordepool_count(tx_count, bundle_count);
debug!(
block_number,
tx_count,
bundle_count,
update_time_ms = update_time.as_millis(),
"Cleaned orderpool",
);
} else {
info!("Clean orderpool job: channel ended");
if !global_cancellation.is_cancelled(){
error!("Clean orderpool job: channel ended with no cancellation");
}
break;
}
},
_ = global_cancellation.cancelled() => {
info!("Clean orderpool job: received cancellation signal");
break;
}
};

let mut orderpool = orderpool.lock();
let start = Instant::now();

orderpool.head_updated(block_number, &state);

let update_time = start.elapsed();
let (tx_count, bundle_count) = orderpool.content_count();
set_ordepool_count(tx_count, bundle_count);
debug!(
block_number,
tx_count,
bundle_count,
update_time_ms = update_time.as_millis(),
"Cleaned orderpool",
);
}
}

global_cancellation.cancel();
Expand Down
2 changes: 2 additions & 0 deletions scripts/ci/benchmark-in-ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ function run_benchmark() {

# Switch back to current commit and run benchmarks again
echo "Switching back to $HEAD_SHA_SHORT and running benchmarks ..."
# Reset to ensure any changes (e.g. to Cargo.lock) are discarded before attempting to checkout.
git reset --hard
git checkout $HEAD_SHA
cargo bench --workspace
fi
Expand Down
Loading