Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task runner (mid-level API), part 1 #172

Merged
merged 7 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ stream = ["dep:rustls", "dep:tokio", "dep:tokio-rustls"]
[dependencies]
bounded-static = "0.5.0"
bytes = "1.5.0"
imap-codec = { version = "2.0.0", features = ["quirk_crlf_relaxed", "bounded-static"] }
imap-types = { version = "2.0.0" }
imap-codec = { version = "2.0.0", features = ["starttls", "quirk_crlf_relaxed", "bounded-static", "ext_condstore_qresync", "ext_login_referrals", "ext_mailbox_referrals", "ext_id", "ext_sort_thread", "ext_binary", "ext_metadata", "ext_uidplus"] }
imap-types = { version = "2.0.0", features = ["starttls", "ext_condstore_qresync", "ext_login_referrals", "ext_mailbox_referrals", "ext_id", "ext_sort_thread", "ext_binary", "ext_metadata", "ext_uidplus"] }
rustls = { version = "0.23.1", optional = true }
thiserror = "1.0.49"
tokio = { version = "1.32.0", optional = true, features = ["io-util", "macros", "net"] }
Expand All @@ -23,7 +23,7 @@ tracing = "0.1.40"
[dev-dependencies]
rand = "0.8.5"
tag-generator = { path = "tag-generator" }
tokio = { version = "1.32.0", features = ["rt", "sync"] }
tokio = { version = "1.37.0", features = ["full"] }

[workspace]
resolver = "2"
Expand Down
4 changes: 4 additions & 0 deletions tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ imap-flow = { path = ".." }
imap-types = "2.0.0"
tag-generator = { path = "../tag-generator" }
thiserror = "1.0.58"
tracing = "0.1.40"

