diff --git a/lightning-background-processor/Cargo.toml b/lightning-background-processor/Cargo.toml index 6a4925a0be0..af8bb25e55f 100644 --- a/lightning-background-processor/Cargo.toml +++ b/lightning-background-processor/Cargo.toml @@ -14,7 +14,7 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [features] -futures = [ "futures-util" ] +futures = [ ] std = ["lightning/std", "lightning-rapid-gossip-sync/std"] default = ["std"] @@ -23,7 +23,6 @@ default = ["std"] bitcoin = { version = "0.29.0", default-features = false } lightning = { version = "0.0.114", path = "../lightning", default-features = false } lightning-rapid-gossip-sync = { version = "0.0.114", path = "../lightning-rapid-gossip-sync", default-features = false } -futures-util = { version = "0.3", default-features = false, features = ["async-await-macro"], optional = true } [dev-dependencies] lightning = { version = "0.0.114", path = "../lightning", features = ["_test_utils"] } diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 665bdf54384..884a7c22664 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -7,7 +7,7 @@ #![deny(private_intra_doc_links)] #![deny(missing_docs)] -#![deny(unsafe_code)] +#![cfg_attr(not(feature = "futures"), deny(unsafe_code))] #![cfg_attr(docsrs, feature(doc_auto_cfg))] @@ -52,8 +52,6 @@ use std::thread::{self, JoinHandle}; #[cfg(feature = "std")] use std::time::Instant; -#[cfg(feature = "futures")] -use futures_util::{select_biased, future::FutureExt, task}; #[cfg(not(feature = "std"))] use alloc::vec::Vec; @@ -384,6 +382,50 @@ macro_rules! define_run_body { } } } +#[cfg(feature = "futures")] +pub(crate) mod futures_util { + use core::future::Future; + use core::task::{Poll, Waker, RawWaker, RawWakerVTable}; + use core::pin::Pin; + use core::marker::Unpin; + pub(crate) struct Selector + Unpin, B: Future + Unpin> { + pub a: A, + pub b: B, + } + pub(crate) enum SelectorOutput { + A, B(bool), + } + + impl + Unpin, B: Future + Unpin> Future for Selector { + type Output = SelectorOutput; + fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { + match Pin::new(&mut self.a).poll(ctx) { + Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); }, + Poll::Pending => {}, + } + match Pin::new(&mut self.b).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); }, + Poll::Pending => {}, + } + Poll::Pending + } + } + + // If we want to poll a future without an async context to figure out if it has completed or + // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values + // but sadly there's a good bit of boilerplate here. + fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) } + fn dummy_waker_action(_: *const ()) { } + + const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action); + pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } +} +#[cfg(feature = "futures")] +use futures_util::{Selector, SelectorOutput, dummy_waker}; +#[cfg(feature = "futures")] +use core::task; + /// Processes background events in a future. /// /// `sleeper` should return a future which completes in the given amount of time and returns a @@ -470,16 +512,20 @@ where chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { - select_biased! { - _ = channel_manager.get_persistable_update_future().fuse() => true, - exit = sleeper(Duration::from_millis(100)).fuse() => { + let fut = Selector { + a: channel_manager.get_persistable_update_future(), + b: sleeper(Duration::from_millis(100)), + }; + match fut.await { + SelectorOutput::A => true, + SelectorOutput::B(exit) => { should_break = exit; false } } }, |t| sleeper(Duration::from_secs(t)), |fut: &mut SleepFuture, _| { - let mut waker = task::noop_waker(); + let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); core::pin::Pin::new(fut).poll(&mut ctx).is_ready() }) diff --git a/lightning-block-sync/Cargo.toml b/lightning-block-sync/Cargo.toml index 9d42968d866..59f8c235605 100644 --- a/lightning-block-sync/Cargo.toml +++ b/lightning-block-sync/Cargo.toml @@ -20,7 +20,6 @@ rpc-client = [ "serde_json", "chunked_transfer" ] [dependencies] bitcoin = "0.29.0" lightning = { version = "0.0.114", path = "../lightning" } -futures-util = { version = "0.3" } tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true } serde_json = { version = "1.0", optional = true } chunked_transfer = { version = "1.4", optional = true } diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs index c73b23b600c..4300893013c 100644 --- a/lightning-block-sync/src/rest.rs +++ b/lightning-block-sync/src/rest.rs @@ -7,15 +7,14 @@ use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse}; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; -use futures_util::lock::Mutex; - use std::convert::TryFrom; use std::convert::TryInto; +use std::sync::Mutex; /// A simple REST client for requesting resources using HTTP `GET`. pub struct RestClient { endpoint: HttpEndpoint, - client: Mutex, + client: Mutex>, } impl RestClient { @@ -23,8 +22,7 @@ impl RestClient { /// /// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest). pub fn new(endpoint: HttpEndpoint) -> std::io::Result { - let client = Mutex::new(HttpClient::connect(&endpoint)?); - Ok(Self { endpoint, client }) + Ok(Self { endpoint, client: Mutex::new(None) }) } /// Requests a resource encoded in `F` format and interpreted as type `T`. @@ -32,7 +30,11 @@ impl RestClient { where F: TryFrom, Error = std::io::Error> + TryInto { let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path); - self.client.lock().await.get::(&uri, &host).await?.try_into() + let mut client = if let Some(client) = self.client.lock().unwrap().take() { client } + else { HttpClient::connect(&self.endpoint)? }; + let res = client.get::(&uri, &host).await?.try_into(); + *self.client.lock().unwrap() = Some(client); + res } } diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index e1dc43c8f28..4c4706cb1cd 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -7,7 +7,7 @@ use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; -use futures_util::lock::Mutex; +use std::sync::Mutex; use serde_json; @@ -41,7 +41,7 @@ impl Error for RpcError {} pub struct RpcClient { basic_auth: String, endpoint: HttpEndpoint, - client: Mutex, + client: Mutex>, id: AtomicUsize, } @@ -50,11 +50,10 @@ impl RpcClient { /// credentials should be a base64 encoding of a user name and password joined by a colon, as is /// required for HTTP basic access authentication. pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result { - let client = Mutex::new(HttpClient::connect(&endpoint)?); Ok(Self { basic_auth: "Basic ".to_string() + credentials, endpoint, - client, + client: Mutex::new(None), id: AtomicUsize::new(0), }) } @@ -73,7 +72,12 @@ impl RpcClient { "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string() }); - let mut response = match self.client.lock().await.post::(&uri, &host, &self.basic_auth, content).await { + let mut client = if let Some(client) = self.client.lock().unwrap().take() { client } + else { HttpClient::connect(&self.endpoint)? }; + let http_response = client.post::(&uri, &host, &self.basic_auth, content).await; + *self.client.lock().unwrap() = Some(client); + + let mut response = match http_response { Ok(JsonResponse(response)) => response, Err(e) if e.kind() == std::io::ErrorKind::Other => { match e.get_ref().unwrap().downcast_ref::() {