Skip to content

Commit f15ffb9

Browse files
committed
add possibility to filter by processed to get_batch_by_index
1 parent 937b0e0 commit f15ffb9

File tree

4 files changed

+34
-20
lines changed

4 files changed

+34
-20
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -722,12 +722,14 @@ impl<
722722

723723
// Perform a consistency check to ensure the previous commit batch exists in the
724724
// database.
725-
if tx.get_batch_by_index(prev_batch_index).await?.is_none() {
725+
if tx.get_batch_by_index(prev_batch_index, None).await?.is_none() {
726726
return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index));
727727
}
728728

729729
// Check if batch already exists in DB.
730-
if let Some(existing_batch) = tx.get_batch_by_index(batch_clone.index).await? {
730+
if let Some(existing_batch) =
731+
tx.get_batch_by_index(batch_clone.index, Some(true)).await?
732+
{
731733
if existing_batch.hash == batch_clone.hash {
732734
// This means we have already processed this batch commit, we will skip
733735
// it.

crates/database/db/src/db.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -413,11 +413,12 @@ impl DatabaseReadOperations for Database {
413413
async fn get_batch_by_index(
414414
&self,
415415
batch_index: u64,
416+
processed: Option<bool>,
416417
) -> Result<Option<BatchCommitData>, DatabaseError> {
417418
metered!(
418419
DatabaseOperation::GetBatchByIndex,
419420
self,
420-
tx(move |tx| async move { tx.get_batch_by_index(batch_index).await })
421+
tx(move |tx| async move { tx.get_batch_by_index(batch_index, processed).await })
421422
)
422423
}
423424

@@ -735,7 +736,7 @@ mod test {
735736
// Round trip the BatchCommitData through the database.
736737
db.insert_batch(batch_commit.clone()).await.unwrap();
737738
let batch_commit_from_db =
738-
db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap();
739+
db.get_batch_by_index(batch_commit.index, None).await.unwrap().unwrap();
739740

740741
assert_eq!(batch_commit, batch_commit_from_db);
741742
}
@@ -1249,7 +1250,7 @@ mod test {
12491250

12501251
// Insert L2 blocks with different batch indices
12511252
for i in 100..110 {
1252-
let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap();
1253+
let batch_data = db.get_batch_by_index(i, None).await.unwrap().unwrap();
12531254
let batch_info: BatchInfo = batch_data.into();
12541255
let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() };
12551256

@@ -1418,9 +1419,9 @@ mod test {
14181419
db.set_finalized_l1_block_number(21).await.unwrap();
14191420

14201421
// Verify the batches and blocks were inserted correctly
1421-
let retrieved_batch_1 = db.get_batch_by_index(1).await.unwrap().unwrap();
1422-
let retrieved_batch_2 = db.get_batch_by_index(2).await.unwrap().unwrap();
1423-
let retrieved_batch_3 = db.get_batch_by_index(3).await.unwrap().unwrap();
1422+
let retrieved_batch_1 = db.get_batch_by_index(1, None).await.unwrap().unwrap();
1423+
let retrieved_batch_2 = db.get_batch_by_index(2, None).await.unwrap().unwrap();
1424+
let retrieved_batch_3 = db.get_batch_by_index(3, None).await.unwrap().unwrap();
14241425
let retried_block_1 = db.get_l2_block_info_by_number(1).await.unwrap().unwrap();
14251426
let retried_block_2 = db.get_l2_block_info_by_number(2).await.unwrap().unwrap();
14261427
let retried_block_3 = db.get_l2_block_info_by_number(3).await.unwrap().unwrap();
@@ -1441,9 +1442,9 @@ mod test {
14411442
assert_eq!(result, (Some(block_2), Some(11)));
14421443

14431444
// Verify that batches 2 and 3 are deleted
1444-
let batch_1 = db.get_batch_by_index(1).await.unwrap();
1445-
let batch_2 = db.get_batch_by_index(2).await.unwrap();
1446-
let batch_3 = db.get_batch_by_index(3).await.unwrap();
1445+
let batch_1 = db.get_batch_by_index(1, None).await.unwrap();
1446+
let batch_2 = db.get_batch_by_index(2, None).await.unwrap();
1447+
let batch_3 = db.get_batch_by_index(3, None).await.unwrap();
14471448
assert!(batch_1.is_some());
14481449
assert!(batch_2.is_none());
14491450
assert!(batch_3.is_none());

crates/database/db/src/operations.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,8 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
374374
.map(|(_, batch_info)| batch_info)
375375
.filter(|b| b.index > 1)
376376
{
377-
let batch = self.get_batch_by_index(batch_info.index).await?.expect("batch must exist");
377+
let batch =
378+
self.get_batch_by_index(batch_info.index, None).await?.expect("batch must exist");
378379
self.delete_batches_gt_block_number(batch.block_number.saturating_sub(1)).await?;
379380
};
380381

@@ -383,7 +384,8 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
383384
else {
384385
return Ok((None, None));
385386
};
386-
let batch = self.get_batch_by_index(batch_info.index).await?.expect("batch must exist");
387+
let batch =
388+
self.get_batch_by_index(batch_info.index, None).await?.expect("batch must exist");
387389
Ok((Some(block_info), Some(batch.block_number.saturating_add(1))))
388390
}
389391

@@ -649,6 +651,7 @@ pub trait DatabaseReadOperations {
649651
async fn get_batch_by_index(
650652
&self,
651653
batch_index: u64,
654+
processed: Option<bool>,
652655
) -> Result<Option<BatchCommitData>, DatabaseError>;
653656

654657
/// Get the latest L1 block number from the database.
@@ -727,13 +730,21 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
727730
async fn get_batch_by_index(
728731
&self,
729732
batch_index: u64,
733+
processed: Option<bool>,
730734
) -> Result<Option<BatchCommitData>, DatabaseError> {
731-
Ok(models::batch_commit::Entity::find_by_id(
732-
TryInto::<i64>::try_into(batch_index).expect("index should fit in i64"),
733-
)
734-
.one(self.get_connection())
735-
.await
736-
.map(|x| x.map(Into::into))?)
735+
let query = if let Some(p) = processed {
736+
models::batch_commit::Entity::find().filter(
737+
models::batch_commit::Column::Index
738+
.eq(TryInto::<i64>::try_into(batch_index).expect("index should fit in i64"))
739+
.and(models::batch_commit::Column::Processed.eq(p)),
740+
)
741+
} else {
742+
models::batch_commit::Entity::find_by_id(
743+
TryInto::<i64>::try_into(batch_index).expect("index should fit in i64"),
744+
)
745+
};
746+
747+
Ok(query.one(self.get_connection()).await.map(|x| x.map(Into::into))?)
737748
}
738749

739750
async fn get_latest_l1_block_number(&self) -> Result<u64, DatabaseError> {

crates/derivation-pipeline/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ where
215215

216216
// get the batch commit data.
217217
let batch = db
218-
.get_batch_by_index(batch_info.index)
218+
.get_batch_by_index(batch_info.index, None)
219219
.await
220220
.map_err(|err| (batch_info.clone(), err.into()))?
221221
.ok_or((

0 commit comments

Comments
 (0)