Skip to content

Commit

Permalink
feat: runnable tasks for background polling of block committer data
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc committed Sep 5, 2024
1 parent 5a6eed0 commit be66fb4
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2135](https://github.com/FuelLabs/fuel-core/pull/2135): Added metrics logging for number of blocks served over the p2p req/res protocol.
- [2155](https://github.com/FuelLabs/fuel-core/pull/2155): Added trait declaration for block committer data
- [2142](https://github.com/FuelLabs/fuel-core/pull/2142): Added benchmarks for varied forms of db lookups to assist in optimizations.
- [2163](https://github.com/FuelLabs/fuel-core/pull/2163): Added runnable task for fetching block committer data.

## [Version 0.35.0]

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/services/gas_price_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ fuel-core-types = { workspace = true, features = ["std"] }
fuel-gas-price-algorithm = { workspace = true }
futures = { workspace = true }
num_enum = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,11 @@ pub trait L2BlockSource: Send + Sync {
async fn get_l2_block(&mut self, height: BlockHeight) -> Result<BlockInfo>;
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct DaCommitDetails {
pub l2_block_range: core::ops::Range<u32>,
pub blob_size_bytes: u32,
pub blob_cost_wei: u32,
pub partial_block_heights: Option<[u32; 2]>,
}

pub trait DaCommitSource: Send + Sync {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,176 @@
use crate::fuel_gas_price_updater::{
DaCommitDetails,
DaCommitSource,
Result as GasPriceUpdaterResult,
use crate::{
fuel_gas_price_updater,
fuel_gas_price_updater::{
fuel_da_source_adapter::service::{
DaMetadataGetter,
DaMetadataResponse,
DaSourceService,
},
DaCommitDetails,
DaCommitSource,
Error::CouldNotFetchDARecord,
},
};
use anyhow::anyhow;
use fuel_core_services::{
RunnableService,
Service,
ServiceRunner,
StateWatcher,
};
use std::time::Duration;

mod block_committer_ingestor;
mod service;

// Exists to help merge the ServiceRunner and DaCommitSource traits into one type impl
pub struct FuelDaSourceService<T: DaMetadataGetter + Default + Send + Sync + 'static>(
ServiceRunner<DaSourceService<T>>,
);

#[derive(Default, Clone)]
pub struct FuelDaSource;
// we decouple the DaCommitDetails that the algorithm uses with
// the responses we get from the ingestors.
impl From<DaMetadataResponse> for DaCommitDetails {
fn from(value: DaMetadataResponse) -> Self {
DaCommitDetails {
l2_block_range: value.l2_block_range,
blob_size_bytes: value.blob_size_bytes,
blob_cost_wei: value.blob_cost,
}
}
}

impl DaCommitSource for FuelDaSource {
impl<T> DaCommitSource for FuelDaSourceService<T>
where
T: Send + Sync + Default + DaMetadataGetter,
{
fn get_da_commit_details(
&mut self,
) -> GasPriceUpdaterResult<Option<DaCommitDetails>> {
todo!() // TODO(#2139): pending research on how to get the data from the block committer
) -> fuel_gas_price_updater::Result<Option<DaCommitDetails>> {
let mut metadata_guard = self.0.shared.cached_data.try_lock().map_err(|err| {
CouldNotFetchDARecord(anyhow!(
"Failed to lock shared metadata state: {:?}",
err
))
})?;

let commit_details = metadata_guard.clone().map(Into::into);

// now mark it as consumed because we don't want to serve the same data
// multiple times
*metadata_guard = None;

Ok(commit_details)
}
}

/// This is a blanket trait created for FuelDaSourceService fn impls
/// Using async_trait makes it cleaner vs a plain impl
#[async_trait::async_trait]
pub trait FuelDaMetadataServiceCompat<T>
where
Self: Sized,
T: Default + DaMetadataGetter,
{
async fn try_new(
ingestor: T,
watcher: &mut StateWatcher,
polling_interval: Duration,
) -> anyhow::Result<Self>;
fn start(&self) -> anyhow::Result<()>;
fn stop(&self) -> bool;
}

#[async_trait::async_trait]
impl<T> FuelDaMetadataServiceCompat<T> for FuelDaSourceService<T>
where
T: DaMetadataGetter + Default + Send + Sync,
{
async fn try_new(
ingestor: T,
state_watcher: &mut StateWatcher,
polling_interval: Duration,
) -> anyhow::Result<Self> {
let service = DaSourceService::new(ingestor, polling_interval);

let task = service.into_task(state_watcher, ()).await?;
let service_runner = ServiceRunner::new(task);

Ok(Self(service_runner))
}

fn start(&self) -> anyhow::Result<()> {
self.0.start()
}

fn stop(&self) -> bool {
self.0.stop()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(Default)]
struct FakeMetadataIngestor;

#[async_trait::async_trait]
impl DaMetadataGetter for FakeMetadataIngestor {
async fn get_da_metadata(&mut self) -> anyhow::Result<DaMetadataResponse> {
Ok(DaMetadataResponse::default())
}
}

#[derive(Default)]
struct FakeErroringMetadataIngestor;

#[async_trait::async_trait]
impl DaMetadataGetter for FakeErroringMetadataIngestor {
async fn get_da_metadata(&mut self) -> anyhow::Result<DaMetadataResponse> {
Err(anyhow!("boo!"))
}
}

#[tokio::test]
async fn test_service_sets_cache_when_request_succeeds() {
// given
let mut service = FuelDaSourceService::try_new(
FakeMetadataIngestor,
&mut StateWatcher::started(),
Duration::from_millis(1),
)
.await
.unwrap();

// when
service.start().unwrap();
tokio::time::sleep(Duration::from_millis(5)).await;
service.stop();

// then
let data_availability_metadata = service.get_da_commit_details().unwrap();
assert!(data_availability_metadata.is_some());
}

#[tokio::test]
async fn test_service_does_not_set_cache_when_request_fails() {
// given
let mut service = FuelDaSourceService::try_new(
FakeErroringMetadataIngestor,
&mut StateWatcher::started(),
Duration::from_millis(1),
)
.await
.unwrap();

// when
service.start().unwrap();
tokio::time::sleep(Duration::from_millis(5)).await;
service.stop();

// then
let data_availability_metadata = service.get_da_commit_details().unwrap();
assert!(data_availability_metadata.is_none());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::fuel_gas_price_updater::fuel_da_source_adapter::service::{
DaMetadataGetter,
DaMetadataResponse,
};
use anyhow::anyhow;
use reqwest::Url;

/// This struct is used to denote the block committer ingestor,
/// which receives data from the block committer (only http api for now)
pub struct BlockCommitterIngestor {
client: reqwest::Client,
url: Url,
}

#[async_trait::async_trait]
impl DaMetadataGetter for BlockCommitterIngestor {
async fn get_da_metadata(&mut self) -> anyhow::Result<DaMetadataResponse> {
let response = self.client.get(self.url.clone()).send().await?;
if !response.status().is_success() {
return Err(anyhow!("failed with response: {}", response.status()));
}
response
.json::<DaMetadataResponse>()
.await
.map_err(|err| anyhow!(err))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use fuel_core_services::{
RunnableService,
RunnableTask,
StateWatcher,
};
use serde::{
Deserialize,
Serialize,
};
use std::{
sync::Arc,
time::Duration,
};
use tokio::sync::Mutex;

/// This struct is used to denote the data returned
/// by da metadata providers, this can be the block committer, or some
/// other provider
#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)]
pub struct DaMetadataResponse {
pub l2_block_range: core::ops::Range<u32>,
pub blob_size_bytes: u32,
pub blob_cost: u32,
}

/// This struct denotes the shared state that can be accessed
/// However, there already exists an api to get the shared_data,
/// provided by the DaCommitSource trait
#[derive(Clone, Default, Debug)]
pub struct DaSharedState {
pub(crate) cached_data: Arc<Mutex<Option<DaMetadataResponse>>>,
}

/// This struct houses the shared_state, polling interval
/// and a metadata_ingestor, which does the actual fetching of the data
/// we expect the ingestor to perform all the serde required
#[derive(Clone, Default)]
pub struct DaSourceService<T>
where
T: Default + DaMetadataGetter,
{
shared_state: DaSharedState,
poll_interval: Duration,
metadata_ingestor: T,
}

impl<T> DaSourceService<T>
where
T: Default + DaMetadataGetter,
{
pub fn new(metadata_ingestor: T, poll_interval: Duration) -> Self {
Self {
shared_state: DaSharedState {
cached_data: Arc::new(Mutex::new(None)),
},
poll_interval,
metadata_ingestor,
}
}
}

/// This trait is implemented by metadata_ingestors to obtain the
/// da metadata in a way they see fit
#[async_trait::async_trait]
pub trait DaMetadataGetter {
async fn get_da_metadata(&mut self) -> anyhow::Result<DaMetadataResponse>;
}

#[async_trait::async_trait]
impl<T> RunnableService for DaSourceService<T>
where
T: DaMetadataGetter + Send + Sync + Default,
{
const NAME: &'static str = "DataAvailabilitySource";

type SharedData = DaSharedState;

type Task = Self;

type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {
self.shared_state.clone()
}

async fn into_task(
mut self,
_: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
Ok(self)
}
}

#[async_trait::async_trait]
impl<T> RunnableTask for DaSourceService<T>
where
T: DaMetadataGetter + Send + Sync + Default,
{
/// This function polls the metadata ingestor according to a polling interval
/// described by the DaSourceService
async fn run(&mut self, state_watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let continue_running;
let interval = tokio::time::interval(self.poll_interval);

tokio::pin!(interval);
tokio::select! {
biased;
_ = state_watcher.while_started() => {
continue_running = false;
}
_ = interval.tick() => {
let metadata_response = self.metadata_ingestor.get_da_metadata().await?;
let mut cached_metadata_response = self.shared_state.cached_data.lock().await;
*cached_metadata_response = Some(metadata_response);
continue_running = true;
}
}
Ok(continue_running)
}

/// There are no shutdown hooks required by the metadata ingestors *yet*
/// and they should be added here if so, in the future.
async fn shutdown(self) -> anyhow::Result<()> {
Ok(())
}
}

0 comments on commit be66fb4

Please sign in to comment.