Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(gas_price_service_v1): strictly ensure last_recorded_height is set, to avoid initial poll of da source #2556

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2233](https://github.com/FuelLabs/fuel-core/pull/2233): Introduce a new column `modification_history_v2` for storing the modification history in the historical rocksDB. Keys in this column are stored in big endian order. Changed the behaviour of the historical rocksDB to write changes for new block heights to the new column, and to perform lookup of values from the `modification_history_v2` table first, and then from the `modification_history` table, performing a migration upon access if necessary.
- [2383](https://github.com/FuelLabs/fuel-core/pull/2383): The `balance` and `balances` GraphQL query handlers now use index to provide the response in a more performant way. As the index is not created retroactively, the client must be initialized with an empty database and synced from the genesis block to utilize it. Otherwise, the legacy way of retrieving data will be used.
- [2463](https://github.com/FuelLabs/fuel-core/pull/2463): The `coinsToSpend` GraphQL query handler now uses index to provide the response in a more performant way. As the index is not created retroactively, the client must be initialized with an empty database and synced from the genesis block to utilize it. Otherwise, the legacy way of retrieving data will be used.
- [2556](https://github.com/FuelLabs/fuel-core/pull/2556): Ensure that the `last_recorded_height` is set for the DA gas price source.

#### Breaking
- [2469](https://github.com/FuelLabs/fuel-core/pull/2469): Move from `GasPriceServicev0` to `GasPriceServiceV1`. Include new config values.
Expand Down
32 changes: 22 additions & 10 deletions crates/services/gas_price_service/src/v1/da_source_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone());
let recorded_height = BlockHeight::new(0);
let latest_l2_height = Arc::new(AtomicU32::new(10u32));

let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
recorded_height,
);
let mut shared_state = &mut service.shared.subscribe();

Expand All @@ -81,10 +84,13 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone());
let latest_l2_height = Arc::new(AtomicU32::new(0));
let recorded_height = BlockHeight::new(0);

let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
recorded_height,
);
let mut shared_state = &mut service.shared.subscribe();

Expand Down Expand Up @@ -114,10 +120,13 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = Arc::new(AtomicU32::new(l2_height));
let recorded_height = BlockHeight::new(0);

let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
recorded_height,
);
let mut shared_state = &mut service.shared.subscribe();

Expand Down Expand Up @@ -145,11 +154,13 @@ mod tests {
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = Arc::new(AtomicU32::new(l2_height));
let recorded_height = BlockHeight::new(0);

let mut service = DaSourceService::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
recorded_height,
);
let mut watcher = StateWatcher::started();

Expand All @@ -158,31 +169,33 @@ mod tests {

// then
let recorded_height = service.recorded_height();
assert!(recorded_height.is_none())
assert_eq!(*recorded_height, 0);
}

