Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(indexeddb): fix IDB cursor.continue_() call after drop #2028

Merged
merged 19 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::BlockHeaderStorageTable;
use async_trait::async_trait;
use chain::BlockHeader;
use mm2_core::mm_ctx::MmArc;
use mm2_db::indexed_db::cursor_prelude::CursorError;
use mm2_db::indexed_db::{BeBigUint, ConstructibleDb, DbIdentifier, DbInstance, DbLocked, IndexedDb, IndexedDbBuilder,
InitDbResult, MultiIndex, SharedDb};
use mm2_err_handle::prelude::*;
Expand Down Expand Up @@ -187,6 +188,7 @@ impl BlockHeaderStorageOps for IDBBlockHeadersStorage {
// Cursor returns values from the lowest to highest key indexes.
// But we need to get the most highest height, so reverse the cursor direction.
.reverse()
.where_first()
.open_cursor(BlockHeaderStorageTable::TICKER_HEIGHT_INDEX)
.await
.map_err(|err| BlockHeaderStorageError::get_err(&ticker, err.to_string()))?
Expand Down Expand Up @@ -222,7 +224,12 @@ impl BlockHeaderStorageOps for IDBBlockHeadersStorage {
.await
.map_err(|err| BlockHeaderStorageError::table_err(&ticker, err.to_string()))?;

let mut cursor = block_headers_db
let condition = move |block| {
serde_json::from_value::<BlockHeaderStorageTable>(block)
.map_to_mm(|err| CursorError::ErrorDeserializingItem(err.to_string()))
.map(|header| header.bits != max_bits)
};
let maybe_next = block_headers_db
.cursor_builder()
.only("ticker", ticker.clone())
.map_err(|err| BlockHeaderStorageError::get_err(&ticker, err.to_string()))?
Expand All @@ -232,19 +239,15 @@ impl BlockHeaderStorageOps for IDBBlockHeadersStorage {
// Cursor returns values from the lowest to highest key indexes.
// But we need to get the most highest height, so reverse the cursor direction.
.reverse()
.where_(condition)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if there are more than one object/record that satisfies the condition? I see that we return the first object, are we sure we won't get Uncaught Error: closure invoked recursively or after being dropped. This is part of the reason I wanted where and limit, so we can get more than one object that satisfies the condition, if we want this in other cases (other than the case here where we want only the first object).

P.S. If we don't get the error when there are more than one object that satisfies the condition, then the inclusion of limit and refactoring of where to return more than one object if they exist can be done in a different PR since this PR purpose is to fix the error only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm this seems like another enhancement that's not related to this PR.

This PR only handle case for retrieving the first item that meets a condition(if specified) which solves js exception error: Uncaught Error: closure invoked recursively or after being But for sure this PR indeed fixed that and also optimizes calls to indexeddb.

I will work on the later issue during next print.

.open_cursor(BlockHeaderStorageTable::TICKER_HEIGHT_INDEX)
.await
.map_err(|err| BlockHeaderStorageError::get_err(&ticker, err.to_string()))?;

while let Some((_item_id, header)) = cursor
.map_err(|err| BlockHeaderStorageError::get_err(&ticker, err.to_string()))?
.next()
.await
.map_err(|err| BlockHeaderStorageError::get_err(&ticker, err.to_string()))?
{
if header.bits == max_bits {
continue;
}
.map_err(|err| BlockHeaderStorageError::get_err(&ticker, err.to_string()))?;

if let Some((_item_id, header)) = maybe_next {
let serialized = &hex::decode(header.raw_header).map_err(|e| BlockHeaderStorageError::DecodeError {
coin: ticker.clone(),
reason: e.to_string(),
Expand All @@ -254,7 +257,7 @@ impl BlockHeaderStorageOps for IDBBlockHeadersStorage {
reader
.read()
.map_err(|e: serialization::Error| BlockHeaderStorageError::DecodeError {
coin: ticker,
coin: ticker.clone(),
laruh marked this conversation as resolved.
Show resolved Hide resolved
reason: e.to_string(),
})?;

Expand Down
18 changes: 15 additions & 3 deletions mm2src/mm2_db/src/indexed_db/drivers/cursor/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod multi_key_cursor;
mod single_key_bound_cursor;
mod single_key_cursor;

use crate::indexed_db::indexed_cursor::CursorCondition;
use empty_cursor::IdbEmptyCursor;
use multi_key_bound_cursor::IdbMultiKeyBoundCursor;
use multi_key_cursor::IdbMultiKeyCursor;
Expand Down Expand Up @@ -241,7 +242,7 @@ impl CursorDriver {
})
}

pub(crate) async fn next(&mut self) -> CursorResult<Option<(ItemId, Json)>> {
pub(crate) async fn next(&mut self, where_: Option<CursorCondition>) -> CursorResult<Option<(ItemId, Json)>> {
loop {
// Check if we got `CursorAction::Stop` at the last iteration.
if self.stopped {
Expand Down Expand Up @@ -281,7 +282,18 @@ impl CursorDriver {
let item: InternalItem =
deserialize_from_js(js_value).map_to_mm(|e| CursorError::ErrorDeserializingItem(e.to_string()))?;

let (item_action, cursor_action) = self.inner.on_iteration(key)?;
let (mut item_action, cursor_action) = self.inner.on_iteration(key)?;

let (id, val) = item.into_pair();
// Checks if the given `where_` condition, represented by an optional closure (`cursor_condition`),
// is satisfied for the provided `item`. If the condition is met, return the corresponding `(id, val)` or skip to the next item.
if let Some(cursor_condition) = &where_ {
if cursor_condition(val.clone())? {
return Ok(Some((id, val)));
shamardy marked this conversation as resolved.
Show resolved Hide resolved
} else {
item_action = CursorItemAction::Skip
}
};

match cursor_action {
CursorAction::Continue => cursor.continue_().map_to_mm(|e| CursorError::AdvanceError {
Expand All @@ -302,7 +314,7 @@ impl CursorDriver {
}

match item_action {
CursorItemAction::Include => return Ok(Some(item.into_pair())),
CursorItemAction::Include => return Ok(Some((id, val))),
// Try to fetch the next item.
CursorItemAction::Skip => (),
}
Expand Down
186 changes: 183 additions & 3 deletions mm2src/mm2_db/src/indexed_db/indexed_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,19 @@ use std::marker::PhantomData;

pub(super) type DbCursorEventTx = mpsc::UnboundedSender<DbCursorEvent>;
pub(super) type DbCursorEventRx = mpsc::UnboundedReceiver<DbCursorEvent>;
pub(super) type CursorCondition = Box<dyn Fn(Json) -> CursorResult<bool> + Send + 'static>;

pub struct CursorBuilder<'transaction, 'reference, Table: TableSignature> {
db_table: &'reference DbTable<'transaction, Table>,
filters: CursorFilters,
where_: Option<CursorCondition>,
}

impl<'transaction, 'reference, Table: TableSignature> CursorBuilder<'transaction, 'reference, Table> {
pub(crate) fn new(db_table: &'reference DbTable<'transaction, Table>) -> Self {
CursorBuilder {
db_table,
where_: None,
filters: CursorFilters::default(),
}
}
Expand Down Expand Up @@ -112,6 +115,36 @@ impl<'transaction, 'reference, Table: TableSignature> CursorBuilder<'transaction
self
}

/// Sets a filtering condition for the cursor using the provided closure (`f`).
/// The closure should take a reference to a value and return a boolean indicating whether the
/// cursor should return this item or none if not found in the store.
/// ```rust
/// let cursor_builder = CursorBuilder::new();
///
/// // Define a closure to filter items based on a condition
/// let condition = |item: Json| -> CursorResult<bool> {
/// // Replace this with your actual condition logic
/// Ok(item.get("property").is_some())
/// };
///
/// // Apply the closure to the cursor builder using the where_ method
/// let updated_cursor_builder = cursor_builder.where_(condition);
/// ```
pub fn where_<F>(mut self, f: F) -> CursorBuilder<'transaction, 'reference, Table>
where
F: Fn(Json) -> CursorResult<bool> + Send + 'static,
{
self.where_ = Some(Box::new(f));
self
}

/// ```rust
/// let cursor_builder = CursorBuilder::new();
/// // Apply the default condition to the cursor builder to return the first item
/// let updated_cursor_builder = cursor_builder.where_first().open_cursor().next();
/// ```
pub fn where_first(self) -> CursorBuilder<'transaction, 'reference, Table> { self.where_(|_| Ok(true)) }

/// Opens a cursor by the specified `index`.
/// https://developer.mozilla.org/en-US/docs/Web/API/IDBObjectStore/openCursor
pub async fn open_cursor(self, index: &str) -> CursorResult<CursorIter<'transaction, Table>> {
Expand All @@ -124,13 +157,15 @@ impl<'transaction, 'reference, Table: TableSignature> CursorBuilder<'transaction
})?;
Ok(CursorIter {
event_tx,
where_: self.where_,
phantom: PhantomData::default(),
})
}
}

pub struct CursorIter<'transaction, Table> {
event_tx: DbCursorEventTx,
where_: Option<CursorCondition>,
phantom: PhantomData<&'transaction Table>,
}

Expand All @@ -140,7 +175,10 @@ impl<'transaction, Table: TableSignature> CursorIter<'transaction, Table> {
pub async fn next(&mut self) -> CursorResult<Option<(ItemId, Table)>> {
let (result_tx, result_rx) = oneshot::channel();
self.event_tx
.send(DbCursorEvent::NextItem { result_tx })
.send(DbCursorEvent::NextItem {
result_tx,
where_: self.where_.take(),
})
.await
.map_to_mm(|e| CursorError::UnexpectedState(format!("Error sending cursor event: {e}")))?;
let maybe_item = result_rx
Expand All @@ -166,14 +204,15 @@ impl<'transaction, Table: TableSignature> CursorIter<'transaction, Table> {
pub enum DbCursorEvent {
NextItem {
result_tx: oneshot::Sender<CursorResult<Option<(ItemId, Json)>>>,
where_: Option<CursorCondition>,
},
}

pub(crate) async fn cursor_event_loop(mut rx: DbCursorEventRx, mut cursor: CursorDriver) {
while let Some(event) = rx.next().await {
match event {
DbCursorEvent::NextItem { result_tx } => {
result_tx.send(cursor.next().await).ok();
DbCursorEvent::NextItem { result_tx, where_ } => {
result_tx.send(cursor.next(where_).await).ok();
},
}
}
Expand Down Expand Up @@ -884,4 +923,145 @@ mod tests {
// Try to poll one more time. This should not fail but return `None`.
assert!(next_item(&mut cursor_iter).await.is_none());
}

#[wasm_bindgen_test]
async fn test_cursor_where_condition() {
const DB_NAME: &str = "TEST_REV_ITER_SINGLE_KEY_BOUND_CURSOR";
const DB_VERSION: u32 = 1;

register_wasm_log();

let items = vec![
swap_item!("uuid1", "RICK", "MORTY", 10, 3, 700),
swap_item!("uuid2", "MORTY", "KMD", 95000, 1, 721),
swap_item!("uuid3", "RICK", "XYZ", 7, u32::MAX, 1281), // +
swap_item!("uuid4", "RICK", "MORTY", 8, 6, 92), // +
swap_item!("uuid5", "QRC20", "RICK", 2, 4, 721),
swap_item!("uuid6", "KMD", "MORTY", 12, 3124, 214), // +
];

let db = IndexedDbBuilder::new(DbIdentifier::for_test(DB_NAME))
.with_version(DB_VERSION)
.with_table::<SwapTable>()
.build()
.await
.expect("!IndexedDb::init");
let transaction = db.transaction().await.expect("!IndexedDb::transaction");
let table = transaction
.table::<SwapTable>()
.await
.expect("!DbTransaction::open_table");
fill_table(&table, items).await;

// check for first swap where started_at is 1281.
let condition = move |swap| {
let swap = serde_json::from_value::<SwapTable>(swap).unwrap();
Ok(swap.started_at == 1281)
};
let maybe_swap = table
.cursor_builder()
.bound("rel_coin_value", 5u32, u32::MAX)
.where_(condition)
.open_cursor("rel_coin_value")
.await
.expect("!CursorBuilder::open_cursor")
.next()
.await
.expect("!Cursor next result")
.map(|(_, swap)| swap);

// maybe_swap should return swap with uuid3 since it's swap uuid3 that has started_at to be 1281.
assert_eq!(maybe_swap, Some(swap_item!("uuid3", "RICK", "XYZ", 7, u32::MAX, 1281)));
}

#[wasm_bindgen_test]
async fn test_cursor_where_first_condition() {
const DB_NAME: &str = "TEST_REV_ITER_SINGLE_KEY_BOUND_CURSOR";
const DB_VERSION: u32 = 1;

register_wasm_log();

let items = vec![
swap_item!("uuid1", "RICK", "MORTY", 10, 3, 700),
swap_item!("uuid2", "MORTY", "KMD", 95000, 1, 721),
swap_item!("uuid3", "RICK", "XYZ", 7, u32::MAX, 1281), // +
swap_item!("uuid4", "RICK", "MORTY", 8, 6, 92), // +
swap_item!("uuid5", "QRC20", "RICK", 2, 4, 721),
swap_item!("uuid6", "KMD", "MORTY", 12, 3124, 214), // +
];

let db = IndexedDbBuilder::new(DbIdentifier::for_test(DB_NAME))
.with_version(DB_VERSION)
.with_table::<SwapTable>()
.build()
.await
.expect("!IndexedDb::init");
let transaction = db.transaction().await.expect("!IndexedDb::transaction");
let table = transaction
.table::<SwapTable>()
.await
.expect("!DbTransaction::open_table");
fill_table(&table, items).await;

let maybe_swap = table
.cursor_builder()
.bound("rel_coin_value", 5u32, u32::MAX)
.where_first()
.open_cursor("rel_coin_value")
.await
.expect("!CursorBuilder::open_cursor")
.next()
.await
.expect("!Cursor next result")
.map(|(_, swap)| swap);

// maybe_swap should return swap with uuid4 since it's the item with the lowest rel_coin_value in the store.
assert_eq!(maybe_swap, Some(swap_item!("uuid4", "RICK", "MORTY", 8, 6, 92)));
}

#[wasm_bindgen_test]
async fn test_cursor_where_first_but_reversed_condition() {
const DB_NAME: &str = "TEST_REV_ITER_SINGLE_KEY_BOUND_CURSOR";
const DB_VERSION: u32 = 1;

register_wasm_log();

let items = vec![
swap_item!("uuid1", "RICK", "MORTY", 10, 3, 700),
swap_item!("uuid2", "MORTY", "KMD", 95000, 1, 721),
swap_item!("uuid3", "RICK", "XYZ", 7, u32::MAX, 1281), // +
swap_item!("uuid4", "RICK", "MORTY", 8, 6, 92), // +
swap_item!("uuid5", "QRC20", "RICK", 2, 4, 721),
swap_item!("uuid6", "KMD", "MORTY", 12, 3124, 214), // +
];

let db = IndexedDbBuilder::new(DbIdentifier::for_test(DB_NAME))
.with_version(DB_VERSION)
.with_table::<SwapTable>()
.build()
.await
.expect("!IndexedDb::init");
let transaction = db.transaction().await.expect("!IndexedDb::transaction");
let table = transaction
.table::<SwapTable>()
.await
.expect("!DbTransaction::open_table");
fill_table(&table, items).await;

let maybe_swap = table
.cursor_builder()
.bound("rel_coin_value", 5u32, u32::MAX)
.where_first()
.reverse()
.open_cursor("rel_coin_value")
.await
.expect("!CursorBuilder::open_cursor")
.next()
.await
.expect("!Cursor next result")
.map(|(_, swap)| swap);

// maybe_swap should return swap with uuid4 since it's the item with the highest rel_coin_value in the store.
assert_eq!(maybe_swap, Some(swap_item!("uuid3", "RICK", "XYZ", 7, u32::MAX, 1281)));
}
}
Loading