-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connection Manager Rewrite #806
Connection Manager Rewrite #806
Conversation
7ca5f4b
to
3ef3925
Compare
3ef3925
to
5850758
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I feel like I must be overlooking something lol. Deferring to @allada 😅
Reviewed 4 of 5 files at r1, 1 of 1 files at r2, all commit messages.
Reviewable status: 1 of 1 LGTMs obtained, and 1 discussions need to be resolved
nativelink-util/src/grpc_utils.rs
line 94 at r2 (raw file):
/// Connected channels that are waiting for poll_ready for the Service. /// This has a key of the endpoint index and iteration to the Channel value. pending_channels: HashMap<ChannelIdentifier, Channel>,
nit: I believe it's safe to use ahash::HashMap
instead of the std::collections::HashMap
here: https://github.com/tkaitchuck/ahash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 1 LGTMs obtained, and 26 discussions need to be resolved
a discussion (no related file):
It feels like we are doing a lot of book keeping with connecting_channels
, available_channels
, pending_channels
, connecting_channels
, and waiting_connections
. I feel like there's a pattern that could have this auto-managed for us.
Maybe we can use a Semaphore
or tokio::sync::watch
to lease out connections & channels and let drop()
put them back into the available pool.
nativelink-config/src/schedulers.rs
line 145 at r2 (raw file):
/// The number of connections to make to each specified endpoint to balance /// the load over multiple TCP connections. Zero is always changed to 1.
nit: Suggest also saying Default: 1
nativelink-config/src/stores.rs
line 565 at r2 (raw file):
/// The number of connections to make to each specified endpoint to balance /// the load over multiple TCP connections. Zero is always changed to 1.
ditto.
nativelink-scheduler/src/grpc_scheduler.rs
line 168 at r2 (raw file):
self.perform_request(instance_name, |instance_name| async move { // Not in the cache, lookup the capabilities with the upstream. let channel = self.connection_manager.connection().await?;
nit: Can we .err_tip()
here for debug tracing?
nativelink-scheduler/src/grpc_scheduler.rs
line 221 at r2 (raw file):
let result_stream = self .perform_request(request, |request| async move { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-scheduler/src/grpc_scheduler.rs
line 241 at r2 (raw file):
let result_stream = self .perform_request(request, |request| async move { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-store/src/grpc_store.rs
line 149 at r2 (raw file):
request.instance_name = self.instance_name.clone(); self.perform_request(request, |request| async move { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-store/src/grpc_store.rs
line 170 at r2 (raw file):
request.instance_name = self.instance_name.clone(); self.perform_request(request, |request| async move { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-store/src/grpc_store.rs
line 191 at r2 (raw file):
request.instance_name = self.instance_name.clone(); self.perform_request(request, |request| async move { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-store/src/grpc_store.rs
line 212 at r2 (raw file):
request.instance_name = self.instance_name.clone(); self.perform_request(request, |request| async move { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-store/src/grpc_store.rs
line 235 at r2 (raw file):
request: ReadRequest, ) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-store/src/grpc_store.rs
line 349 at r2 (raw file):
self.perform_request(request, |request| async move { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-store/src/grpc_store.rs
line 365 at r2 (raw file):
request.instance_name = self.instance_name.clone(); self.perform_request(request, |request| async move { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-store/src/grpc_store.rs
line 381 at r2 (raw file):
request.instance_name = self.instance_name.clone(); self.perform_request(request, |request| async move { let channel = self.connection_manager.connection().await?;
ditto.
nativelink-util/src/grpc_utils.rs
line 1 at r2 (raw file):
// Copyright 2024 The NativeLink Authors. All rights reserved.
It looks like this file is now all about GrpcConnectionManager
. We should probably rename this file.
nativelink-util/src/grpc_utils.rs
line 32 at r2 (raw file):
pub struct ConnectionManager { // The channel to request connections from the worker. worker_tx: UnboundedSender<WorkerRequest>,
It feels like it would be a good idea to use a bounded channel instead so backpressure can be created. I'm always afraid of unbounded channels for this reason. If we did I suggest don't make it configurable just hard code a high limit. Thoughts?
nativelink-util/src/grpc_utils.rs
line 36 at r2 (raw file):
/// The requests that can be made from the ConnectionManager to the /// ConnectionManagerWorker such as requesting
nit: Incomplete comment.
nativelink-util/src/grpc_utils.rs
line 47 at r2 (raw file):
endpoint_index: usize, /// A unique identifier for this particular connection to the Endpoint. iteration: usize,
nit: Lets rename this connection_index
. It took me a while to figure out what this was.
nativelink-util/src/grpc_utils.rs
line 82 at r2 (raw file):
/// The endpoints to establish Channels and the current iteration of /// connection to that endpoint. endpoints: Vec<(usize, Endpoint)>,
nit: Lets alias this usize
to something like EndpointIndex
(Maybe do the same for above usize
s).
nativelink-util/src/grpc_utils.rs
line 87 at r2 (raw file):
/// If the number of connections are restricted, then the number of /// connections that are currently allowed to be made. available_connections: Option<usize>,
nit: Lets make this a usize
and make it usize::MAX
if zero.
Or maybe better yet, always enforce some sane limit. I personally like enforcing limits on uses by default and let them put higher/lower values when they discover limitations.
nativelink-util/src/grpc_utils.rs
line 94 at r2 (raw file):
Previously, aaronmondal (Aaron Siddhartha Mondal) wrote…
nit: I believe it's safe to use
ahash::HashMap
instead of thestd::collections::HashMap
here: https://github.com/tkaitchuck/ahash
We use hashbrown::HashMap
for non-std hashmaps.
nativelink-util/src/grpc_utils.rs
line 103 at r2 (raw file):
/// set of Endpoints. This will restrict the number of concurrent requests /// assuming that the user of this connection manager uses the connection /// only once and reports all errors.
What does the last sentence mean here? I'm not sure I understand. Shouldn't this be wrapped to prevent a user from abusing it?
nativelink-util/src/grpc_utils.rs
line 121 at r2 (raw file):
waiting_connections: VecDeque::new(), }; let connections_per_endpoint = if connections_per_endpoint > 0 {
nit: For readability, just make connections_per_endpoint
mut and modify it in the if-statement.
nativelink-util/src/grpc_utils.rs
line 148 at r2 (raw file):
impl ConnectionManagerWorker { async fn execute(
nit: suggest better name. Maybe service_requests
?
nativelink-util/src/grpc_utils.rs
line 187 at r2 (raw file):
_ = self.connect_next() => {} } }
nit: Add // Unreachable.
after the loop.
nativelink-util/src/grpc_utils.rs
line 356 at r2 (raw file):
identifier: ChannelIdentifier, /// The wrapped future that actually does the work. inner: channel::ResponseFuture,
nit: Generally it's a good idea to put inner
objects first in the list so the optimizer can combine/inline functions that have the same index offsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 1 LGTMs obtained, and 26 discussions need to be resolved
a discussion (no related file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
It feels like we are doing a lot of book keeping with
connecting_channels
,available_channels
,pending_channels
,connecting_channels
, andwaiting_connections
. I feel like there's a pattern that could have this auto-managed for us.Maybe we can use a
Semaphore
ortokio::sync::watch
to lease out connections & channels and letdrop()
put them back into the available pool.
The concept is heavily based off the implementation in tower, which has to perform all this management. I did start off trying to use the tower implementation, but it didn't give access to the errors and we'd end up with the same compromise as before. I don't think that a watch or a Semaphore would help simplify this at all.
I also considered a state machine, but we actually want different management of the Channel, for example connecting needs to be a monitored future, available needs to be a queue, pending could be simply managed my the Connection, but then we'd need a separate list of errors which can happen asynchronously as Channels are cloned to reuse a connection multiple times. The waiting connections could probably be handled by a Semaphore, but I feel it's much simpler to reason with reconnection attempts.
I designed this to be totally lock free and simply mutable on a single worker with a simple channel input and output. I welcome library suggestions, but I haven't seen anything that fits any parts of the use case yet.
nativelink-util/src/grpc_utils.rs
line 187 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: Add
// Unreachable.
after the loop.
It is reachable when the ConnectionManager is dropped. There could be cleanup work afterwards, none is needed though.
nativelink-util/src/grpc_utils.rs
line 356 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: Generally it's a good idea to put
inner
objects first in the list so the optimizer can combine/inline functions that have the same index offsets.
Oh, nice.. hadn't thought of that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 1 LGTMs obtained, and 26 discussions need to be resolved
nativelink-util/src/grpc_utils.rs
line 32 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
It feels like it would be a good idea to use a bounded channel instead so backpressure can be created. I'm always afraid of unbounded channels for this reason. If we did I suggest don't make it configurable just hard code a high limit. Thoughts?
Yeah, I'd thought that too. I already planned to swap it out, just haven't yet.
nativelink-util/src/grpc_utils.rs
line 36 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: Incomplete comment.
Yeah, apparently I drifted off...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 1 LGTMs obtained, and 24 discussions need to be resolved
a discussion (no related file):
Previously, chrisstaite (Chris) wrote…
The concept is heavily based off the implementation in tower, which has to perform all this management. I did start off trying to use the tower implementation, but it didn't give access to the errors and we'd end up with the same compromise as before. I don't think that a watch or a Semaphore would help simplify this at all.
I also considered a state machine, but we actually want different management of the Channel, for example connecting needs to be a monitored future, available needs to be a queue, pending could be simply managed my the Connection, but then we'd need a separate list of errors which can happen asynchronously as Channels are cloned to reuse a connection multiple times. The waiting connections could probably be handled by a Semaphore, but I feel it's much simpler to reason with reconnection attempts.
I designed this to be totally lock free and simply mutable on a single worker with a simple channel input and output. I welcome library suggestions, but I haven't seen anything that fits any parts of the use case yet.
Could you link which implementation it's based on?
I'm going to look to see if maybe we could upstream some changes to reduce our burden of maintenance. My initial search didn't yield much.
Previously, allada (Nathan (Blaise) Bruer) wrote…
https://docs.rs/tower/latest/src/tower/balance/p2c/service.rs.html#32-45 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 5 files at r1, all commit messages.
Reviewable status: 1 of 1 LGTMs obtained, and 26 discussions need to be resolved
nativelink-util/src/grpc_utils.rs
line 194 at r2 (raw file):
// Sleep for a really long time, this will be cancelled by the // select in execute. tokio::time::sleep(Duration::from_secs(100000)).await;
Which thread is this running in and how many possible threads could hit this sleep?
nativelink-util/src/grpc_utils.rs
line 222 at r2 (raw file):
} self.connecting_channels.push(Box::pin(async move { if is_backoff {
Can we utilize the retry.rs backoff utility code vs one off sleeping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 1 LGTMs obtained, and 26 discussions need to be resolved
nativelink-util/src/grpc_utils.rs
line 194 at r2 (raw file):
Previously, adam-singer (Adam Singer) wrote…
Which thread is this running in and how many possible threads could hit this sleep?
It's not running in any thread, it's a tokio sleep. Essentially, we want to suspend this future forever such that it always returns Pending. It will get cancelled by the select that called it and the function re-entered at which time, connecting_channels can be reassessed.
nativelink-util/src/grpc_utils.rs
line 222 at r2 (raw file):
Previously, adam-singer (Adam Singer) wrote…
Can we utilize the retry.rs backoff utility code vs one off sleeping?
I considered it, but that's a lot of config for reconnecting, also retry.rs wraps a function... Maybe we could do it for the whole future...
5850758
to
5ef37d1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 1 LGTMs obtained, and 18 discussions need to be resolved
nativelink-config/src/stores.rs
line 565 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-scheduler/src/grpc_scheduler.rs
line 221 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-scheduler/src/grpc_scheduler.rs
line 241 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-store/src/grpc_store.rs
line 149 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-store/src/grpc_store.rs
line 170 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-store/src/grpc_store.rs
line 191 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-store/src/grpc_store.rs
line 212 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-store/src/grpc_store.rs
line 235 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-store/src/grpc_store.rs
line 349 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-store/src/grpc_store.rs
line 365 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-store/src/grpc_store.rs
line 381 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
ditto.
Done.
nativelink-util/src/grpc_utils.rs
line 1 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
It looks like this file is now all about
GrpcConnectionManager
. We should probably rename this file.
Done.
nativelink-util/src/grpc_utils.rs
line 32 at r2 (raw file):
Previously, chrisstaite (Chris) wrote…
Yeah, I'd thought that too. I already planned to swap it out, just haven't yet.
Done.
nativelink-util/src/grpc_utils.rs
line 94 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
We use
hashbrown::HashMap
for non-std hashmaps.
I think I'll leave this as-is since the std HashMap is now hashbrown anyway.
nativelink-util/src/grpc_utils.rs
line 103 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
What does the last sentence mean here? I'm not sure I understand. Shouldn't this be wrapped to prevent a user from abusing it?
Yeah, this is a lay-over from the old implementation that required user input, I've re-written this for the new implementation.
nativelink-util/src/grpc_utils.rs
line 222 at r2 (raw file):
Previously, chrisstaite (Chris) wrote…
I considered it, but that's a lot of config for reconnecting, also retry.rs wraps a function... Maybe we could do it for the whole future...
There's a few issues with this, but I think I've overcome them. Firstly, the Retrier
requires the result to be an Error
, so I wrapped it to provide the ConnectionIdentifier
context in the case of error. Second, the Retrier
does not have a concept of infinite retries, so in the case the it runs out of attempts, it simply re-enters. The annoying point is that we have to make a clone of the Retrier
because of the lifetime of self
, connecting_channels
will be dropped before the retrier
and therefore it should be safe, but I can't convince the checker without introducing a lifetime to the worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, Local / ubuntu-22.04, Publish image, Vercel, docker-compose-compiles-nativelink (20.04), pre-commit-checks, ubuntu-20.04 / stable, ubuntu-22.04, vale, and 17 discussions need to be resolved
nativelink-util/src/grpc_utils.rs
line 194 at r2 (raw file):
Previously, chrisstaite (Chris) wrote…
It's not running in any thread, it's a tokio sleep. Essentially, we want to suspend this future forever such that it always returns Pending. It will get cancelled by the select that called it and the function re-entered at which time, connecting_channels can be reassessed.
Moved to futures::future::pending::<()>().await
instead.
52d8b9d
to
c36a4bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 1 LGTMs obtained, and pending CI: Analyze (javascript-typescript), Analyze (python), Bazel Dev / ubuntu-22.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Local / ubuntu-22.04, Publish image, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), integration-tests (20.04), integration-tests (22.04), macos-13, pre-commit-checks, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, vale, windows-2022 / stable, zig-cc ubuntu-20.04, zig-cc ubuntu-22.04, and 17 discussions need to be resolved
a discussion (no related file):
Previously, chrisstaite (Chris) wrote…
https://docs.rs/tower/latest/src/tower/balance/p2c/service.rs.html#32-45
Moved pending_channels
to passing the channel to the Connection
on creation and then passing it back on Connected
or Dropped
using .take()
. The down side is that we don't have visibility of the connection if a transport error happens on one stream first while it's pending in another Connection
, however the error should occur again in the other Connection
and resolve itself, so I don't see it as an issue.
52b3346
to
4ec1cb2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have deployed this out to our fleet and it appears to be functioning well.
Reviewable status: 1 of 1 LGTMs obtained, and 17 discussions need to be resolved
4ec1cb2
to
62cf43d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 5 of 8 files at r3, 2 of 3 files at r4.
Reviewable status: 1 of 1 LGTMs obtained, and pending CI: Analyze (javascript-typescript), Bazel Dev / ubuntu-22.04, Local / ubuntu-22.04, Publish image, Vercel, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), pre-commit-checks, ubuntu-20.04 / stable, and 18 discussions need to be resolved
nativelink-util/src/connection_manager.rs
line 442 at r5 (raw file):
} Err(err) => { debug!("Error while creating connection on channel: {err:?}");
I think this should be warn!
or error!
, is it expected to be noisy?
Previously, adam-singer (Adam Singer) wrote…
Because a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed all commit messages.
Reviewable status: 1 of 1 LGTMs obtained, and pending CI: docker-compose-compiles-nativelink (20.04), integration-tests (20.04), integration-tests (22.04), and 15 discussions need to be resolved
Previously, chrisstaite-menlo (Chris Staite) wrote…
So, it looks like some of this can be implemented by:
Which will make multiple connections to each Then we have three issues left:
|
Previously, chrisstaite (Chris) wrote…
Looks like there might be more issues with the upstream |
The connection manager written in grpc_utils made incorrect assumptions about how the tonic and tower implementations were written and is not suitable for maintaining multiple connections and ensuring stability. Completely rewrite this to manage the tonic::Channel for each tonic::Endpoint itself to make a simpler external API and ensure that connection errors are handled correctly. This is performed by using a single worker loop that manages all of the connections and wrapping each connection to inform state to the worker.
62cf43d
to
3925b37
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 1 LGTMs obtained, and 1 discussions need to be resolved
a discussion (no related file):
FYI, sorry for the delay on this. I'll make sure we get everything done for this this week. I subconsciously was avoiding giving an lgtm until after we cut the 0.3.0
release. It should be out in the next day or two and then I'm happy to land this.
Again sorry, these level of refactors are just super scary to me :-(
nativelink-util/src/grpc_utils.rs
line 94 at r2 (raw file):
Previously, chrisstaite-menlo (Chris Staite) wrote…
I think I'll leave this as-is since the std HashMap is now hashbrown anyway.
🤯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the delay. Now that 0.3.0 is out, I'm happy to land this.
Reviewable status: complete! 2 of 1 LGTMs obtained
Just waiting on @adam-singer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r5, 1 of 1 files at r6, all commit messages.
Reviewable status: complete! 3 of 1 LGTMs obtained
The connection manager written in grpc_utils made incorrect assumptions about how the tonic and tower implementations were written and is not suitable for maintaining multiple connections and ensuring stability. Completely rewrite this to manage the tonic::Channel for each tonic::Endpoint itself to make a simpler external API and ensure that connection errors are handled correctly. This is performed by using a single worker loop that manages all of the connections and wrapping each connection to inform state to the worker.
Description
The connection manager written in grpc_utils made incorrect assumptions about how the tonic and tower implementations were written and is not suitable for maintaining multiple connections and ensuring stability.
Completely rewrite this to manage the tonic::Channel for each tonic::Endpoint itself to make a simpler external API and ensure that connection errors are handled correctly. This is performed by using a single worker loop that manages all of the connections and wrapping each connection to inform state to the worker.
Type of change
Please delete options that aren't relevant.
How Has This Been Tested?
Run up on my US proxy service which is now capable of completing a fresh clean build rather than stalling out on concurrent uploads.
Checklist
bazel test //...
passes locallygit amend
see some docsThis change is