Skip to content

Commit

Permalink
feat(docs): add docs with cursor (#222)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Sep 1, 2024
1 parent 57110ca commit af7b8d9
Show file tree
Hide file tree
Showing 30 changed files with 1,591 additions and 78 deletions.
1 change: 1 addition & 0 deletions .cursorignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Add directories or file patterns to ignore during indexing (e.g. foo/ or *.csv)
38 changes: 38 additions & 0 deletions dank_mids/ENVIRONMENT_VARIABLES.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,66 @@
# Are you calling a ganache fork? Can't use state override code
ganache_fork = _envs._deprecated_format.create_env("GANACHE_FORK", bool, default=False, verbose=False)
GANACHE_FORK = _envs.create_env("GANACHE_FORK", bool, default=ganache_fork)
"""Flag indicating whether the current environment is a Ganache fork."""

# We set the default to 20 minutes to account for potentially long event loop times if you're doing serious work.
AIOHTTP_TIMEOUT = _envs.create_env("AIOHTTP_TIMEOUT", int, default=20*60, string_converter=int)
"""Timeout value in seconds for aiohttp requests."""

# Brownie call Semaphore
# Used because I experienced some OOM errs due to web3 formatters when I was batching an absurd number of brownie calls.
# We need a separate semaphore here because the method-specific semaphores are too late in the code to prevent this OOM issue.
brownie_semaphore = _envs._deprecated_format.create_env("BROWNIE_CALL_SEMAPHORE", int, default=100_000, string_converter=int, verbose=False)
BROWNIE_CALL_SEMAPHORE = _envs.create_env("BROWNIE_CALL_SEMAPHORE", BlockSemaphore, default=brownie_semaphore, string_converter=int, verbose=not OPERATION_MODE.infura)
"""
Semaphore for limiting concurrent Brownie calls.
See Also:
:class:`dank_mids.semaphores.BlockSemaphore`: The semaphore class used for concurrency control.
"""

BROWNIE_ENCODER_SEMAPHORE = _envs.create_env("BROWNIE_ENCODER_SEMAPHORE", BlockSemaphore, default=BROWNIE_CALL_SEMAPHORE._default_value * 2, string_converter=int, verbose=not OPERATION_MODE.infura)
"""
Semaphore for limiting concurrent Brownie encoding operations. This limits memory consumption.
See Also:
:class:`dank_mids.semaphores.BlockSemaphore`: The semaphore class used for concurrency control.
"""

# Processes for decoding. This determines process pool size, not total subprocess count.
# There are 3 pools, each initialized with the same value.
# NOTE: Don't stress, these are good for you and will not hog your cpu. You can disable them by setting the var = 0. #TODO: lol u cant yet
BROWNIE_ENCODER_PROCESSES = _envs.create_env("BROWNIE_ENCODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
"""
Process pool for Brownie encoding operations.
See Also:
:class:`a_sync.AsyncProcessPoolExecutor`: The executor class used for managing asynchronous processes.
"""

BROWNIE_DECODER_PROCESSES = _envs.create_env("BROWNIE_DECODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
"""
Process pool for Brownie decoding operations.
See Also:
:class:`a_sync.AsyncProcessPoolExecutor`: The executor class used for managing asynchronous processes.
"""

MULTICALL_DECODER_PROCESSES = _envs.create_env("MULTICALL_DECODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
"""
Process pool for Multicall decoding operations.
See Also:
:class:`a_sync.AsyncProcessPoolExecutor`: The executor class used for managing asynchronous processes.
"""

COLLECTION_FACTOR = _envs.create_env("COLLECTION_FACTOR", int, default=10 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
"""Factor determining the size of data collection operations."""

# We use a modified version of the request spec that doesn't contain unnecessary fields, and switch to the full spec if necessary for your node.
# Set this env var to any value to force the full request spec always
USE_FULL_REQUEST = _envs.create_env("USE_FULL_REQUEST", bool, default=False, verbose=False)
"""Flag indicating whether to use the full request specification."""

DEBUG = _envs.create_env("DEBUG", bool, default=False, verbose=False)
# NOTE: EXPORT_STATS is not implemented
Expand Down
9 changes: 7 additions & 2 deletions dank_mids/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from contextlib import suppress

from dank_mids.brownie_patch import DankContractCall, DankContractMethod, DankContractTx, DankOverloadedMethod
Expand All @@ -11,7 +10,6 @@

with suppress(ImportError):
from dank_mids.brownie_patch import Contract, dank_eth, dank_web3, patch_contract
from web3.eth import AsyncEth as _AsyncEth
# aliased for cleanliness and convenience
web3 = dank_web3
eth = dank_eth
Expand All @@ -20,6 +18,13 @@


def _configure_concurrent_future_work_queue_size():
"""
Configures the concurrent futures process pool to allow for a larger number of queued calls.
This function increases the EXTRA_QUEUED_CALLS value to 50,000, which allows for more
concurrent operations to be queued in the process pool. This can significantly improve
performance for applications that make heavy use of brownie.
"""
import concurrent.futures.process as _cfp
_cfp.EXTRA_QUEUED_CALLS = 50_000

Expand Down
58 changes: 56 additions & 2 deletions dank_mids/_batch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import asyncio
import logging
from typing import TYPE_CHECKING, Any, Awaitable, Generator, List, Union
Expand All @@ -11,24 +10,68 @@
from dank_mids.controller import DankMiddlewareController

MIN_SIZE = 1 # TODO: Play with this
"""The minimum size for a batch operation."""

CHECK = MIN_SIZE - 1
"""A constant used for checking batch sizes."""

logger = logging.getLogger(__name__)

class DankBatch:
"""
A batch of JSON-RPC batches.
This class represents a collection of multicalls and RPC calls that can be executed as a batch.
It is used internally by the :class:`DankMiddlewareController` to manage and execute batches of calls.
Note:
This class is considered "pretty much deprecated" and needs refactoring in future versions.
See Also:
:class:`dank_mids._requests.Multicall`: The Multicall class used in this batch.
:class:`dank_mids._requests.RPCRequest`: The RPCRequest class used in this batch.
"""

__slots__ = 'controller', 'multicalls', 'rpc_calls', '_started'
""" A batch of jsonrpc batches. This is pretty much deprecated and needs to be refactored away."""

def __init__(self, controller: "DankMiddlewareController", multicalls: Multicalls, rpc_calls: List[Union[Multicall, RPCRequest]]):
self.controller = controller
"""The controller managing this batch."""

self.multicalls = multicalls
"""A collection of multicalls to be executed."""

self.rpc_calls = rpc_calls
"""A list of individual RPC calls or multicalls."""

self._started = False
"""A flag indicating whether the batch has been started."""

def __await__(self) -> Generator[Any, None, Any]:
"""
Makes the DankBatch awaitable.
This method allows the batch to be used with the `await` keyword,
starting the batch execution if it hasn't been started yet.
Returns:
A generator that can be awaited to execute the batch.
"""
self.start()
return self._await().__await__()

async def _await(self) -> None:
"""
Internal method to await the completion of all coroutines in the batch.
This method gathers all coroutines in the batch and awaits their completion,
logging any exceptions that may occur during execution. It's designed to
handle both successful completions and potential errors gracefully.
Raises:
Exception: If any of the coroutines in the batch raise an exception,
it will be re-raised after all coroutines have been processed.
"""
batches = tuple(self.coroutines)
for batch, result in zip(batches, await asyncio.gather(*batches, return_exceptions=True)):
if isinstance(result, Exception):
Expand All @@ -37,6 +80,17 @@ async def _await(self) -> None:
raise result

def start(self) -> None:
"""
Initiates the execution of all operations in the batch.
This method starts the processing of all multicalls and individual RPC calls
that have been added to the batch. It marks the batch as started to prevent
duplicate executions.
Note:
This method does not wait for the operations to complete. Use :meth:`~DankBatch._await()`
to wait for completion and handle results.
"""
for mcall in self.multicalls.values():
mcall.start(self, cleanup=False)
for call in self.rpc_calls:
Expand Down
13 changes: 12 additions & 1 deletion dank_mids/_debugging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
"""This module is private for now but will eventually be made public when the api ossifies"""
"""
This module provides debugging tools and utilities for dank_mids operations.
While currently private, this module is intended to be made public in the future
when the API stabilizes. It offers various debugging features to help developers
understand and troubleshoot dank_mids behavior.
Submodules:
- failures: Contains tools for logging and analyzing failed requests.
Note: The API in this module is subject to change. Use with caution in production environments.
"""

from dank_mids._debugging import failures

Expand Down
22 changes: 22 additions & 0 deletions dank_mids/_debugging/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,25 @@ class _FileHelper(metaclass=abc.ABCMeta):
def __init__(self, chainid: int):
if not isinstance(chainid, int):
raise TypeError(f"`chainid` must be an integer. You passed {chainid}") from None

self.chainid = chainid
"""The ID of the blockchain network."""

self.path += f"/{self.chainid}"
self.ensure_dir()
@lru_cache(maxsize=1)
def ensure_dir(self) -> None:
"""
Ensure that the directory for the file exists, creating it if necessary.
"""
os.makedirs(self.path, exist_ok=True)
def open(self) -> "AiofilesContextManager[None, None, AsyncTextIOWrapper]":
"""
Open the file asynchronously.
Returns:
An asynchronous context manager for the file.
"""
logger.info("opening %s with mode %s", self.uri, self.mode)
return aiofiles.open(self.uri, self.mode)
@abc.abstractproperty
Expand All @@ -43,8 +55,18 @@ async def write_row(self, *values: Any) -> None:
await self._write_row(*values)
@alru_cache(maxsize=None)
async def _ensure_headers(self) -> None:
"""
Ensure that the CSV file has headers, writing them if necessary.
"""
await self._write_row(*self.column_names, new_line=False)
async def _write_row(self, *values: Any, new_line: bool = True) -> None:
"""
Write a row to the CSV file.
Args:
*values: The values to write as a row.
new_line: Whether to start a new line before writing. Defaults to True.
"""
row = ','.join(str(obj) for obj in values)
if new_line:
row = f'\n{row}'
Expand Down
56 changes: 54 additions & 2 deletions dank_mids/_debugging/failures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from datetime import datetime
from functools import cached_property, lru_cache
from typing import TYPE_CHECKING, List, Literal, Type, Union
Expand All @@ -12,17 +11,70 @@

@lru_cache(maxsize=None)
class FailedRequestWriter(_CSVWriter):
"""
A class for logging failed requests to a CSV file.
This class provides functionality to record details of failed requests,
including the request type, unique identifier, length, error, and request data.
Usage:
writer = FailedRequestWriter(1, Exception)
await writer.record_failure.put((error, "request_type", "request_uid", 10, request_data))
"""

column_names = "request_type", "request_uid", "request_length", "error", "request_data"
"""The names of the columns in the CSV file."""

def __init__(self, chainid: int, failure_type: Type[BaseException]):
"""
Initialize the FailedRequestWriter.
Args:
chainid: The ID of the blockchain network.
failure_type: The type of exception to record.
Raises:
TypeError: If chainid is not an integer or failure_type is not a subclass of BaseException.
"""
super().__init__(chainid)
if not issubclass(failure_type, BaseException):
raise TypeError(f"`failure_type` must be an Exception type. You passed {failure_type}")

self.failure_type = failure_type
"""The type of exception to record."""

self.record_failure = ProcessingQueue(self._record_failure, num_workers=1, return_data=False)
"""
A ProcessingQueue instance used to record failed requests asynchronously.
It processes failures using the :meth:`FailedRequestWriter._record_failure` method with a single worker.
"""

@cached_property
def filename(self) -> str:
"""Generate a unique filename for the CSV file, then cache it for subsequent use."""
return f"{int(datetime.now().timestamp())}_{self.failure_type.__name__}s.csv"
async def _record_failure(self, e: Exception, request_type: str, request_uid: Union[str, int], request_length: Union[int, Literal["unknown"]], request_data: Union[List["Request"], List["PartialRequest"], bytes]):

async def _record_failure(
self,
e: Exception,
request_type: str,
request_uid: Union[str, int],
request_length: Union[int, Literal["unknown"]],
request_data: Union[List["Request"], List["PartialRequest"], bytes],
):
"""
Record a failed request to the CSV file.
Args:
e: The exception that occurred.
request_type: The type of the request.
request_uid: The unique identifier of the request.
request_length: The length of the request.
request_data: The request data.
Raises:
TypeError: If the exception is not of the specified failure_type.
"""
if not isinstance(e, self.failure_type):
raise TypeError(e, self.failure_type)
await self.write_row(request_type, request_uid, request_length, e, request_data)
Expand Down
16 changes: 15 additions & 1 deletion dank_mids/_demo_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,22 @@


class DummyLogger:
""" Replace a `logging.Logger` object with a dummy to save precious time """
"""
A dummy logger class that mimics the interface of logging.Logger but does nothing.
This class is used to replace the actual logger when demo mode is not active,
saving processing time by avoiding unnecessary logging operations.
"""

def info(self, *args: Any, **kwargs: Any) -> None:
"""
A no-op method that mimics the API of :meth:`logging.Logger.info`.
Args:
*args: Positional arguments (ignored).
**kwargs: Keyword arguments (ignored).
"""
...

# Choose between a real logger and a dummy logger based on the demo mode setting
demo_logger: logging.Logger = logging.getLogger("dank_mids.demo") if ENVIRONMENT_VARIABLES.DEMO_MODE else DummyLogger() # type: ignore [attr-defined, assignment]
Loading

0 comments on commit af7b8d9

Please sign in to comment.