Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Enable concurrent connections between nodes #2664

Merged
merged 2 commits into from
Feb 8, 2025
Merged

Conversation

AhmedSoliman
Copy link
Contributor

@AhmedSoliman AhmedSoliman commented Feb 7, 2025

This enables nodes to maintain concurrent connections across different actual TCP connections to increase message processing concurrency. This is controlled by a new configuration option in [networking] section.

This also tags a few operations with tokio::task::unconstrained to reduce unnecessary coop-driven yields that happen at some hot-paths.

// intentionally empty

Stack created with Sapling. Best reviewed with ReviewStack.

Copy link

github-actions bot commented Feb 7, 2025

Test Results

  7 files  ±0    7 suites  ±0   2m 55s ⏱️ - 1m 41s
 47 tests ±0   46 ✅ ±0  1 💤 ±0  0 ❌ ±0 
182 runs  ±0  179 ✅ ±0  3 💤 ±0  0 ❌ ±0 

Results for commit 96b61a0. ± Comparison against base commit 765636b.

♻️ This comment has been updated with latest results.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. +1 for merging :-)

Comment on lines +219 to +221
// If we are already at the metadata version, avoid tokio's yielding to
// improve tail latencies when this is used in latency-sensitive operations.
let v = tokio::task::unconstrained(recv.wait_for(|v| *v >= min_version))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL

@@ -669,8 +687,8 @@ where
global::get_text_map_propagator(|propagator| propagator.extract(span_ctx))
});

if let Err(e) = router
.call(
if let Err(e) = tokio::task::unconstrained(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment why this task should be unconstrained for my future self.

@@ -421,7 +421,7 @@ impl<M: Targeted + WireEncode> Outgoing<M, HasConnection> {
let connection = bail_on_error!(self, self.try_upgrade());
let permit = bail_on_none!(
self,
connection.reserve().await,
tokio::task::unconstrained(connection.reserve()).await,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you observe an effect of cooperative scheduling w/o unconstrained?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, although it of course varies depending on the workload. I was able to observe 30% improvement in tail latencies of GetSequencerState processing when the system is under heavy load.

Comment on lines +83 to +85
#[strum(props(runtime = "default"))]
SocketHandler,
#[strum(props(OnError = "log"))]
#[strum(props(OnError = "log", runtime = "default"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you observe an effect of cross runtime overhead (global task queue delays)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two reasons for this change:
1- We don't want connections to drop when a partition processor is terminated
2- The majority of writers to the network are running in default runtime (bifrost, syncing metadata, cluster controller) the only exception to this is ingress. The receiving end has somewhat similar, albeit slightly different story.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer your question, the effect that I measured was the unnecessary drops in connections during leadership changes which may cause extra retries has disappeared.

Comment on lines 69 to 70
/// This is used as a guiding value for how many connections every node can
/// maintain with its peers. With more connections, concurrency of network message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "maintain with each peer" to make it clearer that it's the number of connections per peer. Maybe num_concurrent_connections_per_peer (a bit of a mouthful)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the documentation but I'll probably stick with the short name.

This enables nodes to maintain concurrent connections across different actual TCP connections to increase message processing concurrency. This is controlled by a new configuration option in `[networking]` section.

This also tags a few operations with `tokio::task::unconstrained` to reduce unnecessary coop-driven yields that happen at some hot-paths.

```
// intentionally empty
```
@AhmedSoliman AhmedSoliman merged commit 45e4664 into main Feb 8, 2025
34 checks passed
@AhmedSoliman AhmedSoliman deleted the pr2664 branch February 8, 2025 12:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants