-
-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Implement sans I/O #158
Conversation
0a5c698
to
3db7d4c
Compare
Pull Request Test Coverage Report for Build 8974528434Details
💛 - Coveralls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So neat! Looks awesome 🙂
Thanks, Jakob! I really like where this is going! 🎉
I would like to put the proxy and the examples under test tomorrow. Also, I'd like to have a better understanding of the TLS code. Maybe it makes sense to discuss this offline. So far, this looks all very promising :-) |
I started to play a bit with the new SansIO API within Himalaya directly, I think I understood well the concept and it's awesome. Yet I have a question, let's take the following code from your example: let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let mut stream = Stream::insecure(stream);
let client = ClientFlow::new(ClientFlowOptions::default());
let mut scheduler = Scheduler::new(client);
let handle1 = scheduler.enqueue_task(CapabilityTask::default());
loop {
match stream.progress(&mut scheduler).await.unwrap() {
SchedulerEvent::TaskFinished(mut token) => {
if let Some(capability) = handle1.resolve(&mut token) {
println!("handle1: {capability:?}");
break;
}
}
SchedulerEvent::Unsolicited(unsolicited) => {
println!("unsolicited: {unsolicited:?}");
if let Response::Status(Status::Bye { .. }) = unsolicited {
break;
}
}
}
} The scheduler takes care of enqueueing the task and processing the event, and the client (the lib consumer) needs to make the flow progress by running The problem: the previous code needs to be written for every single task, and it needs to be done client side because the scheduler does not know anymore how to progress (async). Only stream knows, but stream does not know how to process a task or an event. I see that a 3rd actor could be beneficial here, to prevent lib consumers to write their own helpers. If a lib consumer uses the scheduler and the stream (tokio + rustls), then the lib should be able to provide those helpers. But where to put it and how to name it? Just to illustrate what I try to say: let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let stream = Stream::insecure(stream);
let client = ClientFlow::new(ClientFlowOptions::default());
let scheduler = Scheduler::new(client);
// this is the 3rd actor I was thinking about
let mut session = Session::new(stream, scheduler)
let capabilities = session.capability().await.unwrap(); |
@soywod: I don't see how the situation has changed with sans I/O. But I would agree that we need better abstractions. Here another proposal: let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let stream = Stream::insecure(stream);
let client = ClientFlow::new(ClientFlowOptions::default());
let scheduler = Scheduler::new(client);
// Session is also sans I/O
let mut session = Session::new(scheduler);
// Borrows &mut session
let mut capability_request = session.capability();
let capabilities = stream.progress(&mut capability_request).await.unwrap(); |
In my initial PR, I added a
It is not really a better abstraction, more like a missing one, because I really like the existing you are proposing here.
Alright, I will give it a shot, thank you! |
@soywod: If you add another method to // Session is also sans I/O
let mut session = Session::new(scheduler);
// Stream::complete takes ownership of the given Flow
let capabilities = stream.complete(session.capability()).await.unwrap(); |
I see what you mean, but by taking the ownership of the Flow, you also take ownership of the current scheduler which prevents you to run multiple complete in a row. I'm experimenting around but cannot find yet sth satisfying. |
No, only the ownership of the result of |
But how to implement then the pub struct TaskResolver<T: Task> {
scheduler: Arc<Mutex<Scheduler>>,
handle: TaskHandle<T>,
}
impl<T: Task> TaskResolver<T> {
pub fn new(scheduler: Arc<Mutex<Scheduler>>, task: T) -> Self {
let handle = scheduler.lock().unwrap().enqueue_task(task);
Self { scheduler, handle }
}
}
impl<T: Task> Flow for TaskResolver<T> {
type Event = T::Output;
type Error = SchedulerError;
fn enqueue_input(&mut self, bytes: &[u8]) {
self.scheduler.lock().unwrap().enqueue_input(bytes);
}
fn progress(&mut self) -> Result<Self::Event, FlowInterrupt<Self::Error>> {
loop {
match self.scheduler.lock().unwrap().progress()? {
SchedulerEvent::TaskFinished(mut token) => {
if let Some(output) = self.handle.resolve(&mut token) {
break Ok(output);
}
}
SchedulerEvent::Unsolicited(unsolicited) => {
if let Response::Status(Status::Bye(bye)) = unsolicited {
break Err(SchedulerError::UnexpectedByeResponse(bye).into());
} else {
println!("unsolicited: {unsolicited:?}");
}
}
}
}
}
} Which is then used this way: pub struct Resolver {
scheduler: Arc<Mutex<Scheduler>>,
}
impl Resolver {
pub fn capability(&mut self) -> TaskResolver<CapabilityTask> {
TaskResolver::new(self.scheduler.clone(), CapabilityTask::new())
}
} And allows sth like: let scheduler = Scheduler::new(client);
let mut resolver = Resolver::new(scheduler);
let capabilities = stream.consume(resolver.capability()).await.unwrap();
let auth = stream
.consume(resolver.authenticate_plain("a", "b"))
.await
.unwrap();
Yes indeed, the |
Wait I have a better solution, I will come back to you this afternoon. |
Hm, maybe I didn't understand your use-case correctly. Do you want to process tasks sequentially or in parallel? I committed some small changes. pub async fn progress<F: Flow>(
&mut self,
mut flow: F,
) -> Result<F::Event, StreamError<F::Error>> And I added this implementation: impl<F: Flow> Flow for &mut F Now you can decide whether you move or borrow the // Borrows flow
stream.progress(&mut flow).await.unwrap();
// Moves flow
stream.progress(flow).await.unwrap(); This should give you a lot of flexibility. Let's discuss your |
Co-authored-by: Manos Pitsidianakis <manos@pitsidianak.is>
Co-authored-by: Damian Poddebniak <poddebniak@mailbox.org>
I merge it now. Thank you all. |
Thank you all! Awesome team effort :-) |
These types are now sans I/O:
imap_flow::client::ClientFlow
imap_flow::server::ServerFlow
imap_tasks::Scheduler
All of them implement the new trait
imap_flow::Flow
which provides just enough interface for implementing I/O utilities.For interacting with I/O there is now the optional utility
imap_flow::stream::Stream
based ontokio
andtokio_rustls
. This utility and its dependencies can be enabled/disabled with the feature flagstream
which is enabled by default.The current usage looks like this:
This PR is only the first (big) step. I think there are a lot of smaller API changes and follow-up refactorings that we can and should do. But for now let's focus on the rough design.
Closes #154
Closes #37
Closes #134
Relates to #98