[dev-dependencies]
static_assertions = "1.1.0"
tokio = { version = "1.37.0", features = ["full"] }
47 changes: 47 additions & 0 deletions tasks/examples/client-resolver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use imap_flow::{
client::{ClientFlow, ClientFlowOptions},
stream::Stream,
};
use tasks::{
resolver::Resolver,
tasks::{authenticate::AuthenticateTask, capability::CapabilityTask, logout::LogoutTask},
};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() {
let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let mut stream = Stream::insecure(stream);
let client = ClientFlow::new(ClientFlowOptions::default());
let mut resolver = Resolver::new(client);

let capability = stream
.progress(resolver.resolve(CapabilityTask::new()))
.await
.unwrap()
.unwrap();

println!("pre-auth capability: {capability:?}");

let capability = stream
.progress(resolver.resolve(AuthenticateTask::plain("alice", "pa²²w0rd", true)))
.await
.unwrap()
.unwrap();

println!("maybe post-auth capability: {capability:?}");

let capability = stream
.progress(resolver.resolve(CapabilityTask::new()))
.await
.unwrap()
.unwrap();

println!("post-auth capability: {capability:?}");

stream
.progress(resolver.resolve(LogoutTask::default()))
.await
.unwrap()
.unwrap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ async fn main() {
let client = ClientFlow::new(ClientFlowOptions::default());
let mut scheduler = Scheduler::new(client);

let handle1 = scheduler.enqueue_task(CapabilityTask::default());
let capability_handle = scheduler.enqueue_task(CapabilityTask::default());

loop {
match stream.progress(&mut scheduler).await.unwrap() {
SchedulerEvent::TaskFinished(mut token) => {
if let Some(capability) = handle1.resolve(&mut token) {
println!("handle1: {capability:?}");
if let Some(capability) = capability_handle.resolve(&mut token) {
println!("capability: {capability:?}");
break;
}
}
Expand All @@ -36,18 +36,18 @@ async fn main() {
}
}

let handle2 = scheduler.enqueue_task(AuthenticateTask::plain("alice", "pa²²w0rd", true));
let handle3 = scheduler.enqueue_task(LogoutTask::default());
let auth_handle = scheduler.enqueue_task(AuthenticateTask::plain("alice", "pa²²w0rd", true));
let logout_handle = scheduler.enqueue_task(LogoutTask::default());

loop {
match stream.progress(&mut scheduler).await.unwrap() {
SchedulerEvent::TaskFinished(mut token) => {
if let Some(auth) = handle2.resolve(&mut token) {
println!("handle2: {auth:?}");
if let Some(auth) = auth_handle.resolve(&mut token) {
println!("auth: {auth:?}");
}

if let Some(logout) = handle3.resolve(&mut token) {
println!("handle3: {logout:?}");
if let Some(logout) = logout_handle.resolve(&mut token) {
println!("logout: {logout:?}");
break;
}
}
Expand Down
24 changes: 18 additions & 6 deletions tasks/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod resolver;
pub mod tasks;

use std::{
Expand Down Expand Up @@ -26,11 +27,11 @@ use thiserror::Error;
/// and move out uninteresting responses (returning `Some(...)`).
///
/// If no active task is interested in a given response, we call this response "unsolicited".
pub trait Task: 'static {
pub trait Task: Send + 'static {
soywod marked this conversation as resolved.
Show resolved Hide resolved
/// Output of the task.
///
/// Returned in [`Self::process_tagged`].
type Output;
type Output: Any + Send;

/// Returns the [`CommandBody`] to issue for this task.
///
Expand Down Expand Up @@ -342,6 +343,8 @@ pub enum SchedulerError {
/// It's better to halt the execution to avoid damage.
#[error("unexpected tag in command completion result")]
UnexpectedTaggedResponse(Tagged<'static>),
#[error("unexpected BYE response")]
UnexpectedByeResponse(Bye<'static>),
}

#[derive(Eq)]
Expand Down Expand Up @@ -398,7 +401,7 @@ impl<T: Task> TaskHandle<T> {
#[derive(Debug)]
pub struct TaskToken {
handle: ClientFlowCommandHandle,
output: Option<Box<dyn Any>>,
output: Option<Box<dyn Any + Send>>,
}

// -------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -432,7 +435,7 @@ where
///
/// * doesn't have an associated type and uses [`Any`] in [`Self::process_tagged`]
/// * is an object-safe "subset" of [`Task`]
trait TaskAny {
trait TaskAny: Send {
fn process_data(&mut self, data: Data<'static>) -> Option<Data<'static>>;

fn process_untagged(&mut self, status_body: StatusBody<'static>)
Expand All @@ -450,7 +453,7 @@ trait TaskAny {

fn process_bye(&mut self, bye: Bye<'static>) -> Option<Bye<'static>>;

fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any>;
fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any + Send>;
}

impl<T> TaskAny for T
Expand Down Expand Up @@ -487,7 +490,16 @@ where
}

/// Returns [`Any`] instead of [`Task::Output`].
fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any> {
fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any + Send> {
Box::new(T::process_tagged(*self, status_body))
}
}

#[cfg(test)]
mod tests {
use static_assertions::assert_impl_all;

use super::Scheduler;

assert_impl_all!(Scheduler: Send);
}
87 changes: 87 additions & 0 deletions tasks/src/resolver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use imap_flow::{client::ClientFlow, Flow, FlowInterrupt};
use imap_types::response::{Response, Status};
use tracing::warn;

use crate::{Scheduler, SchedulerError, SchedulerEvent, Task, TaskHandle};

/// The resolver is a scheduler than manages one task at a time.
pub struct Resolver {
scheduler: Scheduler,
}

impl Flow for Resolver {
type Event = SchedulerEvent;
type Error = SchedulerError;

fn enqueue_input(&mut self, bytes: &[u8]) {
self.scheduler.enqueue_input(bytes);
}

fn progress(&mut self) -> Result<Self::Event, FlowInterrupt<Self::Error>> {
self.scheduler.progress()
}
}

impl Resolver {
/// Create a new resolver.
pub fn new(flow: ClientFlow) -> Self {
jakoschiko marked this conversation as resolved.
Show resolved Hide resolved
Self {
scheduler: Scheduler::new(flow),
}
}

/// Enqueue a [`Task`] for immediate resolution.
pub fn resolve<T: Task>(&mut self, task: T) -> ResolvingTask<T> {
let handle = self.scheduler.enqueue_task(task);

ResolvingTask {
resolver: self,
handle,
}
}
}

pub struct ResolvingTask<'a, T: Task> {
resolver: &'a mut Resolver,
handle: TaskHandle<T>,
}

impl<T: Task> Flow for ResolvingTask<'_, T> {
type Event = T::Output;
type Error = SchedulerError;

fn enqueue_input(&mut self, bytes: &[u8]) {
self.resolver.enqueue_input(bytes);
}

fn progress(&mut self) -> Result<Self::Event, FlowInterrupt<Self::Error>> {
loop {
match self.resolver.progress()? {
SchedulerEvent::TaskFinished(mut token) => {
if let Some(output) = self.handle.resolve(&mut token) {
break Ok(output);
} else {
warn!(?token, "received unexpected task token")
}
}
SchedulerEvent::Unsolicited(unsolicited) => {
if let Response::Status(Status::Bye(bye)) = unsolicited {
let err = SchedulerError::UnexpectedByeResponse(bye);
break Err(FlowInterrupt::Error(err));
} else {
warn!(?unsolicited, "received unsolicited");
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use static_assertions::assert_impl_all;

use super::Resolver;

assert_impl_all!(Resolver: Send);
}
Loading