Skip to content

Commit

Permalink
feat(service-params): Rename host -> listen_addr, `client_addr -> a…
Browse files Browse the repository at this point in the history
…dvertise_addr` (#7530)

Rename `host -> listen_addr`, `client_addr -> advertise_addr` for clearer meaning of cmdline params.

Approved-By: CAJan93

Co-Authored-By: jon-chuang <jon-chuang@users.noreply.github.com>
Co-Authored-By: jon-chuang <9093549+jon-chuang@users.noreply.github.com>
  • Loading branch information
jon-chuang and jon-chuang authored Feb 3, 2023
1 parent bb673d4 commit 704cc4f
Show file tree
Hide file tree
Showing 19 changed files with 166 additions and 126 deletions.
10 changes: 5 additions & 5 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
image: "ghcr.io/risingwavelabs/risingwave:latest"
command:
- compactor-node
- "--host"
- "--listen-addr"
- "0.0.0.0:6660"
- "--prometheus-listener-addr"
- "0.0.0.0:1260"
Expand Down Expand Up @@ -42,11 +42,11 @@ services:
image: "ghcr.io/risingwavelabs/risingwave:latest"
command:
- compute-node
- "--host"
- "--listen-addr"
- "0.0.0.0:5688"
- "--prometheus-listener-addr"
- "0.0.0.0:1222"
- "--client-address"
- "--advertise-addr"
- "compute-node-0:5688"
- "--metrics-level"
- "1"
Expand Down Expand Up @@ -124,7 +124,7 @@ services:
image: "ghcr.io/risingwavelabs/risingwave:latest"
command:
- frontend-node
- "--host"
- "--listen-addr"
- "0.0.0.0:4566"
- "--meta-addr"
- "http://meta-node-0:5690"
Expand Down Expand Up @@ -181,7 +181,7 @@ services:
- meta-node
- "--listen-addr"
- "0.0.0.0:5690"
- "--meta-endpoint"
- "--advertise-addr"
- "meta-node-0:5690"
- "--dashboard-host"
- "0.0.0.0:5691"
Expand Down
16 changes: 8 additions & 8 deletions src/cmd_all/src/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,23 @@ fn get_services(profile: &str) -> (Vec<RisingWaveService>, bool) {
"playground-3cn" => vec![
RisingWaveService::Meta(osstrs([])),
RisingWaveService::Compute(osstrs([
"--host",
"--listen-addr",
"127.0.0.1:5687",
"--state-store",
"hummock+memory-shared",
"--parallelism",
"4",
])),
RisingWaveService::Compute(osstrs([
"--host",
"--listen-addr",
"127.0.0.1:5688",
"--state-store",
"hummock+memory-shared",
"--parallelism",
"4",
])),
RisingWaveService::Compute(osstrs([
"--host",
"--listen-addr",
"127.0.0.1:5689",
"--state-store",
"hummock+memory-shared",
Expand All @@ -96,21 +96,21 @@ fn get_services(profile: &str) -> (Vec<RisingWaveService>, bool) {
RisingWaveService::Meta(osstrs([
"--listen-addr",
"0.0.0.0:5690",
"--meta-endpoint",
"--advertise-addr",
"127.0.0.1:5690",
"--dashboard-host",
"0.0.0.0:5691",
])),
RisingWaveService::Compute(osstrs([
"--host",
"--listen-addr",
"0.0.0.0:5688",
"--client-address",
"--advertise-addr",
"127.0.0.1:5688",
])),
RisingWaveService::Frontend(osstrs([
"--host",
"--listen-addr",
"0.0.0.0:4566",
"--client-address",
"--advertise-addr",
"127.0.0.1:4566",
])),
]
Expand Down
40 changes: 24 additions & 16 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,23 @@ use risingwave_common_proc_macro::OverrideConfig;
/// Command-line arguments for compute-node.
#[derive(Parser, Clone, Debug)]
pub struct ComputeNodeOpts {
// TODO: rename to listen_address and separate out the port.
#[clap(long, env = "RW_HOST", default_value = "127.0.0.1:5688")]
pub host: String,
// TODO: rename to listen_addr and separate out the port.
/// The address that this service listens to.
/// Usually the localhost + desired port.
#[clap(
long,
alias = "host",
env = "RW_LISTEN_ADDR",
default_value = "127.0.0.1:5688"
)]
pub listen_addr: String,

/// The address of the compute node's meta client.
///
/// Optional, we will use listen_address if not specified.
#[clap(long, env = "RW_CLIENT_ADDRESS")]
pub client_address: Option<String>,
/// The address for contacting this instance of the service.
/// This would be synonymous with the service's "public address"
/// or "identifying address".
/// Optional, we will use listen_addr if not specified.
#[clap(long, alias = "client_address", env = "RW_ADVERTISE_ADDR", long)]
pub advertise_addr: Option<String>,

#[clap(
long,
Expand Down Expand Up @@ -151,22 +159,22 @@ pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>>
tracing::info!("Compute node options: {:?}", opts);
validate_opts(&opts);

let listen_address = opts.host.parse().unwrap();
tracing::info!("Server Listening at {}", listen_address);
let listen_addr = opts.listen_addr.parse().unwrap();
tracing::info!("Server Listening at {}", listen_addr);

let client_address = opts
.client_address
let advertise_addr = opts
.advertise_addr
.as_ref()
.unwrap_or_else(|| {
tracing::warn!("Client address is not specified, defaulting to host address");
&opts.host
tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
&opts.listen_addr
})
.parse()
.unwrap();
tracing::info!("Client address is {}", client_address);
tracing::info!("advertise addr is {}", advertise_addr);

let (join_handle_vec, _shutdown_send) =
compute_node_serve(listen_address, client_address, opts).await;
compute_node_serve(listen_addr, advertise_addr, opts).await;

for join_handle in join_handle_vec {
join_handle.await.unwrap();
Expand Down
12 changes: 6 additions & 6 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use crate::ComputeNodeOpts;
/// Bootstraps the compute-node.
pub async fn compute_node_serve(
listen_addr: SocketAddr,
client_addr: HostAddr,
advertise_addr: HostAddr,
opts: ComputeNodeOpts,
) -> (Vec<JoinHandle<()>>, Sender<()>) {
// Load the configuration.
Expand All @@ -85,7 +85,7 @@ pub async fn compute_node_serve(
let meta_client = MetaClient::register_new(
&opts.meta_address,
WorkerType::ComputeNode,
&client_addr,
&advertise_addr,
opts.parallelism,
)
.await
Expand Down Expand Up @@ -203,7 +203,7 @@ pub async fn compute_node_serve(
// Initialize the managers.
let batch_mgr = Arc::new(BatchManager::new(config.batch.clone()));
let stream_mgr = Arc::new(LocalStreamManager::new(
client_addr.clone(),
advertise_addr.clone(),
state_store.clone(),
streaming_metrics.clone(),
config.streaming.clone(),
Expand Down Expand Up @@ -234,7 +234,7 @@ pub async fn compute_node_serve(
let client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let batch_env = BatchEnvironment::new(
batch_mgr.clone(),
client_addr.clone(),
advertise_addr.clone(),
batch_config,
worker_id,
state_store.clone(),
Expand All @@ -249,7 +249,7 @@ pub async fn compute_node_serve(
};
// Initialize the streaming environment.
let stream_env = StreamEnvironment::new(
client_addr.clone(),
advertise_addr.clone(),
connector_params,
stream_config,
worker_id,
Expand Down Expand Up @@ -321,7 +321,7 @@ pub async fn compute_node_serve(
}

// All set, let the meta service know we're ready.
meta_client.activate(&client_addr).await.unwrap();
meta_client.activate(&advertise_addr).await.unwrap();

(join_handle_vec, shutdown_send)
}
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use risingwave_rpc_client::ComputeClient;
use serde_json;

pub async fn show_config(host: &str) -> anyhow::Result<()> {
let listen_address = HostAddr::try_from(host)?;
let client = ComputeClient::new(listen_address).await?;
let listen_addr = HostAddr::try_from(host)?;
let client = ComputeClient::new(listen_addr).await?;
let config_response = client.show_config().await?;
let batch_config: BatchConfig = serde_json::from_str(&config_response.batch_config)?;
let stream_config: StreamingConfig = serde_json::from_str(&config_response.stream_config)?;
Expand Down
27 changes: 19 additions & 8 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,29 @@ use session::SessionManagerImpl;
/// Command-line arguments for frontend-node.
#[derive(Parser, Clone, Debug)]
pub struct FrontendOpts {
// TODO: rename to listen_address and separate out the port.
#[clap(long, env = "RW_HOST", default_value = "127.0.0.1:4566")]
pub host: String,
// TODO: rename to listen_addr and separate out the port.
/// The address that this service listens to.
/// Usually the localhost + desired port.
#[clap(
long,
alias = "host",
env = "RW_LISTEN_ADDR",
default_value = "127.0.0.1:4566"
)]
pub listen_addr: String,

// Optional, we will use listen_address if not specified.
#[clap(long, env = "RW_CLIENT_ADDRESS")]
pub client_address: Option<String>,
/// The address for contacting this instance of the service.
/// This would be synonymous with the service's "public address"
/// or "identifying address".
/// Optional, we will use listen_addr if not specified.
#[clap(long, env = "RW_ADVERTISE_ADDR", alias = "client-address")]
pub advertise_addr: Option<String>,

// TODO: This is currently unused.
#[clap(long, env = "RW_PORT")]
pub port: Option<u16>,

/// The address via which we will attempt to connect to a leader meta node.
#[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
pub meta_addr: String,

Expand Down Expand Up @@ -138,9 +149,9 @@ pub fn start(opts: FrontendOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
// WARNING: don't change the function signature. Making it `async fn` will cause
// slow compile in release mode.
Box::pin(async move {
let addr = opts.host.clone();
let listen_addr = opts.listen_addr.clone();
let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap());
pg_serve(&addr, session_mgr, Some(TlsConfig::new_default()))
pg_serve(&listen_addr, session_mgr, Some(TlsConfig::new_default()))
.await
.unwrap();
})
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ impl FrontendEnv {
let batch_config = config.batch;

let frontend_address: HostAddr = opts
.client_address
.advertise_addr
.as_ref()
.unwrap_or_else(|| {
tracing::warn!("Client address is not specified, defaulting to host address");
&opts.host
tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
&opts.listen_addr
})
.parse()
.unwrap();
tracing::info!("Client address is {}", frontend_address);
tracing::info!("advertise addr is {}", frontend_address);

// Register in meta by calling `AddWorkerNode` RPC.
let meta_client = MetaClient::register_new(
Expand Down
22 changes: 12 additions & 10 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ pub struct MetaNodeOpts {
#[clap(long, env = "RW_HOST")]
host: Option<String>,

/// The endpoint for this meta node, which also serves as its unique identifier in cluster
/// membership and leader election.
#[clap(long, env = "RW_META_ENDPOINT")]
meta_endpoint: Option<String>,
/// The address for contacting this instance of the service.
/// This would be synonymous with the service's "public address"
/// or "identifying address".
/// It will serve as a unique identifier in cluster
/// membership and leader election. Must be specified for etcd backend.
#[clap(long, env = "RW_ADVERTISE_ADDR", required_if_eq("backend", "etcd"))]
advertise_addr: Option<String>,

#[clap(long, env = "RW_DASHBOARD_HOST")]
dashboard_host: Option<String>,
Expand Down Expand Up @@ -136,13 +139,12 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let config = load_config(&opts.config_path, Some(opts.override_opts));
tracing::info!("Starting meta node with config {:?}", config);
let listen_addr: SocketAddr = opts.listen_addr.parse().unwrap();
let meta_addr = opts
.host
.map(|host| format!("{}:{}", host, listen_addr.port()))
.unwrap_or_else(|| opts.listen_addr.clone());
let meta_addr = opts.host.unwrap_or_else(|| listen_addr.ip().to_string());
let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
let prometheus_addr = opts.prometheus_host.map(|x| x.parse().unwrap());
let meta_endpoint = opts.meta_endpoint.unwrap_or(meta_addr);
let advertise_addr = opts
.advertise_addr
.unwrap_or_else(|| format!("{}:{}", meta_addr, listen_addr.port()));
let backend = match config.meta.backend {
MetaBackend::Etcd => MetaStoreBackend::Etcd {
endpoints: opts
Expand All @@ -167,7 +169,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {

tracing::info!("Meta server listening at {}", listen_addr);
let add_info = AddressInfo {
meta_endpoint,
advertise_addr,
listen_addr,
prometheus_addr,
dashboard_addr,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/follower_svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn start_follower_srv(
let leader_srv = LeaderServiceImpl::new(
election_client,
MetaLeaderInfo {
node_address: address_info.meta_endpoint.to_string(),
node_address: address_info.advertise_addr.to_string(),
lease_id: 0,
},
);
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum MetaStoreBackend {

#[derive(Clone)]
pub struct AddressInfo {
pub meta_endpoint: String,
pub advertise_addr: String,
pub listen_addr: SocketAddr,
pub prometheus_addr: Option<SocketAddr>,
pub dashboard_addr: Option<SocketAddr>,
Expand All @@ -51,7 +51,7 @@ pub struct AddressInfo {
impl Default for AddressInfo {
fn default() -> Self {
Self {
meta_endpoint: "".to_string(),
advertise_addr: "".to_string(),
listen_addr: SocketAddr::V4("127.0.0.1:0000".parse().unwrap()),
prometheus_addr: None,
dashboard_addr: None,
Expand Down Expand Up @@ -92,7 +92,7 @@ pub async fn rpc_serve(
EtcdElectionClient::new(
endpoints,
Some(options),
address_info.meta_endpoint.clone(),
address_info.advertise_addr.clone(),
)
.await?,
);
Expand Down Expand Up @@ -192,7 +192,7 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
election_client.leader().await.unwrap().unwrap().into()
} else {
MetaLeaderInfo {
node_address: address_info.meta_endpoint.clone(),
node_address: address_info.advertise_addr.clone(),
lease_id: 0,
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/risedevtool/src/task/compactor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ impl CompactorService {
"compactor cannot use in-memory hummock if remote object store is not provided"
));
}
cmd.arg("--host")
cmd.arg("--listen-addr")
.arg(format!("{}:{}", config.listen_address, config.port))
.arg("--prometheus-listener-addr")
.arg(format!(
"{}:{}",
config.listen_address, config.exporter_port
))
.arg("--client-address")
.arg("--advertise-addr")
.arg(format!("{}:{}", config.address, config.port))
.arg("--metrics-level")
.arg("1")
Expand Down
Loading

0 comments on commit 704cc4f

Please sign in to comment.