Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Implement sans I/O #158

Merged
merged 12 commits into from
May 7, 2024
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@ edition = "2021"
license = "MIT OR Apache-2.0"

[features]
default = ["stream"]
expose_stream = []
stream = ["dep:rustls", "dep:tokio", "dep:tokio-rustls"]
jakoschiko marked this conversation as resolved.
Show resolved Hide resolved

[dependencies]
bounded-static = "0.5.0"
bytes = "1.5.0"
imap-codec = { version = "2.0.0", features = ["quirk_crlf_relaxed", "bounded-static"] }
imap-types = { version = "2.0.0" }
rustls = { version = "0.21.11", optional = true }
thiserror = "1.0.49"
tokio = { version = "1.32.0", features = ["io-util"] }
tokio = { version = "1.32.0", optional = true, features = ["io-util", "macros", "net"] }
tokio-rustls = { version = "0.24.1", optional = true }
tracing = "0.1.40"

[dev-dependencies]
rand = "0.8.5"
tag-generator = { path = "tag-generator" }
tokio = { version = "1.32.0", features = ["macros", "net", "rt", "sync"] }
tokio = { version = "1.32.0", features = ["rt", "sync"] }

[workspace]
resolver = "2"
Expand Down
9 changes: 8 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,11 @@ allow-git = [
]

[licenses]
allow = [ "Apache-2.0", "MIT", "Unicode-DFS-2016" ]
allow = [ "Apache-2.0", "MIT", "Unicode-DFS-2016", "ISC", "OpenSSL" ]

