Skip to content

Commit

Permalink
fix!: Implement SansIO
Browse files Browse the repository at this point in the history
  • Loading branch information
jakoschiko committed May 4, 2024
1 parent 9bcac63 commit 3db7d4c
Show file tree
Hide file tree
Showing 21 changed files with 1,287 additions and 874 deletions.
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@ edition = "2021"
license = "MIT OR Apache-2.0"

[features]
default = ["stream"]
expose_stream = []
stream = ["dep:rustls", "dep:tokio", "dep:tokio-rustls"]

[dependencies]
bounded-static = "0.5.0"
bytes = "1.5.0"
imap-codec = { version = "2.0.0", features = ["quirk_crlf_relaxed", "bounded-static"] }
imap-types = { version = "2.0.0" }
rustls = { version = "0.21.7", optional = true }
thiserror = "1.0.49"
tokio = { version = "1.32.0", features = ["io-util"] }
tokio = { version = "1.32.0", optional = true, features = ["io-util", "macros", "net"] }
tokio-rustls = { version = "0.24.1", optional = true }
tracing = "0.1.40"

[dev-dependencies]
rand = "0.8.5"
tag-generator = { path = "tag-generator" }
tokio = { version = "1.32.0", features = ["macros", "net", "rt", "sync"] }
tokio = { version = "1.32.0", features = ["rt", "sync"] }

