Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass IDLE responses to caller. #186

Merged
merged 23 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
529401a
Implement a method to pass unilateral responses while IDLE.
mordak Mar 14, 2021
2874bfd
Add IDLE example.
mordak Mar 28, 2021
c9b7c0a
Update src/extensions/idle.rs
mordak Apr 5, 2021
5e3f087
Merge remote-tracking branch 'origin/master' into idle-responses
mordak Apr 5, 2021
bbff7d4
Remove deprecated wait_timeout()
mordak Apr 5, 2021
bb38142
Change callback_stop to stop_on_any.
mordak Apr 5, 2021
e8a7c91
Comment example where we turn on debugging.
mordak Apr 5, 2021
b8bd1e4
Reorder UnsolicitedResponse alphabetically so it is easier to follow.
mordak Apr 5, 2021
e1db863
Add helper function to transform a vec of flag strings into a vec of …
mordak Apr 5, 2021
064c2e0
Use drain() instead of reallocating.
mordak Apr 5, 2021
ff39ebf
Merge branch 'idle-responses' of github.com:mordak/rust-imap into idl…
mordak Apr 5, 2021
9126d3c
Improve documentation around unhandled responses.
mordak Apr 7, 2021
11adcfc
Tweak to how we handle incomplete parse.
mordak Apr 7, 2021
5942553
Use iterators for Flag::from_strs()
mordak Apr 7, 2021
7eb2cfd
Use bool instead of CallbackAction.
mordak Apr 7, 2021
efa02f0
Remove wrapper around ResponseCode.
mordak Apr 8, 2021
f2d7919
Do not wrap AttributeValue.
mordak Apr 8, 2021
584c954
Reorder variants alphabetically in try_from.
mordak Apr 8, 2021
692dcdd
Move buffer management into parse match arms.
mordak Apr 8, 2021
4232c77
wait to wait_while
mordak Apr 8, 2021
08de336
Move debug assertion.
mordak Apr 9, 2021
1cabb3b
Promote Unexpected error from ParseError to Error.
mordak Apr 20, 2021
48db461
Merge remote-tracking branch 'origin/master' into idle-responses
mordak Apr 20, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ lazy_static = "1.4"
lettre = "0.9"
lettre_email = "0.9"
rustls-connector = "0.13.0"
structopt = "0.3"

[[example]]
name = "basic"
Expand All @@ -40,6 +41,10 @@ required-features = ["default"]
name = "gmail_oauth2"
required-features = ["default"]

[[example]]
name = "idle"
required-features = ["default"]

[[test]]
name = "imap_integration"
required-features = ["default"]
83 changes: 83 additions & 0 deletions examples/idle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use imap::extensions::idle;
use native_tls::TlsConnector;
use structopt::StructOpt;

#[derive(StructOpt, Debug)]
#[structopt(name = "idle")]
struct Opt {
// The server name to connect to
#[structopt(short, long)]
server: String,

// The port to use
#[structopt(short, long, default_value = "993")]
port: u16,

// The account username
#[structopt(short, long)]
username: String,

// The account password. In a production system passwords
// would normally be in a config or fetched at runtime from
// a password manager or user prompt and not passed on the
// command line.
#[structopt(short = "w", long)]
password: String,

// The mailbox to IDLE on
#[structopt(short, long, default_value = "INBOX")]
mailbox: String,

#[structopt(
short = "x",
long,
help = "The number of responses to receive before exiting",
default_value = "5"
)]
max_responses: usize,
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
}

