Skip to content

Commit cacf896

Browse files
committed
feat: implement IMAP COMPRESS
1 parent 1954ce4 commit cacf896

File tree

6 files changed

+345
-2
lines changed

6 files changed

+345
-2
lines changed

.github/workflows/ci.yml

+6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ jobs:
3131
- name: check tokio
3232
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio
3333

34+
- name: check compress feature with tokio
35+
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio,compress
36+
37+
- name: check compress feature with async-std
38+
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std,compress
39+
3440
- name: check async-std examples
3541
working-directory: examples
3642
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std

Cargo.toml

+5-2
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ is-it-maintained-open-issues = { repository = "async-email/async-imap" }
2020

2121
[features]
2222
default = ["runtime-async-std"]
23+
compress = ["async-compression"]
2324

24-
runtime-async-std = ["async-std"]
25-
runtime-tokio = ["tokio"]
25+
runtime-async-std = ["async-std", "async-compression?/futures-io"]
26+
runtime-tokio = ["tokio", "async-compression?/tokio"]
2627

2728
[dependencies]
2829
async-channel = "2.0.0"
30+
async-compression = { git = "https://github.com/link2xt/async-compression.git", default-features = false, features = ["deflate"], optional = true, branch = "link2xt/miniz_oxide-consumes-all-input" }
2931
async-std = { version = "1.8.0", default-features = false, features = ["std", "unstable"], optional = true }
3032
base64 = "0.21"
3133
bytes = "1"
@@ -35,6 +37,7 @@ imap-proto = "0.16.4"
3537
log = "0.4.8"
3638
nom = "7.0"
3739
once_cell = "1.8.0"
40+
pin-project = "1"
3841
pin-utils = "0.1.0-alpha.4"
3942
self_cell = "1.0.1"
4043
stop-token = "0.7"

src/extensions/compress.rs

