Skip to content

Commit a513a7b

Browse files
committed
feat: implement IMAP COMPRESS
1 parent 1954ce4 commit a513a7b

File tree

5 files changed

+210
-2
lines changed

5 files changed

+210
-2
lines changed

.github/workflows/ci.yml

+6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ jobs:
3131
- name: check tokio
3232
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio
3333

34+
- name: check compress feature with tokio
35+
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio,compress
36+
37+
- name: check compress feature with async-std
38+
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std,compress
39+
3440
- name: check async-std examples
3541
working-directory: examples
3642
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std

Cargo.toml

+5-2
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ is-it-maintained-open-issues = { repository = "async-email/async-imap" }
2020

2121
[features]
2222
default = ["runtime-async-std"]
23+
compress = ["async-compression"]
2324

24-
runtime-async-std = ["async-std"]
25-
runtime-tokio = ["tokio"]
25+
runtime-async-std = ["async-std", "async-compression?/futures-io"]
26+
runtime-tokio = ["tokio", "async-compression?/tokio"]
2627

2728
[dependencies]
2829
async-channel = "2.0.0"
30+
async-compression = { version = "0.4.15", default-features = false, features = ["deflate"], optional = true }
2931
async-std = { version = "1.8.0", default-features = false, features = ["std", "unstable"], optional = true }
3032
base64 = "0.21"
3133
bytes = "1"
@@ -35,6 +37,7 @@ imap-proto = "0.16.4"
3537
log = "0.4.8"
3638
nom = "7.0"
3739
once_cell = "1.8.0"
40+
pin-project = "1"
3841
pin-utils = "0.1.0-alpha.4"
3942
self_cell = "1.0.1"
4043
stop-token = "0.7"

src/extensions/compress.rs

