-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streamline restatectl connection to cluster #2592
base: main
Are you sure you want to change the base?
Conversation
c68d2ad
to
512163f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice work @muhamadazmy. I like how the ConnectionInfo
now contains the logic to retrieve the available nodes :-)
I left a few smaller comments. The one thing which is potentially a bit dangerous is how the cluster-provision
works right now. It would be great if we can ensure that it behaves idempotently when retrying it.
let Some((_, channel)) = connection_info.channels().next() else { | ||
anyhow::bail!("At least one node address is required"); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we have to make sure for the cluster-provision command is that the call is idempotent. So if the user retries, then it shouldn't result into another node being provisioned (unless he explicitly ask us to do so). Accidentally provisioning multiple nodes is a very tricky situation to be in. Moreover, if the user is using the embedded metadata store, then he needs to hit a node that has the metadata-server
role. If he uses an external metadata store, then he can hit any node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tillrohrmann I understand. This is why I am only taking the first address provided by the command line. This should be idempotent unless the user changed the params they are providing to the command line.
An improvement maybe to fail if multiple addresses are given and ask the user to only pass one and to stick to it maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite following this discussion - this calls into NodeCtlSvcHandler::provision_cluster
, from where we make a series of updates to cluster metadata using the metadata client configuration on the node on which we land. The updates themselves are conditional updates to versioned metadata values, and we appear to cater for retries. In particular, writing the initial nodes config is conditional on Precondition::DoesNotExist
. What am I missing?
chain_table.add_row(vec![ | ||
Cell::new(log_id), | ||
Cell::new(err.code()).fg(Color::DarkRed), | ||
Cell::new(err.message()).fg(Color::DarkRed), | ||
]); | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intended to remove this logic if cannot find the tail for a given log_id
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is intended. I can partially restore it
let mut response = connection | ||
.try_each(None, |channel| async { | ||
let mut client = NodeCtlSvcClient::new(channel) | ||
.accept_compressed(CompressionEncoding::Gzip) | ||
.send_compressed(CompressionEncoding::Gzip); | ||
|
||
client.get_metadata(req).await | ||
}) | ||
.await? | ||
.into_inner(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make sure to get the latest Logs
if we can obtain it from multiple nodes?
.try_each(None, |channel| async { | ||
let mut client = NodeCtlSvcClient::new(channel) | ||
.accept_compressed(CompressionEncoding::Gzip) | ||
.send_compressed(CompressionEncoding::Gzip); | ||
|
||
client.get_metadata(req).await | ||
}) | ||
.await? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here regarding what is the latest Logs
version.
tools/restatectl/src/connection.rs
Outdated
None => Either::Right(nodes_config.iter()), | ||
} | ||
.sorted_by(|left, right| { | ||
// nodes that we already have channels open for, gets higher presence. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// nodes that we already have channels open for, gets higher presence. | |
// nodes that we already have channels open for get higher presence. |
tools/restatectl/src/connection.rs
Outdated
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum ConnectionInfoError { | ||
#[error("Could not retrieve nodes configuration. Possible un provisioned cluster!")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[error("Could not retrieve nodes configuration. Possible un provisioned cluster!")] | |
#[error("Could not retrieve nodes configuration. Has the cluster been provisioned yet?")] |
tools/restatectl/src/connection.rs
Outdated
#[derive(Debug, thiserror::Error)] | ||
pub enum ConnectionInfoError { | ||
#[error("Could not retrieve nodes configuration. Possible un provisioned cluster!")] | ||
MissingNodesConfiguration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow-up: Wondering whether a CodedError with more details and a short description how to provision the cluster could be helpful here.
tools/restatectl/src/connection.rs
Outdated
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
writeln!( | ||
f, | ||
"Encountered multiple errors trying to retrieve nodes configurations:" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Encountered multiple errors trying to retrieve nodes configurations:" | |
"Encountered multiple errors trying to retrieve nodes' configuration:" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We seem to use NodesErrors
also for the try_each
call where we are not necessarily trying to retrieve the NodesConfiguration
. Maybe this needs an update of the Display
implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice improvement, @muhamadazmy! Some nitpicks but overall looks good to me.
One question I had was whether it's useful to report the node from which we obtained the response reported by a particular invocation? The node sorting should make that somewhat repeatable but it may help operators in case different nodes have diverged in their views.
tools/restatectl/src/connection.rs
Outdated
None => Either::Right(nodes_config.iter()), | ||
} | ||
.sorted_by(|left, right| { | ||
// nodes that we already have channels open for, gets higher presence. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe?
// nodes that we already have channels open for, gets higher presence. | |
// nodes for which we already have open channels get higher precedence. |
tools/restatectl/src/connection.rs
Outdated
@@ -0,0 +1,267 @@ | |||
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. | |
// Copyright (c) 2025 Restate Software, Inc., Restate GmbH. |
pub async fn try_each<F, T, Fut>( | ||
&self, | ||
role: Option<Role>, | ||
mut closure: F, | ||
) -> Result<Response<T>, ConnectionInfoError> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we only ever construct either ClusterCtrlSvcClient
or NodeCtlSvcClient
maybe it would be useful to just have two convenience wrappers that pass the closure a pre-instantiated client? That would eliminate a fair amount of duplicated code across the various commands.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually tried that but it's much harder than it sounds because of the compiler complaining about captured variable cannot escape FnMut
closure body, etc.. so I let it go for now. I will definitely try to do this later.
tools/restatectl/src/connection.rs
Outdated
Some(role) => Either::Left(nodes_config.iter_role(role)), | ||
None => Either::Right(nodes_config.iter()), | ||
} | ||
.sorted_by(|left, right| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The left/right naming threw me for a second because the stream is of type Either
- I'd maybe just use a
/b
here instead :-)
tools/restatectl/src/connection.rs
Outdated
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum ConnectionInfoError { | ||
#[error("Could not retrieve nodes configuration. Possible un provisioned cluster!")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unprovisioned sounds a bit awkward, maybe something like this is more consistent with the tone we take elsewhere?
#[error("Could not retrieve nodes configuration. Possible un provisioned cluster!")] | |
#[error("Could not retrieve nodes configuration. Have you provisioned this cluster?")] |
tools/restatectl/src/connection.rs
Outdated
} | ||
|
||
if !answer { | ||
// all nodes has errored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: *have! :-)
// all nodes has errored | |
// all nodes have returned error |
}; | ||
|
||
let mut response = match client.get_metadata(req).await { | ||
Ok(response) => response.into_inner(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should somehow inform the operator about what node was used to render the response? Maybe not by default but only if debug logging / verbose mode is active?
16571cc
to
d3e4791
Compare
Summary: Make it easier and cleaner to run the restatectl command by accepting the restate nodes addresses via a unified argument `--address` - Support passing multiple addresses - The command will try to fetch the nodes configuration from the first reachable node - All other subcommands can then use the connection info to use the nodes configuration directly or to connect to a specific node role
Thank you @pcholakov for your very helpful review. I think I processed all your comments. Thanks again :) |
Thank you so much @tillrohrmann for your insightful review as always. I already processed all your comments, specially the ones regarding Logs version. I did some refactoring on how we fetch the logs. for the Would love if you can do another pass over the code. Thanks again |
Streamline restatectl connection to cluster
Summary:
Make it easier and cleaner to run the restatectl command
by accepting the restate nodes addresses via a unified argument
--address
or to connect to a specific node role