Skip to content

Commit

Permalink
feat(system param): sync param changes to worker nodes (#8271)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gun9niR authored Mar 5, 2023
1 parent cb2c85f commit 7969e36
Show file tree
Hide file tree
Showing 21 changed files with 236 additions and 50 deletions.
20 changes: 19 additions & 1 deletion dashboard/proto/gen/meta.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ enum SubscribeType {
FRONTEND = 1;
HUMMOCK = 2;
COMPACTOR = 3;
COMPUTE = 4;
}

// Below for notification service.
Expand Down Expand Up @@ -265,6 +266,7 @@ message SubscribeResponse {
hummock.HummockVersionDeltas hummock_version_deltas = 15;
MetaSnapshot snapshot = 16;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 17;
SystemParams system_params = 19;
}
}

Expand Down
27 changes: 17 additions & 10 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::time::Duration;

use risingwave_common::bail;
use risingwave_common::error::Result;
use risingwave_pb::meta::meta_snapshot::SnapshotVersion;
use risingwave_pb::meta::subscribe_response::Info;
use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
use risingwave_rpc_client::error::RpcError;
Expand Down Expand Up @@ -49,6 +48,13 @@ impl SubscribeTypeEnum for SubscribeCompactor {
}
}

pub struct SubscribeCompute {}
impl SubscribeTypeEnum for SubscribeCompute {
fn subscribe_type() -> SubscribeType {
SubscribeType::Compute
}
}

/// `ObserverManager` is used to update data based on notification from meta.
/// Call `start` to spawn a new asynchronous task
/// We can write the notification logic by implementing `ObserverNodeImpl`.
Expand Down Expand Up @@ -109,12 +115,6 @@ where
unreachable!();
};

let SnapshotVersion {
catalog_version,
parallel_unit_mapping_version,
worker_node_version,
} = info.version.clone().unwrap();

notification_vec.retain_mut(|notification| match notification.info.as_ref().unwrap() {
Info::Database(_)
| Info::Schema(_)
Expand All @@ -124,14 +124,21 @@ where
| Info::Index(_)
| Info::View(_)
| Info::Function(_)
| Info::User(_) => notification.version > catalog_version,
Info::ParallelUnitMapping(_) => notification.version > parallel_unit_mapping_version,
Info::Node(_) => notification.version > worker_node_version,
| Info::User(_) => {
notification.version > info.version.as_ref().unwrap().catalog_version
}
Info::ParallelUnitMapping(_) => {
notification.version > info.version.as_ref().unwrap().parallel_unit_mapping_version
}
Info::Node(_) => {
notification.version > info.version.as_ref().unwrap().worker_node_version
}
Info::HummockVersionDeltas(version_delta) => {
version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id
}
Info::HummockSnapshot(_) => true,
Info::MetaBackupManifestId(_) => true,
Info::SystemParams(_) => true,
Info::Snapshot(_) => unreachable!(),
});

Expand Down
11 changes: 6 additions & 5 deletions src/common/src/system_param/local_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@ use tokio::sync::watch::{channel, Receiver, Sender};
use super::reader::SystemParamsReader;

pub type SystemParamsReaderRef = Arc<ArcSwap<SystemParamsReader>>;
pub type LocalSystemParamsManagerRef = Arc<LocalSystemParamsManager>;

