Skip to content

Commit

Permalink
feat(frontend): support mask failed serving worker temporarily (#10328)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jun 16, 2023
1 parent 7dccfa3 commit 558cef5
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 15 deletions.
29 changes: 24 additions & 5 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::HashMap;
use std::mem;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use arc_swap::ArcSwap;
Expand Down Expand Up @@ -816,21 +817,20 @@ impl StageRunner {
plan_fragment: PlanFragment,
worker: Option<WorkerNode>,
) -> SchedulerResult<Fuse<Streaming<TaskInfoResponse>>> {
let worker_node_addr = worker
.unwrap_or(self.worker_node_manager.next_random_worker()?)
.host
.unwrap();

let mut worker = worker.unwrap_or(self.worker_node_manager.next_random_worker()?);
let worker_node_addr = worker.host.take().unwrap();
let compute_client = self
.compute_client_pool
.get_by_addr((&worker_node_addr).into())
.await
.inspect_err(|_| self.mask_failed_serving_worker(&worker))
.map_err(|e| anyhow!(e))?;

let t_id = task_id.task_id;
let stream_status = compute_client
.create_task(task_id, plan_fragment, self.epoch.clone())
.await
.inspect_err(|_| self.mask_failed_serving_worker(&worker))
.map_err(|e| anyhow!(e))?
.fuse();

Expand Down Expand Up @@ -964,4 +964,23 @@ impl StageRunner {
fn is_root_stage(&self) -> bool {
self.stage.id == 0
}

fn mask_failed_serving_worker(&self, worker: &WorkerNode) {
if !worker.property.as_ref().map_or(false, |p| p.is_serving) {
return;
}
let duration = std::cmp::max(
Duration::from_secs(
self.ctx
.session
.env()
.meta_config()
.max_heartbeat_interval_secs as _,
) / 10,
Duration::from_secs(1),
);
self.worker_node_manager
.manager
.mask_worker_node(worker.id, duration);
}
}
62 changes: 55 additions & 7 deletions src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Duration;

use rand::seq::SliceRandom;
use risingwave_common::bail;
use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
use risingwave_pb::common::{WorkerNode, WorkerType};

use crate::catalog::FragmentId;
Expand All @@ -27,6 +29,8 @@ use crate::scheduler::{SchedulerError, SchedulerResult};
/// `WorkerNodeManager` manages live worker nodes and table vnode mapping information.
pub struct WorkerNodeManager {
inner: RwLock<WorkerNodeManagerInner>,
/// Temporarily make worker invisible from serving cluster.
worker_node_mask: Arc<RwLock<HashSet<u32>>>,
}

struct WorkerNodeManagerInner {
Expand All @@ -53,6 +57,7 @@ impl WorkerNodeManager {
streaming_fragment_vnode_mapping: Default::default(),
serving_fragment_vnode_mapping: Default::default(),
}),
worker_node_mask: Arc::new(Default::default()),
}
}

Expand All @@ -63,7 +68,10 @@ impl WorkerNodeManager {
streaming_fragment_vnode_mapping: HashMap::new(),
serving_fragment_vnode_mapping: HashMap::new(),
});
Self { inner }
Self {
inner,
worker_node_mask: Arc::new(Default::default()),
}
}

pub fn list_worker_nodes(&self) -> Vec<WorkerNode> {
Expand Down Expand Up @@ -248,6 +256,26 @@ impl WorkerNodeManager {
guard.serving_fragment_vnode_mapping.remove(fragment_id);
}
}

fn worker_node_mask(&self) -> RwLockReadGuard<'_, HashSet<u32>> {
self.worker_node_mask.read().unwrap()
}

pub fn mask_worker_node(&self, worker_node_id: u32, duration: Duration) {
let mut worker_node_mask = self.worker_node_mask.write().unwrap();
if worker_node_mask.contains(&worker_node_id) {
return;
}
worker_node_mask.insert(worker_node_id);
let worker_node_mask_ref = self.worker_node_mask.clone();
tokio::spawn(async move {
tokio::time::sleep(duration).await;
worker_node_mask_ref
.write()
.unwrap()
.remove(&worker_node_id);
});
}
}

impl WorkerNodeManagerInner {
Expand Down Expand Up @@ -277,15 +305,16 @@ impl WorkerNodeSelector {
if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes().len()
} else {
self.manager.list_serving_worker_nodes().len()
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
.len()
}
}