fn main() {
let opt = Opt::from_args();

let ssl_conn = TlsConnector::builder().build().unwrap();
let client = imap::connect((opt.server.clone(), opt.port), opt.server, &ssl_conn)
.expect("Could not connect to imap server");
let mut imap = client
.login(opt.username, opt.password)
.expect("Could not authenticate");

// Turn on debug output so we can see the actual traffic coming
// from the server and how it is handled in our callback.
// This wouldn't be turned on in a production build, but is helpful
// in examples and for debugging.
imap.debug = true;
jonhoo marked this conversation as resolved.
Show resolved Hide resolved

imap.select(opt.mailbox).expect("Could not select mailbox");

let idle = imap.idle().expect("Could not IDLE");
jonhoo marked this conversation as resolved.
Show resolved Hide resolved

// Implement a trivial counter that causes the IDLE callback to end the IDLE
// after a fixed number of responses.
//
// A threaded client could use channels or shared data to interact with the
// rest of the program and update mailbox state, decide to exit the IDLE, etc.
let mut num_responses = 0;
let max_responses = opt.max_responses;
let idle_result = idle.wait_keepalive(|response| {
num_responses += 1;
println!("IDLE response #{}: {:?}", num_responses, response);
if num_responses >= max_responses {
idle::CallbackAction::Stop
} else {
idle::CallbackAction::Continue
}
});

match idle_result {
Ok(()) => println!("IDLE finished normally"),
Err(e) => println!("IDLE finished with error {:?}", e),
}

imap.logout().expect("Could not log out");
}
154 changes: 121 additions & 33 deletions src/extensions/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use crate::client::Session;
use crate::error::{Error, Result};
use crate::parse::parse_idle;
use crate::types::UnsolicitedResponse;
#[cfg(feature = "tls")]
use native_tls::TlsStream;
use std::io::{self, Read, Write};
Expand All @@ -13,8 +15,31 @@ use std::time::Duration;
///
/// The handle blocks using the [`IDLE` command](https://tools.ietf.org/html/rfc2177#section-3)
/// specificed in [RFC 2177](https://tools.ietf.org/html/rfc2177) until the underlying server state
/// changes in some way. While idling does inform the client what changes happened on the server,
/// this implementation will currently just block until _anything_ changes, and then notify the
/// changes in some way.
///
/// Each of the `wait` functions takes a callback function which receives any responses
/// that arrive on the channel while IDLE. The callback function implements whatever
/// logic is needed to handle the IDLE response, and then returns a [`CallbackAction`]
/// to `Continue` or `Stop` listening on the channel.
/// For users that want the IDLE to exit on any change (the behavior proior to version 3.0),
/// a convenience callback function [`stop_on_any`] is provided.
///
/// ```no_run
/// # use native_tls::TlsConnector;
/// use imap::extensions::idle;
/// let ssl_conn = TlsConnector::builder().build().unwrap();
/// let client = imap::connect(("example.com", 993), "example.com", &ssl_conn)
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
/// .expect("Could not connect to imap server");
/// let mut imap = client.login("user@example.com", "password")
/// .expect("Could not authenticate");
/// imap.select("INBOX")
/// .expect("Could not select mailbox");
///
/// let idle = imap.idle().expect("Could not IDLE");
///
/// // Exit on any mailbox change
/// let result = idle.wait_keepalive(idle::stop_on_any);
/// ```
///
/// Note that the server MAY consider a client inactive if it has an IDLE command running, and if
/// such a server has an inactivity timeout it MAY log the client off implicitly at the end of its
Expand All @@ -40,6 +65,21 @@ pub enum WaitOutcome {
MailboxChanged,
}

/// Return type for IDLE response callbacks. Tells the IDLE connection
/// if it should continue monitoring the connection or not.
#[derive(Debug, PartialEq, Eq)]
pub enum CallbackAction {
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
/// Continue receiving responses from the IDLE connection.
Continue,
/// Stop receiving responses, and exit the IDLE wait.
Stop,
}

/// A convenience function to always cause the IDLE handler to exit on any change.
pub fn stop_on_any(_response: UnsolicitedResponse) -> CallbackAction {
CallbackAction::Stop
}