+193
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
//! IMAP COMPRESS extension specified in [RFC4978](https://www.rfc-editor.org/rfc/rfc4978.html).
2+
3+
use std::fmt;
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
7+
use pin_project::pin_project;
8+
9+
use crate::client::Session;
10+
use crate::error::Result;
11+
use crate::imap_stream::ImapStream;
12+
use crate::types::IdGenerator;
13+
use crate::Connection;
14+
15+
#[cfg(feature = "runtime-async-std")]
16+
use async_std::io::{IoSlice, IoSliceMut, Read, Write};
17+
#[cfg(feature = "runtime-async-std")]
18+
use futures::io::BufReader;
19+
#[cfg(feature = "runtime-tokio")]
20+
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, BufReader, ReadBuf};
21+
22+
#[cfg(feature = "runtime-tokio")]
23+
use async_compression::tokio::bufread::DeflateDecoder;
24+
#[cfg(feature = "runtime-tokio")]
25+
use async_compression::tokio::write::DeflateEncoder;
26+
27+
#[cfg(feature = "runtime-async-std")]
28+
use async_compression::futures::bufread::DeflateDecoder;
29+
#[cfg(feature = "runtime-async-std")]
30+
use async_compression::futures::write::DeflateEncoder;
31+
32+
/// Network stream compressed with DEFLATE.
33+
#[derive(Debug)]
34+
#[pin_project]
35+
pub struct DeflateStream<T: Read + Write + Unpin + fmt::Debug> {
36+
#[pin]
37+
inner: DeflateDecoder<BufReader<DeflateEncoder<T>>>,
38+
}
39+
40+
impl<T: Read + Write + Unpin + fmt::Debug> DeflateStream<T> {
41+
pub(crate) fn new(stream: T) -> Self {
42+
let stream = DeflateEncoder::new(stream);
43+
let stream = BufReader::new(stream);
44+
let stream = DeflateDecoder::new(stream);
45+
Self { inner: stream }
46+
}
47+
48+
/// Gets a reference to the underlying stream.
49+
pub fn get_ref(&self) -> &T {
50+
self.inner.get_ref().get_ref().get_ref()
51+
}
52+
53+
/// Gets a mutable reference to the underlying stream.
54+
pub fn get_mut(&mut self) -> &mut T {
55+
self.inner.get_mut().get_mut().get_mut()
56+
}
57+
58+
/// Consumes `DeflateStream` and returns underlying stream.
59+
pub fn into_inner(self) -> T {
60+
self.inner.into_inner().into_inner().into_inner()
61+
}
62+
}
63+
64+
#[cfg(feature = "runtime-tokio")]
65+
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
66+
fn poll_read(
67+
self: Pin<&mut Self>,
68+
cx: &mut Context<'_>,
69+
buf: &mut ReadBuf<'_>,
70+
) -> Poll<std::io::Result<()>> {
71+
self.project().inner.poll_read(cx, buf)
72+
}
73+
}
74+
75+
#[cfg(feature = "runtime-async-std")]
76+
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
77+
fn poll_read(
78+
self: Pin<&mut Self>,
79+
cx: &mut Context<'_>,
80+
buf: &mut [u8],
81+
) -> Poll<async_std::io::Result<usize>> {
82+
self.project().inner.poll_read(cx, buf)
83+
}
84+
85+
fn poll_read_vectored(
86+
self: Pin<&mut Self>,
87+
cx: &mut Context<'_>,
88+
bufs: &mut [IoSliceMut<'_>],
89+
) -> Poll<async_std::io::Result<usize>> {
90+
self.project().inner.poll_read_vectored(cx, bufs)
91+
}
92+
}
93+
94+
#[cfg(feature = "runtime-tokio")]
95+
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
96+
fn poll_write(
97+
self: Pin<&mut Self>,
98+
cx: &mut std::task::Context<'_>,
99+
buf: &[u8],
100+
) -> Poll<std::io::Result<usize>> {
101+
self.project().inner.get_pin_mut().poll_write(cx, buf)
102+
}
103+
104+
fn poll_flush(
105+
self: Pin<&mut Self>,
106+
cx: &mut std::task::Context<'_>,
107+
) -> Poll<std::io::Result<()>> {
108+
self.project().inner.poll_flush(cx)
109+
}
110+
111+
fn poll_shutdown(
112+
self: Pin<&mut Self>,
113+
cx: &mut std::task::Context<'_>,
114+
) -> Poll<std::io::Result<()>> {
115+
self.project().inner.poll_shutdown(cx)
116+
}
117+
118+
fn poll_write_vectored(
119+
self: Pin<&mut Self>,
120+
cx: &mut Context<'_>,
121+
bufs: &[std::io::IoSlice<'_>],
122+
) -> Poll<std::io::Result<usize>> {
123+
self.project().inner.poll_write_vectored(cx, bufs)
124+
}
125+
126+
fn is_write_vectored(&self) -> bool {
127+
self.inner.is_write_vectored()
128+
}
129+
}
130+
131+
#[cfg(feature = "runtime-async-std")]
132+
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
133+
fn poll_write(
134+
self: Pin<&mut Self>,
135+
cx: &mut std::task::Context<'_>,
136+
buf: &[u8],
137+
) -> Poll<async_std::io::Result<usize>> {
138+
self.project().inner.as_mut().poll_write(cx, buf)
139+
}
140+
141+
fn poll_flush(
142+
self: Pin<&mut Self>,
143+
cx: &mut std::task::Context<'_>,
144+
) -> Poll<async_std::io::Result<()>> {
145+
self.project().inner.poll_flush(cx)
146+
}
147+
148+
fn poll_close(
149+
self: Pin<&mut Self>,
150+
cx: &mut std::task::Context<'_>,
151+
) -> Poll<async_std::io::Result<()>> {
152+
self.project().inner.poll_close(cx)
153+
}
154+
155+
fn poll_write_vectored(
156+
self: Pin<&mut Self>,
157+
cx: &mut Context<'_>,
158+
bufs: &[IoSlice<'_>],
159+
) -> Poll<async_std::io::Result<usize>> {
160+
self.project().inner.poll_write_vectored(cx, bufs)
161+
}
162+
}
163+
164+
impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
165+
/// Runs `COMPRESS DEFLATE` command.
166+
pub async fn compress<F, S>(self, f: F) -> Result<Session<S>>
167+
where
168+
S: Read + Write + Unpin + fmt::Debug,
169+
F: FnOnce(DeflateStream<T>) -> S,
170+
{
171+
let Self {
172+
mut conn,
173+
unsolicited_responses_tx,
174+
unsolicited_responses,
175+
} = self;
176+
conn.run_command_and_check_ok("COMPRESS DEFLATE", Some(unsolicited_responses_tx.clone()))
177+
.await?;
178+
179+
let stream = conn.into_inner();
180+
let deflate_stream = DeflateStream::new(stream);
181+
let stream = ImapStream::new(f(deflate_stream));
182+
let conn = Connection {
183+
stream,
184+
request_ids: IdGenerator::new(),
185+
};
186+
let session = Session {
187+
conn,
188+
unsolicited_responses_tx,
189+
unsolicited_responses,
190+
};
191+
Ok(session)
192+
}
193+
}

src/extensions/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
//! Implementations of various IMAP extensions.
2+
#[cfg(feature = "compress")]
3+
pub mod compress;
4+
25
pub mod idle;
36

47
pub mod quota;

src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ mod imap_stream;
9595
mod parse;
9696
pub mod types;
9797

98+
#[cfg(feature = "compress")]
99+
pub use crate::extensions::compress::DeflateStream;
100+
98101
pub use crate::authenticator::Authenticator;
99102
pub use crate::client::*;
100103

0 commit comments

Comments
 (0)