pub fn schedule_unit_count(&self) -> usize {
let worker_nodes = if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes()
} else {
self.manager.list_serving_worker_nodes()
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};
worker_nodes
.iter()
Expand All @@ -300,21 +329,40 @@ impl WorkerNodeSelector {
if self.enable_barrier_read {
self.manager.get_streaming_fragment_mapping(&fragment_id)
} else {
self.manager.serving_fragment_mapping(fragment_id)
let origin = self.manager.serving_fragment_mapping(fragment_id)?;
if self.manager.worker_node_mask().is_empty() {
return Ok(origin);
}
let new_workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes());
let masked_mapping =
place_vnode(Some(&origin), &new_workers, origin.iter_unique().count());
masked_mapping.ok_or_else(|| SchedulerError::EmptyWorkerNodes)
}
}

pub fn next_random_worker(&self) -> SchedulerResult<WorkerNode> {
let worker_nodes = if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes()
} else {
self.manager.list_serving_worker_nodes()
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};
worker_nodes
.choose(&mut rand::thread_rng())
.ok_or_else(|| SchedulerError::EmptyWorkerNodes)
.map(|w| (*w).clone())
}

fn apply_worker_node_mask(&self, origin: Vec<WorkerNode>) -> Vec<WorkerNode> {
if origin.len() <= 1 {
// If there is at most one worker, don't apply mask.
return origin;
}
let mask = self.manager.worker_node_mask();
origin
.into_iter()
.filter(|w| !mask.contains(&w.id))
.collect()
}
}

#[cfg(test)]
Expand Down
12 changes: 10 additions & 2 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common::catalog::DEFAULT_SCHEMA_NAME;
use risingwave_common::catalog::{
DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
};
use risingwave_common::config::{load_config, BatchConfig};
use risingwave_common::config::{load_config, BatchConfig, MetaConfig};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::session_config::{ConfigMap, VisibilityMode};
Expand Down Expand Up @@ -111,6 +111,7 @@ pub struct FrontendEnv {
source_metrics: Arc<SourceMetrics>,

batch_config: BatchConfig,
meta_config: MetaConfig,

/// Track creating streaming jobs, used to cancel creating streaming job when cancel request
/// received.
Expand Down Expand Up @@ -157,6 +158,7 @@ impl FrontendEnv {
sessions_map: Arc::new(Mutex::new(HashMap::new())),
frontend_metrics: Arc::new(FrontendMetrics::for_test()),
batch_config: BatchConfig::default(),
meta_config: MetaConfig::default(),
source_metrics: Arc::new(SourceMetrics::default()),
creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
}
Expand All @@ -173,6 +175,7 @@ impl FrontendEnv {
info!("> version: {} ({})", RW_VERSION, GIT_SHA);

let batch_config = config.batch;
let meta_config = config.meta;

let frontend_address: HostAddr = opts
.advertise_addr
Expand All @@ -191,7 +194,7 @@ impl FrontendEnv {
WorkerType::Frontend,
&frontend_address,
Default::default(),
&config.meta,
&meta_config,
)
.await?;

Expand Down Expand Up @@ -321,6 +324,7 @@ impl FrontendEnv {
frontend_metrics,
sessions_map: Arc::new(Mutex::new(HashMap::new())),
batch_config,
meta_config,
source_metrics,
creating_streaming_job_tracker,
},
Expand Down Expand Up @@ -385,6 +389,10 @@ impl FrontendEnv {
&self.batch_config
}

pub fn meta_config(&self) -> &MetaConfig {
&self.meta_config
}

pub fn source_metrics(&self) -> Arc<SourceMetrics> {
self.source_metrics.clone()
}
Expand Down
6 changes: 5 additions & 1 deletion src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum ConfigPath {
}

impl ConfigPath {
fn as_str(&self) -> &str {
pub fn as_str(&self) -> &str {
match self {
ConfigPath::Regular(s) => s,
ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
Expand Down Expand Up @@ -526,6 +526,10 @@ impl Cluster {
self.config.clone()
}

pub fn handle(&self) -> &Handle {
&self.handle
}

/// Graceful shutdown all RisingWave nodes.
pub async fn graceful_shutdown(&self) {
let mut nodes = vec![];
Expand Down
Loading

0 comments on commit 558cef5

Please sign in to comment.