Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamline restatectl connection to cluster #2592

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ service ClusterCtrlSvc {

rpc DescribeLog(DescribeLogRequest) returns (DescribeLogResponse);

rpc ListNodes(ListNodesRequest) returns (ListNodesResponse);

rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty);

rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest)
Expand Down Expand Up @@ -82,13 +80,6 @@ message DescribeLogResponse {
bytes nodes_configuration = 7;
}

message ListNodesRequest {}

message ListNodesResponse {
// Serialized restate_types::nodes_config::NodesConfiguration
bytes nodes_configuration = 1;
}

message TrimLogRequest {
uint32 log_id = 1;
uint64 trim_point = 2;
Expand Down
26 changes: 2 additions & 24 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ use crate::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvc
use crate::cluster_controller::protobuf::{
ClusterStateRequest, ClusterStateResponse, CreatePartitionSnapshotRequest,
CreatePartitionSnapshotResponse, DescribeLogRequest, DescribeLogResponse, FindTailRequest,
FindTailResponse, ListLogsRequest, ListLogsResponse, ListNodesRequest, ListNodesResponse,
SealAndExtendChainRequest, SealAndExtendChainResponse, SealedSegment, TailState,
TrimLogRequest,
FindTailResponse, ListLogsRequest, ListLogsResponse, SealAndExtendChainRequest,
SealAndExtendChainResponse, SealedSegment, TailState, TrimLogRequest,
};

use super::protobuf::{
Expand Down Expand Up @@ -143,27 +142,6 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
}))
}

async fn list_nodes(
&self,
_request: Request<ListNodesRequest>,
) -> Result<Response<ListNodesResponse>, Status> {
let nodes_config = self
.metadata_writer
.metadata_store_client()
.get::<NodesConfiguration>(NODES_CONFIG_KEY.clone())
.await
.map_err(|error| {
Status::unknown(format!(
"Failed to get nodes configuration metadata: {error:?}"
))
})?
.ok_or(Status::not_found("Missing nodes configuration"))?;

Ok(Response::new(ListNodesResponse {
nodes_configuration: serialize_value(nodes_config),
}))
}

/// Internal operations API to trigger the log truncation
async fn trim_log(&self, request: Request<TrimLogRequest>) -> Result<Response<()>, Status> {
let request = request.into_inner();
Expand Down
3 changes: 2 additions & 1 deletion crates/core/protobuf/node_ctl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ message GetMetadataResponse {

message ClusterHealthResponse {
string cluster_name = 1;
// Some value if the cluster has been configured to use the embedded metadata store
// Some value if the cluster has been configured to use the embedded metadata
// store
optional EmbeddedMetadataClusterHealth metadata_cluster_health = 2;
}

Expand Down
1 change: 1 addition & 0 deletions tools/restatectl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
strum = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, features = ["transport", "prost"] }
tracing = { workspace = true }
Expand Down
18 changes: 1 addition & 17 deletions tools/restatectl/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::str::FromStr;

use cling::prelude::*;

use restate_cli_util::CliContext;
use restate_cli_util::CommonOpts;
use restate_types::net::AdvertisedAddress;

use crate::commands::cluster::overview::ClusterStatusOpts;
use crate::commands::cluster::Cluster;
Expand All @@ -24,6 +21,7 @@ use crate::commands::node::Nodes;
use crate::commands::partition::Partitions;
use crate::commands::replicated_loglet::ReplicatedLoglet;
use crate::commands::snapshot::Snapshot;
use crate::connection::ConnectionInfo;

#[derive(Run, Parser, Clone)]
#[command(author, version = crate::build_info::version(), about, infer_subcommands = true)]
Expand All @@ -37,20 +35,6 @@ pub struct CliApp {
pub cmd: Command,
}

#[derive(Parser, Collect, Debug, Clone)]
pub struct ConnectionInfo {
// todo: rename this to be a node address for reusability across commands
/// Cluster Controller address
#[clap(
long,
value_hint = clap::ValueHint::Url,
default_value_t = AdvertisedAddress::from_str("http://localhost:5122/").unwrap(),
env = "RESTATE_CLUSTER_CONTROLLER_ADDRESS",
global = true
)]
pub cluster_controller: AdvertisedAddress,
}