[workspace]
resolver = "2"
Expand Down
17 changes: 11 additions & 6 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use imap_flow::{
client::{ClientFlow, ClientFlowEvent, ClientFlowOptions},
stream::AnyStream,
stream::Stream,
};
use imap_types::{
command::{Command, CommandBody},
Expand All @@ -11,11 +11,16 @@ use tokio::net::TcpStream;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let mut stream = Stream::insecure(stream);
let mut client = ClientFlow::new(ClientFlowOptions::default());

let greeting = loop {
match stream.progress(&mut client).await.unwrap() {
ClientFlowEvent::GreetingReceived { greeting } => break greeting,
event => println!("unexpected event: {event:?}"),
}
};

let (mut client, greeting) =
ClientFlow::receive_greeting(AnyStream::new(stream), ClientFlowOptions::default())
.await
.unwrap();
println!("received greeting: {greeting:?}");

let handle = client.enqueue_command(Command {
Expand All @@ -24,7 +29,7 @@ async fn main() {
});

loop {
match client.progress().await.unwrap() {
match stream.progress(&mut client).await.unwrap() {
ClientFlowEvent::CommandSent {
handle: got_handle,
command,
Expand Down
24 changes: 13 additions & 11 deletions examples/client_authenticate.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{collections::VecDeque, error::Error};
use std::collections::VecDeque;

use imap_flow::{
client::{ClientFlow, ClientFlowEvent, ClientFlowOptions},
stream::AnyStream,
stream::Stream,
};
use imap_types::{
auth::{AuthMechanism, AuthenticateData},
Expand All @@ -12,13 +12,17 @@ use tag_generator::TagGenerator;
use tokio::net::TcpStream;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
let stream = AnyStream::new(TcpStream::connect("127.0.0.1:12345").await?);
async fn main() {
let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let mut stream = Stream::insecure(stream);
let mut client = ClientFlow::new(ClientFlowOptions::default());

let (mut client, greeting) =
ClientFlow::receive_greeting(stream, ClientFlowOptions::default()).await?;

println!("{greeting:?}");
loop {
match stream.progress(&mut client).await.unwrap() {
ClientFlowEvent::GreetingReceived { .. } => break,
event => println!("unexpected event: {event:?}"),
}
}

let mut tag_generator = TagGenerator::new();

Expand All @@ -34,7 +38,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
]);

loop {
let event = client.progress().await?;
let event = stream.progress(&mut client).await.unwrap();
println!("{event:?}");

match event {
Expand All @@ -53,6 +57,4 @@ async fn main() -> Result<(), Box<dyn Error>> {
_ => {}
}
}

Ok(())
}
18 changes: 11 additions & 7 deletions examples/client_idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::BufRead;

use imap_flow::{
client::{ClientFlow, ClientFlowEvent, ClientFlowOptions},
stream::AnyStream,
stream::Stream,
};
use imap_types::{
command::{Command, CommandBody},
Expand All @@ -14,11 +14,15 @@ use tokio::{net::TcpStream, sync::mpsc::Receiver};
#[tokio::main(flavor = "current_thread")]
async fn main() {
let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let mut stream = Stream::insecure(stream);
let mut client = ClientFlow::new(ClientFlowOptions::default());

let (mut client, _) =
ClientFlow::receive_greeting(AnyStream::new(stream), ClientFlowOptions::default())
.await
.unwrap();
loop {
match stream.progress(&mut client).await.unwrap() {
ClientFlowEvent::GreetingReceived { .. } => break,
event => println!("unexpected event: {event:?}"),
}
}

println!("Press ENTER to stop IDLE");
let mut lines = Lines::new();
Expand All @@ -31,7 +35,7 @@ async fn main() {

loop {
tokio::select! {
event = client.progress() => {
event = stream.progress(&mut client) => {
match event.unwrap() {
ClientFlowEvent::IdleCommandSent { .. } => {
println!("IDLE command sent")
Expand Down Expand Up @@ -69,7 +73,7 @@ async fn main() {
}

loop {
match client.progress().await.unwrap() {
match stream.progress(&mut client).await.unwrap() {
ref event @ ClientFlowEvent::StatusReceived {
status:
Status::Tagged(Tagged {
Expand Down
90 changes: 90 additions & 0 deletions examples/client_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::{
io::{Read, Write},
net::TcpStream,
};

use imap_flow::{
client::{ClientFlow, ClientFlowEvent, ClientFlowOptions},
FlowInterrupt, FlowIo,
};
use imap_types::{
command::{Command, CommandBody},
core::Tag,
};

fn main() {
let mut stream = TcpStream::connect("127.0.0.1:12345").unwrap();
let mut read_buffer = [0; 128];
let mut client = ClientFlow::new(ClientFlowOptions::default());

let greeting = loop {
match client.progress() {
Err(interrupt) => match interrupt {
FlowInterrupt::Io(FlowIo::NeedMoreInput) => {
let count = stream.read(&mut read_buffer).unwrap();
client.enqueue_input(&read_buffer[0..count]);
}
interrupt => panic!("unexpected interrupt: {interrupt:?}"),
},
Ok(event) => match event {
ClientFlowEvent::GreetingReceived { greeting } => break greeting,
event => println!("unexpected event: {event:?}"),
},
}
};

println!("received greeting: {greeting:?}");

let handle = client.enqueue_command(Command {
tag: Tag::try_from("A1").unwrap(),
body: CommandBody::login("Al¹cE", "pa²²w0rd").unwrap(),
});

loop {
match client.progress() {
Err(interrupt) => match interrupt {
FlowInterrupt::Io(FlowIo::NeedMoreInput) => {
let count = stream.read(&mut read_buffer).unwrap();
client.enqueue_input(&read_buffer[0..count]);
}
FlowInterrupt::Io(FlowIo::Output(bytes)) => {
stream.write_all(&bytes).unwrap();
}
FlowInterrupt::Error(error) => {
panic!("unexpected error: {error:?}");
}
},
Ok(event) => match event {
ClientFlowEvent::CommandSent {
handle: got_handle,
command,
} => {
println!("command sent: {got_handle:?}, {command:?}");
assert_eq!(handle, got_handle);
}
ClientFlowEvent::CommandRejected {
handle: got_handle,
command,
status,
} => {
println!("command rejected: {got_handle:?}, {command:?}, {status:?}");
assert_eq!(handle, got_handle);
}
ClientFlowEvent::DataReceived { data } => {
println!("data received: {data:?}");
}
ClientFlowEvent::StatusReceived { status } => {
println!("status received: {status:?}");
}
ClientFlowEvent::ContinuationRequestReceived {
continuation_request,
} => {
println!("unexpected continuation request received: {continuation_request:?}");
}
event => {
println!("{event:?}");
}
},
}
}
}
42 changes: 23 additions & 19 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
use std::collections::VecDeque;

use imap_flow::{
server::{ServerFlow, ServerFlowEvent, ServerFlowOptions},
stream::AnyStream,
stream::Stream,
};
use imap_types::response::{Greeting, Status};
use tokio::net::TcpListener;

#[tokio::main(flavor = "current_thread")]
async fn main() {
let (mut server, _) = {
let stream = {
let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap();
let (stream, _) = listener.accept().await.unwrap();
stream
};
let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap();
let (stream, _) = listener.accept().await.unwrap();
let mut stream = Stream::insecure(stream);
let mut server = ServerFlow::new(
ServerFlowOptions::default(),
Greeting::ok(None, "server (example)").unwrap(),
);

ServerFlow::send_greeting(
AnyStream::new(stream),
ServerFlowOptions::default(),
Greeting::ok(None, "server (example)").unwrap(),
)
.await
.unwrap()
};
loop {
match stream.progress(&mut server).await.unwrap() {
ServerFlowEvent::GreetingSent { greeting } => {
println!("greeting sent: {greeting:?}");
break;
}
event => println!("unexpected event: {event:?}"),
}
}

let mut handle = None;
let mut handles = VecDeque::new();

loop {
match server.progress().await.unwrap() {
match stream.progress(&mut server).await.unwrap() {
ServerFlowEvent::CommandReceived { command } => {
println!("command received: {command:?}");
handle = Some(
handles.push_back(
server.enqueue_status(Status::no(Some(command.tag), None, "...").unwrap()),
);
}
Expand All @@ -38,7 +42,7 @@ async fn main() {
response,
} => {
println!("response sent: {response:?}");
assert_eq!(handle, Some(got_handle));
assert_eq!(handles.pop_front(), Some(got_handle));
}
event => {
println!("{event:?}");
Expand Down
Loading

0 comments on commit 3db7d4c

Please sign in to comment.