+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
//! IMAP COMPRESS extension specified in [RFC4978](https://www.rfc-editor.org/rfc/rfc4978.html).
2+
3+
use std::fmt;
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
7+
use pin_project::pin_project;
8+
9+
use crate::client::Session;
10+
use crate::error::Result;
11+
use crate::imap_stream::ImapStream;
12+
use crate::shared_stream::SharedStream;
13+
use crate::types::IdGenerator;
14+
use crate::Connection;
15+
16+
#[cfg(feature = "runtime-async-std")]
17+
use async_std::io::{BufReader, Read, Write};
18+
#[cfg(feature = "runtime-tokio")]
19+
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, BufReader, ReadBuf};
20+
21+
#[cfg(feature = "runtime-tokio")]
22+
use async_compression::tokio::bufread::DeflateDecoder;
23+
#[cfg(feature = "runtime-tokio")]
24+
use async_compression::tokio::write::DeflateEncoder;
25+
26+
#[cfg(feature = "runtime-async-std")]
27+
use async_compression::futures::bufread::DeflateDecoder;
28+
#[cfg(feature = "runtime-async-std")]
29+
use async_compression::futures::write::DeflateEncoder;
30+
31+
/// IMAP stream
32+
#[derive(Debug)]
33+
#[pin_project]
34+
pub struct DeflateStream<T: Read + Write + Unpin + fmt::Debug> {
35+
/// Shared stream reference to allow direct access
36+
/// to the underlying stream.
37+
stream: SharedStream<T>,
38+
39+
#[pin]
40+
decoder: DeflateDecoder<BufReader<SharedStream<T>>>,
41+
42+
#[pin]
43+
encoder: DeflateEncoder<SharedStream<T>>,
44+
}
45+
46+
impl<T: Read + Write + Unpin + fmt::Debug> DeflateStream<T> {
47+
pub(crate) fn new(stream: T) -> Self {
48+
let stream = SharedStream::new(stream);
49+
let decoder = DeflateDecoder::new(BufReader::new(stream.clone()));
50+
let encoder = DeflateEncoder::new(stream.clone());
51+
Self {
52+
stream,
53+
decoder,
54+
encoder,
55+
}
56+
}
57+
58+
/// Runs provided function while holding the lock on the underlying stream.
59+
///
60+
/// This allows to access the underlying stream while ensuring
61+
/// that no data is read from the stream or written into the stream at the same time.
62+
pub fn with_lock<R>(&self, f: impl FnOnce(Pin<&mut T>) -> R) -> R {
63+
self.stream.with_lock(f)
64+
}
65+
}
66+
67+
#[cfg(feature = "runtime-tokio")]
68+
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
69+
fn poll_read(
70+
self: Pin<&mut Self>,
71+
cx: &mut Context<'_>,
72+
buf: &mut ReadBuf<'_>,
73+
) -> Poll<std::io::Result<()>> {
74+
self.project().decoder.poll_read(cx, buf)
75+
}
76+
}
77+
78+
#[cfg(feature = "runtime-async-std")]
79+
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
80+
fn poll_read(
81+
self: Pin<&mut Self>,
82+
cx: &mut Context<'_>,
83+
buf: &mut [u8],
84+
) -> Poll<async_std::io::Result<usize>> {
85+
self.project().decoder.poll_read(cx, buf)
86+
}
87+
}
88+
89+
#[cfg(feature = "runtime-tokio")]
90+
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
91+
fn poll_write(
92+
self: Pin<&mut Self>,
93+
cx: &mut std::task::Context<'_>,
94+
buf: &[u8],
95+
) -> Poll<std::io::Result<usize>> {
96+
self.project().encoder.poll_write(cx, buf)
97+
}
98+
99+
fn poll_flush(
100+
self: Pin<&mut Self>,
101+
cx: &mut std::task::Context<'_>,
102+
) -> Poll<std::io::Result<()>> {
103+
self.project().encoder.poll_flush(cx)
104+
}
105+
106+
fn poll_shutdown(
107+
self: Pin<&mut Self>,
108+
cx: &mut std::task::Context<'_>,
109+
) -> Poll<std::io::Result<()>> {
110+
self.project().encoder.poll_shutdown(cx)
111+
}
112+
}
113+
114+
#[cfg(feature = "runtime-async-std")]
115+
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
116+
fn poll_write(
117+
self: Pin<&mut Self>,
118+
cx: &mut std::task::Context<'_>,
119+
buf: &[u8],
120+
) -> Poll<async_std::io::Result<usize>> {
121+
self.project().encoder.poll_write(cx, buf)
122+
}
123+
124+
fn poll_flush(
125+
self: Pin<&mut Self>,
126+
cx: &mut std::task::Context<'_>,
127+
) -> Poll<async_std::io::Result<()>> {
128+
self.project().encoder.poll_flush(cx)
129+
}
130+
131+
fn poll_close(
132+
self: Pin<&mut Self>,
133+
cx: &mut std::task::Context<'_>,
134+
) -> Poll<async_std::io::Result<()>> {
135+
self.project().encoder.poll_close(cx)
136+
}
137+
}
138+
139+
impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
140+
/// Runs `COMPRESS DEFLATE` command.
141+
pub async fn compress<F, S>(self, f: F) -> Result<Session<S>>
142+
where
143+
S: Read + Write + Unpin + fmt::Debug,
144+
F: FnOnce(DeflateStream<T>) -> S,
145+
{
146+
let Self {
147+
mut conn,
148+
unsolicited_responses_tx,
149+
unsolicited_responses,
150+
} = self;
151+
conn.run_command_and_check_ok("COMPRESS DEFLATE", Some(unsolicited_responses_tx.clone()))
152+
.await?;
153+
154+
let stream = conn.into_inner();
155+
let deflate_stream = DeflateStream::new(stream);
156+
let stream = ImapStream::new(f(deflate_stream));
157+
let conn = Connection {
158+
stream,
159+
request_ids: IdGenerator::new(),
160+
};
161+
let session = Session {
162+
conn,
163+
unsolicited_responses_tx,
164+
unsolicited_responses,
165+
};
166+
Ok(session)
167+
}
168+
}

src/extensions/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
//! Implementations of various IMAP extensions.
2+
#[cfg(feature = "compress")]
3+
pub mod compress;
4+
25
pub mod idle;
36

47
pub mod quota;

src/lib.rs

+6
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ mod imap_stream;
9595
mod parse;
9696
pub mod types;
9797

98+
#[cfg(feature = "compress")]
99+
pub use crate::extensions::compress::DeflateStream;
100+
101+
#[cfg(feature = "compress")]
102+
mod shared_stream;
103+
98104
pub use crate::authenticator::Authenticator;
99105
pub use crate::client::*;
100106

src/shared_stream.rs

