Skip to content

Commit

Permalink
refactor Clock::stream_slots to drop async-stream crate and requi…
Browse files Browse the repository at this point in the history
…rement to `pin!`
  • Loading branch information
ralexstokes committed Apr 21, 2024
1 parent 0f7b619 commit 3f46549
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 26 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ resolver = "2"
members = ["beacon-api-client", "ethereum-consensus", "spec-gen", "spec-tests"]

[workspace.dependencies]
tokio = { version = "1.18.2", features = ["full"] }
tokio-stream = "0.1.8"
async-stream = "0.3.3"
tokio = { version = "1.37.0", features = ["full"] }
tokio-stream = "0.1.15"
tracing = "0.1"
reqwest = { version = "0.11.10", default-features = false, features = ["json"] }
url = "2.2.2"
Expand Down
3 changes: 1 addition & 2 deletions ethereum-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
[features]
default = ["serde", "async"]
serde = ["hex", "serde_json", "serde_yaml"]
async = ["tokio", "tokio-stream", "async-stream"]
async = ["tokio", "tokio-stream"]
optimized = ["shuffling"]
shuffling = [] # supports optimized shuffling routines
secret-key-debug = [
Expand Down Expand Up @@ -50,7 +50,6 @@ serde_yaml = { workspace = true, optional = true }
hex = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
tokio-stream = { workspace = true, optional = true }
async-stream = { workspace = true, optional = true }
bs58 = { workspace = true }
clap = { workspace = true, optional = true }
eyre = { workspace = true, optional = true }
Expand Down
103 changes: 82 additions & 21 deletions ethereum-consensus/src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ pub fn convert_timestamp_nanos_to_slot(
Some(u128_to_u64(delta / seconds_per_slot))
}

/// Convert the `slot` to the number of seconds since the `UNIX_EPOCH`.
pub fn convert_slot_to_timestamp(slot: Slot, seconds_per_slot: u128, genesis_time: u128) -> u64 {
let slot_in_seconds = slot * Duration::from_nanos(u128_to_u64(seconds_per_slot)).as_secs();
slot_in_seconds + Duration::from_nanos(u128_to_u64(genesis_time)).as_secs()
}

pub fn get_current_unix_time_in_nanos() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).expect("after `UNIX_EPOCH`").as_nanos()
}
Expand Down Expand Up @@ -154,6 +160,11 @@ impl<T: TimeProvider + Send + Sync> Clock<T> {
convert_timestamp_nanos_to_slot(current_time, self.genesis_time, self.seconds_per_slot)
}

#[inline]
pub fn timestamp_at_slot(&self, slot: Slot) -> u64 {
convert_slot_to_timestamp(slot, self.seconds_per_slot, self.genesis_time)
}

