Skip to content

Commit

Permalink
check if item should be skipped before returning based on where condi…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
borngraced committed Feb 2, 2024
1 parent 4ec8fae commit 9390742
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 66 deletions.
140 changes: 74 additions & 66 deletions mm2src/mm2_db/src/indexed_db/drivers/cursor/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,81 +244,89 @@ impl CursorDriver {

pub(crate) async fn next(&mut self, where_: Option<CursorCondition>) -> CursorResult<Option<(ItemId, Json)>> {
loop {
// Check if we got `CursorAction::Stop` at the last iteration.
if self.stopped {
return Ok(None);
}

let event = match self.cursor_item_rx.next().await {
Some(event) => event,
None => {
self.stopped = true;
return Ok(None);
},
};
match self.process_cursor_item(where_.as_ref()).await? {
Some(result) => return Ok(Some(result)),
None => continue,
}
}
}

let _cursor_event = event.map_to_mm(|e| CursorError::ErrorOpeningCursor {
async fn continue_(&mut self, cursor: &IdbCursorWithValue, cursor_action: &CursorAction) -> CursorResult<()> {
match cursor_action {
CursorAction::Continue => cursor.continue_().map_to_mm(|e| CursorError::AdvanceError {
description: stringify_js_error(&e),
})?;

let cursor = match cursor_from_request(&self.cursor_request)? {
Some(cursor) => cursor,
// No more items.
None => {
self.stopped = true;
return Ok(None);
},
};

let (key, js_value) = match (cursor.key(), cursor.value()) {
(Ok(key), Ok(js_value)) => (key, js_value),
// No more items.
_ => {
self.stopped = true;
return Ok(None);
},
};

let item: InternalItem =
deserialize_from_js(js_value).map_to_mm(|e| CursorError::ErrorDeserializingItem(e.to_string()))?;

let (mut item_action, cursor_action) = self.inner.on_iteration(key)?;

let (id, val) = item.into_pair();
// Checks if the given `where_` condition, represented by an optional closure (`cursor_condition`),
// is satisfied for the provided `item`. If the condition is met, return the corresponding `(id, val)` or skip to the next item.
if let Some(cursor_condition) = &where_ {
if cursor_condition(val.clone())? {
return Ok(Some((id, val)));
} else {
item_action = CursorItemAction::Skip
}
};

match cursor_action {
CursorAction::Continue => cursor.continue_().map_to_mm(|e| CursorError::AdvanceError {
description: stringify_js_error(&e),
})?,
CursorAction::ContinueWithValue(next_value) => {
cursor
.continue_with_key(&next_value)
.map_to_mm(|e| CursorError::AdvanceError {
description: stringify_js_error(&e),
})?
},
// Don't advance the cursor.
// Here we set the `stopped` flag so we return `Ok(None)` at the next iteration immediately.
// This is required because `item_action` can be `CollectItemAction::Include`,
// and at this iteration we will return `Ok(Some)`.
CursorAction::Stop => self.stopped = true,
}
})?,
CursorAction::ContinueWithValue(next_value) => {
cursor
.continue_with_key(&next_value)
.map_to_mm(|e| CursorError::AdvanceError {
description: stringify_js_error(&e),
})?
},
// Don't advance the cursor.
// Here we set the `stopped` flag so we return `Ok(None)` at the next iteration immediately.
// This is required because `item_action` can be `CollectItemAction::Include`,
// and at this iteration we will return `Ok(Some)`.
CursorAction::Stop => self.stopped = true,
}

Ok(())
}

async fn process_cursor_item(&mut self, where_: Option<&CursorCondition>) -> CursorResult<Option<(ItemId, Json)>> {
let event = match self.cursor_item_rx.next().await {
Some(event) => event,
None => {
self.stopped = true;
return Ok(None);
},
};

let _cursor_event = event.map_to_mm(|e| CursorError::ErrorOpeningCursor {
description: stringify_js_error(&e),
})?;

match item_action {
CursorItemAction::Include => return Ok(Some((id, val))),
// Try to fetch the next item.
CursorItemAction::Skip => (),
let cursor = match cursor_from_request(&self.cursor_request)? {
Some(cursor) => cursor,
None => {
self.stopped = true;
return Ok(None);
},
};

let (key, js_value) = match (cursor.key(), cursor.value()) {
(Ok(key), Ok(js_value)) => (key, js_value),
_ => {
self.stopped = true;
return Ok(None);
},
};

let item: InternalItem =
deserialize_from_js(js_value).map_to_mm(|e| CursorError::ErrorDeserializingItem(e.to_string()))?;
let (item_action, cursor_action) = self.inner.on_iteration(key)?;

let (id, val) = item.into_pair();
// Checks if the given `where_` condition, represented by an optional closure (`cursor_condition`),
// is satisfied for the provided `item`. If the condition is met, return the corresponding `(id, val)` or skip to the next item.
if let Some(cursor_condition) = where_ {
if cursor_condition(val.clone())? && matches!(item_action, CursorItemAction::Include) {
return Ok(Some((id, val)));
}
self.continue_(&cursor, &cursor_action).await?;
return Ok(None);
}

self.continue_(&cursor, &cursor_action).await?;
let res = match item_action {
CursorItemAction::Include => Some((id, val)),
CursorItemAction::Skip => None,
};
Ok(res)
}
}

Expand Down
45 changes: 45 additions & 0 deletions mm2src/mm2_db/src/indexed_db/indexed_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,4 +1064,49 @@ mod tests {
// maybe_swap should return swap with uuid4 since it's the item with the highest rel_coin_value in the store.
assert_eq!(maybe_swap, Some(swap_item!("uuid3", "RICK", "XYZ", 7, u32::MAX, 1281)));
}

#[wasm_bindgen_test]
async fn test_cursor_where_first_condition_with_limit() {
const DB_NAME: &str = "TEST_REV_ITER_SINGLE_KEY_BOUND_CURSOR";
const DB_VERSION: u32 = 1;

register_wasm_log();

let items = vec![
swap_item!("uuid1", "RICK", "MORTY", 10, 3, 700),
swap_item!("uuid2", "MORTY", "KMD", 95000, 1, 721),
swap_item!("uuid3", "RICK", "XYZ", 7, u32::MAX, 1281), // +
swap_item!("uuid4", "RICK", "MORTY", 8, 6, 92), // +
swap_item!("uuid5", "QRC20", "RICK", 2, 4, 721),
swap_item!("uuid6", "KMD", "MORTY", 12, 3124, 214), // +
];

let db = IndexedDbBuilder::new(DbIdentifier::for_test(DB_NAME))
.with_version(DB_VERSION)
.with_table::<SwapTable>()
.build()
.await
.expect("!IndexedDb::init");
let transaction = db.transaction().await.expect("!IndexedDb::transaction");
let table = transaction
.table::<SwapTable>()
.await
.expect("!DbTransaction::open_table");
fill_table(&table, items).await;

let maybe_swap = table
.cursor_builder()
.bound("rel_coin_value", 5u32, u32::MAX)
.where_first()
.open_cursor("rel_coin_value")
.await
.expect("!CursorBuilder::open_cursor")
.next()
.await
.expect("!Cursor next result")
.map(|(_, swap)| swap);

// maybe_swap should return swap with uuid4 since it's the item with the lowest rel_coin_value in the store.
assert_eq!(maybe_swap, Some(swap_item!("uuid4", "RICK", "MORTY", 8, 6, 92)));
}
}

0 comments on commit 9390742

Please sign in to comment.