diff --git a/Cargo.toml b/Cargo.toml index 18bdd1e9..1ed1dcd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,8 @@ stream = ["dep:rustls", "dep:tokio", "dep:tokio-rustls"] [dependencies] bounded-static = "0.5.0" bytes = "1.5.0" -imap-codec = { version = "2.0.0", features = ["quirk_crlf_relaxed", "bounded-static"] } -imap-types = { version = "2.0.0" } +imap-codec = { version = "2.0.0", features = ["starttls", "quirk_crlf_relaxed", "bounded-static", "ext_condstore_qresync", "ext_login_referrals", "ext_mailbox_referrals", "ext_id", "ext_sort_thread", "ext_binary", "ext_metadata", "ext_uidplus"] } +imap-types = { version = "2.0.0", features = ["starttls", "ext_condstore_qresync", "ext_login_referrals", "ext_mailbox_referrals", "ext_id", "ext_sort_thread", "ext_binary", "ext_metadata", "ext_uidplus"] } rustls = { version = "0.23.1", optional = true } thiserror = "1.0.49" tokio = { version = "1.32.0", optional = true, features = ["io-util", "macros", "net"] } @@ -23,7 +23,7 @@ tracing = "0.1.40" [dev-dependencies] rand = "0.8.5" tag-generator = { path = "tag-generator" } -tokio = { version = "1.32.0", features = ["rt", "sync"] } +tokio = { version = "1.37.0", features = ["full"] } [workspace] resolver = "2" diff --git a/tasks/Cargo.toml b/tasks/Cargo.toml index cc2b03c9..cdcdde6c 100644 --- a/tasks/Cargo.toml +++ b/tasks/Cargo.toml @@ -9,4 +9,8 @@ imap-flow = { path = ".." } imap-types = "2.0.0" tag-generator = { path = "../tag-generator" } thiserror = "1.0.58" +tracing = "0.1.40" + +[dev-dependencies] +static_assertions = "1.1.0" tokio = { version = "1.37.0", features = ["full"] } diff --git a/tasks/examples/client-resolver.rs b/tasks/examples/client-resolver.rs new file mode 100644 index 00000000..733be033 --- /dev/null +++ b/tasks/examples/client-resolver.rs @@ -0,0 +1,47 @@ +use imap_flow::{ + client::{ClientFlow, ClientFlowOptions}, + stream::Stream, +}; +use tasks::{ + resolver::Resolver, + tasks::{authenticate::AuthenticateTask, capability::CapabilityTask, logout::LogoutTask}, +}; +use tokio::net::TcpStream; + +#[tokio::main] +async fn main() { + let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap(); + let mut stream = Stream::insecure(stream); + let client = ClientFlow::new(ClientFlowOptions::default()); + let mut resolver = Resolver::new(client); + + let capability = stream + .progress(resolver.resolve(CapabilityTask::new())) + .await + .unwrap() + .unwrap(); + + println!("pre-auth capability: {capability:?}"); + + let capability = stream + .progress(resolver.resolve(AuthenticateTask::plain("alice", "pa²²w0rd", true))) + .await + .unwrap() + .unwrap(); + + println!("maybe post-auth capability: {capability:?}"); + + let capability = stream + .progress(resolver.resolve(CapabilityTask::new())) + .await + .unwrap() + .unwrap(); + + println!("post-auth capability: {capability:?}"); + + stream + .progress(resolver.resolve(LogoutTask::default())) + .await + .unwrap() + .unwrap(); +} diff --git a/tasks/examples/client-tasks.rs b/tasks/examples/client-scheduler.rs similarity index 70% rename from tasks/examples/client-tasks.rs rename to tasks/examples/client-scheduler.rs index 62df076f..fb2f79ab 100644 --- a/tasks/examples/client-tasks.rs +++ b/tasks/examples/client-scheduler.rs @@ -16,13 +16,13 @@ async fn main() { let client = ClientFlow::new(ClientFlowOptions::default()); let mut scheduler = Scheduler::new(client); - let handle1 = scheduler.enqueue_task(CapabilityTask::default()); + let capability_handle = scheduler.enqueue_task(CapabilityTask::default()); loop { match stream.progress(&mut scheduler).await.unwrap() { SchedulerEvent::TaskFinished(mut token) => { - if let Some(capability) = handle1.resolve(&mut token) { - println!("handle1: {capability:?}"); + if let Some(capability) = capability_handle.resolve(&mut token) { + println!("capability: {capability:?}"); break; } } @@ -36,18 +36,18 @@ async fn main() { } } - let handle2 = scheduler.enqueue_task(AuthenticateTask::plain("alice", "pa²²w0rd", true)); - let handle3 = scheduler.enqueue_task(LogoutTask::default()); + let auth_handle = scheduler.enqueue_task(AuthenticateTask::plain("alice", "pa²²w0rd", true)); + let logout_handle = scheduler.enqueue_task(LogoutTask::default()); loop { match stream.progress(&mut scheduler).await.unwrap() { SchedulerEvent::TaskFinished(mut token) => { - if let Some(auth) = handle2.resolve(&mut token) { - println!("handle2: {auth:?}"); + if let Some(auth) = auth_handle.resolve(&mut token) { + println!("auth: {auth:?}"); } - if let Some(logout) = handle3.resolve(&mut token) { - println!("handle3: {logout:?}"); + if let Some(logout) = logout_handle.resolve(&mut token) { + println!("logout: {logout:?}"); break; } } diff --git a/tasks/src/lib.rs b/tasks/src/lib.rs index 7f82cf39..b4dcba8d 100644 --- a/tasks/src/lib.rs +++ b/tasks/src/lib.rs @@ -1,3 +1,4 @@ +pub mod resolver; pub mod tasks; use std::{ @@ -26,11 +27,11 @@ use thiserror::Error; /// and move out uninteresting responses (returning `Some(...)`). /// /// If no active task is interested in a given response, we call this response "unsolicited". -pub trait Task: 'static { +pub trait Task: Send + 'static { /// Output of the task. /// /// Returned in [`Self::process_tagged`]. - type Output; + type Output: Any + Send; /// Returns the [`CommandBody`] to issue for this task. /// @@ -342,6 +343,8 @@ pub enum SchedulerError { /// It's better to halt the execution to avoid damage. #[error("unexpected tag in command completion result")] UnexpectedTaggedResponse(Tagged<'static>), + #[error("unexpected BYE response")] + UnexpectedByeResponse(Bye<'static>), } #[derive(Eq)] @@ -398,7 +401,7 @@ impl TaskHandle { #[derive(Debug)] pub struct TaskToken { handle: ClientFlowCommandHandle, - output: Option>, + output: Option>, } // ------------------------------------------------------------------------------------------------- @@ -432,7 +435,7 @@ where /// /// * doesn't have an associated type and uses [`Any`] in [`Self::process_tagged`] /// * is an object-safe "subset" of [`Task`] -trait TaskAny { +trait TaskAny: Send { fn process_data(&mut self, data: Data<'static>) -> Option>; fn process_untagged(&mut self, status_body: StatusBody<'static>) @@ -450,7 +453,7 @@ trait TaskAny { fn process_bye(&mut self, bye: Bye<'static>) -> Option>; - fn process_tagged(self: Box, status_body: StatusBody<'static>) -> Box; + fn process_tagged(self: Box, status_body: StatusBody<'static>) -> Box; } impl TaskAny for T @@ -487,7 +490,16 @@ where } /// Returns [`Any`] instead of [`Task::Output`]. - fn process_tagged(self: Box, status_body: StatusBody<'static>) -> Box { + fn process_tagged(self: Box, status_body: StatusBody<'static>) -> Box { Box::new(T::process_tagged(*self, status_body)) } } + +#[cfg(test)] +mod tests { + use static_assertions::assert_impl_all; + + use super::Scheduler; + + assert_impl_all!(Scheduler: Send); +} diff --git a/tasks/src/resolver.rs b/tasks/src/resolver.rs new file mode 100644 index 00000000..2c41e3fe --- /dev/null +++ b/tasks/src/resolver.rs @@ -0,0 +1,87 @@ +use imap_flow::{client::ClientFlow, Flow, FlowInterrupt}; +use imap_types::response::{Response, Status}; +use tracing::warn; + +use crate::{Scheduler, SchedulerError, SchedulerEvent, Task, TaskHandle}; + +/// The resolver is a scheduler than manages one task at a time. +pub struct Resolver { + scheduler: Scheduler, +} + +impl Flow for Resolver { + type Event = SchedulerEvent; + type Error = SchedulerError; + + fn enqueue_input(&mut self, bytes: &[u8]) { + self.scheduler.enqueue_input(bytes); + } + + fn progress(&mut self) -> Result> { + self.scheduler.progress() + } +} + +impl Resolver { + /// Create a new resolver. + pub fn new(flow: ClientFlow) -> Self { + Self { + scheduler: Scheduler::new(flow), + } + } + + /// Enqueue a [`Task`] for immediate resolution. + pub fn resolve(&mut self, task: T) -> ResolvingTask { + let handle = self.scheduler.enqueue_task(task); + + ResolvingTask { + resolver: self, + handle, + } + } +} + +pub struct ResolvingTask<'a, T: Task> { + resolver: &'a mut Resolver, + handle: TaskHandle, +} + +impl Flow for ResolvingTask<'_, T> { + type Event = T::Output; + type Error = SchedulerError; + + fn enqueue_input(&mut self, bytes: &[u8]) { + self.resolver.enqueue_input(bytes); + } + + fn progress(&mut self) -> Result> { + loop { + match self.resolver.progress()? { + SchedulerEvent::TaskFinished(mut token) => { + if let Some(output) = self.handle.resolve(&mut token) { + break Ok(output); + } else { + warn!(?token, "received unexpected task token") + } + } + SchedulerEvent::Unsolicited(unsolicited) => { + if let Response::Status(Status::Bye(bye)) = unsolicited { + let err = SchedulerError::UnexpectedByeResponse(bye); + break Err(FlowInterrupt::Error(err)); + } else { + warn!(?unsolicited, "received unsolicited"); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use static_assertions::assert_impl_all; + + use super::Resolver; + + assert_impl_all!(Resolver: Send); +}