#[derive(Run, Subcommand, Clone)]
pub enum Command {
/// Cluster operations
Expand Down
36 changes: 13 additions & 23 deletions tools/restatectl/src/commands/cluster/config/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,37 @@

use clap::Parser;
use cling::{Collect, Run};
use tonic::{codec::CompressionEncoding, Code};
use tonic::codec::CompressionEncoding;

use restate_admin::cluster_controller::protobuf::{
cluster_ctrl_svc_client::ClusterCtrlSvcClient, GetClusterConfigurationRequest,
};
use restate_cli_util::c_println;
use restate_types::nodes_config::Role;

use crate::{
app::ConnectionInfo, commands::cluster::config::cluster_config_string, util::grpc_channel,
};
use crate::{commands::cluster::config::cluster_config_string, connection::ConnectionInfo};

#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "config_get")]
pub struct ConfigGetOpts {}

async fn config_get(connection: &ConnectionInfo, _get_opts: &ConfigGetOpts) -> anyhow::Result<()> {
let channel = grpc_channel(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

let response = client
.get_cluster_configuration(GetClusterConfigurationRequest {})
.await;

let response = match response {
Ok(response) => response,
Err(status) if status.code() == Code::NotFound => {
c_println!("👻 Cluster is not configured");
return Ok(());
}
Err(status) => {
anyhow::bail!("Failed to get cluster configuration: {status}");
}
};
let response = connection
.try_each(Some(Role::Admin), |channel| async {
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

client
.get_cluster_configuration(GetClusterConfigurationRequest {})
.await
})
.await?;

let configuration = response.into_inner();
let cluster_configuration = configuration.cluster_configuration.expect("is set");

let output = cluster_config_string(&cluster_configuration)?;

c_println!("{}", output);

Ok(())
}
31 changes: 21 additions & 10 deletions tools/restatectl/src/commands/cluster/config/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use restate_cli_util::_comfy_table::{Cell, Color, Table};
use restate_cli_util::ui::console::{confirm_or_exit, StyledTable};
use restate_cli_util::{c_println, c_warn};
use restate_types::logs::metadata::{ProviderConfiguration, ProviderKind};
use restate_types::nodes_config::Role;
use restate_types::replication::ReplicationProperty;

use crate::commands::cluster::config::cluster_config_string;
use crate::commands::cluster::provision::extract_default_provider;
use crate::{app::ConnectionInfo, util::grpc_channel};
use crate::connection::ConnectionInfo;

#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "config_set")]
Expand All @@ -51,12 +52,16 @@ pub struct ConfigSetOpts {
}

async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> anyhow::Result<()> {
let channel = grpc_channel(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

let response = client
.get_cluster_configuration(GetClusterConfigurationRequest {})
let response = connection
.try_each(Some(Role::Admin), |channel| async {
let mut client = ClusterCtrlSvcClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip);

client
.get_cluster_configuration(GetClusterConfigurationRequest {})
.await
})
.await
.context("Failed to get cluster configuration")?
.into_inner();
Expand Down Expand Up @@ -123,10 +128,16 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an
cluster_configuration: Some(current),
};

client
.set_cluster_configuration(request)
connection
.try_each(Some(Role::Admin), |channel| async {
let mut client = ClusterCtrlSvcClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip);

client.set_cluster_configuration(request.clone()).await
})
.await
.map_err(|err| anyhow::anyhow!("Failed to set configuration: {}", err.message()))?;
.context("Failed to set configuration")?;

c_println!("✅ Configuration updated successfully");

Expand Down
2 changes: 1 addition & 1 deletion tools/restatectl/src/commands/cluster/overview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use cling::{Collect, Run};

use restate_cli_util::c_println;

use crate::app::ConnectionInfo;
use crate::commands::log::list_logs::{list_logs, ListLogsOpts};
use crate::commands::node::list_nodes::{list_nodes, ListNodesOpts};
use crate::commands::partition::list::{list_partitions, ListPartitionsOpts};
use crate::connection::ConnectionInfo;

#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "cluster_status")]
Expand Down
18 changes: 7 additions & 11 deletions tools/restatectl/src/commands/cluster/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::app::ConnectionInfo;
use crate::commands::cluster::config::cluster_config_string;
use crate::connection::ConnectionInfo;
use crate::util::grpc_channel;
use clap::Parser;
use cling::{Collect, Run};
Expand All @@ -18,7 +18,6 @@ use restate_cli_util::{c_error, c_println, c_warn};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest;
use restate_types::logs::metadata::{ProviderConfiguration, ProviderKind, ReplicatedLogletConfig};
use restate_types::net::AdvertisedAddress;
use restate_types::replication::ReplicationProperty;
use std::num::NonZeroU16;
use tonic::codec::CompressionEncoding;
Expand All @@ -27,10 +26,6 @@ use tonic::Code;
#[derive(Run, Parser, Collect, Clone, Debug)]
#[cling(run = "cluster_provision")]
pub struct ProvisionOpts {
/// Address of the node that should be provisioned
#[clap(long)]
address: Option<AdvertisedAddress>,

/// Number of partitions
#[clap(long)]
num_partitions: Option<NonZeroU16>,
Expand All @@ -56,13 +51,14 @@ pub struct ProvisionOpts {
}

async fn cluster_provision(
connection_info: &ConnectionInfo,
connection: &ConnectionInfo,
provision_opts: &ProvisionOpts,
) -> anyhow::Result<()> {
let node_address = provision_opts
.address
.clone()
.unwrap_or_else(|| connection_info.cluster_controller.clone());
if connection.addresses.len() != 1 {
anyhow::bail!("Only one address must be provided for provision");
}
let node_address = connection.addresses[0].clone();

let channel = grpc_channel(node_address.clone());

let mut client = NodeCtlSvcClient::new(channel)
Expand Down
Loading
Loading