diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 8363a22f..86d7a10e 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -854,12 +854,15 @@ impl DatabaseReadOperations for T { .one(self.get_connection()) .await? { - // Yield n messages starting from the found queue index + 1. + // Yield n messages starting from the found queue index + 1. Only return + // messages that have not been skipped (skipped = false) to handle the edge + // case where the last message in a batch is skipped. + let condition = Condition::all() + // We add 1 to the queue index to constrain across block boundaries + .add(models::l1_message::Column::QueueIndex.gte(record.queue_index + 1)) + .add(models::l1_message::Column::Skipped.eq(false)); Ok(models::l1_message::Entity::find() - .filter( - // We add 1 to the queue index to constrain across block boundaries - models::l1_message::Column::QueueIndex.gte(record.queue_index + 1), - ) + .filter(condition) .order_by_asc(models::l1_message::Column::QueueIndex) .limit(Some(n as u64)) .all(self.get_connection()) @@ -871,6 +874,7 @@ impl DatabaseReadOperations for T { // index starting from the beginning. else { Ok(models::l1_message::Entity::find() + .filter(models::l1_message::Column::Skipped.eq(false)) .order_by_asc(models::l1_message::Column::QueueIndex) .limit(Some(n as u64)) .all(self.get_connection())