Skip to content

Commit

Permalink
feat(core/source): update unicycle and add regression test
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Aug 4, 2024
1 parent ed11db4 commit db81827
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
2 changes: 1 addition & 1 deletion elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ once_cell = { version = "1.8.0", features = ["parking_lot"] }
serde_json = { version = "1.0.64", features = ["raw_value"] }
regex = "1.6.0"
thread_local = { version = "1.1.3", optional = true }
unicycle = "0.9.4"
unicycle = "0.10.2"
rmp-serde = { version = "1.1.0", optional = true }
humantime-serde = "1"

Expand Down
12 changes: 5 additions & 7 deletions elfo-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,11 @@ impl<S: ?Sized> StreamWithWaker<S> {
fn update_waker(self: Pin<&mut Self>, cx: &task::Context<'_>) {
let new_waker = cx.waker();

// Save the waker for reconfiguration if the stream isn't ready.
// NOTE: `unicycle` doesn't support `will_wake` for now:
// https://github.com/udoprog/unicycle/pull/15#issuecomment-1100680368
// NOTE: `unicycle` doesn't support `will_wake` (called by `clone_from()`)
// for now: https://github.com/udoprog/unicycle/pull/15#issuecomment-1100680368
// But we use it anyway to get benefits in the future.
if !self.waker.will_wake(new_waker) {
// SAFETY: `waker` is not pinned.
unsafe { self.get_unchecked_mut().waker.clone_from(new_waker) }
}
// SAFETY: `waker` is not pinned.
unsafe { self.get_unchecked_mut().waker.clone_from(new_waker) }
}

fn wake(&self) {
Expand Down Expand Up @@ -255,6 +252,7 @@ impl futures::Stream for UntypedSourceArc {
let result = guard.get_mut().stream().poll_recv(cx);

if result.is_pending() {
// The stream isn't ready, so we save waker for reconfiguration.
guard.get_mut().update_waker(cx);
} else if matches!(result, Poll::Ready(None)) || guard.status() == StreamStatus::Oneshot {
guard.get_mut().terminate();
Expand Down
37 changes: 36 additions & 1 deletion elfo/tests/source_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use derive_more::Constructor;
use elfo::{config::AnyConfig, prelude::*, scope, stream::Stream, tracing::TraceId};
use elfo::{config::AnyConfig, messages, prelude::*, scope, stream::Stream, tracing::TraceId};
use futures::StreamExt;
use parking_lot::Mutex;
use tokio::time;
Expand Down Expand Up @@ -308,3 +308,38 @@ async fn result() {
assert_msg_eq!(proxy.recv().await, Success(10));
assert_msg_eq!(proxy.recv().await, Failure(20));
}

// Prevents regression after [unicycle#30].
// [unicycle#30]: https://github.com/udoprog/unicycle/issues/30
#[tokio::test(start_paused = true)]
async fn drop_on_terminate() {
struct Guard(Arc<Mutex<bool>>);

impl Drop for Guard {
fn drop(&mut self) {
*self.0.lock() = true;
}
}

let dropped = Arc::new(Mutex::new(false));
let dropped_1 = dropped.clone();

let group = ActorGroup::new().exec(move |mut ctx| {
let dropped = dropped_1.clone();
async move {
ctx.attach(Stream::<messages::Impossible>::once(async move {
let _guard = Guard(dropped);
std::future::pending().await
}));

while ctx.recv().await.is_some() {}
}
});

let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
proxy.sync().await;
proxy.send(messages::Terminate::default()).await;
proxy.finished().await;

assert!(*dropped.lock());
}

0 comments on commit db81827

Please sign in to comment.