From 714274fbbe6d2929bc32bcabb61ac29b15ba454d Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Thu, 4 Jul 2024 19:12:09 +0400 Subject: [PATCH] fix(core/request_table): avoid all-request to multiple groups handing --- CHANGELOG.md | 3 + elfo-core/src/context.rs | 17 +--- elfo-core/src/envelope.rs | 18 +--- elfo-core/src/request_table.rs | 7 +- elfo/tests/request_routing.rs | 147 +++++++++++++++++++++++++-------- 5 files changed, 129 insertions(+), 63 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5edd8359..5d6a1799 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - tracing: improve performance of `TraceId::generate()`. - dumping: remove unstable `Timestamp`. +### Fixed +- core/request: avoid all-request to multiple groups hanging in some cases ([#127]). + [#52]: https://github.com/elfo-rs/elfo/issues/52 [#127]: https://github.com/elfo-rs/elfo/pull/127 [#128]: https://github.com/elfo-rs/elfo/pull/128 diff --git a/elfo-core/src/context.rs b/elfo-core/src/context.rs index c9f1f8da..05fea7dd 100644 --- a/elfo-core/src/context.rs +++ b/elfo-core/src/context.rs @@ -214,15 +214,14 @@ impl Context { Ok(()) => success = true, Err(err) => { has_full |= err.is_full(); - replace_unused(&mut unused, Some(err.into_inner())); + unused = Some(err.into_inner()); } }, - None => replace_unused(&mut unused, Some(envelope)), + None => unused = Some(envelope), }; } if success { - replace_unused(&mut unused, None); Ok(()) } else if has_full { Err(TrySendError::Full(e2m(unused.unwrap()))) @@ -305,7 +304,7 @@ impl Context { let guard = EbrGuard::new(); let entry = self.book.get(recipient, &guard); let object = ward!(entry, { - replace_unused(&mut unused, Some(envelope)); + unused = Some(envelope); continue; }); Object::send(object, Addr::NULL, envelope) @@ -314,14 +313,13 @@ impl Context { .err() .map(|err| err.0); - replace_unused(&mut unused, returned_envelope); + unused = returned_envelope; if unused.is_none() { success = true; } } if success { - replace_unused(&mut unused, None); Ok(()) } else { Err(SendError(e2m(unused.unwrap()))) @@ -860,13 +858,6 @@ fn addrs_with_envelope( }) } -fn replace_unused(dest: &mut Option, value: Option) { - if let Some(old) = dest.take() { - old.drop_as_unused(); - } - *dest = value; -} - impl Context { pub(crate) fn new(book: AddressBook, demux: Demux) -> Self { Self { diff --git a/elfo-core/src/envelope.rs b/elfo-core/src/envelope.rs index 257627cb..f3c38ba7 100644 --- a/elfo-core/src/envelope.rs +++ b/elfo-core/src/envelope.rs @@ -265,20 +265,6 @@ impl Envelope { (message, kind) } - pub(crate) fn drop_as_unused(mut self) { - // SAFETY: `self` is properly initialized. - let header = unsafe { self.0.as_mut() }; - - let fake_kind = MessageKind::Regular { sender: Addr::NULL }; - let kind = mem::replace(&mut header.kind, fake_kind); - - if let MessageKind::RequestAny(token) | MessageKind::RequestAll(token) = kind { - // FIXME: probably invalid for ALL requests, need to decrement remainder. - // REVIEW: DO NOT forget check & fix it before merging. - token.forget(); - } - } - pub(crate) fn into_header_ptr(self) -> NonNull { let ptr = self.0; mem::forget(self); @@ -491,14 +477,14 @@ mod tests_miri { drop(envelope2); assert_eq!(Arc::strong_count(&counter), 3); - envelope3.drop_as_unused(); + drop(envelope3); assert_eq!(Arc::strong_count(&counter), 2); let envelope4 = envelope.duplicate(); assert_eq!(Arc::strong_count(&counter), 3); assert!(envelope4.is::()); - envelope.drop_as_unused(); + drop(envelope); assert_eq!(Arc::strong_count(&counter), 2); drop(envelope4); diff --git a/elfo-core/src/request_table.rs b/elfo-core/src/request_table.rs index afe7bad1..55dc0885 100644 --- a/elfo-core/src/request_table.rs +++ b/elfo-core/src/request_table.rs @@ -77,15 +77,20 @@ impl RequestData { debug_assert!(self.responses.len() <= 1); let is_ok = response.is_ok(); + if self.responses.is_empty() { self.responses.push(response); } // Priority: `Ok(_)` > `Err(Ignored)` > `Err(Failed)` - else if !matches!(&self.responses[0], Err(RequestError::Failed)) { + else if response.is_ok() { + debug_assert!(self.responses[0].is_err()); + self.responses[0] = response; + } else if let Err(RequestError::Ignored) = response { debug_assert!(self.responses[0].is_err()); self.responses[0] = response; } + // Received `Ok`, so prevent further responses. if is_ok { self.remainder = 0; } diff --git a/elfo/tests/request_routing.rs b/elfo/tests/request_routing.rs index 2d479529..94920fff 100644 --- a/elfo/tests/request_routing.rs +++ b/elfo/tests/request_routing.rs @@ -1,15 +1,16 @@ #![cfg(feature = "test-util")] -use std::sync::Arc; +use std::time::Duration; + +use tracing::info; use elfo::{ prelude::*, routers::{MapRouter, Outcome}, - Topology, + RestartParams, RestartPolicy, Topology, _priv::do_start, }; use elfo_core::config::AnyConfig; -use tracing::info; #[message(ret = u64)] struct TestRequest; @@ -17,31 +18,33 @@ struct TestRequest; #[message] struct NeverSent; +fn setup_logger() { + let _ = tracing_subscriber::fmt() + .with_target(false) + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_test_writer() + .try_init(); +} + #[tokio::test] -async fn test_stealing_request_routing() { - let (tx, rx) = futures_intrusive::channel::shared::oneshot_channel(); - let tx = Arc::new(tx); - - let requester_blueprint = ActorGroup::new().exec(move |ctx| { - let tx = tx.clone(); - - async move { - info!("sent request"); - match ctx.request(TestRequest).resolve().await { - Ok(_) => info!("got response"), - Err(_) => { - tx.send(false).unwrap(); - return; - } - } - tx.send(true).unwrap(); - } +async fn stealing() { + setup_logger(); + + let requester_blueprint = ActorGroup::new().exec(move |mut ctx| async move { + info!("sent request"); + ctx.request(TestRequest).resolve().await.unwrap(); + + let envelope = ctx.recv().await.unwrap(); + msg!(match envelope { + (TestRequest, token) => ctx.respond(token, 42), + _ => unreachable!(), + }); }); let responder_blueprint = ActorGroup::new().exec(move |mut ctx| async move { while let Some(envelope) = ctx.recv().await { msg!(match envelope { - (_req @ TestRequest, token) => { + (TestRequest, token) => { ctx.respond(token, 42); info!("replied to request"); } @@ -61,15 +64,10 @@ async fn test_stealing_request_routing() { panic!("thief should not be started"); }); - let _ = tracing_subscriber::fmt() - .with_target(false) - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .with_test_writer() - .try_init(); - let topology = Topology::empty(); let configurers = topology.local("system.configurers").entrypoint(); let requester = topology.local("requester"); + let requester_addr = requester.addr(); let responder = topology.local("responder"); let thief = topology.local("thief"); @@ -86,10 +84,93 @@ async fn test_stealing_request_routing() { responder.mount(responder_blueprint); thief.mount(thief_blueprint); - do_start(topology, false, |_, _| futures::future::ready(())) - .await - .expect("cannot start"); + do_start(topology, false, |ctx, _| async move { + ctx.request_to(requester_addr, TestRequest).resolve().await + }) + .await + .expect("cannot start") + .expect("requester actor failed"); +} + +#[tokio::test] +async fn multiple_failures() { + setup_logger(); + + fn success_blueprint(id: u64) -> Blueprint { + ActorGroup::new().exec(move |mut ctx| async move { + while let Some(envelope) = ctx.recv().await { + msg!(match envelope { + (TestRequest, token) => ctx.respond(token, id), + _ => unreachable!(), + }) + } + }) + } + + fn failure_blueprint() -> Blueprint { + ActorGroup::new() + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(1000), + Duration::from_secs(1000), + ))) + .exec(|_ctx| async move { + panic!("failure"); + }) + } + + let topology = Topology::empty(); + let configurers = topology.local("system.configurers").entrypoint(); + let requester = topology.local("requester"); + let requester_addr = requester.addr(); + let responder_1_fail = topology.local("responder_1_fail"); + let responder_2_fail = topology.local("responder_2_fail"); + let responder_3_fail = topology.local("responder_3_fail"); + let responder_4_succ = topology.local("responder_4_succ"); + let responder_5_fail = topology.local("responder_5_fail"); + let responder_6_succ = topology.local("responder_6_succ"); + let responder_7_fail = topology.local("responder_7_fail"); + + requester.route_all_to(&responder_1_fail); + requester.route_all_to(&responder_2_fail); + requester.route_all_to(&responder_3_fail); + requester.route_all_to(&responder_4_succ); + requester.route_all_to(&responder_5_fail); + requester.route_all_to(&responder_6_succ); + requester.route_all_to(&responder_7_fail); + + let requester_blueprint = ActorGroup::new().exec(move |mut ctx| async move { + for i in 0..10 { + info!("iter #{i}"); + info!("sent any-request"); + let response = ctx.request(TestRequest).resolve().await.unwrap(); + assert!(response == 4 || response == 6); + + info!("sent all-request"); + let responses = ctx.request(TestRequest).all().resolve().await; + assert_eq!(responses.len(), 7, "{:?}", responses); + } + + let envelope = ctx.recv().await.unwrap(); + msg!(match envelope { + (TestRequest, token) => ctx.respond(token, 42), + _ => unreachable!(), + }); + }); + + configurers.mount(elfo_configurer::fixture(&topology, AnyConfig::default())); + requester.mount(requester_blueprint); + responder_1_fail.mount(failure_blueprint()); + responder_2_fail.mount(failure_blueprint()); + responder_3_fail.mount(failure_blueprint()); + responder_4_succ.mount(success_blueprint(4)); + responder_5_fail.mount(failure_blueprint()); + responder_6_succ.mount(success_blueprint(6)); + responder_7_fail.mount(failure_blueprint()); - let success = rx.receive().await.unwrap(); - assert!(success); + do_start(topology, false, |ctx, _| async move { + ctx.request_to(requester_addr, TestRequest).resolve().await + }) + .await + .expect("cannot start") + .expect("requester actor failed"); }