[[licenses.clarify]]
name = "ring"
expression = "MIT AND ISC AND OpenSSL"
license-files = [
{ path = "LICENSE", hash = 0xbd0eed23 }
]
17 changes: 11 additions & 6 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use imap_flow::{
client::{ClientFlow, ClientFlowEvent, ClientFlowOptions},
stream::AnyStream,
stream::Stream,
};
use imap_types::{
command::{Command, CommandBody},
Expand All @@ -11,11 +11,16 @@ use tokio::net::TcpStream;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let mut stream = Stream::insecure(stream);
let mut client = ClientFlow::new(ClientFlowOptions::default());

let greeting = loop {
match stream.progress(&mut client).await.unwrap() {
ClientFlowEvent::GreetingReceived { greeting } => break greeting,
event => println!("unexpected event: {event:?}"),
}
};

let (mut client, greeting) =
ClientFlow::receive_greeting(AnyStream::new(stream), ClientFlowOptions::default())
.await
.unwrap();
println!("received greeting: {greeting:?}");

let handle = client.enqueue_command(Command {
Expand All @@ -24,7 +29,7 @@ async fn main() {
});

loop {
match client.progress().await.unwrap() {
match stream.progress(&mut client).await.unwrap() {
ClientFlowEvent::CommandSent {
handle: got_handle,
command,
Expand Down
24 changes: 13 additions & 11 deletions examples/client_authenticate.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{collections::VecDeque, error::Error};
use std::collections::VecDeque;

use imap_flow::{
client::{ClientFlow, ClientFlowEvent, ClientFlowOptions},
stream::AnyStream,
stream::Stream,
};
use imap_types::{
auth::{AuthMechanism, AuthenticateData},
Expand All @@ -12,13 +12,17 @@ use tag_generator::TagGenerator;
use tokio::net::TcpStream;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
let stream = AnyStream::new(TcpStream::connect("127.0.0.1:12345").await?);
async fn main() {
let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let mut stream = Stream::insecure(stream);
let mut client = ClientFlow::new(ClientFlowOptions::default());

let (mut client, greeting) =
ClientFlow::receive_greeting(stream, ClientFlowOptions::default()).await?;

println!("{greeting:?}");
loop {
match stream.progress(&mut client).await.unwrap() {
ClientFlowEvent::GreetingReceived { .. } => break,
event => println!("unexpected event: {event:?}"),
}
}

let mut tag_generator = TagGenerator::new();

Expand All @@ -34,7 +38,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
]);

loop {
let event = client.progress().await?;
let event = stream.progress(&mut client).await.unwrap();
println!("{event:?}");

match event {
Expand All @@ -53,6 +57,4 @@ async fn main() -> Result<(), Box<dyn Error>> {
_ => {}
}
}

Ok(())
}
18 changes: 11 additions & 7 deletions examples/client_idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::BufRead;

use imap_flow::{
client::{ClientFlow, ClientFlowEvent, ClientFlowOptions},
stream::AnyStream,
stream::Stream,
};
use imap_types::{
command::{Command, CommandBody},
Expand All @@ -14,11 +14,15 @@ use tokio::{net::TcpStream, sync::mpsc::Receiver};
#[tokio::main(flavor = "current_thread")]
async fn main() {
let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let mut stream = Stream::insecure(stream);
let mut client = ClientFlow::new(ClientFlowOptions::default());

let (mut client, _) =
ClientFlow::receive_greeting(AnyStream::new(stream), ClientFlowOptions::default())
.await
.unwrap();
loop {
match stream.progress(&mut client).await.unwrap() {
ClientFlowEvent::GreetingReceived { .. } => break,
event => println!("unexpected event: {event:?}"),
}
}

println!("Press ENTER to stop IDLE");
let mut lines = Lines::new();
Expand All @@ -31,7 +35,7 @@ async fn main() {

loop {
tokio::select! {
event = client.progress() => {
event = stream.progress(&mut client) => {
match event.unwrap() {
ClientFlowEvent::IdleCommandSent { .. } => {
println!("IDLE command sent")
Expand Down Expand Up @@ -69,7 +73,7 @@ async fn main() {
}

loop {
match client.progress().await.unwrap() {
match stream.progress(&mut client).await.unwrap() {
ref event @ ClientFlowEvent::StatusReceived {
status:
Status::Tagged(Tagged {
Expand Down
90 changes: 90 additions & 0 deletions examples/client_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::{
io::{Read, Write},
net::TcpStream,
};

use imap_flow::{
client::{ClientFlow, ClientFlowEvent, ClientFlowOptions},
FlowInterrupt, FlowIo,
};
use imap_types::{
command::{Command, CommandBody},
core::Tag,
};

fn main() {
let mut stream = TcpStream::connect("127.0.0.1:12345").unwrap();
let mut read_buffer = [0; 128];
let mut client = ClientFlow::new(ClientFlowOptions::default());

let greeting = loop {
match client.progress() {
Err(interrupt) => match interrupt {
FlowInterrupt::Io(FlowIo::NeedMoreInput) => {
let count = stream.read(&mut read_buffer).unwrap();
client.enqueue_input(&read_buffer[0..count]);
}
interrupt => panic!("unexpected interrupt: {interrupt:?}"),
},
Ok(event) => match event {
ClientFlowEvent::GreetingReceived { greeting } => break greeting,
event => println!("unexpected event: {event:?}"),
},
}
};

println!("received greeting: {greeting:?}");

let handle = client.enqueue_command(Command {
tag: Tag::try_from("A1").unwrap(),
body: CommandBody::login("Al¹cE", "pa²²w0rd").unwrap(),
});

loop {
match client.progress() {
Err(interrupt) => match interrupt {
FlowInterrupt::Io(FlowIo::NeedMoreInput) => {
let count = stream.read(&mut read_buffer).unwrap();
client.enqueue_input(&read_buffer[0..count]);
}
FlowInterrupt::Io(FlowIo::Output(bytes)) => {
stream.write_all(&bytes).unwrap();
}
FlowInterrupt::Error(error) => {
panic!("unexpected error: {error:?}");
}
},
Ok(event) => match event {
ClientFlowEvent::CommandSent {
handle: got_handle,
command,
} => {
println!("command sent: {got_handle:?}, {command:?}");
assert_eq!(handle, got_handle);
}
ClientFlowEvent::CommandRejected {
handle: got_handle,
command,
status,
} => {
println!("command rejected: {got_handle:?}, {command:?}, {status:?}");
assert_eq!(handle, got_handle);
}
ClientFlowEvent::DataReceived { data } => {
println!("data received: {data:?}");
}
ClientFlowEvent::StatusReceived { status } => {
println!("status received: {status:?}");
}
ClientFlowEvent::ContinuationRequestReceived {
continuation_request,
} => {
println!("unexpected continuation request received: {continuation_request:?}");
}
event => {
println!("{event:?}");
}
},
}
}
}
42 changes: 23 additions & 19 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
use std::collections::VecDeque;

use imap_flow::{
server::{ServerFlow, ServerFlowEvent, ServerFlowOptions},
stream::AnyStream,
stream::Stream,
};
use imap_types::response::{Greeting, Status};
use tokio::net::TcpListener;

#[tokio::main(flavor = "current_thread")]
async fn main() {
let (mut server, _) = {
let stream = {
let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap();
let (stream, _) = listener.accept().await.unwrap();
stream
};
let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap();
let (stream, _) = listener.accept().await.unwrap();
let mut stream = Stream::insecure(stream);
let mut server = ServerFlow::new(
ServerFlowOptions::default(),
Greeting::ok(None, "server (example)").unwrap(),
);

ServerFlow::send_greeting(
AnyStream::new(stream),
ServerFlowOptions::default(),
Greeting::ok(None, "server (example)").unwrap(),
)
.await
.unwrap()
};
loop {
match stream.progress(&mut server).await.unwrap() {
ServerFlowEvent::GreetingSent { greeting } => {
println!("greeting sent: {greeting:?}");
break;
}
event => println!("unexpected event: {event:?}"),
}
}

let mut handle = None;
let mut handles = VecDeque::new();

loop {
match server.progress().await.unwrap() {
match stream.progress(&mut server).await.unwrap() {
ServerFlowEvent::CommandReceived { command } => {
println!("command received: {command:?}");
handle = Some(
handles.push_back(
server.enqueue_status(Status::no(Some(command.tag), None, "...").unwrap()),
);
}
Expand All @@ -38,7 +42,7 @@ async fn main() {
response,
} => {
println!("response sent: {response:?}");
assert_eq!(handle, Some(got_handle));
assert_eq!(handles.pop_front(), Some(got_handle));
}
event => {
println!("{event:?}");
Expand Down
Loading
Loading