Skip to content

Commit

Permalink
fix(core/request_table): avoid all-request to multiple groups handing
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Jul 4, 2024
1 parent 13ff047 commit 714274f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 63 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- tracing: improve performance of `TraceId::generate()`.
- dumping: remove unstable `Timestamp`.

### Fixed
- core/request: avoid all-request to multiple groups hanging in some cases ([#127]).

[#52]: https://github.com/elfo-rs/elfo/issues/52
[#127]: https://github.com/elfo-rs/elfo/pull/127
[#128]: https://github.com/elfo-rs/elfo/pull/128
Expand Down
17 changes: 4 additions & 13 deletions elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,14 @@ impl<C, K> Context<C, K> {
Ok(()) => success = true,
Err(err) => {
has_full |= err.is_full();
replace_unused(&mut unused, Some(err.into_inner()));
unused = Some(err.into_inner());
}
},
None => replace_unused(&mut unused, Some(envelope)),
None => unused = Some(envelope),
};
}

if success {
replace_unused(&mut unused, None);
Ok(())
} else if has_full {
Err(TrySendError::Full(e2m(unused.unwrap())))
Expand Down Expand Up @@ -305,7 +304,7 @@ impl<C, K> Context<C, K> {
let guard = EbrGuard::new();
let entry = self.book.get(recipient, &guard);
let object = ward!(entry, {
replace_unused(&mut unused, Some(envelope));
unused = Some(envelope);
continue;
});
Object::send(object, Addr::NULL, envelope)
Expand All @@ -314,14 +313,13 @@ impl<C, K> Context<C, K> {
.err()
.map(|err| err.0);

replace_unused(&mut unused, returned_envelope);
unused = returned_envelope;
if unused.is_none() {
success = true;
}
}

if success {
replace_unused(&mut unused, None);
Ok(())
} else {
Err(SendError(e2m(unused.unwrap())))
Expand Down Expand Up @@ -860,13 +858,6 @@ fn addrs_with_envelope(
})
}

fn replace_unused(dest: &mut Option<Envelope>, value: Option<Envelope>) {
if let Some(old) = dest.take() {
old.drop_as_unused();
}
*dest = value;
}

impl Context {
pub(crate) fn new(book: AddressBook, demux: Demux) -> Self {
Self {
Expand Down
18 changes: 2 additions & 16 deletions elfo-core/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,20 +265,6 @@ impl Envelope {
(message, kind)
}

pub(crate) fn drop_as_unused(mut self) {
// SAFETY: `self` is properly initialized.
let header = unsafe { self.0.as_mut() };

let fake_kind = MessageKind::Regular { sender: Addr::NULL };
let kind = mem::replace(&mut header.kind, fake_kind);

if let MessageKind::RequestAny(token) | MessageKind::RequestAll(token) = kind {
// FIXME: probably invalid for ALL requests, need to decrement remainder.
// REVIEW: DO NOT forget check & fix it before merging.
token.forget();
}
}

pub(crate) fn into_header_ptr(self) -> NonNull<EnvelopeHeader> {
let ptr = self.0;
mem::forget(self);
Expand Down Expand Up @@ -491,14 +477,14 @@ mod tests_miri {
drop(envelope2);
assert_eq!(Arc::strong_count(&counter), 3);

envelope3.drop_as_unused();
drop(envelope3);
assert_eq!(Arc::strong_count(&counter), 2);

let envelope4 = envelope.duplicate();
assert_eq!(Arc::strong_count(&counter), 3);
assert!(envelope4.is::<Sample>());

envelope.drop_as_unused();
drop(envelope);
assert_eq!(Arc::strong_count(&counter), 2);

drop(envelope4);
Expand Down
7 changes: 6 additions & 1 deletion elfo-core/src/request_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,20 @@ impl RequestData {
debug_assert!(self.responses.len() <= 1);

let is_ok = response.is_ok();

if self.responses.is_empty() {
self.responses.push(response);
}
// Priority: `Ok(_)` > `Err(Ignored)` > `Err(Failed)`
else if !matches!(&self.responses[0], Err(RequestError::Failed)) {
else if response.is_ok() {
debug_assert!(self.responses[0].is_err());
self.responses[0] = response;
} else if let Err(RequestError::Ignored) = response {
debug_assert!(self.responses[0].is_err());
self.responses[0] = response;
}

// Received `Ok`, so prevent further responses.
if is_ok {
self.remainder = 0;
}
Expand Down
147 changes: 114 additions & 33 deletions elfo/tests/request_routing.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,50 @@
#![cfg(feature = "test-util")]

use std::sync::Arc;
use std::time::Duration;

use tracing::info;

use elfo::{
prelude::*,
routers::{MapRouter, Outcome},
Topology,
RestartParams, RestartPolicy, Topology,
_priv::do_start,
};
use elfo_core::config::AnyConfig;
use tracing::info;

#[message(ret = u64)]
struct TestRequest;

#[message]
struct NeverSent;

fn setup_logger() {
let _ = tracing_subscriber::fmt()
.with_target(false)
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_test_writer()
.try_init();
}

#[tokio::test]
async fn test_stealing_request_routing() {
let (tx, rx) = futures_intrusive::channel::shared::oneshot_channel();
let tx = Arc::new(tx);

let requester_blueprint = ActorGroup::new().exec(move |ctx| {
let tx = tx.clone();

async move {
info!("sent request");
match ctx.request(TestRequest).resolve().await {
Ok(_) => info!("got response"),
Err(_) => {
tx.send(false).unwrap();
return;
}
}
tx.send(true).unwrap();
}
async fn stealing() {
setup_logger();

let requester_blueprint = ActorGroup::new().exec(move |mut ctx| async move {
info!("sent request");
ctx.request(TestRequest).resolve().await.unwrap();

let envelope = ctx.recv().await.unwrap();
msg!(match envelope {
(TestRequest, token) => ctx.respond(token, 42),
_ => unreachable!(),
});
});

let responder_blueprint = ActorGroup::new().exec(move |mut ctx| async move {
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
(_req @ TestRequest, token) => {
(TestRequest, token) => {
ctx.respond(token, 42);
info!("replied to request");
}
Expand All @@ -61,15 +64,10 @@ async fn test_stealing_request_routing() {
panic!("thief should not be started");
});

let _ = tracing_subscriber::fmt()
.with_target(false)
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_test_writer()
.try_init();

let topology = Topology::empty();
let configurers = topology.local("system.configurers").entrypoint();
let requester = topology.local("requester");
let requester_addr = requester.addr();
let responder = topology.local("responder");
let thief = topology.local("thief");

Expand All @@ -86,10 +84,93 @@ async fn test_stealing_request_routing() {
responder.mount(responder_blueprint);
thief.mount(thief_blueprint);

do_start(topology, false, |_, _| futures::future::ready(()))
.await
.expect("cannot start");
do_start(topology, false, |ctx, _| async move {
ctx.request_to(requester_addr, TestRequest).resolve().await
})
.await
.expect("cannot start")
.expect("requester actor failed");
}

#[tokio::test]
async fn multiple_failures() {
setup_logger();

fn success_blueprint(id: u64) -> Blueprint {
ActorGroup::new().exec(move |mut ctx| async move {
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
(TestRequest, token) => ctx.respond(token, id),
_ => unreachable!(),
})
}
})
}

fn failure_blueprint() -> Blueprint {
ActorGroup::new()
.restart_policy(RestartPolicy::on_failure(RestartParams::new(
Duration::from_secs(1000),
Duration::from_secs(1000),
)))
.exec(|_ctx| async move {
panic!("failure");
})
}

let topology = Topology::empty();
let configurers = topology.local("system.configurers").entrypoint();
let requester = topology.local("requester");
let requester_addr = requester.addr();
let responder_1_fail = topology.local("responder_1_fail");
let responder_2_fail = topology.local("responder_2_fail");
let responder_3_fail = topology.local("responder_3_fail");
let responder_4_succ = topology.local("responder_4_succ");
let responder_5_fail = topology.local("responder_5_fail");
let responder_6_succ = topology.local("responder_6_succ");
let responder_7_fail = topology.local("responder_7_fail");

requester.route_all_to(&responder_1_fail);
requester.route_all_to(&responder_2_fail);
requester.route_all_to(&responder_3_fail);
requester.route_all_to(&responder_4_succ);
requester.route_all_to(&responder_5_fail);
requester.route_all_to(&responder_6_succ);
requester.route_all_to(&responder_7_fail);

let requester_blueprint = ActorGroup::new().exec(move |mut ctx| async move {
for i in 0..10 {
info!("iter #{i}");
info!("sent any-request");
let response = ctx.request(TestRequest).resolve().await.unwrap();
assert!(response == 4 || response == 6);

info!("sent all-request");
let responses = ctx.request(TestRequest).all().resolve().await;
assert_eq!(responses.len(), 7, "{:?}", responses);
}

let envelope = ctx.recv().await.unwrap();
msg!(match envelope {
(TestRequest, token) => ctx.respond(token, 42),
_ => unreachable!(),
});
});

configurers.mount(elfo_configurer::fixture(&topology, AnyConfig::default()));
requester.mount(requester_blueprint);
responder_1_fail.mount(failure_blueprint());
responder_2_fail.mount(failure_blueprint());
responder_3_fail.mount(failure_blueprint());
responder_4_succ.mount(success_blueprint(4));
responder_5_fail.mount(failure_blueprint());
responder_6_succ.mount(success_blueprint(6));
responder_7_fail.mount(failure_blueprint());

let success = rx.receive().await.unwrap();
assert!(success);
do_start(topology, false, |ctx, _| async move {
ctx.request_to(requester_addr, TestRequest).resolve().await
})
.await
.expect("cannot start")
.expect("requester actor failed");
}

0 comments on commit 714274f

Please sign in to comment.