+157
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
use std::pin::Pin;
2+
use std::sync::{Arc, Mutex};
3+
use std::task::{Context, Poll};
4+
5+
#[cfg(feature = "runtime-async-std")]
6+
use async_std::io::{Read, Write};
7+
#[cfg(feature = "runtime-tokio")]
8+
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, ReadBuf};
9+
10+
#[cfg(feature = "runtime-tokio")]
11+
#[derive(Debug)]
12+
pub(crate) struct SharedStream<T: std::fmt::Debug> {
13+
inner: Arc<Mutex<T>>,
14+
is_write_vectored: bool,
15+
}
16+
17+
#[cfg(feature = "runtime-async-std")]
18+
#[derive(Debug)]
19+
pub(crate) struct SharedStream<T: std::fmt::Debug> {
20+
inner: Arc<Mutex<T>>,
21+
}
22+
23+
#[cfg(feature = "runtime-tokio")]
24+
impl<T: std::fmt::Debug> Clone for SharedStream<T> {
25+
fn clone(&self) -> Self {
26+
Self {
27+
inner: Arc::clone(&self.inner),
28+
is_write_vectored: self.is_write_vectored,
29+
}
30+
}
31+
}
32+
33+
#[cfg(feature = "runtime-async-std")]
34+
impl<T: std::fmt::Debug> Clone for SharedStream<T> {
35+
fn clone(&self) -> Self {
36+
Self {
37+
inner: Arc::clone(&self.inner),
38+
}
39+
}
40+
}
41+
42+
#[cfg(feature = "runtime-tokio")]
43+
impl<T: std::fmt::Debug> SharedStream<T>
44+
where
45+
T: Read + Write,
46+
{
47+
pub(crate) fn new(stream: T) -> SharedStream<T> {
48+
let is_write_vectored = stream.is_write_vectored();
49+
50+
let inner = Arc::new(Mutex::new(stream));
51+
52+
Self {
53+
inner,
54+
is_write_vectored,
55+
}
56+
}
57+
}
58+
59+
#[cfg(feature = "runtime-async-std")]
60+
impl<T: std::fmt::Debug> SharedStream<T>
61+
where
62+
T: Read + Write,
63+
{
64+
pub(crate) fn new(stream: T) -> SharedStream<T> {
65+
let inner = Arc::new(Mutex::new(stream));
66+
67+
Self { inner }
68+
}
69+
}
70+
71+
impl<T: Unpin + std::fmt::Debug> SharedStream<T> {
72+
pub(crate) fn with_lock<R>(&self, f: impl FnOnce(Pin<&mut T>) -> R) -> R {
73+
let mut guard = self.inner.lock().unwrap();
74+
let stream = Pin::new(&mut *guard);
75+
f(stream)
76+
}
77+
}
78+
79+
#[cfg(feature = "runtime-tokio")]
80+
impl<T: Read + Unpin + std::fmt::Debug> Read for SharedStream<T> {
81+
fn poll_read(
82+
self: Pin<&mut Self>,
83+
cx: &mut Context<'_>,
84+
buf: &mut ReadBuf<'_>,
85+
) -> Poll<std::io::Result<()>> {
86+
self.with_lock(|stream| stream.poll_read(cx, buf))
87+
}
88+
}
89+
90+
#[cfg(feature = "runtime-async-std")]
91+
impl<T: Read + Unpin + std::fmt::Debug> Read for SharedStream<T> {
92+
fn poll_read(
93+
self: Pin<&mut Self>,
94+
cx: &mut Context<'_>,
95+
buf: &mut [u8],
96+
) -> Poll<async_std::io::Result<usize>> {
97+
self.with_lock(|stream| stream.poll_read(cx, buf))
98+
}
99+
}
100+
101+
#[cfg(feature = "runtime-tokio")]
102+
impl<T: Write + Unpin + std::fmt::Debug> Write for SharedStream<T> {
103+
fn poll_write(
104+
self: Pin<&mut Self>,
105+
cx: &mut Context<'_>,
106+
buf: &[u8],
107+
) -> Poll<std::io::Result<usize>> {
108+
self.with_lock(|stream| stream.poll_write(cx, buf))
109+
}
110+
111+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
112+
self.with_lock(|stream| stream.poll_flush(cx))
113+
}
114+
115+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
116+
self.with_lock(|stream| stream.poll_shutdown(cx))
117+
}
118+
119+
fn poll_write_vectored(
120+
self: Pin<&mut Self>,
121+
cx: &mut Context<'_>,
122+
bufs: &[std::io::IoSlice<'_>],
123+
) -> Poll<std::io::Result<usize>> {
124+
self.with_lock(|stream| stream.poll_write_vectored(cx, bufs))
125+
}
126+
127+
fn is_write_vectored(&self) -> bool {
128+
self.is_write_vectored
129+
}
130+
}
131+
132+
#[cfg(feature = "runtime-async-std")]
133+
impl<T: Write + Unpin + std::fmt::Debug> Write for SharedStream<T> {
134+
fn poll_write(
135+
self: Pin<&mut Self>,
136+
cx: &mut Context<'_>,
137+
buf: &[u8],
138+
) -> Poll<std::io::Result<usize>> {
139+
self.with_lock(|stream| stream.poll_write(cx, buf))
140+
}
141+
142+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<async_std::io::Result<()>> {
143+
self.with_lock(|stream| stream.poll_flush(cx))
144+
}
145+
146+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<async_std::io::Result<()>> {
147+
self.with_lock(|stream| stream.poll_close(cx))
148+
}
149+
150+
fn poll_write_vectored(
151+
self: Pin<&mut Self>,
152+
cx: &mut Context<'_>,
153+
bufs: &[async_std::io::IoSlice<'_>],
154+
) -> Poll<async_std::io::Result<usize>> {
155+
self.with_lock(|stream| stream.poll_write_vectored(cx, bufs))
156+
}
157+
}

0 commit comments

Comments
 (0)