diff --git a/README.md b/README.md index 4c662a7..17960c4 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,14 @@ the same on-disk cache. As with the other two caches, this also takes `SUBSTRATE_CACHE_METHOD_SIZE` and `SUBSTRATE_RUNTIME_CACHE_SIZE` env vars. +### ENV VARS +The following environment variables are used within async-substrate-interface + - NO_CACHE (default 0): if set to 1, when using the DiskCachedAsyncSubstrateInterface class, no persistent on-disk cache will be stored, instead using only in-memory cache. + - CACHE_LOCATION (default `~/.cache/async-substrate-interface`): this determines the location for the cache file, if using DiskCachedAsyncSubstrateInterface + - SUBSTRATE_CACHE_METHOD_SIZE (default 512): the cache size (either in-memory or on-disk) of the smaller return-size methods (see the Caching section for more info) + - SUBSTRATE_RUNTIME_CACHE_SIZE (default 16): the cache size (either in-memory or on-disk) of the larger return-size methods (see the Caching section for more info) + + ## Contributing Contributions are welcome! Please open an issue or submit a pull request to the `staging` branch. diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 20af1bb..f4f401e 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -63,6 +63,7 @@ from async_substrate_interface.utils.cache import ( async_sql_lru_cache, cached_fetcher, + AsyncSqliteDB, ) from async_substrate_interface.utils.decoding import ( _determine_if_old_runtime_call, @@ -4026,6 +4027,18 @@ class DiskCachedAsyncSubstrateInterface(AsyncSubstrateInterface): Experimental new class that uses disk-caching in addition to memory-caching for the cached methods """ + async def close(self): + """ + Closes the substrate connection, and the websocket connection. + """ + try: + await self.ws.shutdown() + except AttributeError: + pass + db_conn = AsyncSqliteDB(self.url) + if db_conn._db is not None: + await db_conn._db.close() + @async_sql_lru_cache(maxsize=SUBSTRATE_CACHE_METHOD_SIZE) async def get_parent_block_hash(self, block_hash): return await self._get_parent_block_hash(block_hash) diff --git a/async_substrate_interface/utils/cache.py b/async_substrate_interface/utils/cache.py index 56af640..5cf1fe4 100644 --- a/async_substrate_interface/utils/cache.py +++ b/async_substrate_interface/utils/cache.py @@ -9,6 +9,9 @@ from pathlib import Path from typing import Callable, Any, Awaitable, Hashable, Optional +import aiosqlite + + USE_CACHE = True if os.getenv("NO_CACHE") != "1" else False CACHE_LOCATION = ( os.path.expanduser( @@ -21,6 +24,78 @@ logger = logging.getLogger("async_substrate_interface") +class AsyncSqliteDB: + _instances: dict[str, "AsyncSqliteDB"] = {} + _db: Optional[aiosqlite.Connection] = None + _lock: Optional[asyncio.Lock] = None + + def __new__(cls, chain_endpoint: str): + try: + return cls._instances[chain_endpoint] + except KeyError: + instance = super().__new__(cls) + instance._lock = asyncio.Lock() + cls._instances[chain_endpoint] = instance + return instance + + async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any]: + async with self._lock: + if not self._db: + _ensure_dir() + self._db = await aiosqlite.connect(CACHE_LOCATION) + table_name = _get_table_name(func) + key = None + if not (local_chain := _check_if_local(chain)) or not USE_CACHE: + await self._db.execute( + f""" + CREATE TABLE IF NOT EXISTS {table_name} + ( + rowid INTEGER PRIMARY KEY AUTOINCREMENT, + key BLOB, + value BLOB, + chain TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + """ + ) + await self._db.execute( + f""" + CREATE TRIGGER IF NOT EXISTS prune_rows_trigger_{table_name} AFTER INSERT ON {table_name} + BEGIN + DELETE FROM {table_name} + WHERE rowid IN ( + SELECT rowid FROM {table_name} + ORDER BY created_at DESC + LIMIT -1 OFFSET 500 + ); + END; + """ + ) + await self._db.commit() + key = pickle.dumps((args, kwargs or None)) + try: + cursor: aiosqlite.Cursor = await self._db.execute( + f"SELECT value FROM {table_name} WHERE key=? AND chain=?", + (key, chain), + ) + result = await cursor.fetchone() + await cursor.close() + if result is not None: + return pickle.loads(result[0]) + except (pickle.PickleError, sqlite3.Error) as e: + logger.exception("Cache error", exc_info=e) + pass + result = await func(other_self, *args, **kwargs) + if not local_chain or not USE_CACHE: + # TODO use a task here + await self._db.execute( + f"INSERT OR REPLACE INTO {table_name} (key, value, chain) VALUES (?,?,?)", + (key, pickle.dumps(result), chain), + ) + await self._db.commit() + return result + + def _ensure_dir(): path = Path(CACHE_LOCATION).parent if not path.exists(): @@ -115,7 +190,8 @@ def inner(self, *args, **kwargs): ) # If not in DB, call func and store in DB - result = func(self, *args, **kwargs) + if result is None: + result = func(self, *args, **kwargs) if not local_chain or not USE_CACHE: _insert_into_cache(c, conn, table_name, key, result, chain) @@ -131,15 +207,8 @@ def async_sql_lru_cache(maxsize: Optional[int] = None): def decorator(func): @cached_fetcher(max_size=maxsize) async def inner(self, *args, **kwargs): - c, conn, table_name, key, result, chain, local_chain = ( - _shared_inner_fn_logic(func, self, args, kwargs) - ) - - # If not in DB, call func and store in DB - result = await func(self, *args, **kwargs) - if not local_chain or not USE_CACHE: - _insert_into_cache(c, conn, table_name, key, result, chain) - + async_sql_db = AsyncSqliteDB(self.url) + result = await async_sql_db(self.url, self, func, args, kwargs) return result return inner diff --git a/pyproject.toml b/pyproject.toml index 148b33a..02b7791 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,8 @@ dependencies = [ "bt-decode==v0.6.0", "scalecodec~=1.2.11", "websockets>=14.1", - "xxhash" + "xxhash", + "aiosqlite>=0.21.0,<1.0.0" ] requires-python = ">=3.9,<3.14"