/// The system parameter manager on worker nodes. It provides two methods for other components to
/// read the latest system parameters:
/// - `get_params` returns a reference to the latest parameters that is atomically updated.
/// - `watch_params` returns a channel on which calling `recv` will get the latest parameters.
/// Compared with `get_params`, the caller can be explicitly notified of parameter change.
pub struct LocalSystemParamManager {
pub struct LocalSystemParamsManager {
/// The latest parameters.
params: SystemParamsReaderRef,

/// Sender of the latest parameters.
tx: Sender<SystemParamsReaderRef>,
}

impl LocalSystemParamManager {
impl LocalSystemParamsManager {
pub fn new(params: SystemParamsReader) -> Self {
let params = Arc::new(ArcSwap::from_pointee(params));
let (tx, _) = channel(params.clone());
Expand All @@ -56,7 +57,7 @@ impl LocalSystemParamManager {
}
}

pub fn watch_parmams(&self) -> Receiver<SystemParamsReaderRef> {
pub fn watch_params(&self) -> Receiver<SystemParamsReaderRef> {
self.tx.subscribe()
}
}
Expand All @@ -68,15 +69,15 @@ mod tests {
#[tokio::test]
async fn test_manager() {
let p = SystemParams::default().into();
let manager = LocalSystemParamManager::new(p);
let manager = LocalSystemParamsManager::new(p);
let shared_params = manager.get_params();

let new_params = SystemParams {
sstable_size_mb: Some(1),
..Default::default()
};

let mut params_rx = manager.watch_parmams();
let mut params_rx = manager.watch_params();

manager.try_set_params(new_params.clone());
params_rx.changed().await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
extern crate tracing;

pub mod memory_management;
pub mod observer;
pub mod rpc;
pub mod server;

Expand Down
15 changes: 15 additions & 0 deletions src/compute/src/observer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod observer_manager;
49 changes: 49 additions & 0 deletions src/compute/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::observer_manager::{ObserverState, SubscribeCompute};
use risingwave_pb::meta::subscribe_response::Info;
use risingwave_pb::meta::SubscribeResponse;

pub struct ComputeObserverNode {
system_params_manager: LocalSystemParamsManagerRef,
}

impl ObserverState for ComputeObserverNode {
type SubscribeType = SubscribeCompute;

fn handle_notification(&mut self, resp: SubscribeResponse) {
let Some(info) = resp.info.as_ref() else {
return;
};

match info.to_owned() {
Info::SystemParams(p) => self.system_params_manager.try_set_params(p),
_ => {
panic!("error type notification");
}
}
}

fn handle_initialization_notification(&mut self, _resp: SubscribeResponse) {}
}

impl ComputeObserverNode {
pub fn new(system_params_manager: LocalSystemParamsManagerRef) -> Self {
Self {
system_params_manager,
}
}
}
10 changes: 10 additions & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use risingwave_common::config::{
STREAM_WINDOW_SIZE,
};
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_service::metrics_manager::MetricsManager;
use risingwave_common_service::observer_manager::ObserverManager;
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_hummock_sdk::compact::CompactorRuntimeConfig;
use risingwave_pb::common::WorkerType;
Expand Down Expand Up @@ -59,6 +61,7 @@ use crate::memory_management::memory_manager::{
GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB,
};
use crate::memory_management::policy::StreamingOnlyPolicy;
use crate::observer::observer_manager::ComputeObserverNode;
use crate::rpc::service::config_service::ConfigServiceImpl;
use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics;
use crate::rpc::service::exchange_service::ExchangeServiceImpl;
Expand Down Expand Up @@ -246,6 +249,13 @@ pub async fn compute_node_serve(
// of lru manager.
stream_mgr.set_watermark_epoch(watermark_epoch).await;

// Initialize observer manager.
let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params));
let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone());
let observer_manager =
ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
observer_manager.start().await;

let grpc_await_tree_reg = await_tree_config
.map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into()));
let dml_mgr = Arc::new(DmlManager::default());
Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use parking_lot::RwLock;
use risingwave_common::catalog::CatalogVersion;
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend};
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand All @@ -36,6 +37,7 @@ pub struct FrontendObserverNode {
user_info_manager: Arc<RwLock<UserInfoManager>>,
user_info_updated_tx: Sender<UserInfoVersion>,
hummock_snapshot_manager: HummockSnapshotManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
}

impl ObserverState for FrontendObserverNode {
Expand All @@ -46,7 +48,7 @@ impl ObserverState for FrontendObserverNode {
return;
};

match info {
match info.to_owned() {
Info::Database(_)
| Info::Schema(_)
| Info::Table(_)
Expand All @@ -58,7 +60,7 @@ impl ObserverState for FrontendObserverNode {
self.handle_catalog_notification(resp);
}
Info::Node(node) => {
self.update_worker_node_manager(resp.operation(), node.clone());
self.update_worker_node_manager(resp.operation(), node);
}
Info::User(_) => {
self.handle_user_notification(resp);
Expand All @@ -79,6 +81,9 @@ impl ObserverState for FrontendObserverNode {
Info::MetaBackupManifestId(_) => {
panic!("frontend node should not receive MetaBackupManifestId");
}
Info::SystemParams(p) => {
self.system_params_manager.try_set_params(p);
}
}
}

Expand Down Expand Up @@ -155,6 +160,7 @@ impl FrontendObserverNode {
user_info_manager: Arc<RwLock<UserInfoManager>>,
user_info_updated_tx: Sender<UserInfoVersion>,
hummock_snapshot_manager: HummockSnapshotManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
) -> Self {
Self {
worker_node_manager,
Expand All @@ -163,6 +169,7 @@ impl FrontendObserverNode {
user_info_manager,
user_info_updated_tx,
hummock_snapshot_manager,
system_params_manager,
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use risingwave_common::config::{load_config, BatchConfig};
use risingwave_common::error::{Result, RwError};
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::session_config::ConfigMap;
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::types::DataType;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::stream_cancel::{stream_tripwire, Trigger, Tripwire};
Expand Down Expand Up @@ -182,7 +183,7 @@ impl FrontendEnv {
info!("advertise addr is {}", frontend_address);

// Register in meta by calling `AddWorkerNode` RPC.
let (meta_client, _) = MetaClient::register_new(
let (meta_client, system_params_reader) = MetaClient::register_new(
opts.meta_addr.clone().as_str(),
WorkerType::Frontend,
&frontend_address,
Expand Down Expand Up @@ -232,13 +233,15 @@ impl FrontendEnv {
user_info_updated_rx,
));

let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
let frontend_observer_node = FrontendObserverNode::new(
worker_node_manager.clone(),
catalog,
catalog_updated_tx,
user_info_manager,
user_info_updated_tx,
hummock_snapshot_manager.clone(),
system_params_manager,
);
let observer_manager =
ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
Expand Down
Loading

0 comments on commit 7969e36

Please sign in to comment.