Skip to content

Commit

Permalink
fix: Cursor not moving correctly after poll in get_filter_changes (m…
Browse files Browse the repository at this point in the history
…atter-labs#546)

## What ❔

When polling filter changes, add 1 to actual from_block value

## Why ❔

Otherwise, last block that was included in poll will be included to the
next one.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.

---------

Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
  • Loading branch information
Artemka374 and montekki authored Dec 6, 2023
1 parent c7d4315 commit ec5907b
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 73 deletions.
54 changes: 27 additions & 27 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,33 @@
},
"query": "SELECT * from prover_jobs where id=$1"
},
"2044947d6d29f29cda508b2160c39f74a8bfd524afa2ffc20a98ae039bc86ed7": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "hash",
"ordinal": 1,
"type_info": "Bytea"
}
],
"nullable": [
false,
false
],
"parameters": {
"Left": [
"Int8",
"Int8"
]
}
},
"query": "SELECT number, hash FROM miniblocks WHERE number >= $1 ORDER BY number ASC LIMIT $2"
},
"20b22fd457417e9a72f5941887448f9a11b97b449db4759da0b9d368ce93996b": {
"describe": {
"columns": [
Expand Down Expand Up @@ -9492,33 +9519,6 @@
},
"query": "\n SELECT id, circuit_input_blob_url FROM prover_jobs\n WHERE status='successful'\n AND circuit_input_blob_url is NOT NULL\n AND updated_at < NOW() - INTERVAL '30 days'\n LIMIT $1;\n "
},
"b479b7d3334f8d4566c294a44e2adb282fbc66a87be5c248c65211c2a8a07db0": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "hash",
"ordinal": 1,
"type_info": "Bytea"
}
],
"nullable": [
false,
false
],
"parameters": {
"Left": [
"Int8",
"Int8"
]
}
},
"query": "SELECT number, hash FROM miniblocks WHERE number > $1 ORDER BY number ASC LIMIT $2"
},
"b4a3c902646725188f7c79ebac992cdce5896fc6fcc9f485c0cba9d90c4c982c": {
"describe": {
"columns": [
Expand Down
6 changes: 3 additions & 3 deletions core/lib/dal/src/blocks_web3_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,15 @@ impl BlocksWeb3Dal<'_, '_> {
}))
}

