Skip to content

Commit

Permalink
fix: fix several issues of graceful shutdown of meta (#7738)
Browse files Browse the repository at this point in the history
Fix several issues of graceful shutdown of meta:
1. abort election related handle in leader and follower.
2. close all connections of subscriber in notification service.
3. abort barrier manager while it's under recovery via timeout.

Approved-By: shanicky
  • Loading branch information
yezizp2012 authored Feb 7, 2023
1 parent d785ab2 commit adae2d2
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 85 deletions.
4 changes: 2 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,9 @@ where
});

let result = try_join_all(collect_futures).await;
barrier_complete_tx
let _ = barrier_complete_tx
.send((prev_epoch, result.map_err(Into::into)))
.unwrap();
.inspect_err(|err| tracing::warn!("failed to complete barrier: {err}"));
}

/// Changes the state to `Complete`, and try to commit all epoch that state is `Complete` in
Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
dashboard_addr,
ui_path: opts.dashboard_ui_path,
};
let (join_handle, leader_lost_handle, _shutdown_send) = rpc_serve(
let (join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
add_info,
backend,
max_heartbeat_interval,
Expand Down Expand Up @@ -212,8 +212,12 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {

if let Some(leader_lost_handle) = leader_lost_handle {
tokio::select! {
_ = join_handle => {},
_ = leader_lost_handle => {},
_ = tokio::signal::ctrl_c() => {
tracing::info!("receive ctrl+c");
shutdown_send.send(()).unwrap();
join_handle.await.unwrap();
leader_lost_handle.abort();
},
}
} else {
join_handle.await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ where
_ = monitor_interval.tick() => {},
// Shutdown monitor
_ = &mut shutdown_rx => {
tracing::info!("Worker number monitor is stopped");
return;
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ where
}
}

pub async fn abort_all(&self) {
let mut guard = self.core.lock().await;
*guard = NotificationManagerCore::new();
guard.exiting = true;
}

#[inline(always)]
fn notify(
&self,
Expand Down Expand Up @@ -210,6 +216,10 @@ where
sender: UnboundedSender<Notification>,
) {
let mut core_guard = self.core.lock().await;
if core_guard.exiting {
tracing::warn!("notification manager exiting.");
return;
}
let senders = match subscribe_type {
SubscribeType::Frontend => &mut core_guard.frontend_senders,
SubscribeType::Hummock => &mut core_guard.hummock_senders,
Expand All @@ -222,6 +232,10 @@ where

pub async fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
let mut core_guard = self.core.lock().await;
if core_guard.exiting {
tracing::warn!("(l)notification manager exiting.");
return;
}
core_guard.local_senders.push(sender);
}

Expand All @@ -240,6 +254,7 @@ struct NotificationManagerCore {
compactor_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
/// The notification sender to local subscribers.
local_senders: Vec<UnboundedSender<LocalNotification>>,
exiting: bool,
}

impl NotificationManagerCore {
Expand All @@ -249,6 +264,7 @@ impl NotificationManagerCore {
hummock_senders: HashMap::new(),
compactor_senders: HashMap::new(),
local_senders: vec![],
exiting: false,
}
}

Expand Down
71 changes: 0 additions & 71 deletions src/meta/src/rpc/follower_svc.rs

This file was deleted.

47 changes: 38 additions & 9 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,18 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
let join_handle = tokio::spawn(async move {
if let Some(election_client) = election_client.clone() {
let mut is_leader_watcher = election_client.subscribe();
let svc_shutdown_rx_clone = svc_shutdown_rx.clone();
let mut svc_shutdown_rx_clone = svc_shutdown_rx.clone();
let (follower_shutdown_tx, follower_shutdown_rx) = OneChannel::<()>();

let _resp = is_leader_watcher.changed().await;
tokio::select! {
_ = svc_shutdown_rx_clone.changed() => return,
res = is_leader_watcher.changed() => {
if let Err(err) = res {
tracing::error!("leader watcher recv failed {}", err.to_string());
}
}
}
let svc_shutdown_rx_clone = svc_shutdown_rx.clone();

// If not the leader, spawn a follower.
let follower_handle: Option<JoinHandle<()>> = if !*is_leader_watcher.borrow() {
Expand All @@ -206,9 +214,17 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
None
};

let mut svc_shutdown_rx_clone = svc_shutdown_rx.clone();
while !*is_leader_watcher.borrow_and_update() {
if let Err(e) = is_leader_watcher.changed().await {
tracing::error!("leader watcher recv failed {}", e.to_string());
tokio::select! {
_ = svc_shutdown_rx_clone.changed() => {
return;
}
res = is_leader_watcher.changed() => {
if let Err(err) = res {
tracing::error!("leader watcher recv failed {}", err.to_string());
}
}
}
}

Expand Down Expand Up @@ -252,7 +268,6 @@ pub async fn start_service_as_election_follower(
.add_service(HealthServer::new(health_srv))
.serve_with_shutdown(address_info.listen_addr, async move {
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
// shutdown service if all services should be shut down
res = svc_shutdown_rx.changed() => {
match res {
Expand Down Expand Up @@ -507,14 +522,29 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
.await,
);

let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
let notification_mgr = env.notification_manager_ref();
let abort_notification_handler = tokio::spawn(async move {
abort_recv.await.unwrap();
notification_mgr.abort_all().await;
});
sub_tasks.push((abort_notification_handler, abort_sender));

let shutdown_all = async move {
for (join_handle, shutdown_sender) in sub_tasks {
if let Err(_err) = shutdown_sender.send(()) {
// Maybe it is already shut down
continue;
}
if let Err(err) = join_handle.await {
tracing::warn!("Failed to join shutdown: {:?}", err);
// The barrier manager can't be shutdown gracefully if it's under recovering, try to
// abort it using timeout.
match tokio::time::timeout(Duration::from_secs(1), join_handle).await {
Ok(Err(err)) => {
tracing::warn!("Failed to join shutdown: {:?}", err);
}
Err(e) => {
tracing::warn!("Join shutdown timeout: {:?}", e);
}
_ => {}
}
}
};
Expand All @@ -534,7 +564,6 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
.add_service(BackupServiceServer::new(backup_srv))
.serve_with_shutdown(address_info.listen_addr, async move {
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
res = svc_shutdown_rx.changed() => {
match res {
Ok(_) => tracing::info!("Shutting down services"),
Expand Down

0 comments on commit adae2d2

Please sign in to comment.