/// Must be implemented for a transport in order for a `Session` using that transport to support
/// operations with timeouts.
///
Expand Down Expand Up @@ -100,37 +140,74 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
/// Internal helper that doesn't consume self.
///
/// This is necessary so that we can keep using the inner `Session` in `wait_keepalive`.
fn wait_inner(&mut self, reconnect: bool) -> Result<WaitOutcome> {
fn wait_inner<F>(&mut self, reconnect: bool, mut callback: F) -> Result<WaitOutcome>
where
F: FnMut(UnsolicitedResponse) -> CallbackAction,
{
let mut v = Vec::new();
loop {
let result = match self.session.readline(&mut v).map(|_| ()) {
let result = loop {
let rest = match self.session.readline(&mut v) {
Err(Error::Io(ref e))
if e.kind() == io::ErrorKind::TimedOut
|| e.kind() == io::ErrorKind::WouldBlock =>
{
if reconnect {
self.terminate()?;
self.init()?;
return self.wait_inner(reconnect);
break Ok(WaitOutcome::TimedOut);
}
Ok(_len) => {
// Handle Dovecot's imap_idle_notify_interval message
if v.eq_ignore_ascii_case(b"* OK Still here\r\n") {
v.clear();
continue;
}
match parse_idle(&v) {
(_rest, Some(Err(r))) => break Err(r),
(rest, Some(Ok(response))) => {
if let CallbackAction::Stop = callback(response) {
break Ok(WaitOutcome::MailboxChanged);
}
rest
}
(rest, None) => rest,
}
Ok(WaitOutcome::TimedOut)
}
Ok(()) => Ok(WaitOutcome::MailboxChanged),
Err(r) => Err(r),
}?;
Err(r) => break Err(r),
};

// Handle Dovecot's imap_idle_notify_interval message
if v.eq_ignore_ascii_case(b"* OK Still here\r\n") {
// Update remaining data with unparsed data if needed.
if rest.is_empty() {
v.clear();
} else {
break Ok(result);
// Assert on partial parse in debug builds - we expect to always parse all
// or none of the input buffer. On release builds, we still do the right thing.
debug_assert!(
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
rest.len() != v.len(),
"Unexpected partial parse: input: {:?}, output: {:?}",
v,
rest
);
let used = v.len() - rest.len();
v.drain(0..used);
}
};

// Reconnect on timeout if needed
match (reconnect, result) {
(true, Ok(WaitOutcome::TimedOut)) => {
self.terminate()?;
self.init()?;
self.wait_inner(reconnect, callback)
}
(_, result) => result,
}
}

/// Block until the selected mailbox changes.
pub fn wait(mut self) -> Result<()> {
self.wait_inner(true).map(|_| ())
/// Block until the given callback returns `Stop`, or until a response
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
pub fn wait<F>(mut self, callback: F) -> Result<()>
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
where
F: FnMut(UnsolicitedResponse) -> CallbackAction,
{
self.wait_inner(true, callback).map(|_| ())
}
}

Expand All @@ -142,15 +219,19 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
self.keepalive = interval;
}

/// Block until the selected mailbox changes.
/// Block until the given callback returns `Stop`, or until a response
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
///
/// This method differs from [`Handle::wait`] in that it will periodically refresh the IDLE
/// connection, to prevent the server from timing out our connection. The keepalive interval is
/// set to 29 minutes by default, as dictated by RFC 2177, but can be changed using
/// [`Handle::set_keepalive`].
///
/// This is the recommended method to use for waiting.
pub fn wait_keepalive(self) -> Result<()> {
pub fn wait_keepalive<F>(self, callback: F) -> Result<()>
where
F: FnMut(UnsolicitedResponse) -> CallbackAction,
{
// The server MAY consider a client inactive if it has an IDLE command
// running, and if such a server has an inactivity timeout it MAY log
// the client off implicitly at the end of its timeout period. Because
Expand All @@ -159,26 +240,33 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
// This still allows a client to receive immediate mailbox updates even
// though it need only "poll" at half hour intervals.
let keepalive = self.keepalive;
self.timed_wait(keepalive, true).map(|_| ())
}

/// Block until the selected mailbox changes, or until the given amount of time has expired.
#[deprecated(note = "use wait_with_timeout instead")]
pub fn wait_timeout(self, timeout: Duration) -> Result<()> {
self.wait_with_timeout(timeout).map(|_| ())
self.timed_wait(keepalive, true, callback).map(|_| ())
}

/// Block until the selected mailbox changes, or until the given amount of time has expired.
pub fn wait_with_timeout(self, timeout: Duration) -> Result<WaitOutcome> {
self.timed_wait(timeout, false)
/// Block until the given given amount of time has elapsed, the given callback
/// returns `Stop`, or until a response arrives that is not explicitly handled
/// by [`UnsolicitedResponse`].
pub fn wait_with_timeout<F>(self, timeout: Duration, callback: F) -> Result<WaitOutcome>
where
F: FnMut(UnsolicitedResponse) -> CallbackAction,
{
self.timed_wait(timeout, false, callback)
}

fn timed_wait(mut self, timeout: Duration, reconnect: bool) -> Result<WaitOutcome> {
fn timed_wait<F>(
mut self,
timeout: Duration,
reconnect: bool,
callback: F,
) -> Result<WaitOutcome>
where
F: FnMut(UnsolicitedResponse) -> CallbackAction,
{
self.session
.stream
.get_mut()
.set_read_timeout(Some(timeout))?;
let res = self.wait_inner(reconnect);
let res = self.wait_inner(reconnect, callback);
let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok();
res
}
Expand Down
Loading