Skip to content

Commit ebfe917

Browse files
committed
Fix tokio panic
This might not be needed after all because of tokio-rs/tokio#5710, but since I already wrote the code, I created a PR anyway.
1 parent 5b435d1 commit ebfe917

16 files changed

+65
-29
lines changed

deltachat-jsonrpc/src/api/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
66
pub use deltachat::accounts::Accounts;
77
use deltachat::message::get_msg_read_receipts;
88
use deltachat::qr::Qr;
9+
use deltachat::tools;
910
use deltachat::{
1011
chat::{
1112
self, add_contact_to_chat, forward_msgs, get_chat_media, get_chat_msgs, get_chat_msgs_ex,
@@ -152,7 +153,7 @@ impl CommandApi {
152153
impl CommandApi {
153154
/// Test function.
154155
async fn sleep(&self, delay: f64) {
155-
tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await
156+
tools::sleep(std::time::Duration::from_secs_f64(delay)).await
156157
}
157158

158159
// ---------------------------------------------

examples/simple.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use deltachat::chat::{self, ChatId};
2-
use deltachat::chatlist::*;
32
use deltachat::config;
43
use deltachat::contact::*;
54
use deltachat::context::*;
65
use deltachat::message::Message;
76
use deltachat::stock_str::StockStrings;
7+
use deltachat::{chatlist::*, tools};
88
use deltachat::{EventType, Events};
99
use tempfile::tempdir;
1010

@@ -80,7 +80,7 @@ async fn main() {
8080
}
8181

8282
// wait for the message to be sent out
83-
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
83+
tools::sleep(std::time::Duration::from_secs(1)).await;
8484

8585
log::info!("fetching chats..");
8686
let chats = Chatlist::try_load(&ctx, 0, None, None).await.unwrap();

python/tests/test_3_offline.py

+13
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,19 @@ def test_create_contact(self, acfactory):
481481
contact2 = ac1.create_contact("display1 <x@example.org>", "real")
482482
assert contact2.name == "real"
483483

484+
def test_send_lots_of_offline_msgs(self, acfactory):
485+
ac1 = acfactory.get_pseudo_configured_account()
486+
ac1.set_config("configured_mail_server", "example.org")
487+
ac1.set_config("configured_mail_user", "example.org")
488+
ac1.set_config("configured_mail_pw", "example.org")
489+
ac1.set_config("configured_send_server", "example.org")
490+
ac1.set_config("configured_send_user", "example.org")
491+
ac1.set_config("configured_send_pw", "example.org")
492+
ac1.start_io()
493+
chat = ac1.create_contact("some1@example.org", name="some1").create_chat()
494+
for i in range(50):
495+
chat.send_text("hello")
496+
484497
def test_create_chat_simple(self, acfactory):
485498
ac1 = acfactory.get_pseudo_configured_account()
486499
contact1 = ac1.create_contact("some1@example.org", name="some1")

src/accounts.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,10 @@ impl AccountConfig {
510510
#[cfg(test)]
511511
mod tests {
512512
use super::*;
513-
use crate::stock_str::{self, StockMessage};
513+
use crate::{
514+
stock_str::{self, StockMessage},
515+
tools,
516+
};
514517

515518
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
516519
async fn test_account_new_open() {
@@ -732,7 +735,7 @@ mod tests {
732735

733736
// Test that event emitter does not return `None` immediately.
734737
let duration = std::time::Duration::from_millis(1);
735-
assert!(tokio::time::timeout(duration, event_emitter.recv())
738+
assert!(tools::timeout(duration, event_emitter.recv())
736739
.await
737740
.is_err());
738741

src/authres.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ Authentication-Results: dkim=";
669669

670670
// Sleep to make sure key reset is ignored because of DKIM failure
671671
// and not because reordering is suspected.
672-
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
672+
tools::sleep(std::time::Duration::from_millis(1100)).await;
673673

674674
let bob2 = tcm.unconfigured().await;
675675
bob2.configure_addr("bob@example.net").await;

src/chat.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -3798,6 +3798,7 @@ mod tests {
37983798
use crate::message::delete_msgs;
37993799
use crate::receive_imf::receive_imf;
38003800
use crate::test_utils::TestContext;
3801+
use crate::tools;
38013802
use tokio::fs;
38023803

38033804
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -4107,17 +4108,17 @@ mod tests {
41074108

41084109
add_contact_to_chat(&alice, alice_chat_id, claire_id).await?;
41094110
let add2 = alice.pop_sent_msg().await;
4110-
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
4111+
tools::sleep(std::time::Duration::from_millis(1100)).await;
41114112

41124113
add_contact_to_chat(&alice, alice_chat_id, daisy_id).await?;
41134114
let add3 = alice.pop_sent_msg().await;
4114-
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
4115+
tools::sleep(std::time::Duration::from_millis(1100)).await;
41154116

41164117
assert_eq!(get_chat_contacts(&alice, alice_chat_id).await?.len(), 4);
41174118

41184119
remove_contact_from_chat(&alice, alice_chat_id, claire_id).await?;
41194120
let remove1 = alice.pop_sent_msg().await;
4120-
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
4121+
tools::sleep(std::time::Duration::from_millis(1100)).await;
41214122

41224123
remove_contact_from_chat(&alice, alice_chat_id, daisy_id).await?;
41234124
let remove2 = alice.pop_sent_msg().await;
@@ -4154,11 +4155,11 @@ mod tests {
41544155

41554156
send_text_msg(&alice, alice_chat_id, "populate".to_string()).await?;
41564157
let add = alice.pop_sent_msg().await;
4157-
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
4158+
tools::sleep(std::time::Duration::from_millis(1100)).await;
41584159

41594160
remove_contact_from_chat(&alice, alice_chat_id, claire_id).await?;
41604161
let remove1 = alice.pop_sent_msg().await;
4161-
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
4162+
tools::sleep(std::time::Duration::from_millis(1100)).await;
41624163

41634164
remove_contact_from_chat(&alice, alice_chat_id, daisy_id).await?;
41644165
let remove2 = alice.pop_sent_msg().await;
@@ -4752,9 +4753,9 @@ mod tests {
47524753
.await
47534754
.unwrap()
47544755
.chat_id;
4755-
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
4756+
tools::sleep(std::time::Duration::from_millis(1000)).await;
47564757
let chat_id2 = t.get_self_chat().await.id;
4757-
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
4758+
tools::sleep(std::time::Duration::from_millis(1000)).await;
47584759
let chat_id3 = create_group_chat(&t, ProtectionStatus::Unprotected, "foo")
47594760
.await
47604761
.unwrap();

src/contact.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use regex::Regex;
1616
use rusqlite::OptionalExtension;
1717
use serde::{Deserialize, Serialize};
1818
use tokio::task;
19-
use tokio::time::{timeout, Duration};
19+
use tokio::time::Duration;
2020

2121
use crate::aheader::EncryptPreference;
2222
use crate::chat::ChatId;
@@ -32,6 +32,7 @@ use crate::mimeparser::AvatarAction;
3232
use crate::param::{Param, Params};
3333
use crate::peerstate::{Peerstate, PeerstateVerifiedStatus};
3434
use crate::sql::{self, params_iter};
35+
use crate::tools::timeout;
3536
use crate::tools::{
3637
duration_to_str, get_abs_path, improve_single_line_input, strip_rtlo_characters, time,
3738
EmailAddress,

src/ephemeral.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
7171
use anyhow::{ensure, Result};
7272
use async_channel::Receiver;
7373
use serde::{Deserialize, Serialize};
74-
use tokio::time::timeout;
7574

7675
use crate::chat::{send_msg, ChatId};
7776
use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH};
@@ -84,6 +83,7 @@ use crate::message::{Message, MessageState, MsgId, Viewtype};
8483
use crate::mimeparser::SystemMessage;
8584
use crate::sql::{self, params_iter};
8685
use crate::stock_str;
86+
use crate::tools::timeout;
8787
use crate::tools::{duration_to_str, time};
8888

8989
/// Ephemeral timer value.

src/imap/idle.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use futures_lite::FutureExt;
88
use super::session::Session;
99
use super::Imap;
1010
use crate::imap::{client::IMAP_TIMEOUT, FolderMeaning};
11+
use crate::tools;
1112
use crate::{context::Context, scheduler::InterruptInfo};
1213

1314
const IDLE_TIMEOUT: Duration = Duration::from_secs(23 * 60);
@@ -97,7 +98,7 @@ impl Session {
9798
}
9899
}
99100

100-
let mut session = tokio::time::timeout(Duration::from_secs(15), handle.done())
101+
let mut session = tools::timeout(Duration::from_secs(15), handle.done())
101102
.await
102103
.with_context(|| format!("{folder_name}: IMAP IDLE protocol timed out"))?
103104
.with_context(|| format!("{folder_name}: IMAP IDLE failed"))?;

src/imex.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,7 @@ mod tests {
790790
use crate::pgp::{split_armored_data, HEADER_AUTOCRYPT, HEADER_SETUPCODE};
791791
use crate::stock_str::StockMessage;
792792
use crate::test_utils::{alice_keypair, TestContext};
793+
use crate::tools;
793794

794795
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
795796
async fn test_render_setup_file() {
@@ -963,7 +964,7 @@ mod tests {
963964
// The database is still unconfigured;
964965
// fill the config cache with the old value.
965966
context2.is_configured().await.ok();
966-
tokio::time::sleep(Duration::from_micros(1)).await;
967+
tools::sleep(Duration::from_micros(1)).await;
967968
}
968969

969970
// Assert that the config cache has the new value now.

src/imex/transfer.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,7 @@ mod tests {
598598
use crate::chat::{get_chat_msgs, send_msg, ChatItem};
599599
use crate::message::{Message, Viewtype};
600600
use crate::test_utils::TestContextManager;
601+
use crate::tools;
601602

602603
use super::*;
603604

@@ -629,7 +630,7 @@ mod tests {
629630
get_backup(&ctx1, provider.qr()).await.unwrap();
630631

631632
// Make sure the provider finishes without an error.
632-
tokio::time::timeout(Duration::from_secs(30), provider)
633+
tools::timeout(Duration::from_secs(30), provider)
633634
.await
634635
.expect("timed out")
635636
.expect("error in provider");

src/location.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::time::Duration;
66
use anyhow::{ensure, Context as _, Result};
77
use async_channel::Receiver;
88
use quick_xml::events::{BytesEnd, BytesStart, BytesText};
9-
use tokio::time::timeout;
109

1110
use crate::chat::{self, ChatId};
1211
use crate::contact::ContactId;
@@ -15,6 +14,7 @@ use crate::events::EventType;
1514
use crate::message::{Message, MsgId, Viewtype};
1615
use crate::mimeparser::SystemMessage;
1716
use crate::stock_str;
17+
use crate::tools::timeout;
1818
use crate::tools::{duration_to_str, time};
1919

2020
/// Location record.

src/net.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ use std::time::Duration;
66

77
use anyhow::{Context as _, Error, Result};
88
use tokio::net::{lookup_host, TcpStream};
9-
use tokio::time::timeout;
109
use tokio_io_timeout::TimeoutStream;
1110

1211
use crate::context::Context;
1312
use crate::tools::time;
13+
use crate::tools::timeout;
1414

1515
pub(crate) mod http;
1616
pub(crate) mod session;

src/scheduler.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ use crate::context::Context;
1616
use crate::ephemeral::{self, delete_expired_imap_messages};
1717
use crate::events::EventType;
1818
use crate::imap::{FolderMeaning, Imap};
19-
use crate::job;
2019
use crate::location;
2120
use crate::log::LogExt;
2221
use crate::smtp::{send_smtp_messages, Smtp};
2322
use crate::sql;
2423
use crate::tools::time;
2524
use crate::tools::{duration_to_str, maybe_add_time_based_warnings};
25+
use crate::{job, tools};
2626

2727
pub(crate) mod connectivity;
2828

@@ -674,7 +674,7 @@ async fn smtp_loop(
674674
"smtp got rate limited, waiting for {} until can send again",
675675
duration_to_str(duration_until_can_send)
676676
);
677-
tokio::time::timeout(duration_until_can_send, async {
677+
tools::timeout(duration_until_can_send, async {
678678
idle_interrupt_receiver.recv().await.unwrap_or_default()
679679
})
680680
.await
@@ -701,7 +701,7 @@ async fn smtp_loop(
701701
"smtp has messages to retry, planning to retry {} seconds later", timeout
702702
);
703703
let duration = std::time::Duration::from_secs(timeout);
704-
tokio::time::timeout(duration, async {
704+
tools::timeout(duration, async {
705705
idle_interrupt_receiver.recv().await.unwrap_or_default()
706706
})
707707
.await
@@ -865,12 +865,12 @@ impl Scheduler {
865865
// Actually shutdown tasks.
866866
let timeout_duration = std::time::Duration::from_secs(30);
867867
for b in once(self.inbox).chain(self.oboxes.into_iter()) {
868-
tokio::time::timeout(timeout_duration, b.handle)
868+
tools::timeout(timeout_duration, b.handle)
869869
.await
870870
.log_err(context)
871871
.ok();
872872
}
873-
tokio::time::timeout(timeout_duration, self.smtp_handle)
873+
tools::timeout(timeout_duration, self.smtp_handle)
874874
.await
875875
.log_err(context)
876876
.ok();

src/test_utils.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::message::{update_msg_state, Message, MessageState, MsgId, Viewtype};
3737
use crate::mimeparser::MimeMessage;
3838
use crate::receive_imf::receive_imf;
3939
use crate::stock_str::StockStrings;
40-
use crate::tools::EmailAddress;
40+
use crate::tools::{self, EmailAddress};
4141

4242
#[allow(non_upper_case_globals)]
4343
pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avatar900x900.png");
@@ -432,7 +432,7 @@ impl TestContext {
432432
break row;
433433
}
434434
if start.elapsed() < timeout {
435-
tokio::time::sleep(Duration::from_millis(100)).await;
435+
tools::sleep(Duration::from_millis(100)).await;
436436
} else {
437437
return None;
438438
}
@@ -954,7 +954,7 @@ impl EventTracker {
954954
/// If no matching events are ready this will wait for new events to arrive and time out
955955
/// after 10 seconds.
956956
pub async fn get_matching<F: Fn(&EventType) -> bool>(&self, event_matcher: F) -> EventType {
957-
tokio::time::timeout(Duration::from_secs(10), async move {
957+
tools::timeout(Duration::from_secs(10), async move {
958958
loop {
959959
let event = self.recv().await.unwrap();
960960
if event_matcher(&event.typ) {

src/tools.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#![allow(missing_docs)]
55

66
use std::borrow::Cow;
7+
use std::cmp::min;
78
use std::fmt;
89
use std::io::{Cursor, Write};
910
use std::mem;
@@ -14,7 +15,7 @@ use std::time::{Duration, SystemTime};
1415
use anyhow::{bail, Context as _, Result};
1516
use base64::Engine as _;
1617
use chrono::{Local, NaiveDateTime, NaiveTime, TimeZone};
17-
use futures::{StreamExt, TryStreamExt};
18+
use futures::{Future, StreamExt, TryStreamExt};
1819
use mailparse::dateparse;
1920
use mailparse::headers::Headers;
2021
use mailparse::MailHeaderMap;
@@ -710,6 +711,19 @@ pub(crate) fn strip_rtlo_characters(input_str: &str) -> String {
710711
input_str.replace(|char| RTLO_CHARACTERS.contains(&char), "")
711712
}
712713

714+
const THIRTY_YEARS: Duration = Duration::from_secs(60 * 60 * 24 * 365 * 10);
715+
716+
pub fn sleep(duration: Duration) -> tokio::time::Sleep {
717+
tokio::time::sleep(min(THIRTY_YEARS, duration))
718+
}
719+
720+
pub fn timeout<F>(duration: Duration, future: F) -> tokio::time::Timeout<F>
721+
where
722+
F: Future,
723+
{
724+
tokio::time::timeout(min(THIRTY_YEARS, duration), future)
725+
}
726+
713727
#[cfg(test)]
714728
mod tests {
715729
#![allow(clippy::indexing_slicing)]

0 commit comments

Comments
 (0)