diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index fca691f29c..0741f12326 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -6,6 +6,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; pub use deltachat::accounts::Accounts; use deltachat::message::get_msg_read_receipts; use deltachat::qr::Qr; +use deltachat::tools; use deltachat::{ chat::{ self, add_contact_to_chat, forward_msgs, get_chat_media, get_chat_msgs, get_chat_msgs_ex, @@ -152,7 +153,7 @@ impl CommandApi { impl CommandApi { /// Test function. async fn sleep(&self, delay: f64) { - tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await + tools::sleep(std::time::Duration::from_secs_f64(delay)).await } // --------------------------------------------- diff --git a/examples/simple.rs b/examples/simple.rs index cca50bb279..0310eab5e8 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,10 +1,10 @@ use deltachat::chat::{self, ChatId}; -use deltachat::chatlist::*; use deltachat::config; use deltachat::contact::*; use deltachat::context::*; use deltachat::message::Message; use deltachat::stock_str::StockStrings; +use deltachat::{chatlist::*, tools}; use deltachat::{EventType, Events}; use tempfile::tempdir; @@ -80,7 +80,7 @@ async fn main() { } // wait for the message to be sent out - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tools::sleep(std::time::Duration::from_secs(1)).await; log::info!("fetching chats.."); let chats = Chatlist::try_load(&ctx, 0, None, None).await.unwrap(); diff --git a/python/tests/test_3_offline.py b/python/tests/test_3_offline.py index e9a2976815..2eefa9e3c7 100644 --- a/python/tests/test_3_offline.py +++ b/python/tests/test_3_offline.py @@ -481,6 +481,19 @@ def test_create_contact(self, acfactory): contact2 = ac1.create_contact("display1 ", "real") assert contact2.name == "real" + def test_send_lots_of_offline_msgs(self, acfactory): + ac1 = acfactory.get_pseudo_configured_account() + ac1.set_config("configured_mail_server", "example.org") + ac1.set_config("configured_mail_user", "example.org") + ac1.set_config("configured_mail_pw", "example.org") + ac1.set_config("configured_send_server", "example.org") + ac1.set_config("configured_send_user", "example.org") + ac1.set_config("configured_send_pw", "example.org") + ac1.start_io() + chat = ac1.create_contact("some1@example.org", name="some1").create_chat() + for i in range(50): + chat.send_text("hello") + def test_create_chat_simple(self, acfactory): ac1 = acfactory.get_pseudo_configured_account() contact1 = ac1.create_contact("some1@example.org", name="some1") diff --git a/src/accounts.rs b/src/accounts.rs index 9c214b371e..8f2b6631da 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -510,7 +510,10 @@ impl AccountConfig { #[cfg(test)] mod tests { use super::*; - use crate::stock_str::{self, StockMessage}; + use crate::{ + stock_str::{self, StockMessage}, + tools, + }; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_account_new_open() { @@ -732,7 +735,7 @@ mod tests { // Test that event emitter does not return `None` immediately. let duration = std::time::Duration::from_millis(1); - assert!(tokio::time::timeout(duration, event_emitter.recv()) + assert!(tools::timeout(duration, event_emitter.recv()) .await .is_err()); diff --git a/src/authres.rs b/src/authres.rs index cc684e544e..67b7c0bf05 100644 --- a/src/authres.rs +++ b/src/authres.rs @@ -669,7 +669,7 @@ Authentication-Results: dkim="; // Sleep to make sure key reset is ignored because of DKIM failure // and not because reordering is suspected. - tokio::time::sleep(std::time::Duration::from_millis(1100)).await; + tools::sleep(std::time::Duration::from_millis(1100)).await; let bob2 = tcm.unconfigured().await; bob2.configure_addr("bob@example.net").await; diff --git a/src/chat.rs b/src/chat.rs index d5649b0056..9800094bc2 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -3798,6 +3798,7 @@ mod tests { use crate::message::delete_msgs; use crate::receive_imf::receive_imf; use crate::test_utils::TestContext; + use crate::tools; use tokio::fs; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -4107,17 +4108,17 @@ mod tests { add_contact_to_chat(&alice, alice_chat_id, claire_id).await?; let add2 = alice.pop_sent_msg().await; - tokio::time::sleep(std::time::Duration::from_millis(1100)).await; + tools::sleep(std::time::Duration::from_millis(1100)).await; add_contact_to_chat(&alice, alice_chat_id, daisy_id).await?; let add3 = alice.pop_sent_msg().await; - tokio::time::sleep(std::time::Duration::from_millis(1100)).await; + tools::sleep(std::time::Duration::from_millis(1100)).await; assert_eq!(get_chat_contacts(&alice, alice_chat_id).await?.len(), 4); remove_contact_from_chat(&alice, alice_chat_id, claire_id).await?; let remove1 = alice.pop_sent_msg().await; - tokio::time::sleep(std::time::Duration::from_millis(1100)).await; + tools::sleep(std::time::Duration::from_millis(1100)).await; remove_contact_from_chat(&alice, alice_chat_id, daisy_id).await?; let remove2 = alice.pop_sent_msg().await; @@ -4154,11 +4155,11 @@ mod tests { send_text_msg(&alice, alice_chat_id, "populate".to_string()).await?; let add = alice.pop_sent_msg().await; - tokio::time::sleep(std::time::Duration::from_millis(1100)).await; + tools::sleep(std::time::Duration::from_millis(1100)).await; remove_contact_from_chat(&alice, alice_chat_id, claire_id).await?; let remove1 = alice.pop_sent_msg().await; - tokio::time::sleep(std::time::Duration::from_millis(1100)).await; + tools::sleep(std::time::Duration::from_millis(1100)).await; remove_contact_from_chat(&alice, alice_chat_id, daisy_id).await?; let remove2 = alice.pop_sent_msg().await; @@ -4752,9 +4753,9 @@ mod tests { .await .unwrap() .chat_id; - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + tools::sleep(std::time::Duration::from_millis(1000)).await; let chat_id2 = t.get_self_chat().await.id; - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + tools::sleep(std::time::Duration::from_millis(1000)).await; let chat_id3 = create_group_chat(&t, ProtectionStatus::Unprotected, "foo") .await .unwrap(); diff --git a/src/contact.rs b/src/contact.rs index 78269f0035..735214c4a7 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -16,7 +16,7 @@ use regex::Regex; use rusqlite::OptionalExtension; use serde::{Deserialize, Serialize}; use tokio::task; -use tokio::time::{timeout, Duration}; +use tokio::time::Duration; use crate::aheader::EncryptPreference; use crate::chat::ChatId; @@ -32,6 +32,7 @@ use crate::mimeparser::AvatarAction; use crate::param::{Param, Params}; use crate::peerstate::{Peerstate, PeerstateVerifiedStatus}; use crate::sql::{self, params_iter}; +use crate::tools::timeout; use crate::tools::{ duration_to_str, get_abs_path, improve_single_line_input, strip_rtlo_characters, time, EmailAddress, diff --git a/src/ephemeral.rs b/src/ephemeral.rs index 22c3b6303f..5d6bbd66da 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -71,7 +71,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::{ensure, Result}; use async_channel::Receiver; use serde::{Deserialize, Serialize}; -use tokio::time::timeout; use crate::chat::{send_msg, ChatId}; use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH}; @@ -84,6 +83,7 @@ use crate::message::{Message, MessageState, MsgId, Viewtype}; use crate::mimeparser::SystemMessage; use crate::sql::{self, params_iter}; use crate::stock_str; +use crate::tools::timeout; use crate::tools::{duration_to_str, time}; /// Ephemeral timer value. diff --git a/src/imap/idle.rs b/src/imap/idle.rs index fbce499b76..0ad85027d9 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -8,6 +8,7 @@ use futures_lite::FutureExt; use super::session::Session; use super::Imap; use crate::imap::{client::IMAP_TIMEOUT, FolderMeaning}; +use crate::tools; use crate::{context::Context, scheduler::InterruptInfo}; const IDLE_TIMEOUT: Duration = Duration::from_secs(23 * 60); @@ -97,7 +98,7 @@ impl Session { } } - let mut session = tokio::time::timeout(Duration::from_secs(15), handle.done()) + let mut session = tools::timeout(Duration::from_secs(15), handle.done()) .await .with_context(|| format!("{folder_name}: IMAP IDLE protocol timed out"))? .with_context(|| format!("{folder_name}: IMAP IDLE failed"))?; diff --git a/src/imex.rs b/src/imex.rs index cfb6d76cf4..dd2986fbb6 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -790,6 +790,7 @@ mod tests { use crate::pgp::{split_armored_data, HEADER_AUTOCRYPT, HEADER_SETUPCODE}; use crate::stock_str::StockMessage; use crate::test_utils::{alice_keypair, TestContext}; + use crate::tools; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_render_setup_file() { @@ -963,7 +964,7 @@ mod tests { // The database is still unconfigured; // fill the config cache with the old value. context2.is_configured().await.ok(); - tokio::time::sleep(Duration::from_micros(1)).await; + tools::sleep(Duration::from_micros(1)).await; } // Assert that the config cache has the new value now. diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index e3433e3987..04e36b9c16 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -598,6 +598,7 @@ mod tests { use crate::chat::{get_chat_msgs, send_msg, ChatItem}; use crate::message::{Message, Viewtype}; use crate::test_utils::TestContextManager; + use crate::tools; use super::*; @@ -629,7 +630,7 @@ mod tests { get_backup(&ctx1, provider.qr()).await.unwrap(); // Make sure the provider finishes without an error. - tokio::time::timeout(Duration::from_secs(30), provider) + tools::timeout(Duration::from_secs(30), provider) .await .expect("timed out") .expect("error in provider"); diff --git a/src/location.rs b/src/location.rs index cae938cf87..87ddc5854e 100644 --- a/src/location.rs +++ b/src/location.rs @@ -6,7 +6,6 @@ use std::time::Duration; use anyhow::{ensure, Context as _, Result}; use async_channel::Receiver; use quick_xml::events::{BytesEnd, BytesStart, BytesText}; -use tokio::time::timeout; use crate::chat::{self, ChatId}; use crate::contact::ContactId; @@ -15,6 +14,7 @@ use crate::events::EventType; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::SystemMessage; use crate::stock_str; +use crate::tools::timeout; use crate::tools::{duration_to_str, time}; /// Location record. diff --git a/src/net.rs b/src/net.rs index c4b0f3686a..8e54c555b2 100644 --- a/src/net.rs +++ b/src/net.rs @@ -6,11 +6,11 @@ use std::time::Duration; use anyhow::{Context as _, Error, Result}; use tokio::net::{lookup_host, TcpStream}; -use tokio::time::timeout; use tokio_io_timeout::TimeoutStream; use crate::context::Context; use crate::tools::time; +use crate::tools::timeout; pub(crate) mod http; pub(crate) mod session; diff --git a/src/scheduler.rs b/src/scheduler.rs index 7962227025..bebc988143 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -16,13 +16,13 @@ use crate::context::Context; use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::events::EventType; use crate::imap::{FolderMeaning, Imap}; -use crate::job; use crate::location; use crate::log::LogExt; use crate::smtp::{send_smtp_messages, Smtp}; use crate::sql; use crate::tools::time; use crate::tools::{duration_to_str, maybe_add_time_based_warnings}; +use crate::{job, tools}; pub(crate) mod connectivity; @@ -674,7 +674,7 @@ async fn smtp_loop( "smtp got rate limited, waiting for {} until can send again", duration_to_str(duration_until_can_send) ); - tokio::time::timeout(duration_until_can_send, async { + tools::timeout(duration_until_can_send, async { idle_interrupt_receiver.recv().await.unwrap_or_default() }) .await @@ -701,7 +701,7 @@ async fn smtp_loop( "smtp has messages to retry, planning to retry {} seconds later", timeout ); let duration = std::time::Duration::from_secs(timeout); - tokio::time::timeout(duration, async { + tools::timeout(duration, async { idle_interrupt_receiver.recv().await.unwrap_or_default() }) .await @@ -865,12 +865,12 @@ impl Scheduler { // Actually shutdown tasks. let timeout_duration = std::time::Duration::from_secs(30); for b in once(self.inbox).chain(self.oboxes.into_iter()) { - tokio::time::timeout(timeout_duration, b.handle) + tools::timeout(timeout_duration, b.handle) .await .log_err(context) .ok(); } - tokio::time::timeout(timeout_duration, self.smtp_handle) + tools::timeout(timeout_duration, self.smtp_handle) .await .log_err(context) .ok(); diff --git a/src/test_utils.rs b/src/test_utils.rs index d6278e206a..bf8faebe20 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -37,7 +37,7 @@ use crate::message::{update_msg_state, Message, MessageState, MsgId, Viewtype}; use crate::mimeparser::MimeMessage; use crate::receive_imf::receive_imf; use crate::stock_str::StockStrings; -use crate::tools::EmailAddress; +use crate::tools::{self, EmailAddress}; #[allow(non_upper_case_globals)] pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avatar900x900.png"); @@ -432,7 +432,7 @@ impl TestContext { break row; } if start.elapsed() < timeout { - tokio::time::sleep(Duration::from_millis(100)).await; + tools::sleep(Duration::from_millis(100)).await; } else { return None; } @@ -954,7 +954,7 @@ impl EventTracker { /// If no matching events are ready this will wait for new events to arrive and time out /// after 10 seconds. pub async fn get_matching bool>(&self, event_matcher: F) -> EventType { - tokio::time::timeout(Duration::from_secs(10), async move { + tools::timeout(Duration::from_secs(10), async move { loop { let event = self.recv().await.unwrap(); if event_matcher(&event.typ) { diff --git a/src/tools.rs b/src/tools.rs index 23b66c9349..a9b9db9bf7 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -4,6 +4,7 @@ #![allow(missing_docs)] use std::borrow::Cow; +use std::cmp::min; use std::fmt; use std::io::{Cursor, Write}; use std::mem; @@ -14,7 +15,7 @@ use std::time::{Duration, SystemTime}; use anyhow::{bail, Context as _, Result}; use base64::Engine as _; use chrono::{Local, NaiveDateTime, NaiveTime, TimeZone}; -use futures::{StreamExt, TryStreamExt}; +use futures::{Future, StreamExt, TryStreamExt}; use mailparse::dateparse; use mailparse::headers::Headers; use mailparse::MailHeaderMap; @@ -710,6 +711,22 @@ pub(crate) fn strip_rtlo_characters(input_str: &str) -> String { input_str.replace(|char| RTLO_CHARACTERS.contains(&char), "") } +// Workaround for https://github.com/tokio-rs/tokio/issues/5183: +// If we sleep for a very long time (like, u64::MAX/10), then tokio panics. +// So, only sleep for up to 30 years. +const THIRTY_YEARS: Duration = Duration::from_secs(60 * 60 * 24 * 365 * 10); + +pub fn sleep(duration: Duration) -> tokio::time::Sleep { + tokio::time::sleep(min(THIRTY_YEARS, duration)) +} + +pub fn timeout(duration: Duration, future: F) -> tokio::time::Timeout +where + F: Future, +{ + tokio::time::timeout(min(THIRTY_YEARS, duration), future) +} + #[cfg(test)] mod tests { #![allow(clippy::indexing_slicing)]