Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support backward iteration in the RocksDB #1492

Merged
merged 11 commits into from
Nov 20, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Added

- [#1492](https://github.com/FuelLabs/fuel-core/pull/1492): Support backward iteration in the RocksDB. It allows backward queries that were not allowed before.
- [#1490](https://github.com/FuelLabs/fuel-core/pull/1490): Add push and pop benchmarks.
- [#1485](https://github.com/FuelLabs/fuel-core/pull/1485): Prepare rc release of fuel core v0.21
- [#1476](https://github.com/FuelLabs/fuel-core/pull/1453): Add the majority of the "other" benchmarks for contract opcodes.
Expand Down
6 changes: 0 additions & 6 deletions crates/fuel-core/src/schema/coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::{
U64,
},
};
use anyhow::anyhow;
use async_graphql::{
connection::{
Connection,
Expand Down Expand Up @@ -167,11 +166,6 @@ impl CoinQuery {
last: Option<i32>,
before: Option<String>,
) -> async_graphql::Result<Connection<UtxoId, Coin, EmptyFields, EmptyFields>> {
// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(anyhow!("reverse pagination isn't supported for this coins").into())
}

let query: &Database = ctx.data_unchecked();
crate::schema::query_pagination(after, before, first, last, |start, direction| {
let owner: fuel_tx::Address = filter.owner.into();
Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-core/src/schema/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
U64,
},
};
use anyhow::anyhow;
use async_graphql::{
connection::{
Connection,
Expand Down Expand Up @@ -138,13 +137,6 @@ impl ContractBalanceQuery {
> {
let query: &Database = ctx.data_unchecked();

// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(
anyhow!("reverse pagination isn't supported for this resource").into(),
)
}

crate::schema::query_pagination(after, before, first, last, |start, direction| {
let balances = query
.contract_balances(
Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-core/src/schema/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ impl MessageQuery {
};

let messages = if let Some(owner) = owner {
// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(anyhow!(
"reverse pagination isn't supported for this resource"
)
.into())
}

query.owned_messages(&owner.0, start, direction)
} else {
query.all_messages(start, direction)
Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
TxPointer,
},
};
use anyhow::anyhow;
use async_graphql::{
connection::{
Connection,
Expand Down Expand Up @@ -168,13 +167,6 @@ impl TxQuery {
before: Option<String>,
) -> async_graphql::Result<Connection<TxPointer, Transaction, EmptyFields, EmptyFields>>
{
// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(
anyhow!("reverse pagination isn't supported for this resource").into(),
)
}

let query: &Database = ctx.data_unchecked();
let config = ctx.data_unchecked::<Config>();
let owner = fuel_types::Address::from(owner);
Expand Down
86 changes: 80 additions & 6 deletions crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,63 @@ impl RocksDb {
opts
}

/// RocksDB prefix iteration doesn't support reverse order,
/// but seeking the start key and iterating in reverse order works.
/// So we can create a workaround. We need to find the next available
/// element and use it as an anchor for reverse iteration,
/// but skip the first element to jump on the previous prefix.
/// If we can't find the next element, we are at the end of the list,
/// so we can use `IteratorMode::End` to start reverse iteration.
fn reverse_prefix_iter(
&self,
prefix: &[u8],
column: Column,
) -> impl Iterator<Item = KVItem> + '_ {
let maybe_next_item = next_prefix(prefix.to_vec())
.and_then(|next_prefix| {
self.iter_all(
column,
Some(next_prefix.as_slice()),
None,
IterDirection::Forward,
)
.next()
})
.and_then(|res| res.ok());

if let Some((next_start_key, _)) = maybe_next_item {
let iter_mode = IteratorMode::From(
next_start_key.as_slice(),
rocksdb::Direction::Reverse,
);
let prefix = prefix.to_vec();
self
._iter_all(column, ReadOptions::default(), iter_mode)
// Skip the element under the `next_start_key` key.
.skip(1)
.take_while(move |item| {
if let Ok((key, _)) = item {
key.starts_with(prefix.as_slice())
} else {
true
}
})
.into_boxed()
} else {
// No next item, so we can start backward iteration from the end.
let prefix = prefix.to_vec();
self._iter_all(column, ReadOptions::default(), IteratorMode::End)
.take_while(move |item| {
if let Ok((key, _)) = item {
key.starts_with(prefix.as_slice())
} else {
true
}
})
.into_boxed()
}
}

fn _iter_all(
&self,
column: Column,
Expand Down Expand Up @@ -321,13 +378,19 @@ impl KeyValueStore for RocksDb {
.into_boxed()
}
(Some(prefix), None) => {
// start iterating in a certain direction within the keyspace
let iter_mode =
IteratorMode::From(prefix, convert_to_rocksdb_direction(direction));
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
if direction == IterDirection::Reverse {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this section could be a little easier to read.

Something like:

match direction {
    IterDirection::Reverse => self.reverse_prefix_iter(prefix, ...),
    _ => self.prefix_iter(prefix, ...),
}

Then inside the new reverse_prefix_iter, instead of returning early, just do something like:

let maybe_next_item = next_prefix(prefix.to_vec())
    .and_then(|next_prefix| self
                            .iter_all(
                                column,
                                Some(next_prefix.as_slice()),
                                None,
                                IterDirection::Forward,
                            )
                            .next())
    .and_then(|res| res.ok());
if let Some(next_item) = maybe_next_item {
    ...
} else {
    ...
}

Of course, breaking those down into into smaller functions wouldn't hurt either, but those are the changes that would improve readability the most.

self.reverse_prefix_iter(prefix, column).into_boxed()
} else {
// start iterating in a certain direction within the keyspace
let iter_mode = IteratorMode::From(
prefix,
convert_to_rocksdb_direction(direction),
);
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);

self._iter_all(column, opts, iter_mode).into_boxed()
self._iter_all(column, opts, iter_mode).into_boxed()
}
}
(None, Some(start)) => {
// start iterating in a certain direction from the start key
Expand Down Expand Up @@ -503,6 +566,17 @@ impl TransactableStorage for RocksDb {
}
}

/// The `None` means overflow, so there is not following prefix.
fn next_prefix(mut prefix: Vec<u8>) -> Option<Vec<u8>> {
for byte in prefix.iter_mut().rev() {
if let Some(new_byte) = byte.checked_add(1) {
*byte = new_byte;
return Some(prefix)
}
}
None
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
7 changes: 1 addition & 6 deletions tests/tests/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ async fn test_contract_balance(
#[rstest]
#[tokio::test]
async fn test_5_contract_balances(
#[values(PageDirection::Forward)] direction: PageDirection,
// #[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
// Rocksdb doesn't support reverse seeks using a prefix, we'd need to implement a custom
// comparator to support this usecase.
// > One common bug of using prefix iterating is to use prefix mode to iterate in reverse order. But it is not yet supported.
// https://github.com/facebook/rocksdb/wiki/Prefix-Seek#limitation
#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
) {
let mut test_builder = TestSetupBuilder::new(SEED);
let (_, contract_id) = test_builder.setup_contract(
Expand Down
4 changes: 1 addition & 3 deletions tests/tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,7 @@ async fn messages_by_owner_returns_messages_for_the_given_owner() {
#[rstest]
#[tokio::test]
async fn messages_empty_results_for_owner_with_no_messages(
#[values(PageDirection::Forward)] direction: PageDirection,
//#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
// reverse iteration with prefix not supported by rocksdb
#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
#[values(Address::new([16; 32]), Address::new([0; 32]))] owner: Address,
) {
let srv = FuelService::new_node(Config::local_node()).await.unwrap();
Expand Down
50 changes: 38 additions & 12 deletions tests/tests/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,12 @@ async fn get_transactions() {
assert!(response.has_previous_page);
}

#[test_case::test_case(PageDirection::Forward; "forward")]
#[test_case::test_case(PageDirection::Backward; "backward")]
#[tokio::test]
async fn get_transactions_by_owner_forward_and_backward_iterations() {
async fn get_transactions_by_owner_returns_correct_number_of_results(
direction: PageDirection,
) {
let alice = Address::from([1; 32]);
let bob = Address::from([2; 32]);

Expand All @@ -397,7 +401,7 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() {
let all_transactions_forward = PaginationRequest {
cursor: None,
results: 10,
direction: PageDirection::Forward,
direction,
};
let response = client
.transactions_by_owner(&bob, all_transactions_forward)
Expand All @@ -412,24 +416,46 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() {
})
.collect_vec();
assert_eq!(transactions_forward.len(), 5);
}

#[test_case::test_case(PageDirection::Forward; "forward")]
#[test_case::test_case(PageDirection::Backward; "backward")]
#[tokio::test]
async fn get_transactions_by_owner_supports_cursor(direction: PageDirection) {
let alice = Address::from([1; 32]);
let bob = Address::from([2; 32]);

let mut context = TestContext::new(100).await;
let _ = context.transfer(alice, bob, 1).await.unwrap();
let _ = context.transfer(alice, bob, 2).await.unwrap();
let _ = context.transfer(alice, bob, 3).await.unwrap();
let _ = context.transfer(alice, bob, 4).await.unwrap();
let _ = context.transfer(alice, bob, 5).await.unwrap();

let client = context.client;

let all_transactions_backward = PaginationRequest {
let all_transactions_forward = PaginationRequest {
cursor: None,
results: 10,
direction: PageDirection::Backward,
direction,
};
let response = client
.transactions_by_owner(&bob, all_transactions_backward)
.await;
// Backward request is not supported right now.
assert!(response.is_err());

///////////////// Iteration
.transactions_by_owner(&bob, all_transactions_forward)
.await
.unwrap();
let transactions_forward = response
.results
.into_iter()
.map(|tx| {
assert!(matches!(tx.status, TransactionStatus::Success { .. }));
tx.transaction
})
.collect_vec();

let forward_iter_three = PaginationRequest {
cursor: None,
results: 3,
direction: PageDirection::Forward,
direction,
};
let response_after_iter_three = client
.transactions_by_owner(&bob, forward_iter_three)
Expand All @@ -451,7 +477,7 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() {
let forward_iter_next_two = PaginationRequest {
cursor: response_after_iter_three.cursor.clone(),
results: 2,
direction: PageDirection::Forward,
direction,
};
let response = client
.transactions_by_owner(&bob, forward_iter_next_two)
Expand Down
Loading