Skip to content

Commit

Permalink
Improve fetching event batches from node (#1071)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Jul 20, 2024
1 parent e6a0968 commit 62c4ac2
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic

### Fixed

- evm.events: Improve fetching event batches from node.
- models: Fixed `CachedModel` preloading.

## [8.0.0b3] - 2024-07-04
Expand Down
1 change: 1 addition & 0 deletions docs/9.release-notes/_8.0_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- config: Fixed (de)serialization of hex strings in config.
- config: Fixed setting logging levels according to the config.
- evm.events: Fixed matching logs when filtering by topic0.
- evm.events: Improve fetching event batches from node.
- evm.subsquid: Fixed typo in `iter_events` method name.
- models: Fixed `CachedModel` preloading.
- models: Fixed setting default value for `Meta.maxsize`.
Expand Down
1 change: 1 addition & 0 deletions src/dipdup/datasources/abi_etherscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class AbiEtherscanDatasource(AbiDatasource[AbiEtherscanDatasourceConfig], EvmAbi
ratelimit_rate=1,
ratelimit_period=5,
ratelimit_sleep=15,
retry_count=5,
)

async def run(self) -> None:
Expand Down
17 changes: 14 additions & 3 deletions src/dipdup/indexes/evm_events/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[EvmEventData, .
class EvmNodeEventFetcher(EvmNodeFetcher[EvmEventData]):
_datasource: EvmNodeDatasource

def __init__(
self,
datasources: tuple[EvmNodeDatasource, ...],
first_level: int,
last_level: int,
addresses: set[str],
) -> None:
super().__init__(datasources, first_level, last_level)
self._addresses = addresses

async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[EvmEventData, ...]]]:
event_iter = self._fetch_by_level()
async for level, batch in readahead_by_level(event_iter, limit=EVM_NODE_READAHEAD_LIMIT):
Expand All @@ -58,9 +68,10 @@ async def _fetch_by_level(self) -> AsyncIterator[tuple[EvmEventData, ...]]:
self._last_level,
)
event_batch = await self.get_events_batch(
batch_first_level,
batch_last_level,
node,
first_level=batch_first_level,
last_level=batch_last_level,
addresses=self._addresses,
node=node,
)

finished = time.time()
Expand Down
9 changes: 9 additions & 0 deletions src/dipdup/indexes/evm_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,19 @@ def _create_node_fetcher(self, first_level: int, last_level: int) -> EvmNodeEven
if not self.node_datasources:
raise FrameworkException('Creating EvmNodeEventFetcher, but no `evm.node` datasources available')

addresses = set()
for handler_config in self._config.handlers:
if handler_config.contract.address:
addresses.add(handler_config.contract.address)
else:
addresses.clear()
break

return EvmNodeEventFetcher(
datasources=self.node_datasources,
first_level=first_level,
last_level=last_level,
addresses=addresses,
)

def _match_level_data(
Expand Down
25 changes: 17 additions & 8 deletions src/dipdup/indexes/evm_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
import random
from abc import ABC
from collections import defaultdict
Expand All @@ -15,19 +16,25 @@
MIN_BATCH_SIZE = 10
MAX_BATCH_SIZE = 10000
BATCH_SIZE_UP = 1.1
BATCH_SIZE_DOWN = 0.5
BATCH_SIZE_DOWN = 0.65


_logger = logging.getLogger(__name__)


class EvmNodeFetcher(Generic[BufferT], DataFetcher[BufferT, EvmNodeDatasource], ABC):

def get_next_batch_size(self, batch_size: int, ratelimited: bool) -> int:
old_batch_size = batch_size
if ratelimited:
batch_size = int(batch_size * BATCH_SIZE_DOWN)
else:
batch_size = int(batch_size * BATCH_SIZE_UP)

batch_size = min(MAX_BATCH_SIZE, batch_size)
batch_size = max(MIN_BATCH_SIZE, batch_size)
batch_size = int(max(MIN_BATCH_SIZE, batch_size))
if batch_size != old_batch_size:
_logger.debug('Batch size updated: %s -> %s', old_batch_size, batch_size)
return int(batch_size)

def get_random_node(self) -> EvmNodeDatasource:
Expand Down Expand Up @@ -66,16 +73,18 @@ async def get_events_batch(
self,
first_level: int,
last_level: int,
addresses: set[str] | None = None,
node: EvmNodeDatasource | None = None,
) -> dict[int, list[dict[str, Any]]]:
grouped_events: defaultdict[int, list[dict[str, Any]]] = defaultdict(list)
node = node or self.get_random_node()
logs = await node.get_events(
{
'fromBlock': hex(first_level),
'toBlock': hex(last_level),
},
)
params: dict[str, Any] = {
'fromBlock': hex(first_level),
'toBlock': hex(last_level),
}
if addresses:
params['address'] = list(addresses)
logs = await node.get_events(params)
for log in logs:
grouped_events[int(log['blockNumber'], 16)].append(log)
return grouped_events

0 comments on commit 62c4ac2

Please sign in to comment.