Skip to content

Commit

Permalink
Streamline restatectl connection to cluster
Browse files Browse the repository at this point in the history
Summary:
Make it easier and cleaner to run the restatectl command
by accepting the restate nodes addresses via a unified argument `--address`

- Support passing multiple addresses
- The command will try to fetch the nodes configuration from the first reachable node
- All other subcommands can then use the connection info to use the nodes configuration directly
  or to connect to a specific node role
  • Loading branch information
muhamadazmy committed Jan 31, 2025
1 parent 7fab8f6 commit c68d2ad
Show file tree
Hide file tree
Showing 26 changed files with 576 additions and 354 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/core/protobuf/node_ctl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ message GetMetadataResponse {

message ClusterHealthResponse {
string cluster_name = 1;
// Some value if the cluster has been configured to use the embedded metadata store
// Some value if the cluster has been configured to use the embedded metadata
// store
optional EmbeddedMetadataClusterHealth metadata_cluster_health = 2;
}

Expand Down
1 change: 1 addition & 0 deletions tools/restatectl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
strum = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, features = ["transport", "prost"] }
tracing = { workspace = true }
Expand Down
18 changes: 1 addition & 17 deletions tools/restatectl/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::str::FromStr;

use cling::prelude::*;

use restate_cli_util::CliContext;
use restate_cli_util::CommonOpts;
use restate_types::net::AdvertisedAddress;

use crate::commands::cluster::overview::ClusterStatusOpts;
use crate::commands::cluster::Cluster;
Expand All @@ -24,6 +21,7 @@ use crate::commands::node::Nodes;
use crate::commands::partition::Partitions;
use crate::commands::replicated_loglet::ReplicatedLoglet;
use crate::commands::snapshot::Snapshot;
use crate::connection::ConnectionInfo;

#[derive(Run, Parser, Clone)]
#[command(author, version = crate::build_info::version(), about, infer_subcommands = true)]
Expand All @@ -37,20 +35,6 @@ pub struct CliApp {
pub cmd: Command,
}

#[derive(Parser, Collect, Debug, Clone)]
pub struct ConnectionInfo {
// todo: rename this to be a node address for reusability across commands
/// Cluster Controller address
#[clap(
long,
value_hint = clap::ValueHint::Url,
default_value_t = AdvertisedAddress::from_str("http://localhost:5122/").unwrap(),
env = "RESTATE_CLUSTER_CONTROLLER_ADDRESS",
global = true
)]
pub cluster_controller: AdvertisedAddress,
}

