Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POSTPONED] fix: Fix panic when sending 30 offline messages #4419

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deltachat-jsonrpc/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
pub use deltachat::accounts::Accounts;
use deltachat::message::get_msg_read_receipts;
use deltachat::qr::Qr;
use deltachat::tools;
use deltachat::{
chat::{
self, add_contact_to_chat, forward_msgs, get_chat_media, get_chat_msgs, get_chat_msgs_ex,
Expand Down Expand Up @@ -152,7 +153,7 @@ impl CommandApi {
impl CommandApi {
/// Test function.
async fn sleep(&self, delay: f64) {
tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await
tools::sleep(std::time::Duration::from_secs_f64(delay)).await
}

// ---------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use deltachat::chat::{self, ChatId};
use deltachat::chatlist::*;
use deltachat::config;
use deltachat::contact::*;
use deltachat::context::*;
use deltachat::message::Message;
use deltachat::stock_str::StockStrings;
use deltachat::{chatlist::*, tools};
use deltachat::{EventType, Events};
use tempfile::tempdir;

Expand Down Expand Up @@ -80,7 +80,7 @@ async fn main() {
}

// wait for the message to be sent out
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tools::sleep(std::time::Duration::from_secs(1)).await;

log::info!("fetching chats..");
let chats = Chatlist::try_load(&ctx, 0, None, None).await.unwrap();
Expand Down
13 changes: 13 additions & 0 deletions python/tests/test_3_offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,19 @@ def test_create_contact(self, acfactory):
contact2 = ac1.create_contact("display1 <x@example.org>", "real")
assert contact2.name == "real"

def test_send_lots_of_offline_msgs(self, acfactory):
ac1 = acfactory.get_pseudo_configured_account()
ac1.set_config("configured_mail_server", "example.org")
ac1.set_config("configured_mail_user", "example.org")
ac1.set_config("configured_mail_pw", "example.org")
ac1.set_config("configured_send_server", "example.org")
ac1.set_config("configured_send_user", "example.org")
ac1.set_config("configured_send_pw", "example.org")
ac1.start_io()
chat = ac1.create_contact("some1@example.org", name="some1").create_chat()
for i in range(50):
chat.send_text("hello")

def test_create_chat_simple(self, acfactory):
ac1 = acfactory.get_pseudo_configured_account()
contact1 = ac1.create_contact("some1@example.org", name="some1")
Expand Down
7 changes: 5 additions & 2 deletions src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,10 @@ impl AccountConfig {
#[cfg(test)]
mod tests {
use super::*;
use crate::stock_str::{self, StockMessage};
use crate::{
stock_str::{self, StockMessage},
tools,
};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_account_new_open() {
Expand Down Expand Up @@ -732,7 +735,7 @@ mod tests {

// Test that event emitter does not return `None` immediately.
let duration = std::time::Duration::from_millis(1);
assert!(tokio::time::timeout(duration, event_emitter.recv())
assert!(tools::timeout(duration, event_emitter.recv())
.await
.is_err());

Expand Down
2 changes: 1 addition & 1 deletion src/authres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ Authentication-Results: dkim=";

// Sleep to make sure key reset is ignored because of DKIM failure
// and not because reordering is suspected.
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

let bob2 = tcm.unconfigured().await;
bob2.configure_addr("bob@example.net").await;
Expand Down
15 changes: 8 additions & 7 deletions src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3798,6 +3798,7 @@ mod tests {
use crate::message::delete_msgs;
use crate::receive_imf::receive_imf;
use crate::test_utils::TestContext;
use crate::tools;
use tokio::fs;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down Expand Up @@ -4107,17 +4108,17 @@ mod tests {

add_contact_to_chat(&alice, alice_chat_id, claire_id).await?;
let add2 = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

add_contact_to_chat(&alice, alice_chat_id, daisy_id).await?;
let add3 = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

assert_eq!(get_chat_contacts(&alice, alice_chat_id).await?.len(), 4);

remove_contact_from_chat(&alice, alice_chat_id, claire_id).await?;
let remove1 = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

remove_contact_from_chat(&alice, alice_chat_id, daisy_id).await?;
let remove2 = alice.pop_sent_msg().await;
Expand Down Expand Up @@ -4154,11 +4155,11 @@ mod tests {

send_text_msg(&alice, alice_chat_id, "populate".to_string()).await?;
let add = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

remove_contact_from_chat(&alice, alice_chat_id, claire_id).await?;
let remove1 = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

remove_contact_from_chat(&alice, alice_chat_id, daisy_id).await?;
let remove2 = alice.pop_sent_msg().await;
Expand Down Expand Up @@ -4752,9 +4753,9 @@ mod tests {
.await
.unwrap()
.chat_id;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
tools::sleep(std::time::Duration::from_millis(1000)).await;
let chat_id2 = t.get_self_chat().await.id;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
tools::sleep(std::time::Duration::from_millis(1000)).await;
let chat_id3 = create_group_chat(&t, ProtectionStatus::Unprotected, "foo")
.await
.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/contact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use regex::Regex;
use rusqlite::OptionalExtension;
use serde::{Deserialize, Serialize};
use tokio::task;
use tokio::time::{timeout, Duration};
use tokio::time::Duration;

use crate::aheader::EncryptPreference;
use crate::chat::ChatId;
Expand All @@ -32,6 +32,7 @@ use crate::mimeparser::AvatarAction;
use crate::param::{Param, Params};
use crate::peerstate::{Peerstate, PeerstateVerifiedStatus};
use crate::sql::{self, params_iter};
use crate::tools::timeout;
use crate::tools::{
duration_to_str, get_abs_path, improve_single_line_input, strip_rtlo_characters, time,
EmailAddress,
Expand Down
2 changes: 1 addition & 1 deletion src/ephemeral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{ensure, Result};
use async_channel::Receiver;
use serde::{Deserialize, Serialize};
use tokio::time::timeout;

use crate::chat::{send_msg, ChatId};
use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH};
Expand All @@ -84,6 +83,7 @@ use crate::message::{Message, MessageState, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::sql::{self, params_iter};
use crate::stock_str;
use crate::tools::timeout;
use crate::tools::{duration_to_str, time};

/// Ephemeral timer value.
Expand Down
3 changes: 2 additions & 1 deletion src/imap/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures_lite::FutureExt;
use super::session::Session;
use super::Imap;
use crate::imap::{client::IMAP_TIMEOUT, FolderMeaning};
use crate::tools;
use crate::{context::Context, scheduler::InterruptInfo};

const IDLE_TIMEOUT: Duration = Duration::from_secs(23 * 60);
Expand Down Expand Up @@ -97,7 +98,7 @@ impl Session {
}
}

let mut session = tokio::time::timeout(Duration::from_secs(15), handle.done())
let mut session = tools::timeout(Duration::from_secs(15), handle.done())
.await
.with_context(|| format!("{folder_name}: IMAP IDLE protocol timed out"))?
.with_context(|| format!("{folder_name}: IMAP IDLE failed"))?;
Expand Down
3 changes: 2 additions & 1 deletion src/imex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ mod tests {
use crate::pgp::{split_armored_data, HEADER_AUTOCRYPT, HEADER_SETUPCODE};
use crate::stock_str::StockMessage;
use crate::test_utils::{alice_keypair, TestContext};
use crate::tools;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_render_setup_file() {
Expand Down Expand Up @@ -963,7 +964,7 @@ mod tests {
// The database is still unconfigured;
// fill the config cache with the old value.
context2.is_configured().await.ok();
tokio::time::sleep(Duration::from_micros(1)).await;
tools::sleep(Duration::from_micros(1)).await;
}

// Assert that the config cache has the new value now.
Expand Down
3 changes: 2 additions & 1 deletion src/imex/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ mod tests {
use crate::chat::{get_chat_msgs, send_msg, ChatItem};
use crate::message::{Message, Viewtype};
use crate::test_utils::TestContextManager;
use crate::tools;

use super::*;

Expand Down Expand Up @@ -629,7 +630,7 @@ mod tests {
get_backup(&ctx1, provider.qr()).await.unwrap();

// Make sure the provider finishes without an error.
tokio::time::timeout(Duration::from_secs(30), provider)
tools::timeout(Duration::from_secs(30), provider)
.await
.expect("timed out")
.expect("error in provider");
Expand Down
2 changes: 1 addition & 1 deletion src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::time::Duration;
use anyhow::{ensure, Context as _, Result};
use async_channel::Receiver;
use quick_xml::events::{BytesEnd, BytesStart, BytesText};
use tokio::time::timeout;

use crate::chat::{self, ChatId};
use crate::contact::ContactId;
Expand All @@ -15,6 +14,7 @@ use crate::events::EventType;
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::stock_str;
use crate::tools::timeout;
use crate::tools::{duration_to_str, time};

/// Location record.
Expand Down
2 changes: 1 addition & 1 deletion src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use std::time::Duration;

use anyhow::{Context as _, Error, Result};
use tokio::net::{lookup_host, TcpStream};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;

use crate::context::Context;
use crate::tools::time;
use crate::tools::timeout;

pub(crate) mod http;
pub(crate) mod session;
Expand Down
10 changes: 5 additions & 5 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use crate::context::Context;
use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::events::EventType;
use crate::imap::{FolderMeaning, Imap};
use crate::job;
use crate::location;
use crate::log::LogExt;
use crate::smtp::{send_smtp_messages, Smtp};
use crate::sql;
use crate::tools::time;
use crate::tools::{duration_to_str, maybe_add_time_based_warnings};
use crate::{job, tools};

pub(crate) mod connectivity;

Expand Down Expand Up @@ -674,7 +674,7 @@ async fn smtp_loop(
"smtp got rate limited, waiting for {} until can send again",
duration_to_str(duration_until_can_send)
);
tokio::time::timeout(duration_until_can_send, async {
tools::timeout(duration_until_can_send, async {
idle_interrupt_receiver.recv().await.unwrap_or_default()
})
.await
Expand All @@ -701,7 +701,7 @@ async fn smtp_loop(
"smtp has messages to retry, planning to retry {} seconds later", timeout
);
let duration = std::time::Duration::from_secs(timeout);
tokio::time::timeout(duration, async {
tools::timeout(duration, async {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Simon-Laux and me assume that this was the offending line:

At

timeout = Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3)))

we multiply the timeout by 3 every time a message can't be sent, whether . So, if the user sends 30 messages, idle_interrupt_receiver receives an interrupt every time, the message fails to be sent every time, and the timeout is multiplied by 3 every time.

idle_interrupt_receiver.recv().await.unwrap_or_default()
})
.await
Expand Down Expand Up @@ -865,12 +865,12 @@ impl Scheduler {
// Actually shutdown tasks.
let timeout_duration = std::time::Duration::from_secs(30);
for b in once(self.inbox).chain(self.oboxes.into_iter()) {
tokio::time::timeout(timeout_duration, b.handle)
tools::timeout(timeout_duration, b.handle)
.await
.log_err(context)
.ok();
}
tokio::time::timeout(timeout_duration, self.smtp_handle)
tools::timeout(timeout_duration, self.smtp_handle)
.await
.log_err(context)
.ok();
Expand Down
6 changes: 3 additions & 3 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::message::{update_msg_state, Message, MessageState, MsgId, Viewtype};
use crate::mimeparser::MimeMessage;
use crate::receive_imf::receive_imf;
use crate::stock_str::StockStrings;
use crate::tools::EmailAddress;
use crate::tools::{self, EmailAddress};

#[allow(non_upper_case_globals)]
pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avatar900x900.png");
Expand Down Expand Up @@ -432,7 +432,7 @@ impl TestContext {
break row;
}
if start.elapsed() < timeout {
tokio::time::sleep(Duration::from_millis(100)).await;
tools::sleep(Duration::from_millis(100)).await;
} else {
return None;
}
Expand Down Expand Up @@ -954,7 +954,7 @@ impl EventTracker {
/// If no matching events are ready this will wait for new events to arrive and time out
/// after 10 seconds.
pub async fn get_matching<F: Fn(&EventType) -> bool>(&self, event_matcher: F) -> EventType {
tokio::time::timeout(Duration::from_secs(10), async move {
tools::timeout(Duration::from_secs(10), async move {
loop {
let event = self.recv().await.unwrap();
if event_matcher(&event.typ) {
Expand Down
19 changes: 18 additions & 1 deletion src/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#![allow(missing_docs)]

use std::borrow::Cow;
use std::cmp::min;
use std::fmt;
use std::io::{Cursor, Write};
use std::mem;
Expand All @@ -14,7 +15,7 @@ use std::time::{Duration, SystemTime};
use anyhow::{bail, Context as _, Result};
use base64::Engine as _;
use chrono::{Local, NaiveDateTime, NaiveTime, TimeZone};
use futures::{StreamExt, TryStreamExt};
use futures::{Future, StreamExt, TryStreamExt};
use mailparse::dateparse;
use mailparse::headers::Headers;
use mailparse::MailHeaderMap;
Expand Down Expand Up @@ -710,6 +711,22 @@ pub(crate) fn strip_rtlo_characters(input_str: &str) -> String {
input_str.replace(|char| RTLO_CHARACTERS.contains(&char), "")
}

// Workaround for https://github.com/tokio-rs/tokio/issues/5183:
// If we sleep for a very long time (like, u64::MAX/10), then tokio panics.
// So, only sleep for up to 30 years.
const THIRTY_YEARS: Duration = Duration::from_secs(60 * 60 * 24 * 365 * 10);

pub fn sleep(duration: Duration) -> tokio::time::Sleep {
tokio::time::sleep(min(THIRTY_YEARS, duration))
}

pub fn timeout<F>(duration: Duration, future: F) -> tokio::time::Timeout<F>
where
F: Future,
{
tokio::time::timeout(min(THIRTY_YEARS, duration), future)
}

#[cfg(test)]
mod tests {
#![allow(clippy::indexing_slicing)]
Expand Down