Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] Use futures_timer crate and fix unstable tests #1511

Merged
merged 5 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mm2src/coins/lightning/ln_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl LightningEventHandler {

fn handle_pending_htlcs_forwards(&self, time_forwardable: Duration) {
info!("Handling PendingHTLCsForwardable event!");
let min_wait_time = time_forwardable.as_millis() as u32;
let min_wait_time = time_forwardable.as_millis() as u64;
let channel_manager = self.channel_manager.clone();
self.platform.spawner().spawn(async move {
let millis_to_sleep = rand::thread_rng().gen_range(min_wait_time, min_wait_time * 5);
Expand Down
1 change: 1 addition & 0 deletions mm2src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fnv = "1.0.6"
futures01 = { version = "0.1", package = "futures" }
futures = { version = "0.3", package = "futures", features = ["compat", "async-await", "thread-pool"] }
futures-cpupool = "0.1"
futures-timer = "3.0"
hex = "0.4.2"
http = "0.2"
http-body = "0.1"
Expand Down
39 changes: 0 additions & 39 deletions mm2src/common/custom_futures.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
/// Custom future combinators/implementations - some of standard do not match our requirements.
use crate::executor::Timer;
use crate::now_float;

use futures::future::{select, Either};
use futures::lock::Mutex as AsyncMutex;
use futures::task::Poll as Poll03;
use futures::Future as Future03;
use futures01::future::{self, loop_fn, Either as Either01, IntoFuture, Loop};
Expand Down Expand Up @@ -197,42 +194,6 @@ where
}
}

pub struct TimedMutexGuard<'a, T>(futures::lock::MutexGuard<'a, T>);
//impl<'a, T> Drop for TimedMutexGuard<'a, T> {fn drop (&mut self) {}}

/// Like `AsyncMutex` but periodically invokes a callback,
/// allowing the application to implement timeouts, status updates and shutdowns.
pub struct TimedAsyncMutex<T>(AsyncMutex<T>);
impl<T> TimedAsyncMutex<T> {
pub fn new(v: T) -> TimedAsyncMutex<T> { TimedAsyncMutex(AsyncMutex::new(v)) }

/// Like `AsyncMutex::lock` but invokes the `tick` callback periodically.
/// `tick` returns a time till the next tick, or an error to abort the locking attempt.
/// `tick` parameters are the time when the locking attempt has started and the current time
/// (they are equal on the first invocation of `tick`).
pub async fn lock<F, E>(&self, mut tick: F) -> Result<TimedMutexGuard<'_, T>, E>
where
F: FnMut(f64, f64) -> Result<f64, E>,
{
let start = now_float();
let mut now = start;
let mut l = self.0.lock();
let l = loop {
let tick_after = tick(start, now)?;
let t = Timer::till(now + tick_after);
let rc = select(l, t).await;
match rc {
Either::Left((l, _t)) => break l,
Either::Right((_t, lʹ)) => {
now = now_float();
l = lʹ
},
}
};
Ok(TimedMutexGuard(l))
}
}

#[derive(Debug)]
pub struct TimeoutError {
pub duration: Duration,
Expand Down
6 changes: 3 additions & 3 deletions mm2src/common/executor/abortable_system/abortable_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ mod tests {
let abortable_system = AbortableQueue::default();
let spawner = abortable_system.weak_spawner();

let settings = AbortSettings::default().critical_timout_s(0.3);
let settings = AbortSettings::default().critical_timout_s(0.4);

let fut1 = async move {
Timer::sleep(0.5).await;
Timer::sleep(0.6).await;
unsafe { F1_FINISHED = true };
};
spawner.spawn_with_settings(fut1, settings.clone());
Expand All @@ -268,7 +268,7 @@ mod tests {

abortable_system.abort_all();

block_on(Timer::sleep(1.));
block_on(Timer::sleep(1.2));
// `fut1` must not complete.
assert!(unsafe { !F1_FINISHED });
// `fut` must complete.
Expand Down
12 changes: 6 additions & 6 deletions mm2src/common/executor/abortable_system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ mod tests {

let super_system = AbortableQueue::default();
super_system.weak_spawner().spawn(async move {
Timer::sleep(0.2).await;
Timer::sleep(0.5).await;
unsafe { SUPER_FINISHED = true };
});

let sub_system: AbortableQueue = super_system.create_subsystem();
sub_system.weak_spawner().spawn(async move {
Timer::sleep(0.2).await;
Timer::sleep(0.5).await;
unsafe { SUB_FINISHED = true };
});

block_on(Timer::sleep(0.1));
drop(sub_system);
block_on(Timer::sleep(0.2));
block_on(Timer::sleep(0.8));

// Only the super system should finish as the sub system has been aborted.
unsafe {
Expand All @@ -96,19 +96,19 @@ mod tests {

let super_system = AbortableQueue::default();
super_system.weak_spawner().spawn(async move {
Timer::sleep(0.2).await;
Timer::sleep(0.5).await;
unsafe { SUPER_FINISHED = true };
});

let sub_system: AbortableQueue = super_system.create_subsystem();
sub_system.weak_spawner().spawn(async move {
Timer::sleep(0.2).await;
Timer::sleep(0.5).await;
unsafe { SUB_FINISHED = true };
});

block_on(Timer::sleep(0.1));
drop(super_system);
block_on(Timer::sleep(0.2));
block_on(Timer::sleep(0.8));

// Nothing should finish as the super system has been aborted.
unsafe {
Expand Down
10 changes: 5 additions & 5 deletions mm2src/common/executor/abortable_system/simple_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,20 @@ mod tests {
let mut guard = abortable_system.lock();

guard.spawn_or_ignore("F1".to_string(), async move {
Timer::sleep(0.2).await;
Timer::sleep(0.1).await;
unsafe { F1_FINISHED = true };
});
assert!(guard.contains("F1"));
assert!(!guard.contains("F2"));
guard.spawn_or_ignore("F2".to_string(), async move {
Timer::sleep(0.4).await;
Timer::sleep(0.5).await;
unsafe { F2_FINISHED = true };
});

drop(guard);
block_on(Timer::sleep(0.3));
abortable_system.abort_all();
block_on(Timer::sleep(0.2));
block_on(Timer::sleep(0.4));

unsafe {
assert!(F1_FINISHED);
Expand All @@ -158,13 +158,13 @@ mod tests {
});

drop(guard);
block_on(Timer::sleep(0.1));
block_on(Timer::sleep(0.05));

let mut guard = abortable_system.lock();
guard.abort_future("F1");
assert!(!guard.contains("F1"));

block_on(Timer::sleep(0.2));
block_on(Timer::sleep(0.3));

unsafe {
assert!(!F1_FINISHED);
Expand Down
126 changes: 27 additions & 99 deletions mm2src/common/executor/native_executor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use futures::task::Context;
use futures::task::Poll as Poll03;
use futures::Future as Future03;
use gstuff::now_float;
use futures_timer::Delay;
use std::pin::Pin;
use std::thread;
use std::time::Duration;

/// # Important
Expand All @@ -12,118 +11,47 @@ use std::time::Duration;
/// Please consider using `AbortableQueue`, `AbortableSimpleMap` or `spawn_abortable` instead.
pub fn spawn(future: impl Future03<Output = ()> + Send + 'static) { crate::wio::CORE.0.spawn(future); }

/// Schedule the given `future` to be executed shortly after the given `utc` time is reached.
fn spawn_after(utc: f64, future: impl Future03<Output = ()> + Send + 'static) {
use crossbeam::channel;
use gstuff::Constructible;
use std::collections::BTreeMap;
use std::sync::Once;

type SheduleChannelItem = (f64, Pin<Box<dyn Future03<Output = ()> + Send + 'static>>);
static START: Once = Once::new();
static SCHEDULE: Constructible<channel::Sender<SheduleChannelItem>> = Constructible::new();
START.call_once(|| {
thread::Builder::new()
.name("spawn_after".into())
.spawn(move || {
let (tx, rx) = channel::bounded(0);
SCHEDULE.pin(tx).expect("spawn_after] Can't pin the channel");
type Task = Pin<Box<dyn Future03<Output = ()> + Send + 'static>>;
let mut tasks: BTreeMap<Duration, Vec<Task>> = BTreeMap::new();
let mut ready = Vec::with_capacity(4);
loop {
let now = Duration::from_secs_f64(now_float());
let mut next_stop = Duration::from_secs_f64(0.1);
for (utc, _) in tasks.iter() {
if *utc <= now {
ready.push(*utc)
} else {
next_stop = *utc - now;
break;
}
}
for utc in ready.drain(..) {
let v = match tasks.remove(&utc) {
Some(v) => v,
None => continue,
};
//log! ("spawn_after] spawning " (v.len()) " tasks at " [utc]);
for f in v {
spawn(f);
}
}
let (utc, f) = match rx.recv_timeout(next_stop) {
Ok(t) => t,
Err(channel::RecvTimeoutError::Disconnected) => break,
Err(channel::RecvTimeoutError::Timeout) => continue,
};
tasks
.entry(Duration::from_secs_f64(utc))
.or_insert_with(Vec::new)
.push(f)
}
})
.expect("Can't spawn a spawn_after thread");
});
loop {
match SCHEDULE.as_option() {
None => {
thread::yield_now();
continue;
},
Some(tx) => {
tx.send((utc, Box::pin(future))).expect("Can't reach spawn_after");
break;
},
}
}
}

/// A future that completes at a given time.
/// A future that completes at a given time.
#[must_use]
pub struct Timer {
till_utc: f64,
delay: Delay,
}

impl Timer {
pub fn till(till_utc: f64) -> Timer { Timer { till_utc } }
pub fn sleep(seconds: f64) -> Timer {
Timer {
till_utc: now_float() + seconds,
delay: Delay::new(Duration::from_secs_f64(seconds)),
}
}
pub fn sleep_ms(ms: u32) -> Timer {
let seconds = gstuff::duration_to_float(Duration::from_millis(ms as u64));

pub fn sleep_ms(ms: u64) -> Timer {
Timer {
till_utc: now_float() + seconds,
delay: Delay::new(Duration::from_millis(ms)),
}
}
pub fn till_utc(&self) -> f64 { self.till_utc }
}

impl Future03 for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> {
let delta = self.till_utc - now_float();
if delta <= 0. {
return Poll03::Ready(());
}
// NB: We should get a new `Waker` on every `poll` in case the future migrates between executors.
// cf. https://rust-lang.github.io/async-book/02_execution/03_wakeups.html
let waker = cx.waker().clone();
spawn_after(self.till_utc, async move { waker.wake() });
Poll03::Pending
}

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> { Pin::new(&mut self.delay).poll(cx) }
}

#[test]
fn test_timer() {
let started = now_float();
let ti = Timer::sleep(0.2);
let delta = now_float() - started;
assert!(delta < 0.04, "{}", delta);
crate::block_on(ti);
let delta = now_float() - started;
println!("time delta is {}", delta);
assert!(delta > 0.2);
assert!(delta < 0.4)
#[cfg(test)]
mod tests {
use super::*;
use crate::now_float;

#[test]
fn test_timer() {
let started = now_float();
let ti = Timer::sleep(0.2);
let delta = now_float() - started;
assert!(delta < 0.04, "{}", delta);
crate::block_on(ti);
let delta = now_float() - started;
println!("time delta is {}", delta);
assert!(delta > 0.2);
assert!(delta < 0.4)
}
}
2 changes: 1 addition & 1 deletion mm2src/common/executor/wasm_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Timer {

pub fn sleep(secs: f64) -> Timer {
let dur = Duration::from_secs_f64(secs);
let delay_ms = gstuff::duration_to_ms(dur) as u32;
let delay_ms = gstuff::duration_to_ms(dur);
Timer::sleep_ms(delay_ms)
}

Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_test_helpers/src/for_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ where
F: Fn(&str) -> bool,
{
let start = now_float();
let ms = 50.min((timeout_sec * 1000.) as u32 / 20 + 10);
let ms = 50.min((timeout_sec * 1000.) as u64 / 20 + 10);
let mut buf = String::with_capacity(128);
let mut found = false;
loop {
Expand Down