#[derive(Run, Subcommand, Clone)]
pub enum Command {
/// Cluster operations
Expand Down
36 changes: 13 additions & 23 deletions tools/restatectl/src/commands/cluster/config/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,37 @@

use clap::Parser;
use cling::{Collect, Run};
use tonic::{codec::CompressionEncoding, Code};
use restate_types::nodes_config::Role;
use tonic::codec::CompressionEncoding;

use restate_admin::cluster_controller::protobuf::{
cluster_ctrl_svc_client::ClusterCtrlSvcClient, GetClusterConfigurationRequest,
};
use restate_cli_util::c_println;

use crate::{
app::ConnectionInfo, commands::cluster::config::cluster_config_string, util::grpc_channel,
};
use crate::{commands::cluster::config::cluster_config_string, connection::ConnectionInfo};

#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "config_get")]
pub struct ConfigGetOpts {}

async fn config_get(connection: &ConnectionInfo, _get_opts: &ConfigGetOpts) -> anyhow::Result<()> {
let channel = grpc_channel(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

let response = client
.get_cluster_configuration(GetClusterConfigurationRequest {})
.await;

let response = match response {
Ok(response) => response,
Err(status) if status.code() == Code::NotFound => {
c_println!("👻 Cluster is not configured");
return Ok(());
}
Err(status) => {
anyhow::bail!("Failed to get cluster configuration: {status}");
}
};
let response = connection
.try_each(Some(Role::Admin), |channel| async {
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

client
.get_cluster_configuration(GetClusterConfigurationRequest {})
.await
})
.await?;

let configuration = response.into_inner();
let cluster_configuration = configuration.cluster_configuration.expect("is set");

let output = cluster_config_string(&cluster_configuration)?;

c_println!("{}", output);

Ok(())
}
31 changes: 21 additions & 10 deletions tools/restatectl/src/commands/cluster/config/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use anyhow::Context;
use clap::Parser;
use cling::{Collect, Run};
use restate_types::nodes_config::Role;
use tonic::codec::CompressionEncoding;

use restate_admin::cluster_controller::protobuf::SetClusterConfigurationRequest;
Expand All @@ -25,7 +26,7 @@ use restate_types::replication::ReplicationProperty;

use crate::commands::cluster::config::cluster_config_string;
use crate::commands::cluster::provision::extract_default_provider;
use crate::{app::ConnectionInfo, util::grpc_channel};
use crate::connection::ConnectionInfo;

#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "config_set")]
Expand All @@ -51,12 +52,16 @@ pub struct ConfigSetOpts {
}

async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> anyhow::Result<()> {
let channel = grpc_channel(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

let response = client
.get_cluster_configuration(GetClusterConfigurationRequest {})
let response = connection
.try_each(Some(Role::Admin), |channel| async {
let mut client = ClusterCtrlSvcClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip);

client
.get_cluster_configuration(GetClusterConfigurationRequest {})
.await
})
.await
.context("Failed to get cluster configuration")?
.into_inner();
Expand Down Expand Up @@ -123,10 +128,16 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an
cluster_configuration: Some(current),
};

client
.set_cluster_configuration(request)
connection
.try_each(Some(Role::Admin), |channel| async {
let mut client = ClusterCtrlSvcClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip);

client.set_cluster_configuration(request.clone()).await
})
.await
.map_err(|err| anyhow::anyhow!("Failed to set configuration: {}", err.message()))?;
.context("Failed to set configuration")?;

c_println!("✅ Configuration updated successfully");

Expand Down
2 changes: 1 addition & 1 deletion tools/restatectl/src/commands/cluster/overview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use cling::{Collect, Run};

use restate_cli_util::c_println;

use crate::app::ConnectionInfo;
use crate::commands::log::list_logs::{list_logs, ListLogsOpts};
use crate::commands::node::list_nodes::{list_nodes, ListNodesOpts};
use crate::commands::partition::list::{list_partitions, ListPartitionsOpts};
use crate::connection::ConnectionInfo;

#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "cluster_status")]
Expand Down
16 changes: 4 additions & 12 deletions tools/restatectl/src/commands/cluster/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::app::ConnectionInfo;
use crate::commands::cluster::config::cluster_config_string;
use crate::util::grpc_channel;
use crate::connection::ConnectionInfo;
use clap::Parser;
use cling::{Collect, Run};
use restate_cli_util::ui::console::confirm_or_exit;
use restate_cli_util::{c_error, c_println, c_warn};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest;
use restate_types::logs::metadata::{ProviderConfiguration, ProviderKind, ReplicatedLogletConfig};
use restate_types::net::AdvertisedAddress;
use restate_types::replication::ReplicationProperty;
use std::num::NonZeroU16;
use tonic::codec::CompressionEncoding;
Expand All @@ -27,10 +25,6 @@ use tonic::Code;
#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "cluster_provision")]
pub struct ProvisionOpts {
/// Address of the node that should be provisioned
#[clap(long)]
address: Option<AdvertisedAddress>,

/// Number of partitions
#[clap(long)]
num_partitions: Option<NonZeroU16>,
Expand Down Expand Up @@ -59,11 +53,9 @@ async fn cluster_provision(
connection_info: &ConnectionInfo,
provision_opts: &ProvisionOpts,
) -> anyhow::Result<()> {
let node_address = provision_opts
.address
.clone()
.unwrap_or_else(|| connection_info.cluster_controller.clone());
let channel = grpc_channel(node_address.clone());
let Some((_, channel)) = connection_info.channels().next() else {
anyhow::bail!("At least one node address is required");
};

let mut client = NodeCtlSvcClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
Expand Down
61 changes: 20 additions & 41 deletions tools/restatectl/src/commands/log/describe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,28 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use anyhow::Context;
use cling::prelude::*;
use itertools::Itertools;
use log::render_loglet_params;

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient;
use restate_admin::cluster_controller::protobuf::{DescribeLogRequest, ListLogsRequest};
use restate_cli_util::_comfy_table::{Cell, Color, Table};
use restate_cli_util::c_println;
use restate_cli_util::ui::console::StyledTable;
use restate_types::logs::metadata::{Chain, Logs, ProviderKind, Segment, SegmentIndex};
use restate_types::logs::metadata::{Logs, ProviderKind, Segment, SegmentIndex};
use restate_types::logs::LogId;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams};
use restate_types::storage::StorageCodec;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use restate_types::Versioned;

use super::LogIdRange;
use crate::app::ConnectionInfo;
use super::{get_logs, LogIdRange};
use crate::commands::log;
use crate::util::grpc_channel;
use crate::connection::ConnectionInfo;

#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "describe_logs")]
pub struct DescribeLogIdOpts {
/// The log id or range to describe, e.g. "0", "1-4"; all logs are shown by default
#[arg( value_parser = value_parser!(LogIdRange))]
#[arg(value_parser = value_parser!(LogIdRange))]
log_id: Vec<LogIdRange>,

/// The first segment id to display
Expand Down Expand Up @@ -71,18 +66,11 @@ async fn describe_logs(
connection: &ConnectionInfo,
opts: &DescribeLogIdOpts,
) -> anyhow::Result<()> {
let channel = grpc_channel(connection.cluster_controller.clone());
let nodes_config = connection.get_nodes_configuration().await?;

let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);
let logs = get_logs(connection).await?;

let log_ids = if opts.log_id.is_empty() {
let list_response = client
.list_logs(ListLogsRequest::default())
.await?
.into_inner();
let mut buf = list_response.logs;
let logs = StorageCodec::decode::<Logs, _>(&mut buf)?;
logs.iter()
.sorted_by(|a, b| Ord::cmp(a.0, b.0))
.map(|(id, _)| LogIdRange::from(id))
Expand All @@ -93,25 +81,24 @@ async fn describe_logs(

for range in log_ids {
for log_id in range.iter() {
describe_log(log_id, &mut client, opts).await?;
describe_log(opts, &nodes_config, &logs, log_id.into()).await?;
}
}

Ok(())
}

async fn describe_log(
log_id: u32,
client: &mut ClusterCtrlSvcClient<Channel>,
opts: &DescribeLogIdOpts,
nodes_configuration: &NodesConfiguration,
logs: &Logs,
log_id: LogId,
) -> anyhow::Result<()> {
let req = DescribeLogRequest { log_id };
let mut response = client.describe_log(req).await?.into_inner();

let mut buf = response.chain.clone();
let chain = StorageCodec::decode::<Chain, _>(&mut buf).context("Failed to decode log chain")?;
let chain = logs
.chain(&log_id)
.ok_or_else(|| anyhow::anyhow!("Failed to get log chain"))?;

c_println!("Log Id: {} (v{})", log_id, response.logs_version);
c_println!("Log Id: {} ({})", log_id, logs.version());

let mut chain_table = Table::new_styled();
let mut header_row = vec![
Expand All @@ -129,11 +116,7 @@ async fn describe_log(
}
chain_table.set_styled_header(header_row);

let last_segment = chain
.iter()
.last()
.map(|s| s.index())
.unwrap_or(SegmentIndex::from(u32::MAX));
let last_segment = chain.tail_index();

let mut first_segment_rendered = None;
let mut last_segment_rendered = None;
Expand All @@ -152,17 +135,13 @@ async fn describe_log(
};

let segments: Box<dyn Iterator<Item = Segment>> = if opts.all {
Box::new(segments)
segments
} else if opts.head.is_some() {
Box::new(segments.take(opts.head.unwrap()))
} else {
Box::new(segments.tail(opts.tail.unwrap()))
};

let nodes_configuration =
StorageCodec::decode::<NodesConfiguration, _>(&mut response.nodes_configuration)
.context("Failed to decode nodes configuration")?;

for segment in segments {
if first_segment_rendered.is_none() {
first_segment_rendered = Some(segment.index());
Expand All @@ -182,10 +161,10 @@ async fn describe_log(
render_loglet_params(&params, |p| Cell::new(p.loglet_id)),
render_loglet_params(&params, |p| Cell::new(format!("{:#}", p.replication))),
render_loglet_params(&params, |p| {
render_sequencer(is_tail_segment, p, &nodes_configuration)
render_sequencer(is_tail_segment, p, nodes_configuration)
}),
render_loglet_params(&params, |p| {
render_effective_nodeset(is_tail_segment, p, &nodes_configuration)
render_effective_nodeset(is_tail_segment, p, nodes_configuration)
}),
];
if opts.extra {
Expand Down
Loading

0 comments on commit c68d2ad

Please sign in to comment.