/// Returns hashes of blocks with numbers greater than `from_block` and the number of the last block.
pub async fn get_block_hashes_after(
/// Returns hashes of blocks with numbers starting from `from_block` and the number of the last block.
pub async fn get_block_hashes_since(
&mut self,
from_block: MiniblockNumber,
limit: usize,
) -> sqlx::Result<(Vec<H256>, Option<MiniblockNumber>)> {
let rows = sqlx::query!(
"SELECT number, hash FROM miniblocks \
WHERE number > $1 \
WHERE number >= $1 \
ORDER BY number ASC \
LIMIT $2",
from_block.0 as i64,
Expand Down
13 changes: 4 additions & 9 deletions core/lib/dal/src/events_web3_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use zksync_types::{
};

use crate::{
instrument::InstrumentExt,
models::{storage_block::web3_block_number_to_sql, storage_event::StorageWeb3Log},
SqlxError, StorageProcessor,
instrument::InstrumentExt, models::storage_event::StorageWeb3Log, SqlxError, StorageProcessor,
};

#[derive(Debug)]
Expand Down Expand Up @@ -119,10 +117,8 @@ impl EventsWeb3Dal<'_, '_> {

let mut where_sql = format!("(miniblock_number >= {})", filter.from_block.0 as i64);

if let Some(to_block) = filter.to_block {
let block_sql = web3_block_number_to_sql(to_block);
where_sql += &format!(" AND (miniblock_number <= {})", block_sql);
}
where_sql += &format!(" AND (miniblock_number <= {})", filter.to_block.0 as i64);

if !filter.addresses.is_empty() {
where_sql += &format!(" AND (address = ANY(${}))", arg_index);
arg_index += 1;
Expand Down Expand Up @@ -172,7 +168,6 @@ impl EventsWeb3Dal<'_, '_> {

#[cfg(test)]
mod tests {
use zksync_types::api::BlockNumber;
use zksync_types::{Address, H256};

use super::*;
Expand All @@ -185,7 +180,7 @@ mod tests {
let events_web3_dal = EventsWeb3Dal { storage };
let filter = GetLogsFilter {
from_block: MiniblockNumber(100),
to_block: Some(BlockNumber::Number(200.into())),
to_block: MiniblockNumber(200),
addresses: vec![Address::from_low_u64_be(123)],
topics: vec![(0, vec![H256::from_low_u64_be(456)])],
};
Expand Down
2 changes: 1 addition & 1 deletion core/lib/types/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ pub struct TransactionDetails {
#[derive(Debug, Clone)]
pub struct GetLogsFilter {
pub from_block: MiniblockNumber,
pub to_block: Option<BlockNumber>,
pub to_block: MiniblockNumber,
pub addresses: Vec<Address>,
pub topics: Vec<(u32, Vec<H256>)>,
}
Expand Down
45 changes: 29 additions & 16 deletions core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
.installed_filters
.lock()
.await
.add(TypedFilter::Blocks(last_block_number));
.add(TypedFilter::Blocks(last_block_number + 1));
method_latency.observe();
Ok(idx)
}
Expand Down Expand Up @@ -773,14 +773,19 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
.map_err(|err| internal_error(METHOD_NAME, err))?;
let (block_hashes, last_block_number) = conn
.blocks_web3_dal()
.get_block_hashes_after(*from_block, self.state.api_config.req_entities_limit)
.get_block_hashes_since(*from_block, self.state.api_config.req_entities_limit)
.await
.map_err(|err| internal_error(METHOD_NAME, err))?;
*from_block = last_block_number.unwrap_or(*from_block);

*from_block = match last_block_number {
Some(last_block_number) => last_block_number + 1,
None => *from_block,
};

FilterChanges::Hashes(block_hashes)
}

TypedFilter::PendingTransactions(from_timestamp) => {
TypedFilter::PendingTransactions(from_timestamp_excluded) => {
let mut conn = self
.state
.connection_pool
Expand All @@ -790,12 +795,14 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
let (tx_hashes, last_timestamp) = conn
.transactions_web3_dal()
.get_pending_txs_hashes_after(
*from_timestamp,
*from_timestamp_excluded,
Some(self.state.api_config.req_entities_limit),
)
.await
.map_err(|err| internal_error(METHOD_NAME, err))?;
*from_timestamp = last_timestamp.unwrap_or(*from_timestamp);

*from_timestamp_excluded = last_timestamp.unwrap_or(*from_timestamp_excluded);

FilterChanges::Hashes(tx_hashes)
}

Expand All @@ -816,16 +823,26 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
} else {
vec![]
};

let mut to_block = self
.state
.resolve_filter_block_number(filter.to_block)
.await?;

if matches!(filter.to_block, Some(BlockNumber::Number(_))) {
to_block = to_block.min(
self.state
.resolve_filter_block_number(Some(BlockNumber::Latest))
.await?,
);
}

let get_logs_filter = GetLogsFilter {
from_block: *from_block,
to_block: filter.to_block,
to_block,
addresses,
topics,
};
let to_block = self
.state
.resolve_filter_block_number(filter.to_block)
.await?;

let mut storage = self
.state
Expand Down Expand Up @@ -859,11 +876,7 @@ impl<G: L1GasPriceProvider> EthNamespace<G> {
.get_logs(get_logs_filter, i32::MAX as usize)
.await
.map_err(|err| internal_error(METHOD_NAME, err))?;
*from_block = logs
.last()
.map(|log| MiniblockNumber(log.block_number.unwrap().as_u32()))
.unwrap_or(*from_block);
// FIXME: why is `from_block` not updated?
*from_block = to_block + 1;
FilterChanges::Logs(logs)
}
};
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl<G: L1GasPriceProvider> ZksNamespace<G> {
.get_logs(
GetLogsFilter {
from_block: first_miniblock_of_l1_batch,
to_block: Some(block_number.0.into()),
to_block: block_number,
addresses: vec![L1_MESSENGER_ADDRESS],
topics: vec![(2, vec![address_to_h256(&sender)]), (3, vec![msg])],
},
Expand Down
3 changes: 2 additions & 1 deletion core/lib/zksync_core/src/api_server/web3/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,10 @@ impl<E> RpcState<E> {
.enumerate()
.filter_map(|(idx, topics)| topics.map(|topics| (idx as u32 + 1, topics.0)))
.collect();

let get_logs_filter = GetLogsFilter {
from_block,
to_block: filter.to_block,
to_block,
addresses,
topics,
};
Expand Down
24 changes: 9 additions & 15 deletions core/lib/zksync_core/src/api_server/web3/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,10 @@ impl HttpTest for LogFilterChanges {
assert_logs_match(&topics_logs, &[events[1], events[3]]);

let new_all_logs = client.get_filter_changes(all_logs_filter_id).await?;
let FilterChanges::Logs(new_all_logs) = new_all_logs else {
let FilterChanges::Hashes(new_all_logs) = new_all_logs else {
panic!("Unexpected getFilterChanges output: {:?}", new_all_logs);
};
assert_eq!(new_all_logs, all_logs); // FIXME(#546): update test after behavior is fixed
assert!(new_all_logs.is_empty());
Ok(())
}
}
Expand Down Expand Up @@ -458,11 +458,10 @@ impl HttpTest for LogFilterChangesWithBlockBoundaries {
};
assert_logs_match(&lower_bound_logs, &new_events);

// FIXME(#546): update test after behavior is fixed
let new_upper_bound_logs = client.get_filter_changes(upper_bound_filter_id).await?;
assert_eq!(new_upper_bound_logs, FilterChanges::Logs(upper_bound_logs));
assert_matches!(new_upper_bound_logs, FilterChanges::Hashes(hashes) if hashes.is_empty());
let new_bounded_logs = client.get_filter_changes(bounded_filter_id).await?;
assert_eq!(new_bounded_logs, FilterChanges::Logs(bounded_logs));
assert_matches!(new_bounded_logs, FilterChanges::Hashes(hashes) if hashes.is_empty());

// Add miniblock #3. It should not be picked up by the bounded and upper bound filters,
// and should be picked up by the lower bound filter.
Expand All @@ -472,27 +471,22 @@ impl HttpTest for LogFilterChangesWithBlockBoundaries {
let new_events: Vec<_> = new_events.iter().collect();

let bounded_logs = client.get_filter_changes(bounded_filter_id).await?;
let FilterChanges::Logs(bounded_logs) = bounded_logs else {
let FilterChanges::Hashes(bounded_logs) = bounded_logs else {
panic!("Unexpected getFilterChanges output: {:?}", bounded_logs);
};
assert!(bounded_logs
.iter()
.all(|log| log.block_number.unwrap() < 3.into()));
assert!(bounded_logs.is_empty());

let upper_bound_logs = client.get_filter_changes(upper_bound_filter_id).await?;
let FilterChanges::Logs(upper_bound_logs) = upper_bound_logs else {
let FilterChanges::Hashes(upper_bound_logs) = upper_bound_logs else {
panic!("Unexpected getFilterChanges output: {:?}", upper_bound_logs);
};
assert!(upper_bound_logs
.iter()
.all(|log| log.block_number.unwrap() < 3.into()));
assert!(upper_bound_logs.is_empty());

let lower_bound_logs = client.get_filter_changes(lower_bound_filter_id).await?;
let FilterChanges::Logs(lower_bound_logs) = lower_bound_logs else {
panic!("Unexpected getFilterChanges output: {:?}", lower_bound_logs);
};
let start_idx = lower_bound_logs.len() - 4;
assert_logs_match(&lower_bound_logs[start_idx..], &new_events);
assert_logs_match(&lower_bound_logs, &new_events);
Ok(())
}
}
Expand Down

0 comments on commit ec5907b

Please sign in to comment.