Skip to content

Commit

Permalink
fix: join actor when dropping (#7263)
Browse files Browse the repository at this point in the history
Ensure actor task is completely dropped after calling `drop_all_actors`, see #7208 (comment).
Also remove unused method `wait_all`.

Approved-By: yezizp2012

Co-Authored-By: zwang28 <84491488@qq.com>
Co-Authored-By: Runji Wang <wangrunji0408@163.com>
  • Loading branch information
3 people authored Jan 10, 2023
1 parent 49dc120 commit 5d25f9c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 8 additions & 12 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl LocalStreamManager {

/// Force stop all actors on this worker.
pub async fn stop_all_actors(&self) -> StreamResult<()> {
self.core.lock().await.drop_all_actors();
self.core.lock().await.drop_all_actors().await;
// Clear shared buffer in storage to release memory
self.clear_storage_buffer().await;
self.clear_all_senders_and_collect_rx();
Expand All @@ -333,15 +333,6 @@ impl LocalStreamManager {
core.update_actors(actors, hanging_channels)
}

/// This function was called while [`LocalStreamManager`] exited.
pub async fn wait_all(self) -> StreamResult<()> {
let handles = self.core.lock().await.take_all_handles()?;
for (_id, handle) in handles {
handle.await.unwrap();
}
Ok(())
}

/// This function could only be called once during the lifecycle of `LocalStreamManager` for
/// now.
pub async fn update_actor_info(&self, actor_infos: &[ActorInfo]) -> StreamResult<()> {
Expand Down Expand Up @@ -783,11 +774,16 @@ impl LocalStreamManagerCore {
}

/// `drop_all_actors` is invoked by meta node via RPC for recovery purpose.
fn drop_all_actors(&mut self) {
for (actor_id, handle) in self.handles.drain() {
async fn drop_all_actors(&mut self) {
for (actor_id, handle) in &self.handles {
tracing::debug!("force stopping actor {}", actor_id);
handle.abort();
}
for (actor_id, handle) in self.handles.drain() {
tracing::debug!("join actor {}", actor_id);
let result = handle.await;
assert!(result.is_ok() || result.unwrap_err().is_cancelled());
}
self.actors.clear();
self.context.clear_channels();
if let Some(m) = self.stack_trace_manager.as_mut() {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ etcd-client = { version = "0.2.11", package = "madsim-etcd-client" }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
glob = "0.3"
itertools = "0.10"
madsim = "0.2.12"
madsim = "0.2.13"
paste = "1"
rand = "0.8"
rdkafka = { package = "madsim-rdkafka", version = "=0.2.8-alpha", features = ["cmake-build"] }
Expand Down

0 comments on commit 5d25f9c

Please sign in to comment.