diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 49b6e9f13919c..2c11d7a5f4964 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -5,7 +5,7 @@ services: image: "ghcr.io/risingwavelabs/risingwave:latest" command: - compactor-node - - "--host" + - "--listen-addr" - "0.0.0.0:6660" - "--prometheus-listener-addr" - "0.0.0.0:1260" @@ -42,11 +42,11 @@ services: image: "ghcr.io/risingwavelabs/risingwave:latest" command: - compute-node - - "--host" + - "--listen-addr" - "0.0.0.0:5688" - "--prometheus-listener-addr" - "0.0.0.0:1222" - - "--client-address" + - "--advertise-addr" - "compute-node-0:5688" - "--metrics-level" - "1" @@ -124,7 +124,7 @@ services: image: "ghcr.io/risingwavelabs/risingwave:latest" command: - frontend-node - - "--host" + - "--listen-addr" - "0.0.0.0:4566" - "--meta-addr" - "http://meta-node-0:5690" @@ -181,7 +181,7 @@ services: - meta-node - "--listen-addr" - "0.0.0.0:5690" - - "--meta-endpoint" + - "--advertise-addr" - "meta-node-0:5690" - "--dashboard-host" - "0.0.0.0:5691" diff --git a/src/cmd_all/src/playground.rs b/src/cmd_all/src/playground.rs index 5dbb65c3a87dc..d730361eb9dbe 100644 --- a/src/cmd_all/src/playground.rs +++ b/src/cmd_all/src/playground.rs @@ -66,7 +66,7 @@ fn get_services(profile: &str) -> (Vec, bool) { "playground-3cn" => vec![ RisingWaveService::Meta(osstrs([])), RisingWaveService::Compute(osstrs([ - "--host", + "--listen-addr", "127.0.0.1:5687", "--state-store", "hummock+memory-shared", @@ -74,7 +74,7 @@ fn get_services(profile: &str) -> (Vec, bool) { "4", ])), RisingWaveService::Compute(osstrs([ - "--host", + "--listen-addr", "127.0.0.1:5688", "--state-store", "hummock+memory-shared", @@ -82,7 +82,7 @@ fn get_services(profile: &str) -> (Vec, bool) { "4", ])), RisingWaveService::Compute(osstrs([ - "--host", + "--listen-addr", "127.0.0.1:5689", "--state-store", "hummock+memory-shared", @@ -96,21 +96,21 @@ fn get_services(profile: &str) -> (Vec, bool) { RisingWaveService::Meta(osstrs([ "--listen-addr", "0.0.0.0:5690", - "--meta-endpoint", + "--advertise-addr", "127.0.0.1:5690", "--dashboard-host", "0.0.0.0:5691", ])), RisingWaveService::Compute(osstrs([ - "--host", + "--listen-addr", "0.0.0.0:5688", - "--client-address", + "--advertise-addr", "127.0.0.1:5688", ])), RisingWaveService::Frontend(osstrs([ - "--host", + "--listen-addr", "0.0.0.0:4566", - "--client-address", + "--advertise-addr", "127.0.0.1:4566", ])), ] diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 6a9cf8407be62..a0737992cee1c 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -37,15 +37,23 @@ use risingwave_common_proc_macro::OverrideConfig; /// Command-line arguments for compute-node. #[derive(Parser, Clone, Debug)] pub struct ComputeNodeOpts { - // TODO: rename to listen_address and separate out the port. - #[clap(long, env = "RW_HOST", default_value = "127.0.0.1:5688")] - pub host: String, + // TODO: rename to listen_addr and separate out the port. + /// The address that this service listens to. + /// Usually the localhost + desired port. + #[clap( + long, + alias = "host", + env = "RW_LISTEN_ADDR", + default_value = "127.0.0.1:5688" + )] + pub listen_addr: String, - /// The address of the compute node's meta client. - /// - /// Optional, we will use listen_address if not specified. - #[clap(long, env = "RW_CLIENT_ADDRESS")] - pub client_address: Option, + /// The address for contacting this instance of the service. + /// This would be synonymous with the service's "public address" + /// or "identifying address". + /// Optional, we will use listen_addr if not specified. + #[clap(long, alias = "client_address", env = "RW_ADVERTISE_ADDR", long)] + pub advertise_addr: Option, #[clap( long, @@ -151,22 +159,22 @@ pub fn start(opts: ComputeNodeOpts) -> Pin + Send>> tracing::info!("Compute node options: {:?}", opts); validate_opts(&opts); - let listen_address = opts.host.parse().unwrap(); - tracing::info!("Server Listening at {}", listen_address); + let listen_addr = opts.listen_addr.parse().unwrap(); + tracing::info!("Server Listening at {}", listen_addr); - let client_address = opts - .client_address + let advertise_addr = opts + .advertise_addr .as_ref() .unwrap_or_else(|| { - tracing::warn!("Client address is not specified, defaulting to host address"); - &opts.host + tracing::warn!("advertise addr is not specified, defaulting to listen_addr"); + &opts.listen_addr }) .parse() .unwrap(); - tracing::info!("Client address is {}", client_address); + tracing::info!("advertise addr is {}", advertise_addr); let (join_handle_vec, _shutdown_send) = - compute_node_serve(listen_address, client_address, opts).await; + compute_node_serve(listen_addr, advertise_addr, opts).await; for join_handle in join_handle_vec { join_handle.await.unwrap(); diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 938f7ca07c2bb..a13edcd9bc2a0 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -66,7 +66,7 @@ use crate::ComputeNodeOpts; /// Bootstraps the compute-node. pub async fn compute_node_serve( listen_addr: SocketAddr, - client_addr: HostAddr, + advertise_addr: HostAddr, opts: ComputeNodeOpts, ) -> (Vec>, Sender<()>) { // Load the configuration. @@ -85,7 +85,7 @@ pub async fn compute_node_serve( let meta_client = MetaClient::register_new( &opts.meta_address, WorkerType::ComputeNode, - &client_addr, + &advertise_addr, opts.parallelism, ) .await @@ -203,7 +203,7 @@ pub async fn compute_node_serve( // Initialize the managers. let batch_mgr = Arc::new(BatchManager::new(config.batch.clone())); let stream_mgr = Arc::new(LocalStreamManager::new( - client_addr.clone(), + advertise_addr.clone(), state_store.clone(), streaming_metrics.clone(), config.streaming.clone(), @@ -234,7 +234,7 @@ pub async fn compute_node_serve( let client_pool = Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); let batch_env = BatchEnvironment::new( batch_mgr.clone(), - client_addr.clone(), + advertise_addr.clone(), batch_config, worker_id, state_store.clone(), @@ -249,7 +249,7 @@ pub async fn compute_node_serve( }; // Initialize the streaming environment. let stream_env = StreamEnvironment::new( - client_addr.clone(), + advertise_addr.clone(), connector_params, stream_config, worker_id, @@ -321,7 +321,7 @@ pub async fn compute_node_serve( } // All set, let the meta service know we're ready. - meta_client.activate(&client_addr).await.unwrap(); + meta_client.activate(&advertise_addr).await.unwrap(); (join_handle_vec, shutdown_send) } diff --git a/src/ctl/src/cmd_impl/compute.rs b/src/ctl/src/cmd_impl/compute.rs index 5941939cdf06b..61cde8ffa85df 100644 --- a/src/ctl/src/cmd_impl/compute.rs +++ b/src/ctl/src/cmd_impl/compute.rs @@ -19,8 +19,8 @@ use risingwave_rpc_client::ComputeClient; use serde_json; pub async fn show_config(host: &str) -> anyhow::Result<()> { - let listen_address = HostAddr::try_from(host)?; - let client = ComputeClient::new(listen_address).await?; + let listen_addr = HostAddr::try_from(host)?; + let client = ComputeClient::new(listen_addr).await?; let config_response = client.show_config().await?; let batch_config: BatchConfig = serde_json::from_str(&config_response.batch_config)?; let stream_config: StreamingConfig = serde_json::from_str(&config_response.stream_config)?; diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 3e81c49d77dea..9954f6d003775 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -69,18 +69,29 @@ use session::SessionManagerImpl; /// Command-line arguments for frontend-node. #[derive(Parser, Clone, Debug)] pub struct FrontendOpts { - // TODO: rename to listen_address and separate out the port. - #[clap(long, env = "RW_HOST", default_value = "127.0.0.1:4566")] - pub host: String, + // TODO: rename to listen_addr and separate out the port. + /// The address that this service listens to. + /// Usually the localhost + desired port. + #[clap( + long, + alias = "host", + env = "RW_LISTEN_ADDR", + default_value = "127.0.0.1:4566" + )] + pub listen_addr: String, - // Optional, we will use listen_address if not specified. - #[clap(long, env = "RW_CLIENT_ADDRESS")] - pub client_address: Option, + /// The address for contacting this instance of the service. + /// This would be synonymous with the service's "public address" + /// or "identifying address". + /// Optional, we will use listen_addr if not specified. + #[clap(long, env = "RW_ADVERTISE_ADDR", alias = "client-address")] + pub advertise_addr: Option, // TODO: This is currently unused. #[clap(long, env = "RW_PORT")] pub port: Option, + /// The address via which we will attempt to connect to a leader meta node. #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")] pub meta_addr: String, @@ -138,9 +149,9 @@ pub fn start(opts: FrontendOpts) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. Box::pin(async move { - let addr = opts.host.clone(); + let listen_addr = opts.listen_addr.clone(); let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap()); - pg_serve(&addr, session_mgr, Some(TlsConfig::new_default())) + pg_serve(&listen_addr, session_mgr, Some(TlsConfig::new_default())) .await .unwrap(); }) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 216b288b1319b..eddc62dec84da 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -156,15 +156,15 @@ impl FrontendEnv { let batch_config = config.batch; let frontend_address: HostAddr = opts - .client_address + .advertise_addr .as_ref() .unwrap_or_else(|| { - tracing::warn!("Client address is not specified, defaulting to host address"); - &opts.host + tracing::warn!("advertise addr is not specified, defaulting to listen_addr"); + &opts.listen_addr }) .parse() .unwrap(); - tracing::info!("Client address is {}", frontend_address); + tracing::info!("advertise addr is {}", frontend_address); // Register in meta by calling `AddWorkerNode` RPC. let meta_client = MetaClient::register_new( diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 3f66b05c663bc..425bb5dbf0fb6 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -65,10 +65,13 @@ pub struct MetaNodeOpts { #[clap(long, env = "RW_HOST")] host: Option, - /// The endpoint for this meta node, which also serves as its unique identifier in cluster - /// membership and leader election. - #[clap(long, env = "RW_META_ENDPOINT")] - meta_endpoint: Option, + /// The address for contacting this instance of the service. + /// This would be synonymous with the service's "public address" + /// or "identifying address". + /// It will serve as a unique identifier in cluster + /// membership and leader election. Must be specified for etcd backend. + #[clap(long, env = "RW_ADVERTISE_ADDR", required_if_eq("backend", "etcd"))] + advertise_addr: Option, #[clap(long, env = "RW_DASHBOARD_HOST")] dashboard_host: Option, @@ -136,13 +139,12 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { let config = load_config(&opts.config_path, Some(opts.override_opts)); tracing::info!("Starting meta node with config {:?}", config); let listen_addr: SocketAddr = opts.listen_addr.parse().unwrap(); - let meta_addr = opts - .host - .map(|host| format!("{}:{}", host, listen_addr.port())) - .unwrap_or_else(|| opts.listen_addr.clone()); + let meta_addr = opts.host.unwrap_or_else(|| listen_addr.ip().to_string()); let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap()); let prometheus_addr = opts.prometheus_host.map(|x| x.parse().unwrap()); - let meta_endpoint = opts.meta_endpoint.unwrap_or(meta_addr); + let advertise_addr = opts + .advertise_addr + .unwrap_or_else(|| format!("{}:{}", meta_addr, listen_addr.port())); let backend = match config.meta.backend { MetaBackend::Etcd => MetaStoreBackend::Etcd { endpoints: opts @@ -167,7 +169,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { tracing::info!("Meta server listening at {}", listen_addr); let add_info = AddressInfo { - meta_endpoint, + advertise_addr, listen_addr, prometheus_addr, dashboard_addr, diff --git a/src/meta/src/rpc/follower_svc.rs b/src/meta/src/rpc/follower_svc.rs index 8cc0e3b929bc0..aa25e133b98e1 100644 --- a/src/meta/src/rpc/follower_svc.rs +++ b/src/meta/src/rpc/follower_svc.rs @@ -37,7 +37,7 @@ pub async fn start_follower_srv( let leader_srv = LeaderServiceImpl::new( election_client, MetaLeaderInfo { - node_address: address_info.meta_endpoint.to_string(), + node_address: address_info.advertise_addr.to_string(), lease_id: 0, }, ); diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 45ac46dfc2d4e..1b08c0c40a2ac 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -41,7 +41,7 @@ pub enum MetaStoreBackend { #[derive(Clone)] pub struct AddressInfo { - pub meta_endpoint: String, + pub advertise_addr: String, pub listen_addr: SocketAddr, pub prometheus_addr: Option, pub dashboard_addr: Option, @@ -51,7 +51,7 @@ pub struct AddressInfo { impl Default for AddressInfo { fn default() -> Self { Self { - meta_endpoint: "".to_string(), + advertise_addr: "".to_string(), listen_addr: SocketAddr::V4("127.0.0.1:0000".parse().unwrap()), prometheus_addr: None, dashboard_addr: None, @@ -92,7 +92,7 @@ pub async fn rpc_serve( EtcdElectionClient::new( endpoints, Some(options), - address_info.meta_endpoint.clone(), + address_info.advertise_addr.clone(), ) .await?, ); @@ -192,7 +192,7 @@ pub async fn rpc_serve_with_store( election_client.leader().await.unwrap().unwrap().into() } else { MetaLeaderInfo { - node_address: address_info.meta_endpoint.clone(), + node_address: address_info.advertise_addr.clone(), lease_id: 0, } }; diff --git a/src/risedevtool/src/task/compactor_service.rs b/src/risedevtool/src/task/compactor_service.rs index b590e2d5ffd7e..4b891b6fcf6c3 100644 --- a/src/risedevtool/src/task/compactor_service.rs +++ b/src/risedevtool/src/task/compactor_service.rs @@ -58,14 +58,14 @@ impl CompactorService { "compactor cannot use in-memory hummock if remote object store is not provided" )); } - cmd.arg("--host") + cmd.arg("--listen-addr") .arg(format!("{}:{}", config.listen_address, config.port)) .arg("--prometheus-listener-addr") .arg(format!( "{}:{}", config.listen_address, config.exporter_port )) - .arg("--client-address") + .arg("--advertise-addr") .arg(format!("{}:{}", config.address, config.port)) .arg("--metrics-level") .arg("1") diff --git a/src/risedevtool/src/task/compute_node_service.rs b/src/risedevtool/src/task/compute_node_service.rs index 9fee8740452c5..bb4cf1f305775 100644 --- a/src/risedevtool/src/task/compute_node_service.rs +++ b/src/risedevtool/src/task/compute_node_service.rs @@ -47,14 +47,14 @@ impl ComputeNodeService { config: &ComputeNodeConfig, hummock_in_memory_strategy: HummockInMemoryStrategy, ) -> Result<()> { - cmd.arg("--host") + cmd.arg("--listen-addr") .arg(format!("{}:{}", config.listen_address, config.port)) .arg("--prometheus-listener-addr") .arg(format!( "{}:{}", config.listen_address, config.exporter_port )) - .arg("--client-address") + .arg("--advertise-addr") .arg(format!("{}:{}", config.address, config.port)) .arg("--metrics-level") .arg("1") diff --git a/src/risedevtool/src/task/frontend_service.rs b/src/risedevtool/src/task/frontend_service.rs index 73a89da6696d0..19833911933db 100644 --- a/src/risedevtool/src/task/frontend_service.rs +++ b/src/risedevtool/src/task/frontend_service.rs @@ -42,9 +42,9 @@ impl FrontendService { /// Apply command args according to config pub fn apply_command_args(cmd: &mut Command, config: &FrontendConfig) -> Result<()> { - cmd.arg("--host") + cmd.arg("--listen-addr") .arg(format!("{}:{}", config.listen_address, config.port)) - .arg("--client-address") + .arg("--advertise-addr") .arg(format!("{}:{}", config.address, config.port)) .arg("--prometheus-listener-addr") .arg(format!( diff --git a/src/risedevtool/src/task/meta_node_service.rs b/src/risedevtool/src/task/meta_node_service.rs index 81f06cbae60ec..3c5d24a268863 100644 --- a/src/risedevtool/src/task/meta_node_service.rs +++ b/src/risedevtool/src/task/meta_node_service.rs @@ -44,7 +44,7 @@ impl MetaNodeService { pub fn apply_command_args(cmd: &mut Command, config: &MetaNodeConfig) -> Result<()> { cmd.arg("--listen-addr") .arg(format!("{}:{}", config.listen_address, config.port)) - .arg("--meta-endpoint") + .arg("--advertise-addr") .arg(format!("{}:{}", config.address, config.port)) .arg("--dashboard-host") .arg(format!( diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 7c34518c80296..ac7b55a9435b0 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -24,13 +24,23 @@ use crate::server::compactor_serve; /// Command-line arguments for compute-node. #[derive(Parser, Clone, Debug)] pub struct CompactorOpts { - // TODO: rename to listen_address and separate out the port. - #[clap(long, env = "RW_HOST", default_value = "127.0.0.1:6660")] - pub host: String, + // TODO: rename to listen_addr and separate out the port. + /// The address that this service listens to. + /// Usually the localhost + desired port. + #[clap( + long, + alias = "host", + env = "RW_LISTEN_ADDR", + default_value = "127.0.0.1:6660" + )] + pub listen_addr: String, - // Optional, we will use listen_address if not specified. - #[clap(long, env = "RW_CLIENT_ADDRESS")] - pub client_address: Option, + /// The address for contacting this instance of the service. + /// This would be synonymous with the service's "public address" + /// or "identifying address". + /// Optional, we will use listen_addr if not specified. + #[clap(long, env = "RW_ADVERTISE_ADDR", alias = "client-address")] + pub advertise_addr: Option, // TODO: This is currently unused. #[clap(long, env = "RW_PORT")] @@ -92,22 +102,22 @@ pub fn start(opts: CompactorOpts) -> Pin + Send>> { tracing::info!("Compactor node options: {:?}", opts); tracing::info!("meta address: {}", opts.meta_address.clone()); - let listen_address = opts.host.parse().unwrap(); - tracing::info!("Server Listening at {}", listen_address); + let listen_addr = opts.listen_addr.parse().unwrap(); + tracing::info!("Server Listening at {}", listen_addr); - let client_address = opts - .client_address + let advertise_addr = opts + .advertise_addr .as_ref() .unwrap_or_else(|| { - tracing::warn!("Client address is not specified, defaulting to host address"); - &opts.host + tracing::warn!("advertise addr is not specified, defaulting to listen address"); + &opts.listen_addr }) .parse() .unwrap(); - tracing::info!("Client address is {}", client_address); + tracing::info!(" address is {}", advertise_addr); let (join_handle, observer_join_handle, _shutdown_sender) = - compactor_serve(listen_address, client_address, opts).await; + compactor_serve(listen_addr, advertise_addr, opts).await; join_handle.await.unwrap(); observer_join_handle.await.unwrap(); diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 85d08c7220c16..ecdd30cea95c0 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -45,7 +45,7 @@ use crate::CompactorOpts; /// Fetches and runs compaction tasks. pub async fn compactor_serve( listen_addr: SocketAddr, - client_addr: HostAddr, + advertise_addr: HostAddr, opts: CompactorOpts, ) -> (JoinHandle<()>, JoinHandle<()>, Sender<()>) { let config = load_config(&opts.config_path, Some(opts.override_config)); @@ -56,12 +56,16 @@ pub async fn compactor_serve( ); // Register to the cluster. - let meta_client = - MetaClient::register_new(&opts.meta_address, WorkerType::Compactor, &client_addr, 0) - .await - .unwrap(); + let meta_client = MetaClient::register_new( + &opts.meta_address, + WorkerType::Compactor, + &advertise_addr, + 0, + ) + .await + .unwrap(); tracing::info!("Assigned compactor id {}", meta_client.worker_id()); - meta_client.activate(&client_addr).await.unwrap(); + meta_client.activate(&advertise_addr).await.unwrap(); // Boot compactor let registry = prometheus::Registry::new(); diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 04bf3b2552583..8a0e14084409d 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -70,7 +70,7 @@ impl CompactionTestMetrics { /// `./risedev compaction-test --state-store hummock+s3://your-bucket -t ` pub async fn compaction_test_main( _listen_addr: SocketAddr, - client_addr: HostAddr, + advertise_addr: HostAddr, opts: CompactionTestOpts, ) -> anyhow::Result<()> { let meta_listen_addr = opts @@ -90,7 +90,7 @@ pub async fn compaction_test_main( let (compactor_thrd, compactor_shutdown_tx) = start_compactor_thread( opts.meta_address.clone(), - client_addr.to_string(), + advertise_addr.to_string(), opts.state_store.clone(), opts.config_path.clone(), ); @@ -101,7 +101,7 @@ pub async fn compaction_test_main( init_metadata_for_replay( original_meta_endpoint, &opts.meta_address, - &client_addr, + &advertise_addr, opts.ci_mode, &mut table_id, ) @@ -109,7 +109,7 @@ pub async fn compaction_test_main( assert_ne!(0, table_id, "Invalid table_id for correctness checking"); - let version_deltas = pull_version_deltas(original_meta_endpoint, &client_addr).await?; + let version_deltas = pull_version_deltas(original_meta_endpoint, &advertise_addr).await?; tracing::info!( "Pulled delta logs from Meta: len(logs): {}", @@ -153,7 +153,7 @@ pub async fn start_meta_node(listen_addr: String, config_path: String) { async fn start_compactor_node( meta_rpc_endpoint: String, - client_addr: String, + advertise_addr: String, state_store: String, config_path: String, ) { @@ -161,8 +161,8 @@ async fn start_compactor_node( "compactor-node", "--host", "127.0.0.1:5550", - "--client-address", - &client_addr, + "--advertise-addr", + &advertise_addr, "--meta-address", &meta_rpc_endpoint, "--state-store", @@ -175,7 +175,7 @@ async fn start_compactor_node( pub fn start_compactor_thread( meta_endpoint: String, - client_addr: String, + advertise_addr: String, state_store: String, config_path: String, ) -> (JoinHandle<()>, std::sync::mpsc::Sender<()>) { @@ -188,7 +188,7 @@ pub fn start_compactor_thread( runtime.block_on(async { tokio::spawn(async { tracing::info!("Starting compactor node"); - start_compactor_node(meta_endpoint, client_addr, state_store, config_path).await + start_compactor_node(meta_endpoint, advertise_addr, state_store, config_path).await }); rx.recv().unwrap(); }); @@ -218,7 +218,7 @@ fn start_replay_thread( async fn init_metadata_for_replay( cluster_meta_endpoint: &str, new_meta_endpoint: &str, - client_addr: &HostAddr, + advertise_addr: &HostAddr, ci_mode: bool, table_id: &mut u32, ) -> anyhow::Result<()> { @@ -234,20 +234,20 @@ async fn init_metadata_for_replay( tracing::info!("Ctrl+C received, now exiting"); std::process::exit(0); }, - ret = MetaClient::register_new(cluster_meta_endpoint, WorkerType::RiseCtl, client_addr, 0) => { + ret = MetaClient::register_new(cluster_meta_endpoint, WorkerType::RiseCtl, advertise_addr, 0) => { meta_client = ret.unwrap(); }, } let worker_id = meta_client.worker_id(); tracing::info!("Assigned init worker id {}", worker_id); - meta_client.activate(client_addr).await.unwrap(); + meta_client.activate(advertise_addr).await.unwrap(); let tables = meta_client.risectl_list_state_tables().await?; let compaction_groups = meta_client.risectl_list_compaction_group().await?; let new_meta_client = - MetaClient::register_new(new_meta_endpoint, WorkerType::RiseCtl, client_addr, 0).await?; - new_meta_client.activate(client_addr).await.unwrap(); + MetaClient::register_new(new_meta_endpoint, WorkerType::RiseCtl, advertise_addr, 0).await?; + new_meta_client.activate(advertise_addr).await.unwrap(); if ci_mode { let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap(); *table_id = table_to_check.id; @@ -266,16 +266,20 @@ async fn init_metadata_for_replay( async fn pull_version_deltas( cluster_meta_endpoint: &str, - client_addr: &HostAddr, + advertise_addr: &HostAddr, ) -> anyhow::Result> { // Register to the cluster. // We reuse the RiseCtl worker type here - let meta_client = - MetaClient::register_new(cluster_meta_endpoint, WorkerType::RiseCtl, client_addr, 0) - .await?; + let meta_client = MetaClient::register_new( + cluster_meta_endpoint, + WorkerType::RiseCtl, + advertise_addr, + 0, + ) + .await?; let worker_id = meta_client.worker_id(); tracing::info!("Assigned pull worker id {}", worker_id); - meta_client.activate(client_addr).await.unwrap(); + meta_client.activate(advertise_addr).await.unwrap(); let (handle, shutdown_tx) = MetaClient::start_heartbeat_loop( meta_client.clone(), @@ -302,10 +306,10 @@ async fn start_replay( table_to_check: u32, version_delta_logs: Vec, ) -> anyhow::Result<()> { - let client_addr = "127.0.0.1:7770".parse().unwrap(); + let advertise_addr = "127.0.0.1:7770".parse().unwrap(); tracing::info!( - "Start to replay. Client address is {}, Table id {}", - client_addr, + "Start to replay. Advertise address is {}, Table id {}", + advertise_addr, table_to_check ); @@ -320,10 +324,11 @@ async fn start_replay( // Register to the cluster. // We reuse the RiseCtl worker type here let meta_client = - MetaClient::register_new(&opts.meta_address, WorkerType::RiseCtl, &client_addr, 0).await?; + MetaClient::register_new(&opts.meta_address, WorkerType::RiseCtl, &advertise_addr, 0) + .await?; let worker_id = meta_client.worker_id(); tracing::info!("Assigned replay worker id {}", worker_id); - meta_client.activate(&client_addr).await.unwrap(); + meta_client.activate(&advertise_addr).await.unwrap(); let sub_tasks = vec![MetaClient::start_heartbeat_loop( meta_client.clone(), diff --git a/src/tests/compaction_test/src/lib.rs b/src/tests/compaction_test/src/lib.rs index 4848b6e121338..bbd1174d0c7fe 100644 --- a/src/tests/compaction_test/src/lib.rs +++ b/src/tests/compaction_test/src/lib.rs @@ -38,7 +38,7 @@ pub struct CompactionTestOpts { #[clap(long, default_value = "127.0.0.1:6660")] pub host: String, - // Optional, we will use listen_address if not specified. + // Optional, we will use listen_addr if not specified. #[clap(long)] pub client_address: Option, @@ -99,8 +99,8 @@ pub fn start(opts: CompactionTestOpts) -> Pin + Send panic!("Invalid state store"); } } - let listen_address = opts.host.parse().unwrap(); - tracing::info!("Server Listening at {}", listen_address); + let listen_addr = opts.host.parse().unwrap(); + tracing::info!("Server Listening at {}", listen_addr); let client_address = opts .client_address @@ -112,7 +112,7 @@ pub fn start(opts: CompactionTestOpts) -> Pin + Send .parse() .unwrap(); - let ret = compaction_test_main(listen_address, client_address, opts).await; + let ret = compaction_test_main(listen_addr, client_address, opts).await; match ret { Ok(_) => { tracing::info!("Success"); diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 6d6e0e289371f..53e2e4a3515c6 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -175,7 +175,7 @@ impl Cluster { &conf.config_path, "--listen-addr", "0.0.0.0:5690", - "--meta-endpoint", + "--advertise-addr", &format!("192.168.1.{i}:5690"), "--backend", "etcd", @@ -199,9 +199,9 @@ impl Cluster { "frontend-node", "--config-path", &conf.config_path, - "--host", + "--listen-addr", "0.0.0.0:4566", - "--client-address", + "--advertise-addr", &format!("192.168.2.{i}:4566"), "--meta-addr", "meta:5690", @@ -220,9 +220,9 @@ impl Cluster { "compute-node", "--config-path", &conf.config_path, - "--host", + "--listen-addr", "0.0.0.0:5688", - "--client-address", + "--advertise-addr", &format!("192.168.3.{i}:5688"), "--meta-address", "meta:5690", @@ -246,9 +246,9 @@ impl Cluster { "compactor-node", "--config-path", &conf.config_path, - "--host", + "--listen-addr", "0.0.0.0:6660", - "--client-address", + "--advertise-addr", &format!("192.168.4.{i}:6660"), "--meta-address", "meta:5690",