Skip to content

Commit

Permalink
test: smoke
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc committed Sep 11, 2024
1 parent 0240ba7 commit 96af5d0
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 80 deletions.
31 changes: 16 additions & 15 deletions crates/fuel-core/src/service/sub_services/algorithm_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use fuel_core_gas_price_service::{
fuel_gas_price_updater::{
da_source_adapter::{
dummy_costs::DummyDaBlockCosts,
service::DaBlockCostsService,
DaBlockCostsProvider,
DaBlockCostsSharedState,
},
fuel_core_storage_adapter::{
storage::GasPriceMetadata,
Expand All @@ -39,6 +39,7 @@ use fuel_core_gas_price_service::{
use fuel_core_services::{
stream::BoxStream,
RunnableService,
Service,
StateWatcher,
};
use fuel_core_storage::{
Expand All @@ -62,7 +63,7 @@ use fuel_core_types::{
type Updater = FuelGasPriceUpdater<
FuelL2BlockSource<ConsensusParametersProvider>,
MetadataStorageAdapter,
DaBlockCostsProvider,
DaBlockCostsSharedState,
>;

pub struct InitializeTask {
Expand All @@ -73,7 +74,7 @@ pub struct InitializeTask {
pub on_chain_db: Database<OnChain, RegularStage<OnChain>>,
pub block_stream: BoxStream<SharedImportResult>,
pub shared_algo: SharedGasPriceAlgo<Algorithm>,
pub da_block_costs_service: DaBlockCostsService<DummyDaBlockCosts>,
pub da_block_costs_provider: DaBlockCostsProvider<DummyDaBlockCosts>,
}

type MetadataStorageAdapter =
Expand All @@ -100,8 +101,8 @@ impl InitializeTask {
// there's no use of this source yet, so we can safely return an error
let da_block_costs_source =
DummyDaBlockCosts::new(Err(anyhow::anyhow!("Not used")));
let da_block_costs_service =
DaBlockCostsService::new(da_block_costs_source, None);
let da_block_costs_provider =
DaBlockCostsProvider::new(da_block_costs_source, None);

let task = Self {
config,
Expand All @@ -111,7 +112,7 @@ impl InitializeTask {
on_chain_db,
block_stream,
shared_algo,
da_block_costs_service,
da_block_costs_provider,
};
Ok(task)
}
Expand Down Expand Up @@ -173,15 +174,15 @@ impl RunnableService for InitializeTask {
self.gas_price_db,
self.on_chain_db,
self.block_stream,
self.da_block_costs_service.shared_data(),
self.da_block_costs_provider.shared_state,
)?;
let inner_service = GasPriceService::new(
starting_height,
updater,
self.shared_algo,
self.da_block_costs_service,
)
.await;

self.da_block_costs_provider
.service
.start_and_await()
.await?;
let inner_service =
GasPriceService::new(starting_height, updater, self.shared_algo).await;
Ok(inner_service)
}
}
Expand All @@ -193,7 +194,7 @@ pub fn get_synced_gas_price_updater(
mut gas_price_db: Database<GasPriceDatabase, RegularStage<GasPriceDatabase>>,
on_chain_db: Database<OnChain, RegularStage<OnChain>>,
block_stream: BoxStream<SharedImportResult>,
da_block_costs: DaBlockCostsProvider,
da_block_costs: DaBlockCostsSharedState,
) -> anyhow::Result<Updater> {
let mut first_run = false;
let latest_block_height: u32 = on_chain_db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub struct DaBlockCosts {
pub blob_cost_wei: u128,
}

pub trait GetDaBlockCosts: Send + Sync + Clone {
pub trait GetDaBlockCosts: Send + Sync {
fn get(&mut self) -> Result<Option<DaBlockCosts>>;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
use crate::fuel_gas_price_updater::{
da_source_adapter::service::{
new_service,
DaBlockCostsService,
DaBlockCostsSource,
},
DaBlockCosts,
GetDaBlockCosts,
Result as GasPriceUpdaterResult,
};
use std::sync::Arc;
use fuel_core_services::ServiceRunner;
use std::{
sync::Arc,
time::Duration,
};
use tokio::sync::{
mpsc,
Mutex,
Expand All @@ -16,21 +25,42 @@ pub mod service;
pub const POLLING_INTERVAL_MS: u64 = 10_000;

#[derive(Clone)]
pub struct DaBlockCostsProvider {
receiver: Arc<Mutex<mpsc::Receiver<DaBlockCosts>>>,
pub struct DaBlockCostsSharedState {
inner: Arc<Mutex<mpsc::Receiver<DaBlockCosts>>>,
}

impl DaBlockCostsProvider {
fn from_receiver(receiver: mpsc::Receiver<DaBlockCosts>) -> Self {
impl DaBlockCostsSharedState {
fn new(receiver: mpsc::Receiver<DaBlockCosts>) -> Self {
Self {
inner: Arc::new(Mutex::new(receiver)),
}
}
}

pub struct DaBlockCostsProvider<T: DaBlockCostsSource + 'static> {
pub service: ServiceRunner<DaBlockCostsService<T>>,
pub shared_state: DaBlockCostsSharedState,
}

const CHANNEL_BUFFER_SIZE: usize = 10;

impl<T> DaBlockCostsProvider<T>
where
T: DaBlockCostsSource,
{
pub fn new(source: T, polling_interval: Option<Duration>) -> Self {
let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let service = new_service(source, sender, polling_interval);
Self {
receiver: Arc::new(Mutex::new(receiver)),
shared_state: DaBlockCostsSharedState::new(receiver),
service,
}
}
}

impl GetDaBlockCosts for DaBlockCostsProvider {
impl GetDaBlockCosts for DaBlockCostsSharedState {
fn get(&mut self) -> GasPriceUpdaterResult<Option<DaBlockCosts>> {
if let Ok(mut guard) = self.receiver.try_lock() {
if let Ok(mut guard) = self.inner.try_lock() {
if let Ok(da_block_costs) = guard.try_recv() {
return Ok(Some(da_block_costs));
}
Expand All @@ -43,10 +73,7 @@ impl GetDaBlockCosts for DaBlockCostsProvider {
#[cfg(test)]
mod tests {
use super::*;
use crate::fuel_gas_price_updater::da_source_adapter::{
dummy_costs::DummyDaBlockCosts,
service::new_service,
};
use crate::fuel_gas_price_updater::da_source_adapter::dummy_costs::DummyDaBlockCosts;
use fuel_core_services::Service;
use std::time::Duration;
use tokio::time::sleep;
Expand All @@ -60,13 +87,16 @@ mod tests {
blob_cost_wei: 2,
};
let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone()));
let service = new_service(da_block_costs_source, Some(Duration::from_millis(1)));
let mut shared_state = service.shared.clone();
let provider = DaBlockCostsProvider::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
);
let mut shared_state = provider.shared_state.clone();

// when
service.start_and_await().await.unwrap();
provider.service.start_and_await().await.unwrap();
sleep(Duration::from_millis(10)).await;
service.stop_and_await().await.unwrap();
provider.service.stop_and_await().await.unwrap();

// then
let da_block_costs_opt = shared_state.get().unwrap();
Expand All @@ -83,13 +113,16 @@ mod tests {
blob_cost_wei: 1,
};
let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone()));
let service = new_service(da_block_costs_source, Some(Duration::from_millis(1)));
let mut shared_state = service.shared.clone();
let service = DaBlockCostsProvider::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
);
let mut shared_state = service.shared_state.clone();

// when
service.start_and_await().await.unwrap();
service.service.start_and_await().await.unwrap();
sleep(Duration::from_millis(10)).await;
service.stop_and_await().await.unwrap();
service.service.stop_and_await().await.unwrap();
let da_block_costs_opt = shared_state.get().unwrap();
assert!(da_block_costs_opt.is_some());
assert_eq!(da_block_costs_opt.unwrap(), expected_da_cost);
Expand All @@ -103,13 +136,16 @@ mod tests {
async fn run__when_da_block_cost_source_errors_shared_value_is_not_updated() {
// given
let da_block_costs_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")));
let service = new_service(da_block_costs_source, Some(Duration::from_millis(1)));
let mut shared_state = service.shared.clone();
let service = DaBlockCostsProvider::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
);
let mut shared_state = service.shared_state.clone();

// when
service.start_and_await().await.unwrap();
service.service.start_and_await().await.unwrap();
sleep(Duration::from_millis(10)).await;
service.stop_and_await().await.unwrap();
service.service.stop_and_await().await.unwrap();

// then
let da_block_costs_opt = shared_state.get().unwrap();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::fuel_gas_price_updater::{
da_source_adapter::{
DaBlockCostsProvider,
POLLING_INTERVAL_MS,
},
da_source_adapter::POLLING_INTERVAL_MS,
DaBlockCosts,
};
use fuel_core_services::{
Expand All @@ -25,15 +22,12 @@ use tokio::{

pub use anyhow::Result;

const CHANNEL_BUFFER_SIZE: usize = 10;

/// This struct houses the shared_state, polling interval
/// and a source, which does the actual fetching of the data
pub struct DaBlockCostsService<Source>
where
Source: DaBlockCostsSource,
{
block_cost_provider: DaBlockCostsProvider,
poll_interval: Interval,
source: Source,
sender: Sender<DaBlockCosts>,
Expand All @@ -44,13 +38,14 @@ impl<Source> DaBlockCostsService<Source>
where
Source: DaBlockCostsSource,
{
pub fn new(source: Source, poll_interval: Option<Duration>) -> Self {
pub fn new(
source: Source,
sender: Sender<DaBlockCosts>,
poll_interval: Option<Duration>,
) -> Self {
#[allow(clippy::arithmetic_side_effects)]
let (sender, receiver) = tokio::sync::mpsc::channel(CHANNEL_BUFFER_SIZE);
let block_cost_provider = DaBlockCostsProvider::from_receiver(receiver);
Self {
sender,
block_cost_provider,
poll_interval: interval(
poll_interval.unwrap_or(Duration::from_millis(POLLING_INTERVAL_MS)),
),
Expand All @@ -74,15 +69,13 @@ where
{
const NAME: &'static str = "DaBlockCostsService";

type SharedData = DaBlockCostsProvider;
type SharedData = ();

type Task = Self;

type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {
self.block_cost_provider.clone()
}
fn shared_data(&self) -> Self::SharedData {}

async fn into_task(
mut self,
Expand All @@ -100,7 +93,7 @@ where
Source: DaBlockCostsSource,
{
/// This function polls the source according to a polling interval
/// described by the DaSourceService
/// described by the DaBlockCostsService
async fn run(&mut self, state_watcher: &mut StateWatcher) -> Result<bool> {
let continue_running;

Expand Down Expand Up @@ -130,7 +123,8 @@ where

pub fn new_service<S: DaBlockCostsSource>(
da_source: S,
sender: Sender<DaBlockCosts>,
poll_interval: Option<Duration>,
) -> ServiceRunner<DaBlockCostsService<S>> {
ServiceRunner::new(DaBlockCostsService::new(da_source, poll_interval))
ServiceRunner::new(DaBlockCostsService::new(da_source, sender, poll_interval))
}
Loading

0 comments on commit 96af5d0

Please sign in to comment.