Skip to content

Commit

Permalink
feat(common): move observer_manager to common service (#3754)
Browse files Browse the repository at this point in the history
* mv observer_manager to common_service

* mv observer_manager to common_service

* rename
  • Loading branch information
xxhZs authored Jul 11, 2022
1 parent 33c3611 commit a8d0c57
Show file tree
Hide file tree
Showing 16 changed files with 244 additions and 140 deletions.
48 changes: 41 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"src/cmd",
"src/cmd_all",
"src/common",
"src/common/common_service",
"src/compute",
"src/connector",
"src/ctl",
Expand Down
3 changes: 0 additions & 3 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ either = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = "0.2"
humantime = "2.1"
hyper = "0.14"
itertools = "0.10"
lazy_static = "1"
log = "0.4"
Expand All @@ -45,8 +44,6 @@ tokio = { version = "=0.2.0-alpha.3", package = "madsim-tokio", features = ["rt"
tokio-stream = "0.1"
toml = "0.5"
tonic = { version = "=0.2.0-alpha.3", package = "madsim-tonic" }
tower = { version = "0.4", features = ["util", "load-shed"] }
tower-http = { version = "0.3", features = ["add-extension", "cors"] }
tracing = { version = "0.1" }
twox-hash = "1"
url = "2"
Expand Down
17 changes: 17 additions & 0 deletions src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "risingwave_common_service"
version = "0.1.10"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
hyper = "0.14"
log = "0.4"
prometheus = { version = "0.13" }
risingwave_common = { path = "../" }
risingwave_pb = { path = "../../prost" }
risingwave_rpc_client = { path = "../../rpc_client" }
tokio = { version = "=0.2.0-alpha.3", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] }
tower = { version = "0.4", features = ["util", "load-shed"] }
tower-http = { version = "0.2", features = ["add-extension", "cors"] }
tracing = { version = "0.1" }
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod metrics_manager;
// This is a stub lib.rs.

#![feature(lint_reasons)]

pub mod metrics_manager;
pub mod observer_manager;

pub use metrics_manager::MetricsManager;
File renamed without changes.
111 changes: 111 additions & 0 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::SubscribeResponse;
use risingwave_rpc_client::{MetaClient, NotificationStream};
use tokio::task::JoinHandle;

/// `ObserverManager` is used to update data based on notification from meta.
/// Call `start` to spawn a new asynchronous task
/// We can write the notification logic by implementing `ObserverNodeImpl`.
pub struct ObserverManager {
rx: Box<dyn NotificationStream>,
meta_client: MetaClient,
addr: HostAddr,
worker_type: WorkerType,
observer_states: Box<dyn ObserverNodeImpl + Send>,
}

pub trait ObserverNodeImpl {
/// modify data after receiving notification from meta
fn handle_notification(&mut self, resp: SubscribeResponse);

/// Initialize data from the meta. It will be called at start or resubscribe
fn handle_initialization_notification(&mut self, resp: SubscribeResponse) -> Result<()>;
}

impl ObserverManager {
pub async fn new(
meta_client: MetaClient,
addr: HostAddr,
observer_states: Box<dyn ObserverNodeImpl + Send>,
worker_type: WorkerType,
) -> Self {
let rx = meta_client.subscribe(&addr, worker_type).await.unwrap();
Self {
rx,
meta_client,
addr,
worker_type,
observer_states,
}
}

/// `start` is used to spawn a new asynchronous task which receives meta's notification and
/// call the `handle_initialization_notification` and `handle_notification` to update node data.
pub async fn start(mut self) -> Result<JoinHandle<()>> {
let first_resp = self.rx.next().await?.ok_or_else(|| {
ErrorCode::InternalError(
"ObserverManager start failed, Stream of notification terminated at the start."
.to_string(),
)
})?;
self.observer_states
.handle_initialization_notification(first_resp)?;
let handle = tokio::spawn(async move {
loop {
if let Ok(resp) = self.rx.next().await {
if resp.is_none() {
tracing::error!("Stream of notification terminated.");
self.re_subscribe().await;
continue;
}
self.observer_states.handle_notification(resp.unwrap());
}
}
});
Ok(handle)
}

/// `re_subscribe` is used to re-subscribe to the meta's notification.
async fn re_subscribe(&mut self) {
loop {
match self
.meta_client
.subscribe(&self.addr, self.worker_type)
.await
{
Ok(rx) => {
tracing::debug!("re-subscribe success");
self.rx = rx;
if let Ok(Some(snapshot_resp)) = self.rx.next().await {
self.observer_states
.handle_initialization_notification(snapshot_resp)
.expect("handle snapshot notification failed after re-subscribe");
break;
}
}
Err(_) => {
tokio::time::sleep(RE_SUBSCRIBE_RETRY_INTERVAL).await;
}
}
}
}
}
const RE_SUBSCRIBE_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1 change: 0 additions & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ pub mod config;
pub mod field_generator;
pub mod hash;
pub mod monitor;
pub mod service;
pub mod session_config;
#[cfg(test)]
pub mod test_utils;
Expand Down
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ prost = "0.10"
rdkafka = { version = "0.28", features = ["cmake-build"] }
risingwave_batch = { path = "../batch" }
risingwave_common = { path = "../common" }
risingwave_common_service = { path = "../common/common_service" }
risingwave_connector = { path = "../connector" }
risingwave_pb = { path = "../prost" }
risingwave_rpc_client = { path = "../rpc_client" }
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
use risingwave_batch::task::{BatchEnvironment, BatchManager};
use risingwave_common::config::ComputeNodeConfig;
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::service::MetricsManager;
use risingwave_common::util::addr::HostAddr;
use risingwave_common_service::metrics_manager::MetricsManager;
use risingwave_pb::common::WorkerType;
use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ prost = "0.10"
rand = "0.8"
risingwave_batch = { path = "../batch" }
risingwave_common = { path = "../common" }
risingwave_common_service = { path = "../common/common_service" }
risingwave_expr = { path = "../expr" }
risingwave_pb = { path = "../prost" }
risingwave_rpc_client = { path = "../rpc_client" }
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::{pg_catalog, DatabaseId, SchemaId};

/// Root catalog of database catalog. Manage all database/schema/table in memory on frontend. it
/// is protected by a `RwLock`. only [`crate::observer::observer_manager::ObserverManager`] will get
/// its mut reference and do write to sync with the meta catalog. Other situations it is read only
/// with a read guard.
/// is protected by a `RwLock`. only [`crate::observer::observer_manager::FrontendObserverNode`]
/// will get its mut reference and do write to sync with the meta catalog. Other situations it is
/// read only with a read guard.
///
/// - catalog (root catalog)
/// - database catalog
Expand Down
Loading

0 comments on commit a8d0c57

Please sign in to comment.