#[tokio::test]
async fn run__recorded_height_updated_by_da_costs() {
// given
let l2_height = 10;
let recorded_height = 9;
let unexpected_costs = DaBlockCosts {
let l2_block_range_start = 2;
let expected_recorded_height = 9;
let costs = DaBlockCosts {
bundle_id: 1,
l2_blocks: 2..=recorded_height,
l2_blocks: l2_block_range_start..=expected_recorded_height,
bundle_size_bytes: 1024 * 128,
blob_cost_wei: 2,
};
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
DummyDaBlockCosts::new(Ok(costs.clone()), notifier.clone());
let latest_l2_height = Arc::new(AtomicU32::new(l2_height));

let (sender, mut receiver) =
tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
let mut service = DaSourceService::new_with_sender(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
(l2_block_range_start - 1).into(), /* we want to start polling from before the l2 block range */
sender,
);
let mut watcher = StateWatcher::started();
Expand All @@ -191,8 +204,7 @@ mod tests {
let next = service.run(&mut watcher).await;

// then
let actual = service.recorded_height().unwrap();
let expected = BlockHeight::from(recorded_height);
assert_eq!(expected, actual);
let actual = service.recorded_height();
assert_eq!(expected_recorded_height, *actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use std::ops::Deref;

#[async_trait::async_trait]
pub trait BlockCommitterApi: Send + Sync {
/// Used on first run to get the latest costs and seqno
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>>;
/// Used to get the costs for a specific seqno
async fn get_costs_by_l2_block_number(
&self,
Expand Down Expand Up @@ -90,17 +88,17 @@ where
{
async fn request_da_block_costs(
&mut self,
last_recorded_height: &Option<BlockHeight>,
last_recorded_height: &BlockHeight,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
let raw_da_block_costs: Vec<_> = match last_recorded_height.and_then(|x| x.succ())
{
Some(ref next_height) => {
self.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?
}
None => self.client.get_latest_costs().await?.into_iter().collect(),
};
let next_height = last_recorded_height.succ().ok_or(anyhow!(
"Failed to increment the last recorded height: {:?}",
last_recorded_height
))?;

let raw_da_block_costs: Vec<_> = self
.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?;

let da_block_costs: Vec<_> =
raw_da_block_costs.iter().map(DaBlockCosts::from).collect();
Expand Down Expand Up @@ -141,19 +139,6 @@ impl BlockCommitterApi for BlockCommitterHttpApi {
Ok(vec![])
}
}

async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
// Latest: http://localhost:8080/v1/costs?variant=latest&limit=5
if let Some(url) = &self.url {
let formatted_url = format!("{url}/v1/costs?variant=latest&limit=1");
let response = self.client.get(formatted_url).send().await?;
let raw_da_block_costs = response.json::<Vec<RawDaBlockCosts>>().await?;
// only take the first element, since we are only looking for the most recent
Ok(raw_da_block_costs.first().cloned())
} else {
Ok(None)
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -269,56 +254,6 @@ mod test_block_committer_http_api {
// then
assert_eq!(actual, expected);
}

#[test]
fn get_latest_costs__when_url_is_none__then_returns_none() {
let rt = tokio::runtime::Runtime::new().unwrap();

// given
let block_committer = BlockCommitterHttpApi::new(None);

// when
let actual =
rt.block_on(async { block_committer.get_latest_costs().await.unwrap() });

// then
assert_eq!(actual, None);
}

#[test]
fn get_latest_costs__when_url_is_some__then_returns_expected_costs() {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut mock = FakeServer::new();
let url = mock.url();

// given
let block_committer = BlockCommitterHttpApi::new(Some(url));
let not_expected = RawDaBlockCosts {
id: 1,
start_height: 1,
end_height: 10,
da_block_height: 1u64.into(),
cost: 1,
size: 1,
};
mock.add_response(not_expected);
let expected = RawDaBlockCosts {
id: 2,
start_height: 11,
end_height: 20,
da_block_height: 2u64.into(),
cost: 2,
size: 2,
};
mock.add_response(expected.clone());

// when
let actual =
rt.block_on(async { block_committer.get_latest_costs().await.unwrap() });

// then
assert_eq!(actual, Some(expected));
}
}
#[cfg(any(test, feature = "test-helpers"))]
pub mod fake_server {
Expand Down Expand Up @@ -450,9 +385,6 @@ mod tests {

#[async_trait::async_trait]
impl BlockCommitterApi for MockBlockCommitterApi {
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
Ok(self.value.clone())
}
async fn get_costs_by_l2_block_number(
&self,
l2_block_number: u32,
Expand Down Expand Up @@ -482,22 +414,6 @@ mod tests {
}
}

#[tokio::test]
async fn request_da_block_cost__when_last_value_is_none__then_get_latest_costs_is_called(
) {
// given
let da_block_costs = test_da_block_costs();
let expected = vec![(&da_block_costs).into()];
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs));
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api);

// when
let actual = block_committer.request_da_block_costs(&None).await.unwrap();

// then
assert_eq!(actual, expected);
}

#[tokio::test]
async fn request_da_block_cost__when_last_value_is_some__then_get_costs_by_l2_block_number_is_called(
) {
Expand All @@ -510,7 +426,7 @@ mod tests {

// when
let actual = block_committer
.request_da_block_costs(&Some(latest_height))
.request_da_block_costs(&latest_height)
.await
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl DummyDaBlockCosts {
impl DaBlockCostsSource for DummyDaBlockCosts {
async fn request_da_block_costs(
&mut self,
_latest_recorded_height: &Option<BlockHeight>,
_latest_recorded_height: &BlockHeight,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
match &self.value {
Ok(da_block_costs) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,15 @@ pub struct DaSourceService<Source> {
poll_interval: Interval,
source: Source,
shared_state: SharedState,
// This is the latest L2 height that is shared between this service
// and the block importer
// This is done for filtering out da block costs which reference
// a height greater than the latest L2 height
// This is a situation that occurs during syncing of the node
latest_l2_height: Arc<AtomicU32>,
recorded_height: Option<BlockHeight>,
// This is the last recorded height of the da block costs
// This is used to fetch the da block costs from the source
recorded_height: BlockHeight,
}

pub(crate) const DA_BLOCK_COSTS_CHANNEL_SIZE: usize = 16 * 1024;
Expand All @@ -62,7 +69,7 @@ where
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<AtomicU32>,
recorded_height: Option<BlockHeight>,
recorded_height: BlockHeight,
) -> Self {
let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
#[allow(clippy::arithmetic_side_effects)]
Expand All @@ -82,7 +89,7 @@ where
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<AtomicU32>,
recorded_height: Option<BlockHeight>,
recorded_height: BlockHeight,
sender: Sender<DaBlockCosts>,
) -> Self {
Self {
Expand Down Expand Up @@ -113,12 +120,8 @@ where
tracing::debug!("Sending block costs: {:?}", da_block_costs);
let end = BlockHeight::from(*da_block_costs.l2_blocks.end());
self.shared_state.0.send(da_block_costs)?;
if let Some(recorded_height) = self.recorded_height {
if end > recorded_height {
self.recorded_height = Some(end)
}
} else {
self.recorded_height = Some(end)
if end > self.recorded_height {
self.recorded_height = end
}
}
Ok(())
Expand All @@ -137,7 +140,7 @@ where
}

#[cfg(test)]
pub fn recorded_height(&self) -> Option<BlockHeight> {
pub fn recorded_height(&self) -> BlockHeight {
self.recorded_height
}
}
Expand All @@ -148,7 +151,7 @@ where
pub trait DaBlockCostsSource: Send + Sync {
async fn request_da_block_costs(
&mut self,
recorded_height: &Option<BlockHeight>,
recorded_height: &BlockHeight,
) -> Result<Vec<DaBlockCosts>>;
}

Expand Down Expand Up @@ -212,11 +215,12 @@ pub fn new_da_service<S: DaBlockCostsSource>(
da_source: S,
poll_interval: Option<Duration>,
latest_l2_height: Arc<AtomicU32>,
recorded_height: BlockHeight,
) -> ServiceRunner<DaSourceService<S>> {
ServiceRunner::new(DaSourceService::new(
da_source,
poll_interval,
latest_l2_height,
None,
recorded_height,
))
}
Loading
Loading