From 3f46549f9b94e790e5afd2e7f44002b63d0245fd Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Sun, 21 Apr 2024 09:51:32 -0600 Subject: [PATCH] refactor `Clock::stream_slots` to drop `async-stream` crate and requirement to `pin!` --- Cargo.toml | 5 +- ethereum-consensus/Cargo.toml | 3 +- ethereum-consensus/src/clock.rs | 103 +++++++++++++++++++++++++------- 3 files changed, 85 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e7651efc8..1a6f3ce74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,9 +4,8 @@ resolver = "2" members = ["beacon-api-client", "ethereum-consensus", "spec-gen", "spec-tests"] [workspace.dependencies] -tokio = { version = "1.18.2", features = ["full"] } -tokio-stream = "0.1.8" -async-stream = "0.3.3" +tokio = { version = "1.37.0", features = ["full"] } +tokio-stream = "0.1.15" tracing = "0.1" reqwest = { version = "0.11.10", default-features = false, features = ["json"] } url = "2.2.2" diff --git a/ethereum-consensus/Cargo.toml b/ethereum-consensus/Cargo.toml index 8dcfd3687..fee91112b 100644 --- a/ethereum-consensus/Cargo.toml +++ b/ethereum-consensus/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0" [features] default = ["serde", "async"] serde = ["hex", "serde_json", "serde_yaml"] -async = ["tokio", "tokio-stream", "async-stream"] +async = ["tokio", "tokio-stream"] optimized = ["shuffling"] shuffling = [] # supports optimized shuffling routines secret-key-debug = [ @@ -50,7 +50,6 @@ serde_yaml = { workspace = true, optional = true } hex = { workspace = true, optional = true } tokio = { workspace = true, optional = true } tokio-stream = { workspace = true, optional = true } -async-stream = { workspace = true, optional = true } bs58 = { workspace = true } clap = { workspace = true, optional = true } eyre = { workspace = true, optional = true } diff --git a/ethereum-consensus/src/clock.rs b/ethereum-consensus/src/clock.rs index 5205ea60b..77618b68d 100644 --- a/ethereum-consensus/src/clock.rs +++ b/ethereum-consensus/src/clock.rs @@ -45,6 +45,12 @@ pub fn convert_timestamp_nanos_to_slot( Some(u128_to_u64(delta / seconds_per_slot)) } +/// Convert the `slot` to the number of seconds since the `UNIX_EPOCH`. +pub fn convert_slot_to_timestamp(slot: Slot, seconds_per_slot: u128, genesis_time: u128) -> u64 { + let slot_in_seconds = slot * Duration::from_nanos(u128_to_u64(seconds_per_slot)).as_secs(); + slot_in_seconds + Duration::from_nanos(u128_to_u64(genesis_time)).as_secs() +} + pub fn get_current_unix_time_in_nanos() -> u128 { SystemTime::now().duration_since(UNIX_EPOCH).expect("after `UNIX_EPOCH`").as_nanos() } @@ -154,6 +160,11 @@ impl Clock { convert_timestamp_nanos_to_slot(current_time, self.genesis_time, self.seconds_per_slot) } + #[inline] + pub fn timestamp_at_slot(&self, slot: Slot) -> u64 { + convert_slot_to_timestamp(slot, self.seconds_per_slot, self.genesis_time) + } + // Return the current epoch, or `None` if before genesis. pub fn current_epoch(&self) -> Option { let current_slot = self.current_slot()?; @@ -195,18 +206,51 @@ impl Clock { pub type SystemClock = Clock; +#[cfg(feature = "async")] +use std::{ + future::Future, + pin::Pin, + task::{self, Poll}, +}; +#[cfg(feature = "async")] +use tokio::time::{sleep, Sleep}; #[cfg(feature = "async")] use tokio_stream::Stream; +#[cfg(feature = "async")] +/// Implements `futures_core::Stream` yielding the slots tracked by the `clock`. +/// NOTE: the first poll will return the current slot, even if it is not aligned to the slot start. +/// All future polls will be aligned to the start of the slot. +pub struct SlotStream { + delay: Pin>, + clock: Clock, + first_slot: Option, +} + #[cfg(feature = "async")] impl Clock { - pub fn stream_slots(&self) -> impl Stream + '_ { - async_stream::stream! { - loop { - let slot = self.current_slot().expect("after genesis"); - yield slot; - let duration_until_next_slot = self.duration_until_slot(slot + 1); - tokio::time::sleep(duration_until_next_slot).await; + pub fn into_stream(self) -> SlotStream { + let delay = Box::pin(sleep(self.duration_until_next_slot())); + let current_slot = self.current_slot(); + SlotStream { delay, clock: self, first_slot: current_slot } + } +} + +#[cfg(feature = "async")] +impl Stream for SlotStream { + type Item = Slot; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let current_slot = self.first_slot.take(); + if let Some(slot) = current_slot { + Poll::Ready(Some(slot)) + } else { + match Pin::new(&mut self.delay).poll(cx) { + Poll::Ready(_) => { + self.delay = Box::pin(sleep(self.clock.duration_until_next_slot())); + Poll::Ready(self.clock.current_slot()) + } + Poll::Pending => Poll::Pending, } } } @@ -217,6 +261,10 @@ mod tests { use super::*; use std::sync::Mutex; + fn duration_since_unix_epoch() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() + } + struct Ticker { tick: Mutex, seconds_per_slot: u128, @@ -228,9 +276,13 @@ mod tests { *tick += Duration::from_secs(1).as_nanos(); } - fn tick_slot(&self) { + // tick until we are one before the smallest resolution until the next slot + fn tick_until_next_slot(&self) { let mut tick = self.tick.lock().unwrap(); - *tick += self.seconds_per_slot; + let current_slot = *tick / self.seconds_per_slot; + let next_slot = current_slot + 1; + let next_tick = next_slot * self.seconds_per_slot; + *tick = next_tick - 1; } } @@ -255,7 +307,7 @@ mod tests { time_provider.tick(); assert_eq!(clock.duration_until_next_slot().as_secs(), 11); assert_eq!(clock.current_slot().unwrap(), 0); - for _ in 0..12 { + for _ in 0..seconds_per_slot { time_provider.tick(); } assert_eq!(clock.duration_until_next_slot().as_secs(), 11); @@ -288,23 +340,29 @@ mod tests { async fn test_slot_stream() { use tokio_stream::StreamExt; - // note: make this very large so it is clear if the `TimeProvider` mocking is broken - let seconds_per_slot: u64 = 1200000000; + // NOTE: make this very large so it is clear if the `TimeProvider` mocking is broken + let seconds_per_slot = u64::MAX; let time_provider = new_ticker(seconds_per_slot); let clock = Clock::new(0, seconds_per_slot, 12, time_provider.clone()); - let slot_stream = clock.stream_slots(); - - tokio::pin!(slot_stream); + let clock_handle = clock.clone(); + // fast-forward the world state until there is the minimum delay, as `into_stream` will + // start from this state. + time_provider.tick_until_next_slot(); + let mut slot_stream = clock_handle.into_stream(); let current_slot = clock.current_slot().unwrap(); let target_slot = current_slot + 3; let mut slots = vec![]; while let Some(slot) = slot_stream.next().await { - if slot >= target_slot { + // forward time to align with the slot just yielded + time_provider.tick(); + if slot == target_slot { break } slots.push(slot); - time_provider.tick_slot(); + // jump world state ahead to just before next slot + time_provider.tick_until_next_slot(); + assert_eq!(clock.duration_until_next_slot(), Duration::from_nanos(1)); } assert_eq!(slots, (current_slot..target_slot).collect::>()); } @@ -316,14 +374,17 @@ mod tests { use tokio_stream::StreamExt; let clock = for_mainnet(); - let slot_stream = clock.stream_slots(); + let clock_handle = clock.clone(); + let mut slot_stream = clock_handle.into_stream(); - tokio::pin!(slot_stream); - - let current_slot = clock.current_slot().unwrap(); + let current_slot = clock.current_slot().expect("past genesis"); let target_slot = current_slot + 3; let mut slots = vec![]; while let Some(slot) = slot_stream.next().await { + if slot != current_slot { + // ignore first slot, as it is not necessarily aligned to wall clock time + assert_eq!(duration_since_unix_epoch(), clock.timestamp_at_slot(slot)); + } if slot >= target_slot { break }