diff --git a/quickwit/quickwit-actors/src/actor_state.rs b/quickwit/quickwit-actors/src/actor_state.rs index 8b9e4190f60..0fb68156fa4 100644 --- a/quickwit/quickwit-actors/src/actor_state.rs +++ b/quickwit/quickwit-actors/src/actor_state.rs @@ -71,7 +71,7 @@ impl ActorState { } } -pub struct AtomicState(AtomicU32); +pub(crate) struct AtomicState(AtomicU32); impl Default for AtomicState { fn default() -> Self { @@ -80,7 +80,7 @@ impl Default for AtomicState { } impl AtomicState { - pub fn process(&self) { + pub(crate) fn process(&self) { let _ = self.0.compare_exchange( ActorState::Idle as u32, ActorState::Processing as u32, @@ -89,7 +89,7 @@ impl AtomicState { ); } - pub fn idle(&self) { + pub(crate) fn idle(&self) { let _ = self.0.compare_exchange( ActorState::Processing as u32, ActorState::Idle as u32, @@ -98,7 +98,7 @@ impl AtomicState { ); } - pub fn pause(&self) { + pub(crate) fn pause(&self) { let _ = self .0 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { @@ -109,7 +109,7 @@ impl AtomicState { }); } - pub fn resume(&self) { + pub(crate) fn resume(&self) { let _ = self.0.compare_exchange( ActorState::Paused as u32, ActorState::Processing as u32, diff --git a/quickwit/quickwit-actors/src/spawn_builder.rs b/quickwit/quickwit-actors/src/spawn_builder.rs index 42d94f43ad4..c3f0b114b46 100644 --- a/quickwit/quickwit-actors/src/spawn_builder.rs +++ b/quickwit/quickwit-actors/src/spawn_builder.rs @@ -201,7 +201,13 @@ impl SpawnBuilder { } } -/// Returns `None` if no message is available at the moment. +/// Receives an envelope from either the high priority queue or the low priority queue. +/// +/// In the paused state, the actor will only attempt to receive high priority messages. +/// +/// If no message is available, this function will yield until a message arrives. +/// If a high priority message is arrives first it is guaranteed to be processed first. +/// This other way around is however not guaranteed. async fn recv_envelope(inbox: &mut Inbox, ctx: &ActorContext) -> Envelope { if ctx.state().is_running() { ctx.protect_future(inbox.recv()).await.expect(