From dbe67fedfe4eaf1879cf9cbfd44c65f008a85066 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Sat, 1 Feb 2025 10:11:38 +0100 Subject: [PATCH] Streamline restatectl connection to cluster Summary: Make it easier and cleaner to run the restatectl command by accepting the restate nodes addresses via a unified argument `--address` - Support passing multiple addresses - The command will try to fetch the nodes configuration from the first reachable node - All other subcommands can then use the connection info to use the nodes configuration directly or to connect to a specific node role --- Cargo.lock | 1 + crates/admin/protobuf/cluster_ctrl_svc.proto | 9 - .../cluster_controller/grpc_svc_handler.rs | 26 +- crates/core/protobuf/node_ctl_svc.proto | 3 +- tools/restatectl/Cargo.toml | 1 + tools/restatectl/src/app.rs | 18 +- .../src/commands/cluster/config/get.rs | 36 +- .../src/commands/cluster/config/set.rs | 31 +- .../src/commands/cluster/overview.rs | 2 +- .../src/commands/cluster/provision.rs | 18 +- .../src/commands/log/describe_log.rs | 59 +-- .../restatectl/src/commands/log/find_tail.rs | 34 +- .../restatectl/src/commands/log/list_logs.rs | 19 +- .../src/commands/log/reconfigure.rs | 49 ++- tools/restatectl/src/commands/log/trim_log.rs | 18 +- tools/restatectl/src/commands/metadata/get.rs | 12 +- tools/restatectl/src/commands/metadata/mod.rs | 48 +-- .../restatectl/src/commands/metadata/patch.rs | 11 +- tools/restatectl/src/commands/metadata/put.rs | 5 +- .../src/commands/metadata/status.rs | 179 +++++---- .../src/commands/node/list_nodes.rs | 21 +- .../restatectl/src/commands/partition/list.rs | 32 +- .../src/commands/replicated_loglet/digest.rs | 29 +- .../src/commands/replicated_loglet/info.rs | 23 +- .../src/commands/snapshot/create_snapshot.rs | 21 +- tools/restatectl/src/connection.rs | 341 ++++++++++++++++++ tools/restatectl/src/lib.rs | 1 + 27 files changed, 636 insertions(+), 411 deletions(-) create mode 100644 tools/restatectl/src/connection.rs diff --git a/Cargo.lock b/Cargo.lock index 45c78b777..6ddd07aef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7578,6 +7578,7 @@ dependencies = [ "serde_json", "strum", "tempfile", + "thiserror 2.0.11", "tokio", "tonic", "tracing", diff --git a/crates/admin/protobuf/cluster_ctrl_svc.proto b/crates/admin/protobuf/cluster_ctrl_svc.proto index a4342bd28..626a7c3f4 100644 --- a/crates/admin/protobuf/cluster_ctrl_svc.proto +++ b/crates/admin/protobuf/cluster_ctrl_svc.proto @@ -22,8 +22,6 @@ service ClusterCtrlSvc { rpc DescribeLog(DescribeLogRequest) returns (DescribeLogResponse); - rpc ListNodes(ListNodesRequest) returns (ListNodesResponse); - rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty); rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest) @@ -82,13 +80,6 @@ message DescribeLogResponse { bytes nodes_configuration = 7; } -message ListNodesRequest {} - -message ListNodesResponse { - // Serialized restate_types::nodes_config::NodesConfiguration - bytes nodes_configuration = 1; -} - message TrimLogRequest { uint32 log_id = 1; uint64 trim_point = 2; diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index 0f2607b64..b473b4617 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -29,9 +29,8 @@ use crate::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvc use crate::cluster_controller::protobuf::{ ClusterStateRequest, ClusterStateResponse, CreatePartitionSnapshotRequest, CreatePartitionSnapshotResponse, DescribeLogRequest, DescribeLogResponse, FindTailRequest, - FindTailResponse, ListLogsRequest, ListLogsResponse, ListNodesRequest, ListNodesResponse, - SealAndExtendChainRequest, SealAndExtendChainResponse, SealedSegment, TailState, - TrimLogRequest, + FindTailResponse, ListLogsRequest, ListLogsResponse, SealAndExtendChainRequest, + SealAndExtendChainResponse, SealedSegment, TailState, TrimLogRequest, }; use super::protobuf::{ @@ -143,27 +142,6 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { })) } - async fn list_nodes( - &self, - _request: Request, - ) -> Result, Status> { - let nodes_config = self - .metadata_writer - .metadata_store_client() - .get::(NODES_CONFIG_KEY.clone()) - .await - .map_err(|error| { - Status::unknown(format!( - "Failed to get nodes configuration metadata: {error:?}" - )) - })? - .ok_or(Status::not_found("Missing nodes configuration"))?; - - Ok(Response::new(ListNodesResponse { - nodes_configuration: serialize_value(nodes_config), - })) - } - /// Internal operations API to trigger the log truncation async fn trim_log(&self, request: Request) -> Result, Status> { let request = request.into_inner(); diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index 0256e5bed..4097c0d77 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -78,7 +78,8 @@ message GetMetadataResponse { message ClusterHealthResponse { string cluster_name = 1; - // Some value if the cluster has been configured to use the embedded metadata store + // Some value if the cluster has been configured to use the embedded metadata + // store optional EmbeddedMetadataClusterHealth metadata_cluster_health = 2; } diff --git a/tools/restatectl/Cargo.toml b/tools/restatectl/Cargo.toml index 705928e51..75753adfe 100644 --- a/tools/restatectl/Cargo.toml +++ b/tools/restatectl/Cargo.toml @@ -47,6 +47,7 @@ serde = { workspace = true } serde_json = { workspace = true } strum = { workspace = true } tempfile = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } tonic = { workspace = true, features = ["transport", "prost"] } tracing = { workspace = true } diff --git a/tools/restatectl/src/app.rs b/tools/restatectl/src/app.rs index ff14a3481..417325a66 100644 --- a/tools/restatectl/src/app.rs +++ b/tools/restatectl/src/app.rs @@ -8,13 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::str::FromStr; - use cling::prelude::*; use restate_cli_util::CliContext; use restate_cli_util::CommonOpts; -use restate_types::net::AdvertisedAddress; use crate::commands::cluster::overview::ClusterStatusOpts; use crate::commands::cluster::Cluster; @@ -24,6 +21,7 @@ use crate::commands::node::Nodes; use crate::commands::partition::Partitions; use crate::commands::replicated_loglet::ReplicatedLoglet; use crate::commands::snapshot::Snapshot; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Clone)] #[command(author, version = crate::build_info::version(), about, infer_subcommands = true)] @@ -37,20 +35,6 @@ pub struct CliApp { pub cmd: Command, } -#[derive(Parser, Collect, Debug, Clone)] -pub struct ConnectionInfo { - // todo: rename this to be a node address for reusability across commands - /// Cluster Controller address - #[clap( - long, - value_hint = clap::ValueHint::Url, - default_value_t = AdvertisedAddress::from_str("http://localhost:5122/").unwrap(), - env = "RESTATE_CLUSTER_CONTROLLER_ADDRESS", - global = true - )] - pub cluster_controller: AdvertisedAddress, -} - #[derive(Run, Subcommand, Clone)] pub enum Command { /// Cluster operations diff --git a/tools/restatectl/src/commands/cluster/config/get.rs b/tools/restatectl/src/commands/cluster/config/get.rs index e376e2cc9..ed572a17d 100644 --- a/tools/restatectl/src/commands/cluster/config/get.rs +++ b/tools/restatectl/src/commands/cluster/config/get.rs @@ -10,40 +10,31 @@ use clap::Parser; use cling::{Collect, Run}; -use tonic::{codec::CompressionEncoding, Code}; +use tonic::codec::CompressionEncoding; use restate_admin::cluster_controller::protobuf::{ cluster_ctrl_svc_client::ClusterCtrlSvcClient, GetClusterConfigurationRequest, }; use restate_cli_util::c_println; +use restate_types::nodes_config::Role; -use crate::{ - app::ConnectionInfo, commands::cluster::config::cluster_config_string, util::grpc_channel, -}; +use crate::{commands::cluster::config::cluster_config_string, connection::ConnectionInfo}; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "config_get")] pub struct ConfigGetOpts {} async fn config_get(connection: &ConnectionInfo, _get_opts: &ConfigGetOpts) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - - let response = client - .get_cluster_configuration(GetClusterConfigurationRequest {}) - .await; - - let response = match response { - Ok(response) => response, - Err(status) if status.code() == Code::NotFound => { - c_println!("👻 Cluster is not configured"); - return Ok(()); - } - Err(status) => { - anyhow::bail!("Failed to get cluster configuration: {status}"); - } - }; + let response = connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = + ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + + client + .get_cluster_configuration(GetClusterConfigurationRequest {}) + .await + }) + .await?; let configuration = response.into_inner(); let cluster_configuration = configuration.cluster_configuration.expect("is set"); @@ -51,6 +42,5 @@ async fn config_get(connection: &ConnectionInfo, _get_opts: &ConfigGetOpts) -> a let output = cluster_config_string(&cluster_configuration)?; c_println!("{}", output); - Ok(()) } diff --git a/tools/restatectl/src/commands/cluster/config/set.rs b/tools/restatectl/src/commands/cluster/config/set.rs index d79e151bc..2620c963a 100644 --- a/tools/restatectl/src/commands/cluster/config/set.rs +++ b/tools/restatectl/src/commands/cluster/config/set.rs @@ -21,11 +21,12 @@ use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::ui::console::{confirm_or_exit, StyledTable}; use restate_cli_util::{c_println, c_warn}; use restate_types::logs::metadata::{ProviderConfiguration, ProviderKind}; +use restate_types::nodes_config::Role; use restate_types::replication::ReplicationProperty; use crate::commands::cluster::config::cluster_config_string; use crate::commands::cluster::provision::extract_default_provider; -use crate::{app::ConnectionInfo, util::grpc_channel}; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "config_set")] @@ -51,12 +52,16 @@ pub struct ConfigSetOpts { } async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - - let response = client - .get_cluster_configuration(GetClusterConfigurationRequest {}) + let response = connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = ClusterCtrlSvcClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + client + .get_cluster_configuration(GetClusterConfigurationRequest {}) + .await + }) .await .context("Failed to get cluster configuration")? .into_inner(); @@ -123,10 +128,16 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an cluster_configuration: Some(current), }; - client - .set_cluster_configuration(request) + connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = ClusterCtrlSvcClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + client.set_cluster_configuration(request.clone()).await + }) .await - .map_err(|err| anyhow::anyhow!("Failed to set configuration: {}", err.message()))?; + .context("Failed to set configuration")?; c_println!("✅ Configuration updated successfully"); diff --git a/tools/restatectl/src/commands/cluster/overview.rs b/tools/restatectl/src/commands/cluster/overview.rs index e1c0e5db3..7ee5c7a56 100644 --- a/tools/restatectl/src/commands/cluster/overview.rs +++ b/tools/restatectl/src/commands/cluster/overview.rs @@ -13,10 +13,10 @@ use cling::{Collect, Run}; use restate_cli_util::c_println; -use crate::app::ConnectionInfo; use crate::commands::log::list_logs::{list_logs, ListLogsOpts}; use crate::commands::node::list_nodes::{list_nodes, ListNodesOpts}; use crate::commands::partition::list::{list_partitions, ListPartitionsOpts}; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "cluster_status")] diff --git a/tools/restatectl/src/commands/cluster/provision.rs b/tools/restatectl/src/commands/cluster/provision.rs index 69caaaaed..a0fab2f92 100644 --- a/tools/restatectl/src/commands/cluster/provision.rs +++ b/tools/restatectl/src/commands/cluster/provision.rs @@ -8,8 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::app::ConnectionInfo; use crate::commands::cluster::config::cluster_config_string; +use crate::connection::ConnectionInfo; use crate::util::grpc_channel; use clap::Parser; use cling::{Collect, Run}; @@ -18,7 +18,6 @@ use restate_cli_util::{c_error, c_println, c_warn}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest; use restate_types::logs::metadata::{ProviderConfiguration, ProviderKind, ReplicatedLogletConfig}; -use restate_types::net::AdvertisedAddress; use restate_types::replication::ReplicationProperty; use std::num::NonZeroU16; use tonic::codec::CompressionEncoding; @@ -27,10 +26,6 @@ use tonic::Code; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "cluster_provision")] pub struct ProvisionOpts { - /// Address of the node that should be provisioned - #[clap(long)] - address: Option, - /// Number of partitions #[clap(long)] num_partitions: Option, @@ -56,13 +51,14 @@ pub struct ProvisionOpts { } async fn cluster_provision( - connection_info: &ConnectionInfo, + connection: &ConnectionInfo, provision_opts: &ProvisionOpts, ) -> anyhow::Result<()> { - let node_address = provision_opts - .address - .clone() - .unwrap_or_else(|| connection_info.cluster_controller.clone()); + if connection.addresses.len() != 1 { + anyhow::bail!("Only one address must be provided for provision"); + } + let node_address = connection.addresses[0].clone(); + let channel = grpc_channel(node_address.clone()); let mut client = NodeCtlSvcClient::new(channel) diff --git a/tools/restatectl/src/commands/log/describe_log.rs b/tools/restatectl/src/commands/log/describe_log.rs index aff3240c3..4a28a545e 100644 --- a/tools/restatectl/src/commands/log/describe_log.rs +++ b/tools/restatectl/src/commands/log/describe_log.rs @@ -8,33 +8,28 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::Context; use cling::prelude::*; use itertools::Itertools; use log::render_loglet_params; -use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; -use restate_admin::cluster_controller::protobuf::{DescribeLogRequest, ListLogsRequest}; use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; -use restate_types::logs::metadata::{Chain, Logs, ProviderKind, Segment, SegmentIndex}; +use restate_types::logs::metadata::{Logs, ProviderKind, Segment, SegmentIndex}; +use restate_types::logs::LogId; use restate_types::nodes_config::NodesConfiguration; use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams}; -use restate_types::storage::StorageCodec; -use tonic::codec::CompressionEncoding; -use tonic::transport::Channel; +use restate_types::Versioned; use super::LogIdRange; -use crate::app::ConnectionInfo; use crate::commands::log; -use crate::util::grpc_channel; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "describe_logs")] pub struct DescribeLogIdOpts { /// The log id or range to describe, e.g. "0", "1-4"; all logs are shown by default - #[arg( value_parser = value_parser!(LogIdRange))] + #[arg(value_parser = value_parser!(LogIdRange))] log_id: Vec, /// The first segment id to display @@ -71,18 +66,11 @@ async fn describe_logs( connection: &ConnectionInfo, opts: &DescribeLogIdOpts, ) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); + let nodes_config = connection.get_nodes_configuration().await?; - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + let logs = connection.get_logs().await?; let log_ids = if opts.log_id.is_empty() { - let list_response = client - .list_logs(ListLogsRequest::default()) - .await? - .into_inner(); - let mut buf = list_response.logs; - let logs = StorageCodec::decode::(&mut buf)?; logs.iter() .sorted_by(|a, b| Ord::cmp(a.0, b.0)) .map(|(id, _)| LogIdRange::from(id)) @@ -93,7 +81,7 @@ async fn describe_logs( for range in log_ids { for log_id in range.iter() { - describe_log(log_id, &mut client, opts).await?; + describe_log(opts, &nodes_config, &logs, log_id.into()).await?; } } @@ -101,17 +89,16 @@ async fn describe_logs( } async fn describe_log( - log_id: u32, - client: &mut ClusterCtrlSvcClient, opts: &DescribeLogIdOpts, + nodes_configuration: &NodesConfiguration, + logs: &Logs, + log_id: LogId, ) -> anyhow::Result<()> { - let req = DescribeLogRequest { log_id }; - let mut response = client.describe_log(req).await?.into_inner(); - - let mut buf = response.chain.clone(); - let chain = StorageCodec::decode::(&mut buf).context("Failed to decode log chain")?; + let chain = logs + .chain(&log_id) + .ok_or_else(|| anyhow::anyhow!("Failed to get log chain"))?; - c_println!("Log Id: {} (v{})", log_id, response.logs_version); + c_println!("Log Id: {} ({})", log_id, logs.version()); let mut chain_table = Table::new_styled(); let mut header_row = vec![ @@ -129,11 +116,7 @@ async fn describe_log( } chain_table.set_styled_header(header_row); - let last_segment = chain - .iter() - .last() - .map(|s| s.index()) - .unwrap_or(SegmentIndex::from(u32::MAX)); + let last_segment = chain.tail_index(); let mut first_segment_rendered = None; let mut last_segment_rendered = None; @@ -152,17 +135,13 @@ async fn describe_log( }; let segments: Box> = if opts.all { - Box::new(segments) + segments } else if opts.head.is_some() { Box::new(segments.take(opts.head.unwrap())) } else { Box::new(segments.tail(opts.tail.unwrap())) }; - let nodes_configuration = - StorageCodec::decode::(&mut response.nodes_configuration) - .context("Failed to decode nodes configuration")?; - for segment in segments { if first_segment_rendered.is_none() { first_segment_rendered = Some(segment.index()); @@ -182,10 +161,10 @@ async fn describe_log( render_loglet_params(¶ms, |p| Cell::new(p.loglet_id)), render_loglet_params(¶ms, |p| Cell::new(format!("{:#}", p.replication))), render_loglet_params(¶ms, |p| { - render_sequencer(is_tail_segment, p, &nodes_configuration) + render_sequencer(is_tail_segment, p, nodes_configuration) }), render_loglet_params(¶ms, |p| { - render_effective_nodeset(is_tail_segment, p, &nodes_configuration) + render_effective_nodeset(is_tail_segment, p, nodes_configuration) }), ]; if opts.extra { diff --git a/tools/restatectl/src/commands/log/find_tail.rs b/tools/restatectl/src/commands/log/find_tail.rs index a0b6123d6..c9de8c23e 100644 --- a/tools/restatectl/src/commands/log/find_tail.rs +++ b/tools/restatectl/src/commands/log/find_tail.rs @@ -9,18 +9,17 @@ // by the Apache License, Version 2.0. use cling::prelude::*; -use restate_cli_util::_comfy_table::{Cell, Color, Table}; -use restate_cli_util::ui::console::StyledTable; use tonic::codec::CompressionEncoding; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; use restate_admin::cluster_controller::protobuf::{FindTailRequest, TailState}; +use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; - -use crate::app::ConnectionInfo; -use crate::util::grpc_channel; +use restate_cli_util::ui::console::StyledTable; +use restate_types::nodes_config::Role; use super::LogIdRange; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "find_tail")] @@ -31,10 +30,6 @@ pub struct FindTailOpts { } async fn find_tail(connection: &ConnectionInfo, opts: &FindTailOpts) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - let mut chain_table = Table::new_styled(); let header_row = vec!["LOG-ID", "SEGMENT", "STATE", "TAIL-LSN"]; @@ -42,17 +37,28 @@ async fn find_tail(connection: &ConnectionInfo, opts: &FindTailOpts) -> anyhow:: for log_id in opts.log_id.clone().into_iter().flatten() { let find_tail_request = FindTailRequest { log_id }; - let response = match client.find_tail(find_tail_request).await { - Ok(response) => response.into_inner(), + let response = match connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = ClusterCtrlSvcClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + client.find_tail(find_tail_request).await + }) + .await + { + Ok(response) => response, Err(err) => { chain_table.add_row(vec![ Cell::new(log_id), - Cell::new(err.code()).fg(Color::DarkRed), - Cell::new(err.message()).fg(Color::DarkRed), + Cell::new("").fg(Color::DarkRed), + Cell::new(err).fg(Color::DarkRed), ]); + continue; } - }; + } + .into_inner(); chain_table.add_row(vec![ Cell::new(response.log_id), diff --git a/tools/restatectl/src/commands/log/list_logs.rs b/tools/restatectl/src/commands/log/list_logs.rs index 44dee89c0..547648c2a 100644 --- a/tools/restatectl/src/commands/log/list_logs.rs +++ b/tools/restatectl/src/commands/log/list_logs.rs @@ -11,21 +11,16 @@ use std::collections::BTreeMap; use cling::prelude::*; -use tonic::codec::CompressionEncoding; -use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; -use restate_admin::cluster_controller::protobuf::ListLogsRequest; use restate_cli_util::_comfy_table::{Cell, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; -use restate_types::logs::metadata::{Chain, Logs}; +use restate_types::logs::metadata::Chain; use restate_types::logs::LogId; -use restate_types::storage::StorageCodec; use restate_types::Versioned; -use crate::app::ConnectionInfo; use crate::commands::log::{deserialize_replicated_log_params, render_loglet_params}; -use crate::util::grpc_channel; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[clap(visible_alias = "ls")] @@ -33,18 +28,10 @@ use crate::util::grpc_channel; pub struct ListLogsOpts {} pub async fn list_logs(connection: &ConnectionInfo, _opts: &ListLogsOpts) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - - let req = ListLogsRequest::default(); - let response = client.list_logs(req).await?.into_inner(); + let logs = connection.get_logs().await?; let mut logs_table = Table::new_styled(); - let mut buf = response.logs; - let logs = StorageCodec::decode::(&mut buf)?; - c_println!("Log Configuration ({})", logs.version()); c_println!( diff --git a/tools/restatectl/src/commands/log/reconfigure.rs b/tools/restatectl/src/commands/log/reconfigure.rs index b983e53cd..9560916b7 100644 --- a/tools/restatectl/src/commands/log/reconfigure.rs +++ b/tools/restatectl/src/commands/log/reconfigure.rs @@ -13,23 +13,19 @@ use std::num::{NonZeroU32, NonZeroU8}; use anyhow::Context; use cling::prelude::*; use tonic::codec::CompressionEncoding; -use tonic::transport::Channel; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; -use restate_admin::cluster_controller::protobuf::{ - ChainExtension, ListLogsRequest, SealAndExtendChainRequest, -}; +use restate_admin::cluster_controller::protobuf::{ChainExtension, SealAndExtendChainRequest}; use restate_cli_util::{c_eprintln, c_println}; -use restate_types::logs::metadata::{Logs, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{ProviderKind, SegmentIndex}; use restate_types::logs::{LogId, LogletId}; +use restate_types::nodes_config::Role; use restate_types::protobuf::common::Version; use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_types::replication::{NodeSet, ReplicationProperty}; -use restate_types::storage::StorageCodec; use restate_types::{GenerationalNodeId, PlainNodeId}; -use crate::app::ConnectionInfo; -use crate::util::grpc_channel; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "reconfigure")] @@ -67,11 +63,6 @@ pub struct ReconfigureOpts { } async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - let extension = match opts.provider { Some(provider) => { let params = match provider { @@ -79,7 +70,7 @@ async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> any #[cfg(any(test, feature = "memory-loglet"))] ProviderKind::InMemory => rand::random::().to_string(), #[cfg(feature = "replicated-loglet")] - ProviderKind::Replicated => replicated_loglet_params(&mut client, opts).await?, + ProviderKind::Replicated => replicated_loglet_params(connection, opts).await?, }; Some(ChainExtension { @@ -91,13 +82,20 @@ async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> any None => None, }; - let response = client - .seal_and_extend_chain(SealAndExtendChainRequest { - log_id: opts.log_id, - min_version: Some(Version { - value: opts.min_version.get(), - }), - extension, + let request = SealAndExtendChainRequest { + log_id: opts.log_id, + min_version: Some(Version { + value: opts.min_version.get(), + }), + extension, + }; + + let response = connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = + ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + + client.seal_and_extend_chain(request.clone()).await }) .await? .into_inner(); @@ -134,16 +132,11 @@ async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> any } async fn replicated_loglet_params( - client: &mut ClusterCtrlSvcClient, + connection: &ConnectionInfo, opts: &ReconfigureOpts, ) -> anyhow::Result { - let mut logs_response = client - .list_logs(ListLogsRequest {}) - .await - .context("Failed to get logs metadata")? - .into_inner(); + let logs = connection.get_logs().await?; - let logs = StorageCodec::decode::(&mut logs_response.logs)?; let log_id = LogId::from(opts.log_id); let chain = logs .chain(&log_id) diff --git a/tools/restatectl/src/commands/log/trim_log.rs b/tools/restatectl/src/commands/log/trim_log.rs index 03f2807d0..145176187 100644 --- a/tools/restatectl/src/commands/log/trim_log.rs +++ b/tools/restatectl/src/commands/log/trim_log.rs @@ -15,9 +15,9 @@ use tonic::codec::CompressionEncoding; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; use restate_admin::cluster_controller::protobuf::TrimLogRequest; use restate_cli_util::c_println; +use restate_types::nodes_config::Role; -use crate::app::ConnectionInfo; -use crate::util::grpc_channel; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[clap()] @@ -33,16 +33,18 @@ pub struct TrimLogOpts { } async fn trim_log(connection: &ConnectionInfo, opts: &TrimLogOpts) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - let trim_request = TrimLogRequest { log_id: opts.log_id, trim_point: opts.trim_point, }; - client - .trim_log(trim_request) + + connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = + ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + + client.trim_log(trim_request).await + }) .await .with_context(|| "failed to submit trim request")? .into_inner(); diff --git a/tools/restatectl/src/commands/metadata/get.rs b/tools/restatectl/src/commands/metadata/get.rs index 4b6d23aae..b86d2df9b 100644 --- a/tools/restatectl/src/commands/metadata/get.rs +++ b/tools/restatectl/src/commands/metadata/get.rs @@ -20,6 +20,7 @@ use restate_types::live::Live; use crate::commands::metadata::{ create_metadata_store_client, GenericMetadataValue, MetadataAccessMode, MetadataCommonOpts, }; +use crate::connection::ConnectionInfo; use crate::environment::metadata_store; use crate::environment::task_center::run_in_task_center; @@ -35,9 +36,9 @@ pub struct GetValueOpts { key: String, } -async fn get_value(opts: &GetValueOpts) -> anyhow::Result<()> { +async fn get_value(connection: &ConnectionInfo, opts: &GetValueOpts) -> anyhow::Result<()> { let value = match opts.metadata.access_mode { - MetadataAccessMode::Remote => get_value_remote(opts).await?, + MetadataAccessMode::Remote => get_value_remote(connection, opts).await?, MetadataAccessMode::Direct => get_value_direct(opts).await?, }; @@ -47,8 +48,11 @@ async fn get_value(opts: &GetValueOpts) -> anyhow::Result<()> { Ok(()) } -async fn get_value_remote(opts: &GetValueOpts) -> anyhow::Result> { - let metadata_store_client = create_metadata_store_client(&opts.metadata).await?; +async fn get_value_remote( + connection: &ConnectionInfo, + opts: &GetValueOpts, +) -> anyhow::Result> { + let metadata_store_client = create_metadata_store_client(connection, &opts.metadata).await?; metadata_store_client .get(ByteString::from(opts.key.as_str())) diff --git a/tools/restatectl/src/commands/metadata/mod.rs b/tools/restatectl/src/commands/metadata/mod.rs index d0987c0db..89765cd5b 100644 --- a/tools/restatectl/src/commands/metadata/mod.rs +++ b/tools/restatectl/src/commands/metadata/mod.rs @@ -8,21 +8,22 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::Context; -use cling::prelude::*; +mod get; +mod patch; +mod put; +mod status; + use std::path::PathBuf; -use std::str::FromStr; + +use cling::prelude::*; +use restate_types::nodes_config::Role; use restate_core::metadata_store::MetadataStoreClient; use restate_metadata_server::create_client; use restate_types::config::MetadataStoreClientOptions; -use restate_types::net::AdvertisedAddress; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; -mod get; -mod patch; -mod put; -mod status; +use crate::connection::ConnectionInfo; #[derive(Run, Subcommand, Clone)] pub enum Metadata { @@ -39,15 +40,14 @@ pub enum Metadata { #[derive(Args, Clone, Debug)] #[clap()] pub struct MetadataCommonOpts { - /// Metadata store server addresses + /// Etcd store server addresses #[arg( short, - long = "addresses", - default_values = &["http://127.0.0.1:5122"], - env = "RESTATE_METADATA_ADDRESSES", - value_delimiter = ',' + long = "etcd", + value_delimiter = ',', + required_if_eq("remote_service_type", "etcd") )] - addresses: Vec, + etcd: Vec, /// Metadata store access mode #[arg(long, default_value_t)] @@ -109,20 +109,20 @@ impl Versioned for GenericMetadataValue { } pub async fn create_metadata_store_client( + connection: &ConnectionInfo, opts: &MetadataCommonOpts, ) -> anyhow::Result { let client = match opts.remote_service_type { - RemoteServiceType::Restate => restate_types::config::MetadataStoreClient::Embedded { - addresses: opts - .addresses - .iter() - .map(|address| { - AdvertisedAddress::from_str(address).context("failed to parse address") - }) - .collect::, _>>()?, - }, + RemoteServiceType::Restate => { + let nodes = connection.get_nodes_configuration().await?; + let addresses = nodes + .iter_role(Role::MetadataServer) + .map(|(_, node)| node.address.clone()) + .collect(); + restate_types::config::MetadataStoreClient::Embedded { addresses } + } RemoteServiceType::Etcd => restate_types::config::MetadataStoreClient::Etcd { - addresses: opts.addresses.clone(), + addresses: opts.etcd.clone(), }, }; diff --git a/tools/restatectl/src/commands/metadata/patch.rs b/tools/restatectl/src/commands/metadata/patch.rs index 94326e7b7..30cd98163 100644 --- a/tools/restatectl/src/commands/metadata/patch.rs +++ b/tools/restatectl/src/commands/metadata/patch.rs @@ -24,6 +24,7 @@ use restate_types::Version; use crate::commands::metadata::{ create_metadata_store_client, GenericMetadataValue, MetadataAccessMode, MetadataCommonOpts, }; +use crate::connection::ConnectionInfo; use crate::environment::metadata_store::start_metadata_server; use crate::environment::task_center::run_in_task_center; @@ -51,12 +52,15 @@ pub struct PatchValueOpts { pub dry_run: bool, } -pub(crate) async fn patch_value(opts: &PatchValueOpts) -> anyhow::Result<()> { +pub(crate) async fn patch_value( + connection: &ConnectionInfo, + opts: &PatchValueOpts, +) -> anyhow::Result<()> { let patch = serde_json::from_str(opts.patch.as_str()) .map_err(|e| anyhow::anyhow!("Parsing JSON patch: {}", e))?; let value = match opts.metadata.access_mode { - MetadataAccessMode::Remote => patch_value_remote(opts, patch).await?, + MetadataAccessMode::Remote => patch_value_remote(connection, opts, patch).await?, MetadataAccessMode::Direct => patch_value_direct(opts, patch).await?, }; @@ -67,10 +71,11 @@ pub(crate) async fn patch_value(opts: &PatchValueOpts) -> anyhow::Result<()> { } async fn patch_value_remote( + connection: &ConnectionInfo, opts: &PatchValueOpts, patch: Patch, ) -> anyhow::Result> { - let metadata_store_client = create_metadata_store_client(&opts.metadata).await?; + let metadata_store_client = create_metadata_store_client(connection, &opts.metadata).await?; Ok(Some( patch_value_inner(opts, &patch, &metadata_store_client).await?, )) diff --git a/tools/restatectl/src/commands/metadata/put.rs b/tools/restatectl/src/commands/metadata/put.rs index 0ec21b57e..31313499e 100644 --- a/tools/restatectl/src/commands/metadata/put.rs +++ b/tools/restatectl/src/commands/metadata/put.rs @@ -17,6 +17,7 @@ use serde_json::Value; use crate::commands::metadata::patch::{patch_value, PatchValueOpts}; use crate::commands::metadata::MetadataCommonOpts; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[clap()] @@ -41,7 +42,7 @@ pub struct PutValueOpts { dry_run: bool, } -async fn put_value(opts: &PutValueOpts) -> anyhow::Result<()> { +async fn put_value(connection: &ConnectionInfo, opts: &PutValueOpts) -> anyhow::Result<()> { let opts = opts.clone(); let doc_body = opts.doc.contents()?; @@ -69,5 +70,5 @@ async fn put_value(opts: &PutValueOpts) -> anyhow::Result<()> { dry_run: opts.dry_run, }; - patch_value(&patch_opts).await + patch_value(connection, &patch_opts).await } diff --git a/tools/restatectl/src/commands/metadata/status.rs b/tools/restatectl/src/commands/metadata/status.rs index d8d1d3f6c..37e43420b 100644 --- a/tools/restatectl/src/commands/metadata/status.rs +++ b/tools/restatectl/src/commands/metadata/status.rs @@ -8,26 +8,25 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::app::ConnectionInfo; -use crate::util::grpc_channel; +use std::collections::BTreeMap; + use clap::Parser; use cling::{Collect, Run}; use itertools::Itertools; +use tonic::codec::CompressionEncoding; +use tonic::IntoRequest; + use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; -use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::GetMetadataRequest; use restate_metadata_server::grpc::metadata_server_svc_client::MetadataServerSvcClient; use restate_metadata_server::MemberId; -use restate_types::net::metadata::MetadataKind; -use restate_types::nodes_config::{NodesConfiguration, Role}; +use restate_types::nodes_config::Role; use restate_types::protobuf::common::MetadataServerStatus; -use restate_types::storage::StorageCodec; use restate_types::{PlainNodeId, Version}; -use std::collections::BTreeMap; -use tonic::codec::CompressionEncoding; -use tonic::IntoRequest; + +use crate::connection::ConnectionInfo; +use crate::util::grpc_channel; #[derive(Run, Parser, Collect, Clone, Debug)] #[clap()] @@ -35,15 +34,7 @@ use tonic::IntoRequest; pub struct StatusOpts {} async fn status(connection: &ConnectionInfo) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - let req = GetMetadataRequest { - kind: MetadataKind::NodesConfiguration.into(), - sync: false, - }; - let mut response = client.get_metadata(req).await?.into_inner(); - let nodes_configuration = StorageCodec::decode::(&mut response.encoded)?; - + let nodes_configuration = connection.get_nodes_configuration().await?; let mut metadata_nodes_table = Table::new_styled(); let header = vec![ "NODE", @@ -62,82 +53,80 @@ async fn status(connection: &ConnectionInfo) -> anyhow::Result<()> { let mut unreachable_nodes = BTreeMap::default(); - for (node_id, node_config) in nodes_configuration.iter() { - if node_config.roles.contains(Role::MetadataServer) { - let metadata_channel = grpc_channel(node_config.address.clone()); - let mut metadata_client = MetadataServerSvcClient::new(metadata_channel) - .accept_compressed(CompressionEncoding::Gzip); - - let metadata_store_status = metadata_client.status(().into_request()).await; - - let status = match metadata_store_status { - Ok(response) => response.into_inner(), - Err(err) => { - unreachable_nodes.insert(node_id, err.to_string()); - continue; - } - }; - - metadata_nodes_table.add_row(vec![ - Cell::new(node_id), - render_metadata_server_status(status.status()), - Cell::new( - status - .configuration - .as_ref() - .and_then(|config| config.version.map(Version::from)) - .unwrap_or(Version::INVALID), - ), - Cell::new( - status - .leader - .map(|leader_id| PlainNodeId::new(leader_id).to_string()) - .unwrap_or("-".to_owned()), - ), - Cell::new( - status - .configuration - .map(|config| { - format!( - "[{}]", - config - .members - .into_iter() - .map(|(node_id, storage_id)| MemberId::new( - PlainNodeId::from(node_id), - storage_id - )) - .map(|member_id| member_id.to_string()) - .join(",") - ) - }) - .unwrap_or("[]".to_owned()), - ), - Cell::new(status.raft.map(|raft| raft.applied).unwrap_or_default()), - Cell::new(status.raft.map(|raft| raft.committed).unwrap_or_default()), - Cell::new(status.raft.map(|raft| raft.term).unwrap_or_default()), - // first and last index are inclusive - Cell::new( - status - .raft - .map(|raft| (raft.last_index + 1) - raft.first_index) - .unwrap_or_default(), - ), - Cell::new( - status - .snapshot - .map(|snapshot| snapshot.index) - .unwrap_or_default(), - ), - Cell::new(bytesize::to_string( - status - .snapshot - .map(|snapshot| snapshot.size) - .unwrap_or_default(), - true, - )), - ]); - } + for (node_id, node_config) in nodes_configuration.iter_role(Role::MetadataServer) { + let metadata_channel = grpc_channel(node_config.address.clone()); + let mut metadata_client = MetadataServerSvcClient::new(metadata_channel) + .accept_compressed(CompressionEncoding::Gzip); + + let metadata_store_status = metadata_client.status(().into_request()).await; + + let status = match metadata_store_status { + Ok(response) => response.into_inner(), + Err(err) => { + unreachable_nodes.insert(node_id, err.to_string()); + continue; + } + }; + + metadata_nodes_table.add_row(vec![ + Cell::new(node_id), + render_metadata_server_status(status.status()), + Cell::new( + status + .configuration + .as_ref() + .and_then(|config| config.version.map(Version::from)) + .unwrap_or(Version::INVALID), + ), + Cell::new( + status + .leader + .map(|leader_id| PlainNodeId::new(leader_id).to_string()) + .unwrap_or("-".to_owned()), + ), + Cell::new( + status + .configuration + .map(|config| { + format!( + "[{}]", + config + .members + .into_iter() + .map(|(node_id, storage_id)| MemberId::new( + PlainNodeId::from(node_id), + storage_id + )) + .map(|member_id| member_id.to_string()) + .join(",") + ) + }) + .unwrap_or("[]".to_owned()), + ), + Cell::new(status.raft.map(|raft| raft.applied).unwrap_or_default()), + Cell::new(status.raft.map(|raft| raft.committed).unwrap_or_default()), + Cell::new(status.raft.map(|raft| raft.term).unwrap_or_default()), + // first and last index are inclusive + Cell::new( + status + .raft + .map(|raft| (raft.last_index + 1) - raft.first_index) + .unwrap_or_default(), + ), + Cell::new( + status + .snapshot + .map(|snapshot| snapshot.index) + .unwrap_or_default(), + ), + Cell::new(bytesize::to_string( + status + .snapshot + .map(|snapshot| snapshot.size) + .unwrap_or_default(), + true, + )), + ]); } c_println!("{}", metadata_nodes_table); diff --git a/tools/restatectl/src/commands/node/list_nodes.rs b/tools/restatectl/src/commands/node/list_nodes.rs index 11609ef61..3154f2e1b 100644 --- a/tools/restatectl/src/commands/node/list_nodes.rs +++ b/tools/restatectl/src/commands/node/list_nodes.rs @@ -17,8 +17,6 @@ use itertools::Itertools; use tokio::task::JoinSet; use tonic::codec::CompressionEncoding; -use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; -use restate_admin::cluster_controller::protobuf::ListNodesRequest; use restate_cli_util::_comfy_table::{Cell, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; @@ -26,10 +24,9 @@ use restate_cli_util::ui::{duration_to_human_rough, Tense}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; use restate_core::protobuf::node_ctl_svc::IdentResponse; use restate_types::nodes_config::NodesConfiguration; -use restate_types::storage::StorageCodec; use restate_types::PlainNodeId; -use crate::app::ConnectionInfo; +use crate::connection::ConnectionInfo; use crate::util::grpc_channel; // Default timeout for the [optional] GetIdent call made to all nodes @@ -46,18 +43,8 @@ pub struct ListNodesOpts { } pub async fn list_nodes(connection: &ConnectionInfo, opts: &ListNodesOpts) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - - let mut response = client - .list_nodes(ListNodesRequest::default()) - .await - .map_err(|e| anyhow::anyhow!("failed to list nodes: {:?}", e))? - .into_inner(); - - let nodes_configuration = - StorageCodec::decode::(&mut response.nodes_configuration)?; + let nodes_configuration = connection.get_nodes_configuration().await?; + let nodes = nodes_configuration.iter().collect::>(); let nodes_extra_info = if opts.extra { @@ -78,7 +65,7 @@ pub async fn list_nodes(connection: &ConnectionInfo, opts: &ListNodesOpts) -> an } nodes_table.set_styled_header(header); - for (node_id, node_config) in nodes.clone() { + for (node_id, node_config) in nodes { let mut node_row = vec![ Cell::new(node_id.to_string()), Cell::new(node_config.current_generation.generation().to_string()), diff --git a/tools/restatectl/src/commands/partition/list.rs b/tools/restatectl/src/commands/partition/list.rs index 805809953..4423505fd 100644 --- a/tools/restatectl/src/commands/partition/list.rs +++ b/tools/restatectl/src/commands/partition/list.rs @@ -13,6 +13,7 @@ use std::collections::{BTreeMap, HashMap}; use cling::prelude::*; use itertools::Itertools; +use restate_types::nodes_config::Role; use tonic::codec::CompressionEncoding; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; @@ -29,10 +30,9 @@ use restate_types::protobuf::cluster::{ use restate_types::storage::StorageCodec; use restate_types::{GenerationalNodeId, PlainNodeId, Version}; -use crate::app::ConnectionInfo; use crate::commands::display_util::render_as_duration; use crate::commands::log::deserialize_replicated_log_params; -use crate::util::grpc_channel; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug, Default)] #[cling(run = "list_partitions")] @@ -71,21 +71,33 @@ pub async fn list_partitions( connection: &ConnectionInfo, opts: &ListPartitionsOpts, ) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + let cluster_state = connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = ClusterCtrlSvcClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); - let cluster_state_request = ClusterStateRequest::default(); - let cluster_state = client - .get_cluster_state(cluster_state_request) + client + .get_cluster_state(ClusterStateRequest::default()) + .await + }) .await? .into_inner() .cluster_state .ok_or_else(|| anyhow::anyhow!("no cluster state returned"))?; // we need the logs to show the current sequencer for each partition's log - let list_logs_request = ListLogsRequest::default(); - let list_logs_response = client.list_logs(list_logs_request).await?.into_inner(); + let list_logs_response = connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = ClusterCtrlSvcClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + client.list_logs(ListLogsRequest::default()).await + }) + .await? + .into_inner(); + let mut buf = list_logs_response.logs; let logs = StorageCodec::decode::(&mut buf)?; let logs: HashMap = logs.iter().map(|(id, chain)| (*id, chain)).collect(); diff --git a/tools/restatectl/src/commands/replicated_loglet/digest.rs b/tools/restatectl/src/commands/replicated_loglet/digest.rs index 2f8fd496e..485f6f2c8 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest.rs @@ -19,21 +19,16 @@ use restate_bifrost::providers::replicated_loglet::replication::NodeSetChecker; use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; -use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::GetMetadataRequest; -use restate_core::MetadataKind; use restate_log_server::protobuf::log_server_svc_client::LogServerSvcClient; use restate_log_server::protobuf::GetDigestRequest; -use restate_types::logs::metadata::Logs; use restate_types::logs::{LogletId, LogletOffset, SequenceNumber, TailState}; use restate_types::net::log_server::RecordStatus; -use restate_types::nodes_config::{NodesConfiguration, Role}; +use restate_types::nodes_config::Role; use restate_types::replicated_loglet::LogNodeSetExt; -use restate_types::storage::StorageCodec; use restate_types::Versioned; -use crate::app::ConnectionInfo; use crate::commands::replicated_loglet::digest_util::DigestsHelper; +use crate::connection::ConnectionInfo; use crate::util::grpc_channel; #[derive(Run, Parser, Collect, Clone, Debug)] @@ -41,9 +36,7 @@ use crate::util::grpc_channel; pub struct DigestOpts { /// The replicated loglet id loglet_id: LogletId, - /// Sync metadata from metadata store first - #[arg(long)] - sync_metadata: bool, + /// From offset. Requests from oldest if unset. #[arg(long, default_value = "1")] from: u32, @@ -53,26 +46,14 @@ pub struct DigestOpts { } async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - let req = GetMetadataRequest { - kind: MetadataKind::Logs.into(), - sync: opts.sync_metadata, - }; - let mut response = client.get_metadata(req).await?.into_inner(); - let logs = StorageCodec::decode::(&mut response.encoded)?; + let logs = connection.get_logs().await?; c_println!("Log Configuration ({})", logs.version()); let Some(loglet_ref) = logs.get_replicated_loglet(&opts.loglet_id) else { return Err(anyhow::anyhow!("loglet {} not found", opts.loglet_id)); }; - let req = GetMetadataRequest { - kind: MetadataKind::NodesConfiguration.into(), - sync: opts.sync_metadata, - }; - let mut response = client.get_metadata(req).await?.into_inner(); - let nodes_config = StorageCodec::decode::(&mut response.encoded)?; + let nodes_config = connection.get_nodes_configuration().await?; c_println!("Nodes Configuration ({})", nodes_config.version()); c_println!(); diff --git a/tools/restatectl/src/commands/replicated_loglet/info.rs b/tools/restatectl/src/commands/replicated_loglet/info.rs index fb565f4fe..12c0fb011 100644 --- a/tools/restatectl/src/commands/replicated_loglet/info.rs +++ b/tools/restatectl/src/commands/replicated_loglet/info.rs @@ -9,41 +9,22 @@ // by the Apache License, Version 2.0. use cling::prelude::*; -use tonic::codec::CompressionEncoding; use restate_cli_util::{c_indentln, c_println}; -use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::GetMetadataRequest; -use restate_core::MetadataKind; -use restate_types::logs::metadata::Logs; use restate_types::logs::LogletId; -use restate_types::storage::StorageCodec; use restate_types::Versioned; -use crate::app::ConnectionInfo; -use crate::util::grpc_channel; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "get_info")] pub struct InfoOpts { /// The replicated loglet id loglet_id: LogletId, - /// Sync metadata from metadata store first - #[arg(long)] - sync_metadata: bool, } async fn get_info(connection: &ConnectionInfo, opts: &InfoOpts) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - - let req = GetMetadataRequest { - kind: MetadataKind::Logs.into(), - sync: opts.sync_metadata, - }; - let mut response = client.get_metadata(req).await?.into_inner(); - - let logs = StorageCodec::decode::(&mut response.encoded)?; + let logs = connection.get_logs().await?; c_println!("Log Configuration ({})", logs.version()); let Some(loglet_ref) = logs.get_replicated_loglet(&opts.loglet_id) else { diff --git a/tools/restatectl/src/commands/snapshot/create_snapshot.rs b/tools/restatectl/src/commands/snapshot/create_snapshot.rs index 36de7efb8..95aa03ec2 100644 --- a/tools/restatectl/src/commands/snapshot/create_snapshot.rs +++ b/tools/restatectl/src/commands/snapshot/create_snapshot.rs @@ -8,15 +8,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use anyhow::Context; use cling::prelude::*; use tonic::codec::CompressionEncoding; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; use restate_admin::cluster_controller::protobuf::CreatePartitionSnapshotRequest; use restate_cli_util::c_println; +use restate_types::nodes_config::Role; -use crate::app::ConnectionInfo; -use crate::util::grpc_channel; +use crate::connection::ConnectionInfo; #[derive(Run, Parser, Collect, Clone, Debug)] #[clap(visible_alias = "create")] @@ -31,18 +32,20 @@ async fn create_snapshot( connection: &ConnectionInfo, opts: &CreateSnapshotOpts, ) -> anyhow::Result<()> { - let channel = grpc_channel(connection.cluster_controller.clone()); - let mut client = - ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); - let request = CreatePartitionSnapshotRequest { partition_id: opts.partition_id as u32, }; - let response = client - .create_partition_snapshot(request) + let response = connection + .try_each(Some(Role::Admin), |channel| async { + let mut client = ClusterCtrlSvcClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + client.create_partition_snapshot(request).await + }) .await - .map_err(|e| anyhow::anyhow!("failed to request snapshot: {:?}", e))? + .context("Failed to request snapshot")? .into_inner(); c_println!("Snapshot created: {}", response.snapshot_id); diff --git a/tools/restatectl/src/connection.rs b/tools/restatectl/src/connection.rs new file mode 100644 index 000000000..922914eab --- /dev/null +++ b/tools/restatectl/src/connection.rs @@ -0,0 +1,341 @@ +// Copyright (c) 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{cmp::Ordering, collections::HashMap, fmt::Display, future::Future, sync::Arc}; + +use cling::{prelude::Parser, Collect}; +use itertools::{Either, Itertools}; +use tokio::sync::{Mutex, MutexGuard}; +use tonic::{codec::CompressionEncoding, transport::Channel, Response, Status}; +use tracing::debug; + +use restate_core::protobuf::node_ctl_svc::{ + node_ctl_svc_client::NodeCtlSvcClient, GetMetadataRequest, IdentResponse, +}; +use restate_types::{ + logs::metadata::Logs, + net::AdvertisedAddress, + nodes_config::{NodesConfiguration, Role}, + protobuf::common::{MetadataKind, NodeStatus}, + storage::{StorageCodec, StorageDecode, StorageDecodeError}, + Version, Versioned, +}; + +use crate::util::grpc_channel; + +#[derive(Clone, Parser, Collect, Debug)] +pub struct ConnectionInfo { + // todo: rename this to be a node address for reusability across commands + /// Cluster Controller address + #[clap( + long="address", + value_hint = clap::ValueHint::Url, + default_value = "http://localhost:5122/", + env = "RESTATE_ADDRESS", + global = true + )] + pub addresses: Vec, + + /// Sync metadata from metadata store first + #[arg(long)] + pub sync_metadata: bool, + + #[clap(skip)] + nodes_configuration: Arc>>, + + #[clap(skip)] + logs: Arc>>, + + #[clap(skip)] + cache: Arc>>, +} + +impl ConnectionInfo { + /// Gets NodesConfiguration object. This function tries all provided addresses and makes sure + /// nodes configuration is cached. + pub async fn get_nodes_configuration(&self) -> Result { + if self.addresses.is_empty() { + return Err(ConnectionInfoError::NoAvailableNodes(NoRoleError(None))); + } + + let guard = self.nodes_configuration.lock().await; + + // get nodes configuration will always use the addresses seed + // provided via the cmdline + self.get_latest( + self.addresses.iter(), + self.addresses.len(), + GetMetadataRequest { + kind: MetadataKind::NodesConfiguration.into(), + sync: false, + }, + guard, + |ident| Version::from(ident.nodes_config_version), + ) + .await + } + + /// Gets Logs object. + /// + /// This function will try multiple nodes learned from nodes_configuration + /// to get the best guess of the latest logs version is. + pub async fn get_logs(&self) -> Result { + let nodes_config = self.get_nodes_configuration().await?; + + let guard = self.logs.lock().await; + + let nodes_addresses = nodes_config + .iter() + .map(|(_, node)| &node.address) + .collect::>(); + + let cluster_size = nodes_addresses.len(); + let cached = self.cache.lock().await.keys().cloned().collect::>(); + + assert!(!cached.is_empty(), "must have cached connections"); + + let try_nodes = cached + .iter() + .chain( + nodes_addresses + .into_iter() + .filter(|address| !cached.contains(address)), + ) + .collect::>(); + + // To be sure we landed on the best guess of the latest version of the logs + // we need to ask multiple nodes in the nodes config. + // We make sure cached nodes has higher precedence + 50% of node set size trimmed to + // a total of 50% + 1 nodes. + + self.get_latest( + try_nodes.into_iter(), + (cluster_size / 2) + 1, + GetMetadataRequest { + kind: MetadataKind::Logs.into(), + sync: false, + }, + guard, + |ident| Version::from(ident.logs_version), + ) + .await + } + + /// Gets NodesConfiguration object. This function will try `try_best_of` nodes from the provided + /// addresses. Worst case all addresses will be tried if some or all nodes did not satisfy the query + async fn get_latest( + &self, + addresses: impl Iterator, + mut try_best_of: usize, + request: GetMetadataRequest, + mut guard: MutexGuard<'_, Option>, + version_map: M, + ) -> Result + where + T: StorageDecode + Versioned + Clone, + M: Fn(&IdentResponse) -> Version, + { + if let Some(meta) = &*guard { + return Ok(meta.clone()); + } + + let mut latest_meta: Option = None; + let mut answer = false; + let mut errors = NodesErrors::default(); + let mut cache = self.cache.lock().await; + + for address in addresses { + let channel = cache.entry(address.clone()).or_insert_with(|| { + debug!("connecting to {address}"); + grpc_channel(address.clone()) + }); + + let mut client = NodeCtlSvcClient::new(channel.clone()) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + let response = match client.get_ident(()).await { + Ok(response) => response.into_inner(), + Err(status) => { + errors.error(address.clone(), status); + continue; + } + }; + + // node is reachable and has answered + + answer = true; + if response.status != NodeStatus::Alive as i32 { + // node did not join the cluster yet. + continue; + } + + if version_map(&response) + <= latest_meta + .as_ref() + .map(|c| c.version()) + .unwrap_or(Version::INVALID) + { + // has older version than we have. + continue; + } + + let mut response = match client.get_metadata(request).await { + Ok(response) => response.into_inner(), + Err(status) => { + errors.error(address.clone(), status); + continue; + } + }; + + let meta = StorageCodec::decode::(&mut response.encoded).map_err(|err| { + ConnectionInfoError::InvalidNodesConfiguration(address.clone(), err) + })?; + + if meta.version() + > latest_meta + .as_ref() + .map(|c| c.version()) + .unwrap_or(Version::INVALID) + { + latest_meta = Some(meta); + } + + try_best_of -= 1; + if try_best_of == 0 { + break; + } + } + + if !answer { + // all nodes have returned error + return Err(ConnectionInfoError::NodesErrors(errors)); + } + + *guard = latest_meta.clone(); + latest_meta.ok_or(ConnectionInfoError::MissingMetadata) + } + + pub async fn try_each( + &self, + role: Option, + mut closure: F, + ) -> Result, ConnectionInfoError> + where + F: FnMut(Channel) -> Fut, + Fut: Future, Status>>, + { + let nodes_config = self.get_nodes_configuration().await?; + let mut channels = self.cache.lock().await; + + let iterator = match role { + Some(role) => Either::Left(nodes_config.iter_role(role)), + None => Either::Right(nodes_config.iter()), + } + .sorted_by(|a, b| { + // nodes for which we already have open channels get higher precedence. + match ( + channels.contains_key(&a.1.address), + channels.contains_key(&b.1.address), + ) { + (true, false) => Ordering::Less, + (false, true) => Ordering::Greater, + (_, _) => a.0.cmp(&b.0), + } + }); + + let mut errors = NodesErrors::default(); + + for (_, node) in iterator { + // avoid creating new channels on each iteration. Instead cheaply copy the channels + let channel = channels + .entry(node.address.clone()) + .or_insert_with(|| grpc_channel(node.address.clone())); + + let result = closure(channel.clone()).await; + match result { + Ok(response) => return Ok(response), + Err(status) => { + errors.error(node.address.clone(), status); + } + } + } + + if errors.is_empty() { + Err(ConnectionInfoError::NoAvailableNodes(NoRoleError(role))) + } else { + Err(ConnectionInfoError::NodesErrors(errors)) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ConnectionInfoError { + #[error("Could not retrieve cluster metadata. Has the cluster been provisioned yet?")] + MissingMetadata, + + #[error("Failed to decode nodes configuration from node {0}: {1}")] + InvalidNodesConfiguration(AdvertisedAddress, StorageDecodeError), + + #[error(transparent)] + NodesErrors(NodesErrors), + + #[error(transparent)] + NoAvailableNodes(NoRoleError), +} + +#[derive(Debug, thiserror::Error)] +pub struct NoRoleError(Option); + +impl Display for NoRoleError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.0 { + Some(role) => { + write!(f, "No available {role} nodes to satisfy the request")?; + } + None => { + write!(f, "No available nodes to satisfy the request")?; + } + } + + Ok(()) + } +} + +#[derive(Debug, Default)] +pub struct NodesErrors { + node_status: Vec<(AdvertisedAddress, Status)>, +} + +impl NodesErrors { + fn error(&mut self, node: AdvertisedAddress, status: Status) { + self.node_status.push((node, status)); + } + + fn is_empty(&self) -> bool { + self.node_status.is_empty() + } +} + +impl Display for NodesErrors { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "Encountered multiple errors:")?; + for (address, status) in &self.node_status { + writeln!(f, " - {address} -> {status}")?; + } + Ok(()) + } +} + +impl std::error::Error for NodesErrors { + fn description(&self) -> &str { + "aggregated nodes error" + } +} diff --git a/tools/restatectl/src/lib.rs b/tools/restatectl/src/lib.rs index 18dfb0ee6..17ddc16f3 100644 --- a/tools/restatectl/src/lib.rs +++ b/tools/restatectl/src/lib.rs @@ -13,4 +13,5 @@ pub(crate) mod commands; pub(crate) mod util; pub use app::CliApp; mod build_info; +pub(crate) mod connection; pub(crate) mod environment;