// Return the current epoch, or `None` if before genesis.
pub fn current_epoch(&self) -> Option<Epoch> {
let current_slot = self.current_slot()?;
Expand Down Expand Up @@ -195,18 +206,51 @@ impl<T: TimeProvider + Send + Sync> Clock<T> {

pub type SystemClock = Clock<SystemTimeProvider>;

#[cfg(feature = "async")]
use std::{
future::Future,
pin::Pin,
task::{self, Poll},
};
#[cfg(feature = "async")]
use tokio::time::{sleep, Sleep};
#[cfg(feature = "async")]
use tokio_stream::Stream;

#[cfg(feature = "async")]
/// Implements `futures_core::Stream` yielding the slots tracked by the `clock`.
/// NOTE: the first poll will return the current slot, even if it is not aligned to the slot start.
/// All future polls will be aligned to the start of the slot.
pub struct SlotStream<T: TimeProvider + Send + Sync> {
delay: Pin<Box<Sleep>>,
clock: Clock<T>,
first_slot: Option<Slot>,
}

#[cfg(feature = "async")]
impl<T: TimeProvider + Send + Sync> Clock<T> {
pub fn stream_slots(&self) -> impl Stream<Item = Slot> + '_ {
async_stream::stream! {
loop {
let slot = self.current_slot().expect("after genesis");
yield slot;
let duration_until_next_slot = self.duration_until_slot(slot + 1);
tokio::time::sleep(duration_until_next_slot).await;
pub fn into_stream(self) -> SlotStream<T> {
let delay = Box::pin(sleep(self.duration_until_next_slot()));
let current_slot = self.current_slot();
SlotStream { delay, clock: self, first_slot: current_slot }
}
}

#[cfg(feature = "async")]
impl<T: TimeProvider + Send + Sync> Stream for SlotStream<T> {
type Item = Slot;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let current_slot = self.first_slot.take();
if let Some(slot) = current_slot {
Poll::Ready(Some(slot))
} else {
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
self.delay = Box::pin(sleep(self.clock.duration_until_next_slot()));
Poll::Ready(self.clock.current_slot())
}
Poll::Pending => Poll::Pending,
}
}
}
Expand All @@ -217,6 +261,10 @@ mod tests {
use super::*;
use std::sync::Mutex;

fn duration_since_unix_epoch() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
}

struct Ticker {
tick: Mutex<u128>,
seconds_per_slot: u128,
Expand All @@ -228,9 +276,13 @@ mod tests {
*tick += Duration::from_secs(1).as_nanos();
}

fn tick_slot(&self) {
// tick until we are one before the smallest resolution until the next slot
fn tick_until_next_slot(&self) {
let mut tick = self.tick.lock().unwrap();
*tick += self.seconds_per_slot;
let current_slot = *tick / self.seconds_per_slot;
let next_slot = current_slot + 1;
let next_tick = next_slot * self.seconds_per_slot;
*tick = next_tick - 1;
}
}

Expand All @@ -255,7 +307,7 @@ mod tests {
time_provider.tick();
assert_eq!(clock.duration_until_next_slot().as_secs(), 11);
assert_eq!(clock.current_slot().unwrap(), 0);
for _ in 0..12 {
for _ in 0..seconds_per_slot {
time_provider.tick();
}
assert_eq!(clock.duration_until_next_slot().as_secs(), 11);
Expand Down Expand Up @@ -288,23 +340,29 @@ mod tests {
async fn test_slot_stream() {
use tokio_stream::StreamExt;

// note: make this very large so it is clear if the `TimeProvider` mocking is broken
let seconds_per_slot: u64 = 1200000000;
// NOTE: make this very large so it is clear if the `TimeProvider` mocking is broken
let seconds_per_slot = u64::MAX;
let time_provider = new_ticker(seconds_per_slot);
let clock = Clock::new(0, seconds_per_slot, 12, time_provider.clone());
let slot_stream = clock.stream_slots();

tokio::pin!(slot_stream);
let clock_handle = clock.clone();
// fast-forward the world state until there is the minimum delay, as `into_stream` will
// start from this state.
time_provider.tick_until_next_slot();
let mut slot_stream = clock_handle.into_stream();

let current_slot = clock.current_slot().unwrap();
let target_slot = current_slot + 3;
let mut slots = vec![];
while let Some(slot) = slot_stream.next().await {
if slot >= target_slot {
// forward time to align with the slot just yielded
time_provider.tick();
if slot == target_slot {
break
}
slots.push(slot);
time_provider.tick_slot();
// jump world state ahead to just before next slot
time_provider.tick_until_next_slot();
assert_eq!(clock.duration_until_next_slot(), Duration::from_nanos(1));
}
assert_eq!(slots, (current_slot..target_slot).collect::<Vec<_>>());
}
Expand All @@ -316,14 +374,17 @@ mod tests {
use tokio_stream::StreamExt;

let clock = for_mainnet();
let slot_stream = clock.stream_slots();
let clock_handle = clock.clone();
let mut slot_stream = clock_handle.into_stream();

tokio::pin!(slot_stream);

let current_slot = clock.current_slot().unwrap();
let current_slot = clock.current_slot().expect("past genesis");
let target_slot = current_slot + 3;
let mut slots = vec![];
while let Some(slot) = slot_stream.next().await {
if slot != current_slot {
// ignore first slot, as it is not necessarily aligned to wall clock time
assert_eq!(duration_since_unix_epoch(), clock.timestamp_at_slot(slot));
}
if slot >= target_slot {
break
}
Expand Down

0 comments on commit 3f46549

Please sign in to comment.