From c3435b9668294cd2223d1b066fd2bdc4fd99cbc8 Mon Sep 17 00:00:00 2001 From: eschechter Date: Thu, 15 Dec 2022 16:51:14 -0500 Subject: [PATCH 01/21] use hamt to build mapping of keys -> cids --- .gitignore | 2 + ipldstore/contentstore.py | 52 ++--------------- ipldstore/hamt_wrapper.py | 117 ++++++++++++++++++++++++++++++++++++++ ipldstore/ipldstore.py | 72 +++++++---------------- setup.cfg | 3 + 5 files changed, 148 insertions(+), 98 deletions(-) create mode 100644 ipldstore/hamt_wrapper.py diff --git a/.gitignore b/.gitignore index ece86af..a6db6d5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ __pycache__ .dmypy.json .tox *.egg-info +build +.pytest_cache \ No newline at end of file diff --git a/ipldstore/contentstore.py b/ipldstore/contentstore.py index 3794032..7aac5bc 100644 --- a/ipldstore/contentstore.py +++ b/ipldstore/contentstore.py @@ -7,7 +7,7 @@ import asyncio from multiformats import CID, multicodec, multibase, multihash, varint -import cbor2, dag_cbor +import dag_cbor from cbor2 import CBORTag from dag_cbor.encoding import EncodableType as DagCborEncodable from typing_validation import validate @@ -25,15 +25,9 @@ DagPbCodec = multicodec.get("dag-pb") DagCborCodec = multicodec.get("dag-cbor") -def default_encoder(encoder, value): - encoder.encode(CBORTag(42, b'\x00' + bytes(value))) - -def grouper(seq, size): - return (seq[pos:pos + size] for pos in range(0, len(seq), size)) - def get_retry_session() -> requests.Session: - session = requests.Session() + session = requests.Session() retries = Retry(connect=5, total=5, backoff_factor=4) session.mount("http://", HTTPAdapter(max_retries=retries)) return session @@ -192,6 +186,7 @@ async def _async_get(host: str, session: aiohttp.ClientSession, cid: CID): async with session.post(host + api_method, params={"arg": str(cid)}) as resp: return await resp.read() + async def _main_async(keys: List[CID], host: str, d: Dict[CID, bytes]): async with aiohttp.ClientSession() as session: tasks = [_async_get(host, session, key) for key in keys] @@ -219,30 +214,12 @@ def __init__(self, else: self._default_hash = multihash.Multihash(codec=default_hash) - def recover_tree(self, broken_struct): - if not isinstance(broken_struct, dict): - return broken_struct - all_recovered = [] - ret_tree = {} - for k in broken_struct: - if len(k) > 1 and k.startswith("/") and k[2:].isnumeric(): - cid_to_recover = CID.decode(broken_struct[k].value[1:]) - recovered = self.recover_tree(cbor2.loads(self.get_raw(cid_to_recover))) - all_recovered.append(recovered) - else: - ret_tree[k] = self.recover_tree(broken_struct[k]) - for recovered in all_recovered: - for k in recovered: - ret_tree[k] = self.recover_tree(recovered[k]) - - return ret_tree - def get(self, cid: CID) -> ValueType: value = self.get_raw(cid) if cid.codec == DagPbCodec: return value elif cid.codec == DagCborCodec: - return self.recover_tree(cbor2.loads(value)) + return dag_cbor.decode(value) else: raise ValueError(f"can't decode CID's codec '{cid.codec.name}'") @@ -263,31 +240,12 @@ def get_raw(self, cid: CID) -> bytes: res.raise_for_status() return res.content - def make_tree_structure(self, node): - if not isinstance(node, dict): - return node - new_tree = {} - if len(node) <= self._max_nodes_per_level: - for key in node: - new_tree[key] = self.make_tree_structure(node[key]) - return new_tree - for group_of_keys in grouper(list(node.keys()), self._max_nodes_per_level): - key_for_group = f"/{hash(frozenset(group_of_keys))}" - sub_tree = {} - for key in group_of_keys: - sub_tree[key] = node[key] - new_tree[key_for_group] = self.put_sub_tree(self.make_tree_structure(sub_tree)) - return self.make_tree_structure(new_tree) - - def put_sub_tree(self, d): - return self.put_raw(cbor2.dumps(d, default=default_encoder), DagCborCodec, should_pin=False) - def put(self, value: ValueType) -> CID: validate(value, ValueType) if isinstance(value, bytes): return self.put_raw(value, DagPbCodec) else: - return self.put_raw(cbor2.dumps(self.make_tree_structure(value), default=default_encoder), DagCborCodec) + return self.put_raw(dag_cbor.encode(value), DagCborCodec) def put_raw(self, raw_value: bytes, diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py new file mode 100644 index 0000000..981f2be --- /dev/null +++ b/ipldstore/hamt_wrapper.py @@ -0,0 +1,117 @@ +import cbor2 +import dag_cbor +import mmh3 +import requests +from dask.distributed import Lock +from multiformats import CID +from py_hamt.hamt import Hamt, create, load + +inline_objects = [ + ".zarray", + ".zgroup", + ".zmetadata", + ".zattrs", +] + + +class HamtIPFSStore: + def save(self, obj): + obj = dag_cbor.encode(obj) + res = requests.post( + "http://localhost:5001/api/v0/dag/put", + params={"store-codec": "dag-cbor", "input-codec": "dag-cbor", "pin": False}, + files={"dummy": obj}, + ) + res.raise_for_status() + return CID.decode(res.json()["Cid"]["/"]) + + def load(self, id): + if isinstance(id, cbor2.CBORTag): + id = CID.decode(id.value[1:]) + res = requests.post( + "http://localhost:5001/api/v0/block/get", params={"arg": str(id)} + ) + res.raise_for_status() + return dag_cbor.decode(res.content) + + def is_equal(self, id1: CID, id2: CID): + return str(id1) == str(id2) + + def is_link(self, obj: CID): + return isinstance(obj, CID) and obj.codec.name == "dag-cbor" + + +class HamtWrapper: + SEP = "/" + + def __init__(self, store=HamtIPFSStore(), starting_id=None, others_dict=None): + Hamt.register_hasher( + 0x23, 4, lambda x: mmh3.hash(x, signed=False).to_bytes(4, byteorder="big") + ) + if starting_id: + self.hamt = load(store, starting_id) + else: + self.hamt = create(store) + + self.others_dict = others_dict if others_dict is not None else {} + + def get(self, key_path): + try: + return self.hamt.get(self.SEP.join(key_path)) + except KeyError: + return get_recursive(self.others_dict, key_path) + + def set(self, key_path, value): + if isinstance(value, CID) and value.codec.name == "dag-pb": + try: + lock = Lock("x") + lock.acquire() + except ValueError: + lock = None + self.hamt = self.hamt.set(self.SEP.join(key_path), value) + if lock is not None: + lock.release() + else: + set_recursive(self.others_dict, key_path, value) + + def iter_all(self): + yield from self._iter_nested("", self.others_dict) + yield from self.hamt.keys() + + def _iter_nested(self, prefix: str, mapping): + for key, value in mapping.items(): + key_parts = key.split(self.SEP) + if key_parts[-1] in inline_objects: + yield prefix + key + elif isinstance(value, dict): + yield from self._iter_nested(prefix + key + self.SEP, value) + else: + yield prefix + key + + def to_dict(self): + return {**self.others_dict, "hamt": self.hamt.id} + + @staticmethod + def from_dict(d): + others_dict = d + hamt_id = others_dict["hamt"] + del others_dict["hamt"] + return HamtWrapper( + HamtIPFSStore(), starting_id=hamt_id, others_dict=others_dict + ) + + +def set_recursive(obj, path, value) -> None: + assert len(path) >= 1 + if len(path) == 1: + obj[path[0]] = value + else: + set_recursive(obj.setdefault(path[0], {}), path[1:], value) # type: ignore + + +def get_recursive(obj, path): + assert len(path) >= 1 + if len(path) == 1: + return obj[path[0]] + else: + return get_recursive(obj[path[0]], path[1:]) # type: ignore diff --git a/ipldstore/ipldstore.py b/ipldstore/ipldstore.py index 1729df5..8fd8be6 100644 --- a/ipldstore/ipldstore.py +++ b/ipldstore/ipldstore.py @@ -2,8 +2,6 @@ Implementation of a MutableMapping based on IPLD data structures. """ - - from io import BufferedIOBase from collections.abc import MutableMapping import sys @@ -18,14 +16,17 @@ from .contentstore import ContentAddressableStore, IPFSStore, MappingCAStore from .utils import StreamLike +from .hamt_wrapper import HamtWrapper if sys.version_info >= (3, 9): MutableMappingT = MutableMapping MutableMappingSB = MutableMapping[str, bytes] else: from typing import MutableMapping as MutableMappingT + MutableMappingSB = MutableMapping + @dataclass class InlineCodec: decoder: Callable[[bytes], Any] @@ -48,13 +49,15 @@ def json_dumps_bytes(obj: Any) -> bytes: class IPLDStore(MutableMappingSB): def __init__(self, castore: Optional[ContentAddressableStore] = None, sep: str = "/", should_async_get: bool = True): - self._mapping: Dict[str, Union[bytes, dag_cbor.encoding.EncodableType]] = {} + self._mapping = HamtWrapper() self._store = castore or MappingCAStore() if isinstance(self._store, IPFSStore) and should_async_get: # Monkey patch zarr to use the async get of multiple chunks def storage_getitems(kv_self, keys, on_error="omit"): return kv_self._mutable_mapping.getitems(keys) + import zarr + zarr.KVStore.getitems = storage_getitems self.sep = sep self.root_cid: Optional[CID] = None @@ -67,7 +70,7 @@ def getitems(self, keys: List[str]) -> Dict[str, bytes]: to_async_get = [] for key in keys: key_parts = key.split(self.sep) - get_value = get_recursive(self._mapping, key_parts) + get_value = self._mapping.get(key_parts) try: # First see if this is a special key that doesn't need to be handled by the store inline_codec = inline_objects[key_parts[-1]] @@ -87,13 +90,14 @@ def getitems(self, keys: List[str]) -> Dict[str, bytes]: def __getitem__(self, key: str) -> bytes: key_parts = key.split(self.sep) - get_value = get_recursive(self._mapping, key_parts) + get_value = self._mapping.get(key_parts) try: inline_codec = inline_objects[key_parts[-1]] except KeyError: if isinstance(get_value, CBORTag): get_value = CID.decode(get_value.value[1:]) assert isinstance(get_value, CID) + res = self._store.get(get_value) assert isinstance(res, bytes) return res @@ -111,41 +115,32 @@ def __setitem__(self, key: str, value: bytes) -> None: set_value = cid else: set_value = inline_codec.decoder(value) - + self._mapping.set(key_parts, set_value) self.root_cid = None - set_recursive(self._mapping, key_parts, set_value) def __delitem__(self, key: str) -> None: - key_parts = key.split(self.sep) - del_recursive(self._mapping, key_parts) + # key_parts = key.split(self.sep) + # del_recursive(self._mapping, key_parts) + raise NotImplementedError def __iter__(self) -> Iterator[str]: - return self._iter_nested("", self._mapping) - - def _iter_nested(self, prefix: str, mapping: Dict[str, Union[bytes, dag_cbor.encoding.EncodableType]]) -> Iterator[str]: - for key, value in mapping.items(): - key_parts = key.split(self.sep) - if key_parts[-1] in inline_objects: - yield prefix + key - elif isinstance(value, dict): - yield from self._iter_nested(prefix + key + self.sep, value) - else: - yield prefix + key + # return self._iter_nested("", self._mapping) + return self._mapping.iter_all() def __len__(self) -> int: - return len(list(iter(self))) + return len(list(self._mapping.iter_all())) def freeze(self) -> CID: """ - Store current version and return the corresponding root cid. + Store current version and return the corresponding root cid. """ if self.root_cid is None: - self.root_cid = self._store.put(self._mapping) + self.root_cid = self._store.put(self._mapping.to_dict()) return self.root_cid def clear(self) -> None: self.root_cid = None - self._mapping = {} + self._mapping = HamtWrapper() @overload def to_car(self, stream: BufferedIOBase) -> int: @@ -175,36 +170,11 @@ def set_root(self, cid: CID) -> None: cid = CID.decode(cid) assert cid in self._store self.root_cid = cid - self._mapping = self._store.get(cid) # type: ignore + whole_mapping = self._store.get(cid) + self._mapping = HamtWrapper.from_dict(whole_mapping) _T = TypeVar("_T") _V = TypeVar("_V") RecursiveMapping = MutableMappingT[_T, Union[_V, "RecursiveMapping"]] # type: ignore - - -def set_recursive(obj: RecursiveMapping[_T, _V], path: List[_T], value: _V) -> None: - assert len(path) >= 1 - if len(path) == 1: - obj[path[0]] = value - else: - set_recursive(obj.setdefault(path[0], {}), path[1:], value) # type: ignore - - -def get_recursive(obj: RecursiveMapping[_T, _V], path: List[_T]) -> _V: - assert len(path) >= 1 - if len(path) == 1: - return obj[path[0]] - else: - return get_recursive(obj[path[0]], path[1:]) # type: ignore - - -def del_recursive(obj: MutableMappingT[_T, Any], path: List[_T]) -> None: - assert len(path) >= 1 - if len(path) == 1: - del obj[path[0]] - else: - del_recursive(obj[path[0]], path[1:]) - if len(obj[path[0]]) == 0: - del obj[path[0]] diff --git a/setup.cfg b/setup.cfg index 5a93598..c1352cd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,6 +36,9 @@ install_requires = requests cbor2 zarr + dask + py-hamt @ git+ssh://git@github.com/dClimate/python-iamap@evan-dev + mmh3 [options.package_data] * = py.typed, *.json From 6b3f90663718788a2f02b435af2a6372984a190a Mon Sep 17 00:00:00 2001 From: eschechter Date: Thu, 5 Jan 2023 13:29:04 -0800 Subject: [PATCH 02/21] use sha256 instead of mmh3, refactor --- ipldstore/hamt_wrapper.py | 38 ++++++++++++++++++++++++++------------ ipldstore/ipldstore.py | 24 ++---------------------- setup.cfg | 1 - 3 files changed, 28 insertions(+), 35 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 981f2be..cfb18e5 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -1,17 +1,31 @@ import cbor2 import dag_cbor -import mmh3 +from dataclasses import dataclass +import hashlib +import json import requests +import typing from dask.distributed import Lock from multiformats import CID from py_hamt.hamt import Hamt, create, load -inline_objects = [ - ".zarray", - ".zgroup", - ".zmetadata", - ".zattrs", -] +@dataclass +class InlineCodec: + decoder: typing.Callable[[bytes], typing.Any] + encoder: typing.Callable[[typing.Any], bytes] + +def json_dumps_bytes(obj: typing.Any) -> bytes: + return json.dumps(obj).encode("utf-8") + +json_inline_codec = InlineCodec(json.loads, json_dumps_bytes) + + +inline_objects = { + ".zarray": json_inline_codec, + ".zgroup": json_inline_codec, + ".zmetadata": json_inline_codec, + ".zattrs": json_inline_codec, +} class HamtIPFSStore: @@ -46,12 +60,14 @@ class HamtWrapper: def __init__(self, store=HamtIPFSStore(), starting_id=None, others_dict=None): Hamt.register_hasher( - 0x23, 4, lambda x: mmh3.hash(x, signed=False).to_bytes(4, byteorder="big") + 0x12, 32, lambda x: hashlib.sha256(x.encode("utf-8")).digest() ) if starting_id: self.hamt = load(store, starting_id) else: - self.hamt = create(store) + self.hamt = create( + store, options={"bit_width": 5, "bucket_size": 3, "hash_alg": 0x12} + ) self.others_dict = others_dict if others_dict is not None else {} @@ -96,9 +112,7 @@ def from_dict(d): others_dict = d hamt_id = others_dict["hamt"] del others_dict["hamt"] - return HamtWrapper( - HamtIPFSStore(), starting_id=hamt_id, others_dict=others_dict - ) + return HamtWrapper(starting_id=hamt_id, others_dict=others_dict) def set_recursive(obj, path, value) -> None: diff --git a/ipldstore/ipldstore.py b/ipldstore/ipldstore.py index 8fd8be6..50dc6e6 100644 --- a/ipldstore/ipldstore.py +++ b/ipldstore/ipldstore.py @@ -5,7 +5,6 @@ from io import BufferedIOBase from collections.abc import MutableMapping import sys -from dataclasses import dataclass from typing import Optional, Callable, Any, TypeVar, Union, Iterator, overload, List, Dict import json @@ -16,7 +15,8 @@ from .contentstore import ContentAddressableStore, IPFSStore, MappingCAStore from .utils import StreamLike -from .hamt_wrapper import HamtWrapper +from .hamt_wrapper import HamtWrapper, inline_objects + if sys.version_info >= (3, 9): MutableMappingT = MutableMapping @@ -27,26 +27,6 @@ MutableMappingSB = MutableMapping -@dataclass -class InlineCodec: - decoder: Callable[[bytes], Any] - encoder: Callable[[Any], bytes] - - -def json_dumps_bytes(obj: Any) -> bytes: - return json.dumps(obj).encode("utf-8") - - -json_inline_codec = InlineCodec(json.loads, json_dumps_bytes) - -inline_objects = { - ".zarray": json_inline_codec, - ".zgroup": json_inline_codec, - ".zmetadata": json_inline_codec, - ".zattrs": json_inline_codec, -} - - class IPLDStore(MutableMappingSB): def __init__(self, castore: Optional[ContentAddressableStore] = None, sep: str = "/", should_async_get: bool = True): self._mapping = HamtWrapper() diff --git a/setup.cfg b/setup.cfg index c1352cd..a31b77c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,7 +38,6 @@ install_requires = zarr dask py-hamt @ git+ssh://git@github.com/dClimate/python-iamap@evan-dev - mmh3 [options.package_data] * = py.typed, *.json From 08a5fde08071b79a11e7d699804210d6f57d1081 Mon Sep 17 00:00:00 2001 From: eschechter Date: Tue, 17 Jan 2023 14:10:41 -0800 Subject: [PATCH 03/21] don't write every hamt update to ipfs (for performance) --- ipldstore/hamt_wrapper.py | 51 +++++++++++++++++++++++++++------------ ipldstore/ipldstore.py | 2 -- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index cfb18e5..92a2889 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -1,4 +1,3 @@ -import cbor2 import dag_cbor from dataclasses import dataclass import hashlib @@ -7,16 +6,20 @@ import typing from dask.distributed import Lock from multiformats import CID +from multiformats import multihash from py_hamt.hamt import Hamt, create, load + @dataclass class InlineCodec: decoder: typing.Callable[[bytes], typing.Any] encoder: typing.Callable[[typing.Any], bytes] + def json_dumps_bytes(obj: typing.Any) -> bytes: return json.dumps(obj).encode("utf-8") + json_inline_codec = InlineCodec(json.loads, json_dumps_bytes) @@ -28,25 +31,32 @@ def json_dumps_bytes(obj: typing.Any) -> bytes: } +def get_cbor_dag_hash(obj): + ob_cbor = dag_cbor.encode(obj) + ob_cbor_hash = multihash.get("sha2-256").digest(ob_cbor) + return CID("base32", 1, "dag-cbor", ob_cbor_hash) + + class HamtIPFSStore: + def __init__(self): + self.mapping = {} + def save(self, obj): - obj = dag_cbor.encode(obj) - res = requests.post( - "http://localhost:5001/api/v0/dag/put", - params={"store-codec": "dag-cbor", "input-codec": "dag-cbor", "pin": False}, - files={"dummy": obj}, - ) - res.raise_for_status() - return CID.decode(res.json()["Cid"]["/"]) + cid = get_cbor_dag_hash(obj) + self.mapping[cid] = obj + return cid def load(self, id): - if isinstance(id, cbor2.CBORTag): - id = CID.decode(id.value[1:]) - res = requests.post( - "http://localhost:5001/api/v0/block/get", params={"arg": str(id)} - ) - res.raise_for_status() - return dag_cbor.decode(res.content) + try: + return self.mapping[id] + except KeyError: + res = requests.post( + "http://localhost:5001/api/v0/block/get", params={"arg": str(id)} + ) + res.raise_for_status() + obj = dag_cbor.decode(res.content) + self.mapping[id] = obj + return obj def is_equal(self, id1: CID, id2: CID): return str(id1) == str(id2) @@ -62,6 +72,7 @@ def __init__(self, store=HamtIPFSStore(), starting_id=None, others_dict=None): Hamt.register_hasher( 0x12, 32, lambda x: hashlib.sha256(x.encode("utf-8")).digest() ) + if starting_id: self.hamt = load(store, starting_id) else: @@ -105,6 +116,14 @@ def _iter_nested(self, prefix: str, mapping): yield prefix + key def to_dict(self): + for id in self.hamt.ids(): + obj = self.hamt.store.mapping[id] + obj = dag_cbor.encode(obj) + res = requests.post( + "http://localhost:5001/api/v0/block/put?cid-codec=dag-cbor", + files={"dummy": obj}, + ) + res.raise_for_status() return {**self.others_dict, "hamt": self.hamt.id} @staticmethod diff --git a/ipldstore/ipldstore.py b/ipldstore/ipldstore.py index 50dc6e6..ebf8f75 100644 --- a/ipldstore/ipldstore.py +++ b/ipldstore/ipldstore.py @@ -6,10 +6,8 @@ from collections.abc import MutableMapping import sys from typing import Optional, Callable, Any, TypeVar, Union, Iterator, overload, List, Dict -import json from multiformats import CID -import dag_cbor from cbor2 import CBORTag from numcodecs.compat import ensure_bytes # type: ignore From 5c6a919f30aa68d5fa6952a1e529b6d246277bea Mon Sep 17 00:00:00 2001 From: eschechter Date: Wed, 18 Jan 2023 09:37:48 -0800 Subject: [PATCH 04/21] increase bit_width to avoid index errors in hamt --- ipldstore/hamt_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 92a2889..5769ad0 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -77,7 +77,7 @@ def __init__(self, store=HamtIPFSStore(), starting_id=None, others_dict=None): self.hamt = load(store, starting_id) else: self.hamt = create( - store, options={"bit_width": 5, "bucket_size": 3, "hash_alg": 0x12} + store, options={"bit_width": 8, "bucket_size": 5, "hash_alg": 0x12} ) self.others_dict = others_dict if others_dict is not None else {} From 2970e6d611649dbb63cd85a7c89f2b0a4e6f903b Mon Sep 17 00:00:00 2001 From: eschechter Date: Wed, 18 Jan 2023 15:40:59 -0800 Subject: [PATCH 05/21] documentation for HamtWrapper --- ipldstore/hamt_wrapper.py | 98 +++++++++++++++++++++++++++++++++++---- 1 file changed, 89 insertions(+), 9 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 5769ad0..5c616e9 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -31,13 +31,27 @@ def json_dumps_bytes(obj: typing.Any) -> bytes: } -def get_cbor_dag_hash(obj): +def get_cbor_dag_hash(obj) -> CID: + """Generates the IPFS hash an object would have if it were put to IPFS as dag-cbor, + without actually making an IPFS call (much faster) + + Args: + obj: object to generate hash for + + Returns: + CID: cid for object in dag-cbor + """ ob_cbor = dag_cbor.encode(obj) ob_cbor_hash = multihash.get("sha2-256").digest(ob_cbor) return CID("base32", 1, "dag-cbor", ob_cbor_hash) class HamtIPFSStore: + """Class implementing methods necessary for a HAMT store. Unlike other stores, does not save objects + permanently -- instead, stores them in memory and generates an ID without actually persisting the object. + It is the responsibility of the user to save all the HAMTs keys on their own if they + wish the HAMT to persist. + """ def __init__(self): self.mapping = {} @@ -68,30 +82,67 @@ def is_link(self, obj: CID): class HamtWrapper: SEP = "/" - def __init__(self, store=HamtIPFSStore(), starting_id=None, others_dict=None): + def __init__( + self, + store=HamtIPFSStore(), + starting_id: typing.Optional[CID] = None, + others_dict: dict = None, + ) -> None: + """ + Args: + store (optional): Backing store for underyling HAMT. Defaults to HamtIPFSStore(). + starting_id (typing.Optional[CID], optional): + CID with which to initialize HAMT. In this implementation, the HAMT will only include + keys corresponding to zarr chunks, represented by dag-pb CIDs. Defaults to None. + others_dict (dict, optional): + Starting dict for keys that aren't going to be put in the HAMT. Defaults to None. + """ + + # Register the sha2-256 hash with HAMT Hamt.register_hasher( 0x12, 32, lambda x: hashlib.sha256(x.encode("utf-8")).digest() ) if starting_id: + # Load HAMT from store using given id self.hamt = load(store, starting_id) else: + # Create HAMT from scratch with sensible default options self.hamt = create( store, options={"bit_width": 8, "bucket_size": 5, "hash_alg": 0x12} ) self.others_dict = others_dict if others_dict is not None else {} - def get(self, key_path): + def get(self, key_path: typing.List[str]): + """Get the value located at a `key_path`. First checks the HAMT, and if unable to find the key, + checks `others_dict`. If not in either, raises `KeyError` + + Args: + key_path (typing.List[str]): decomposed key for which to find value + + Returns: + value located at this `key_path` + """ try: return self.hamt.get(self.SEP.join(key_path)) except KeyError: return get_recursive(self.others_dict, key_path) - def set(self, key_path, value): + def set(self, key_path: typing.List[str], value) -> None: + """Sets a `key_path` to a given `value`. If the value is a dag-pb CID, then sets the HAMT. + Otherwise, sets `others_dict`. + + Args: + key_path (typing.List[str]): decomposed key for which to set value + value: value to set + """ if isinstance(value, CID) and value.codec.name == "dag-pb": + # We need to lock the HAMT to prevent multiple threads writing to it at once, + # as it is not thread safe try: - lock = Lock("x") + # lock needs name so dask knows to recognize it across threads + lock = Lock("hamt-write") lock.acquire() except ValueError: lock = None @@ -101,11 +152,26 @@ def set(self, key_path, value): else: set_recursive(self.others_dict, key_path, value) - def iter_all(self): + def iter_all(self) -> typing.Iterator[str]: + """Iterates over all keys in both `others_dict` and the HAMT + + Yields: + str: key + """ yield from self._iter_nested("", self.others_dict) yield from self.hamt.keys() - def _iter_nested(self, prefix: str, mapping): + def _iter_nested(self, prefix: str, mapping: dict) -> typing.Iterator[str]: + """Iterates over all keys in `mapping`, reconstructing the key from decomposed + version when necessary + + Args: + prefix (str): parts of key that have been used up intil this point in the recursion + mapping (dict): dict to pull keys from + + Yields: + str: key + """ for key, value in mapping.items(): key_parts = key.split(self.SEP) if key_parts[-1] in inline_objects: @@ -115,7 +181,13 @@ def _iter_nested(self, prefix: str, mapping): else: yield prefix + key - def to_dict(self): + def to_dict(self) -> dict: + """Stores all keys in the HAMT permanently in IPFS, then returns a dict representation + of the whole data structure + + Returns: + dict: dict representation of `self`, including both `others_dict` and `hamt` + """ for id in self.hamt.ids(): obj = self.hamt.store.mapping[id] obj = dag_cbor.encode(obj) @@ -127,7 +199,15 @@ def to_dict(self): return {**self.others_dict, "hamt": self.hamt.id} @staticmethod - def from_dict(d): + def from_dict(d: dict) -> "HamtWrapper": + """Takes a dict generated by `to_dict` and turns it back into a HAMT. + + Args: + d (dict): generated by `to_dict` + + Returns: + HamtWrapper: corresponds to this dict `d` + """ others_dict = d hamt_id = others_dict["hamt"] del others_dict["hamt"] From 7dda51351e77379f10992d84dbd5a034f0a9ae79 Mon Sep 17 00:00:00 2001 From: eschechter Date: Thu, 19 Jan 2023 10:35:22 -0800 Subject: [PATCH 06/21] use cbor2 and store cbor of hamts objs, not actual dicts --- ipldstore/hamt_wrapper.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 5c616e9..052b6b3 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -1,4 +1,4 @@ -import dag_cbor +import cbor2 from dataclasses import dataclass import hashlib import json @@ -30,20 +30,22 @@ def json_dumps_bytes(obj: typing.Any) -> bytes: ".zattrs": json_inline_codec, } +def default_encoder(encoder, value): + encoder.encode(cbor2.CBORTag(42, b'\x00' + bytes(value))) -def get_cbor_dag_hash(obj) -> CID: - """Generates the IPFS hash an object would have if it were put to IPFS as dag-cbor, + +def get_cbor_dag_hash(obj) -> typing.Tuple[CID, bytes]: + """Generates the IPFS hash and bytes an object would have if it were put to IPFS as dag-cbor, without actually making an IPFS call (much faster) Args: - obj: object to generate hash for - + obj: object to generate hash and dag-cbor bytes for Returns: - CID: cid for object in dag-cbor + typing.Tuple[CID, bytes]: IPFS hash and dag-cbor bytes """ - ob_cbor = dag_cbor.encode(obj) - ob_cbor_hash = multihash.get("sha2-256").digest(ob_cbor) - return CID("base32", 1, "dag-cbor", ob_cbor_hash) + obj_cbor = cbor2.dumps(obj, default=default_encoder) + obj_cbor_hash = multihash.get("sha2-256").digest(obj_cbor) + return CID("base32", 1, "dag-cbor", obj_cbor_hash), obj_cbor class HamtIPFSStore: @@ -56,26 +58,30 @@ def __init__(self): self.mapping = {} def save(self, obj): - cid = get_cbor_dag_hash(obj) - self.mapping[cid] = obj + cid, obj_cbor = get_cbor_dag_hash(obj) + self.mapping[cid] = obj_cbor return cid def load(self, id): + if isinstance(id, cbor2.CBORTag): + id = CID.decode(id.value[1:]) try: - return self.mapping[id] + return cbor2.loads(self.mapping[id]) except KeyError: res = requests.post( "http://localhost:5001/api/v0/block/get", params={"arg": str(id)} ) res.raise_for_status() - obj = dag_cbor.decode(res.content) - self.mapping[id] = obj + obj = cbor2.loads(res.content) + self.mapping[id] = res.content return obj def is_equal(self, id1: CID, id2: CID): return str(id1) == str(id2) def is_link(self, obj: CID): + if isinstance(obj, cbor2.CBORTag): + obj = CID.decode(obj.value[1:]) return isinstance(obj, CID) and obj.codec.name == "dag-cbor" @@ -189,11 +195,10 @@ def to_dict(self) -> dict: dict: dict representation of `self`, including both `others_dict` and `hamt` """ for id in self.hamt.ids(): - obj = self.hamt.store.mapping[id] - obj = dag_cbor.encode(obj) + obj_cbor = self.hamt.store.mapping[id] res = requests.post( "http://localhost:5001/api/v0/block/put?cid-codec=dag-cbor", - files={"dummy": obj}, + files={"dummy": obj_cbor}, ) res.raise_for_status() return {**self.others_dict, "hamt": self.hamt.id} From eb2782898ce2543442eba317bbbe270cf881cbe1 Mon Sep 17 00:00:00 2001 From: eschechter Date: Thu, 19 Jan 2023 15:21:24 -0800 Subject: [PATCH 07/21] back to dag_cbor from cbor2 --- ipldstore/hamt_wrapper.py | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 052b6b3..4d08b50 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -1,4 +1,4 @@ -import cbor2 +import dag_cbor from dataclasses import dataclass import hashlib import json @@ -30,9 +30,6 @@ def json_dumps_bytes(obj: typing.Any) -> bytes: ".zattrs": json_inline_codec, } -def default_encoder(encoder, value): - encoder.encode(cbor2.CBORTag(42, b'\x00' + bytes(value))) - def get_cbor_dag_hash(obj) -> typing.Tuple[CID, bytes]: """Generates the IPFS hash and bytes an object would have if it were put to IPFS as dag-cbor, @@ -43,17 +40,18 @@ def get_cbor_dag_hash(obj) -> typing.Tuple[CID, bytes]: Returns: typing.Tuple[CID, bytes]: IPFS hash and dag-cbor bytes """ - obj_cbor = cbor2.dumps(obj, default=default_encoder) + obj_cbor = dag_cbor.encode(obj) obj_cbor_hash = multihash.get("sha2-256").digest(obj_cbor) return CID("base32", 1, "dag-cbor", obj_cbor_hash), obj_cbor class HamtIPFSStore: - """Class implementing methods necessary for a HAMT store. Unlike other stores, does not save objects - permanently -- instead, stores them in memory and generates an ID without actually persisting the object. - It is the responsibility of the user to save all the HAMTs keys on their own if they - wish the HAMT to persist. + """Class implementing methods necessary for a HAMT store. Unlike other stores, does not save objects + permanently -- instead, stores them in memory and generates an ID without actually persisting the object. + It is the responsibility of the user to save all the HAMTs keys on their own if they + wish the HAMT to persist. """ + def __init__(self): self.mapping = {} @@ -63,16 +61,14 @@ def save(self, obj): return cid def load(self, id): - if isinstance(id, cbor2.CBORTag): - id = CID.decode(id.value[1:]) try: - return cbor2.loads(self.mapping[id]) + return dag_cbor.decode(self.mapping[id]) except KeyError: res = requests.post( "http://localhost:5001/api/v0/block/get", params={"arg": str(id)} ) res.raise_for_status() - obj = cbor2.loads(res.content) + obj = dag_cbor.decode(res.content) self.mapping[id] = res.content return obj @@ -80,8 +76,6 @@ def is_equal(self, id1: CID, id2: CID): return str(id1) == str(id2) def is_link(self, obj: CID): - if isinstance(obj, cbor2.CBORTag): - obj = CID.decode(obj.value[1:]) return isinstance(obj, CID) and obj.codec.name == "dag-cbor" From f03391142bd87254eb3a1a6bb71bd126ff84d074 Mon Sep 17 00:00:00 2001 From: eschechter Date: Fri, 20 Jan 2023 13:41:46 -0800 Subject: [PATCH 08/21] conistently use cbor2 and base32 CIDs --- ipldstore/contentstore.py | 76 +++++---------------------------------- ipldstore/hamt_wrapper.py | 20 +++++++---- ipldstore/ipldstore.py | 4 +-- 3 files changed, 23 insertions(+), 77 deletions(-) diff --git a/ipldstore/contentstore.py b/ipldstore/contentstore.py index 7aac5bc..e7a1a56 100644 --- a/ipldstore/contentstore.py +++ b/ipldstore/contentstore.py @@ -7,8 +7,7 @@ import asyncio from multiformats import CID, multicodec, multibase, multihash, varint -import dag_cbor -from cbor2 import CBORTag +import cbor2 from dag_cbor.encoding import EncodableType as DagCborEncodable from typing_validation import validate @@ -33,6 +32,9 @@ def get_retry_session() -> requests.Session: return session +def default_encoder(encoder, value): + encoder.encode(cbor2.CBORTag(42, b'\x00' + bytes(value))) + class ContentAddressableStore(ABC): @abstractmethod def get_raw(self, cid: CID) -> bytes: @@ -43,7 +45,7 @@ def get(self, cid: CID) -> ValueType: if cid.codec == RawCodec: return value elif cid.codec == DagCborCodec: - return dag_cbor.decode(value) + return cbor2.loads(value) else: raise ValueError(f"can't decode CID's codec '{cid.codec.name}'") @@ -66,73 +68,11 @@ def put(self, value: ValueType) -> CID: if isinstance(value, bytes): return self.put_raw(value, RawCodec) else: - return self.put_raw(dag_cbor.encode(value), DagCborCodec) + return self.put_raw(cbor2.dumps(value, default=default_encoder), DagCborCodec) def normalize_cid(self, cid: CID) -> CID: # pylint: disable=no-self-use return cid - @overload - def to_car(self, root: CID, stream: BufferedIOBase) -> int: - ... - - @overload - def to_car(self, root: CID, stream: None = None) -> bytes: - ... - - def to_car(self, root: CID, stream: Optional[BufferedIOBase] = None) -> Union[bytes, int]: - validate(root, CID) - validate(stream, Optional[BufferedIOBase]) - - if stream is None: - buffer = BytesIO() - stream = buffer - return_bytes = True - else: - return_bytes = False - - bytes_written = 0 - header = dag_cbor.encode({"version": 1, "roots": [root]}) - bytes_written += stream.write(varint.encode(len(header))) - bytes_written += stream.write(header) - bytes_written += self._to_car(root, stream, set()) - - if return_bytes: - return buffer.getvalue() - else: - return bytes_written - - def _to_car(self, - root: CID, - stream: BufferedIOBase, - already_written: MutableSet[CID]) -> int: - """ - makes a CAR without the header - """ - bytes_written = 0 - - if root not in already_written: - data = self.get_raw(root) - cid_bytes = bytes(root) - bytes_written += stream.write(varint.encode(len(cid_bytes) + len(data))) - bytes_written += stream.write(cid_bytes) - bytes_written += stream.write(data) - already_written.add(root) - - if root.codec == DagCborCodec: - value = dag_cbor.decode(data) - for child in iter_links(value): - bytes_written += self._to_car(child, stream, already_written) - return bytes_written - - def import_car(self, stream_or_bytes: StreamLike) -> List[CID]: - roots, blocks = read_car(stream_or_bytes) - roots = [self.normalize_cid(root) for root in roots] - - for cid, data, _ in blocks: - self.put_raw(bytes(data), cid.codec) - - return roots - class MappingCAStore(ContentAddressableStore): def __init__(self, @@ -219,7 +159,7 @@ def get(self, cid: CID) -> ValueType: if cid.codec == DagPbCodec: return value elif cid.codec == DagCborCodec: - return dag_cbor.decode(value) + return cbor2.loads(value) else: raise ValueError(f"can't decode CID's codec '{cid.codec.name}'") @@ -245,7 +185,7 @@ def put(self, value: ValueType) -> CID: if isinstance(value, bytes): return self.put_raw(value, DagPbCodec) else: - return self.put_raw(dag_cbor.encode(value), DagCborCodec) + return self.put_raw(cbor2.dumps(value, default=default_encoder), DagCborCodec) def put_raw(self, raw_value: bytes, diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 4d08b50..d5c542f 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -1,4 +1,4 @@ -import dag_cbor +import cbor2 from dataclasses import dataclass import hashlib import json @@ -30,6 +30,8 @@ def json_dumps_bytes(obj: typing.Any) -> bytes: ".zattrs": json_inline_codec, } +def default_encoder(encoder, value): + encoder.encode(cbor2.CBORTag(42, b'\x00' + bytes(value))) def get_cbor_dag_hash(obj) -> typing.Tuple[CID, bytes]: """Generates the IPFS hash and bytes an object would have if it were put to IPFS as dag-cbor, @@ -40,7 +42,7 @@ def get_cbor_dag_hash(obj) -> typing.Tuple[CID, bytes]: Returns: typing.Tuple[CID, bytes]: IPFS hash and dag-cbor bytes """ - obj_cbor = dag_cbor.encode(obj) + obj_cbor = cbor2.dumps(obj, default=default_encoder) obj_cbor_hash = multihash.get("sha2-256").digest(obj_cbor) return CID("base32", 1, "dag-cbor", obj_cbor_hash), obj_cbor @@ -60,22 +62,26 @@ def save(self, obj): self.mapping[cid] = obj_cbor return cid - def load(self, id): + def load(self, cid): + if isinstance(cid, cbor2.CBORTag): + cid = CID.decode(cid.value[1:]).set(base="base32") try: - return dag_cbor.decode(self.mapping[id]) + return cbor2.loads(self.mapping[cid]) except KeyError: res = requests.post( - "http://localhost:5001/api/v0/block/get", params={"arg": str(id)} + "http://localhost:5001/api/v0/block/get", params={"arg": str(cid)} ) res.raise_for_status() - obj = dag_cbor.decode(res.content) - self.mapping[id] = res.content + obj = cbor2.loads(res.content) + self.mapping[cid] = res.content return obj def is_equal(self, id1: CID, id2: CID): return str(id1) == str(id2) def is_link(self, obj: CID): + if isinstance(obj, cbor2.CBORTag): + obj = CID.decode(obj.value[1:]).set(base="base32") return isinstance(obj, CID) and obj.codec.name == "dag-cbor" diff --git a/ipldstore/ipldstore.py b/ipldstore/ipldstore.py index ebf8f75..5ca0a07 100644 --- a/ipldstore/ipldstore.py +++ b/ipldstore/ipldstore.py @@ -56,7 +56,7 @@ def getitems(self, keys: List[str]) -> Dict[str, bytes]: except KeyError: # If it isn't, the key is an IPFS CID and needs to be passed to the store to be handled asynchronously if isinstance(get_value, CBORTag): - get_value = CID.decode(get_value.value[1:]) + get_value = CID.decode(get_value.value[1:]).set(base="base32") assert isinstance(get_value, CID) cid_to_key_map[get_value] = key to_async_get.append(get_value) @@ -73,7 +73,7 @@ def __getitem__(self, key: str) -> bytes: inline_codec = inline_objects[key_parts[-1]] except KeyError: if isinstance(get_value, CBORTag): - get_value = CID.decode(get_value.value[1:]) + get_value = CID.decode(get_value.value[1:]).set(base="base32") assert isinstance(get_value, CID) res = self._store.get(get_value) From c321e157b577bbeec291ae838e35241fc0727b92 Mon Sep 17 00:00:00 2001 From: eschechter Date: Mon, 23 Jan 2023 10:32:15 -0800 Subject: [PATCH 09/21] log timeout exceptions for block get in --- ipldstore/hamt_wrapper.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index d5c542f..533e0df 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -30,8 +30,10 @@ def json_dumps_bytes(obj: typing.Any) -> bytes: ".zattrs": json_inline_codec, } + def default_encoder(encoder, value): - encoder.encode(cbor2.CBORTag(42, b'\x00' + bytes(value))) + encoder.encode(cbor2.CBORTag(42, b"\x00" + bytes(value))) + def get_cbor_dag_hash(obj) -> typing.Tuple[CID, bytes]: """Generates the IPFS hash and bytes an object would have if it were put to IPFS as dag-cbor, @@ -68,9 +70,15 @@ def load(self, cid): try: return cbor2.loads(self.mapping[cid]) except KeyError: - res = requests.post( - "http://localhost:5001/api/v0/block/get", params={"arg": str(cid)} - ) + try: + res = requests.post( + "http://localhost:5001/api/v0/block/get", + params={"arg": str(cid)}, + timeout=30, + ) + except requests.exceptions.ReadTimeout: + print(f"timed out on {str(cid)}") + raise Exception(f"timed out on {str(cid)}") res.raise_for_status() obj = cbor2.loads(res.content) self.mapping[cid] = res.content From 8f91a9fe87c2b601643fd99687601e25b5d0c778 Mon Sep 17 00:00:00 2001 From: eschechter Date: Mon, 23 Jan 2023 12:00:26 -0800 Subject: [PATCH 10/21] refactor dask lock around hamt.set --- ipldstore/hamt_wrapper.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 533e0df..4f68738 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -154,15 +154,9 @@ def set(self, key_path: typing.List[str], value) -> None: if isinstance(value, CID) and value.codec.name == "dag-pb": # We need to lock the HAMT to prevent multiple threads writing to it at once, # as it is not thread safe - try: - # lock needs name so dask knows to recognize it across threads - lock = Lock("hamt-write") - lock.acquire() - except ValueError: - lock = None - self.hamt = self.hamt.set(self.SEP.join(key_path), value) - if lock is not None: - lock.release() + # lock needs name so dask knows to recognize it across threads + with Lock("hamt-write"): + self.hamt = self.hamt.set(self.SEP.join(key_path), value) else: set_recursive(self.others_dict, key_path, value) From bfa8d25b208c6a4dc494ce30d833030fb3c31d09 Mon Sep 17 00:00:00 2001 From: eschechter Date: Tue, 24 Jan 2023 11:40:34 -0800 Subject: [PATCH 11/21] improve hamt_wrapper efficiency by relying less on multiformats library --- ipldstore/hamt_wrapper.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 4f68738..10d150f 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -6,7 +6,7 @@ import typing from dask.distributed import Lock from multiformats import CID -from multiformats import multihash +from hashlib import sha256 from py_hamt.hamt import Hamt, create, load @@ -45,8 +45,11 @@ def get_cbor_dag_hash(obj) -> typing.Tuple[CID, bytes]: typing.Tuple[CID, bytes]: IPFS hash and dag-cbor bytes """ obj_cbor = cbor2.dumps(obj, default=default_encoder) - obj_cbor_hash = multihash.get("sha2-256").digest(obj_cbor) - return CID("base32", 1, "dag-cbor", obj_cbor_hash), obj_cbor + + # the multihash format prefixes the raw sha256 digest with two bytes: + # 18 (the multicodec code for sha256) and 32 (the length of the digest in bytes) + obj_cbor_multi_hash = bytes([18, 32]) + sha256(obj_cbor).digest() + return CID("base32", 1, "dag-cbor", obj_cbor_multi_hash), obj_cbor class HamtIPFSStore: @@ -89,7 +92,9 @@ def is_equal(self, id1: CID, id2: CID): def is_link(self, obj: CID): if isinstance(obj, cbor2.CBORTag): - obj = CID.decode(obj.value[1:]).set(base="base32") + # dag-cbor tags have 37 bytes in their value: + # 1 from CBORTag prefix, 1 from version, 34 from multihash digest, 1 from codec + return obj.tag == 42 and len(obj.value) == 37 return isinstance(obj, CID) and obj.codec.name == "dag-cbor" From a11c9eb3f65053612423073644788edac6f484bf Mon Sep 17 00:00:00 2001 From: eschechter Date: Wed, 1 Feb 2023 15:10:44 -0800 Subject: [PATCH 12/21] garbage collect mapping holding hamt ids when it's too big --- ipldstore/hamt_wrapper.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 10d150f..b83b446 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -2,6 +2,7 @@ from dataclasses import dataclass import hashlib import json +import psutil import requests import typing from dask.distributed import Lock @@ -61,12 +62,21 @@ class HamtIPFSStore: def __init__(self): self.mapping = {} + self.num_bytes_in_mapping = 0 def save(self, obj): cid, obj_cbor = get_cbor_dag_hash(obj) self.mapping[cid] = obj_cbor + self.num_bytes_in_mapping += len(obj_cbor) return cid + def garbage_collect_mapping(self, hamt: Hamt): + hamt_keys = set(hamt.ids()) + to_delete = [key for key in self.mapping if key not in hamt_keys] + for key in to_delete: + self.num_bytes_in_mapping -= len(self.mapping[key]) + del self.mapping[key] + def load(self, cid): if isinstance(cid, cbor2.CBORTag): cid = CID.decode(cid.value[1:]).set(base="base32") @@ -101,6 +111,9 @@ def is_link(self, obj: CID): class HamtWrapper: SEP = "/" + MAX_PERCENT_OF_RAM_FOR_MAPPING = 0.1 + MIN_GC_RATIO_BEFORE_FAILURE = 1.1 + def __init__( self, store=HamtIPFSStore(), @@ -132,6 +145,7 @@ def __init__( ) self.others_dict = others_dict if others_dict is not None else {} + self._system_ram = psutil.virtual_memory().total def get(self, key_path: typing.List[str]): """Get the value located at a `key_path`. First checks the HAMT, and if unable to find the key, @@ -162,6 +176,20 @@ def set(self, key_path: typing.List[str], value) -> None: # lock needs name so dask knows to recognize it across threads with Lock("hamt-write"): self.hamt = self.hamt.set(self.SEP.join(key_path), value) + percent_ram_used_by_mapping = ( + self.hamt.store.num_bytes_in_mapping / self._system_ram + ) + if percent_ram_used_by_mapping > self.MAX_PERCENT_OF_RAM_FOR_MAPPING: + num_ids_in_hamt = sum(1 for _ in self.hamt.ids()) + if ( + len(self.hamt.store.mapping) + > self.MIN_GC_RATIO_BEFORE_FAILURE * num_ids_in_hamt + ): + self.hamt.store.garbage_collect_mapping(self.hamt) + else: + raise RuntimeError( + "HAMT mapping is taking up more than 0.1 of system RAM and gc won't help much, abort" + ) else: set_recursive(self.others_dict, key_path, value) From d20a2dc1be848c3ef36bf14ffb4d5a54d4d30b61 Mon Sep 17 00:00:00 2001 From: eschechter Date: Thu, 2 Feb 2023 08:24:47 -0800 Subject: [PATCH 13/21] handle case where hamt_id is cbor2tag in gc --- ipldstore/hamt_wrapper.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index b83b446..f5f941a 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -71,8 +71,13 @@ def save(self, obj): return cid def garbage_collect_mapping(self, hamt: Hamt): - hamt_keys = set(hamt.ids()) - to_delete = [key for key in self.mapping if key not in hamt_keys] + hamt_ids = set() + for hamt_id in hamt.ids(): + if isinstance(hamt_id, cbor2.CBORTag): + hamt_ids.add(CID.decode(hamt_id.value[1:]).set(base="base32")) + else: + hamt_ids.add(hamt_id) + to_delete = [key for key in self.mapping if key not in hamt_ids] for key in to_delete: self.num_bytes_in_mapping -= len(self.mapping[key]) del self.mapping[key] @@ -188,7 +193,7 @@ def set(self, key_path: typing.List[str], value) -> None: self.hamt.store.garbage_collect_mapping(self.hamt) else: raise RuntimeError( - "HAMT mapping is taking up more than 0.1 of system RAM and gc won't help much, abort" + f"HAMT mapping is taking up more than {self.MAX_PERCENT_OF_RAM_FOR_MAPPING} of system RAM and gc won't help much, abort" ) else: set_recursive(self.others_dict, key_path, value) From 2993eafbf88bded029f89d0e0ff09858c2f87282 Mon Sep 17 00:00:00 2001 From: eschechter Date: Tue, 7 Feb 2023 16:59:46 -0500 Subject: [PATCH 14/21] check for cbortag properly in to_dict --- ipldstore/hamt_wrapper.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index f5f941a..f124183 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -235,6 +235,8 @@ def to_dict(self) -> dict: dict: dict representation of `self`, including both `others_dict` and `hamt` """ for id in self.hamt.ids(): + if isinstance(id, cbor2.CBORTag): + id = CID.decode(id.value[1:]).set(base="base32") obj_cbor = self.hamt.store.mapping[id] res = requests.post( "http://localhost:5001/api/v0/block/put?cid-codec=dag-cbor", From 75306fa88a321460d034ed21296905139d36a04d Mon Sep 17 00:00:00 2001 From: eschechter Date: Mon, 27 Feb 2023 08:43:41 -0800 Subject: [PATCH 15/21] documentation cleanup --- .gitignore | 3 ++- ipldstore/hamt_wrapper.py | 52 +++++++++++++++++++++++++++++++++------ ipldstore/ipldstore.py | 1 + 3 files changed, 48 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index a6db6d5..387db63 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ __pycache__ .tox *.egg-info build -.pytest_cache \ No newline at end of file +.pytest_cache +venv diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index f124183..cc588d2 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -53,7 +53,7 @@ def get_cbor_dag_hash(obj) -> typing.Tuple[CID, bytes]: return CID("base32", 1, "dag-cbor", obj_cbor_multi_hash), obj_cbor -class HamtIPFSStore: +class HamtMemoryStore: """Class implementing methods necessary for a HAMT store. Unlike other stores, does not save objects permanently -- instead, stores them in memory and generates an ID without actually persisting the object. It is the responsibility of the user to save all the HAMTs keys on their own if they @@ -61,16 +61,35 @@ class HamtIPFSStore: """ def __init__(self): - self.mapping = {} + self.mapping: typing.Dict[CID, bytes] = {} self.num_bytes_in_mapping = 0 - def save(self, obj): + def save(self, obj) -> CID: + """Generate an ID for object and store the CID and a dag_cbor representation of the object + in the store's mapping. Also tracks how large mapping is by adding the length of the dag_cbor + representation. + + Args: + obj: object being stored in HAMT + + Returns: + CID: dag-cbor hash for object + """ cid, obj_cbor = get_cbor_dag_hash(obj) self.mapping[cid] = obj_cbor self.num_bytes_in_mapping += len(obj_cbor) return cid def garbage_collect_mapping(self, hamt: Hamt): + """Removes all CIDs in the store's mapping not present in the provided HAMT. Useful because, in general, + the intermediate states stored in the mapping are not necesary to persist and can consume 100s of GBs + of RAM when generating large HAMTs. + + + Args: + hamt (Hamt): HAMT whose CIDs will be persisted -- ids that are in the store's mapping but not + in `hamt` will be deleted from the mapping. + """ hamt_ids = set() for hamt_id in hamt.ids(): if isinstance(hamt_id, cbor2.CBORTag): @@ -82,7 +101,20 @@ def garbage_collect_mapping(self, hamt: Hamt): self.num_bytes_in_mapping -= len(self.mapping[key]) del self.mapping[key] - def load(self, cid): + def load(self, cid: typing.Union[cbor2.CBORTag, CID]): + """Given a dag-cbor CID, returns the corresponding object. First checks to see if + CID is present in store's mapping. If not, then checks IFPS, and finally fails if + the CID can't be found in 30s. + + Args: + cid typing.Union[cbor2.CBORTag, CID]: CID to load. Might initially be in CBORTag form + due to the fact that we use cbor2 to encode objects for speed. + + Raises: + Exception: Raised when CID cannot be located in store's mapping or IPFS. + + Returns: object whose dag-cbor hash is the CID + """ if isinstance(cid, cbor2.CBORTag): cid = CID.decode(cid.value[1:]).set(base="base32") try: @@ -95,7 +127,6 @@ def load(self, cid): timeout=30, ) except requests.exceptions.ReadTimeout: - print(f"timed out on {str(cid)}") raise Exception(f"timed out on {str(cid)}") res.raise_for_status() obj = cbor2.loads(res.content) @@ -109,6 +140,7 @@ def is_link(self, obj: CID): if isinstance(obj, cbor2.CBORTag): # dag-cbor tags have 37 bytes in their value: # 1 from CBORTag prefix, 1 from version, 34 from multihash digest, 1 from codec + # dag-pb v0 hashes have fewer. return obj.tag == 42 and len(obj.value) == 37 return isinstance(obj, CID) and obj.codec.name == "dag-cbor" @@ -121,13 +153,11 @@ class HamtWrapper: def __init__( self, - store=HamtIPFSStore(), starting_id: typing.Optional[CID] = None, others_dict: dict = None, ) -> None: """ Args: - store (optional): Backing store for underyling HAMT. Defaults to HamtIPFSStore(). starting_id (typing.Optional[CID], optional): CID with which to initialize HAMT. In this implementation, the HAMT will only include keys corresponding to zarr chunks, represented by dag-pb CIDs. Defaults to None. @@ -139,6 +169,7 @@ def __init__( Hamt.register_hasher( 0x12, 32, lambda x: hashlib.sha256(x.encode("utf-8")).digest() ) + store = HamtMemoryStore() if starting_id: # Load HAMT from store using given id @@ -181,17 +212,24 @@ def set(self, key_path: typing.List[str], value) -> None: # lock needs name so dask knows to recognize it across threads with Lock("hamt-write"): self.hamt = self.hamt.set(self.SEP.join(key_path), value) + # Now check if the percent of system RAM used by the store's mapping exceeds a 10% threshold percent_ram_used_by_mapping = ( self.hamt.store.num_bytes_in_mapping / self._system_ram ) if percent_ram_used_by_mapping > self.MAX_PERCENT_OF_RAM_FOR_MAPPING: + # If it does exceed the threshold, next determine if the store's mapping is larger + # than the HAMT by a 10% margin. + # here, we use number of IDs in the HAMT as a proxy for the HAMT's size num_ids_in_hamt = sum(1 for _ in self.hamt.ids()) if ( len(self.hamt.store.mapping) > self.MIN_GC_RATIO_BEFORE_FAILURE * num_ids_in_hamt ): + # If both criteria are met, we can save signficant RAM by GCing, so do so. self.hamt.store.garbage_collect_mapping(self.hamt) else: + # If the first criterium is met but not the second, then the HAMT itself is too large to comfortably + # fit in memory, and we abort the parse. raise RuntimeError( f"HAMT mapping is taking up more than {self.MAX_PERCENT_OF_RAM_FOR_MAPPING} of system RAM and gc won't help much, abort" ) diff --git a/ipldstore/ipldstore.py b/ipldstore/ipldstore.py index 5ca0a07..3322193 100644 --- a/ipldstore/ipldstore.py +++ b/ipldstore/ipldstore.py @@ -27,6 +27,7 @@ class IPLDStore(MutableMappingSB): def __init__(self, castore: Optional[ContentAddressableStore] = None, sep: str = "/", should_async_get: bool = True): + # In this iteration of IPLDStore, we use a HAMT to store zarr chunks instead of a dict self._mapping = HamtWrapper() self._store = castore or MappingCAStore() if isinstance(self._store, IPFSStore) and should_async_get: From 830f15914c9a18298652757e3799614bfb07dcbb Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Thu, 2 Mar 2023 11:29:25 -0500 Subject: [PATCH 16/21] fix: sort out dependencies Pin dag-cbor to < 0.3, and add psutil and the distributed option for dask. --- setup.cfg | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/setup.cfg b/setup.cfg index a31b77c..1530940 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,17 +27,18 @@ packages = find: python_requires = >=3.8 install_requires = aiohttp - typing-extensions - typing-validation + cbor2 + dag-cbor >= 0.2.2, < 0.3 + dask[distributed] multiformats - dag-cbor >= 0.2.2 - pure-protobuf >= 2.1.0 numcodecs + psutil + pure-protobuf >= 2.1.0 + py-hamt @ git+ssh://git@github.com/dClimate/python-iamap@evan-dev requests - cbor2 + typing-validation + typing-extensions zarr - dask - py-hamt @ git+ssh://git@github.com/dClimate/python-iamap@evan-dev [options.package_data] * = py.typed, *.json From 11642dadb751589f6100e49a88e5644732524c2a Mon Sep 17 00:00:00 2001 From: eschechter Date: Tue, 7 Mar 2023 10:21:46 -0800 Subject: [PATCH 17/21] switch to using Hamt.create instead of plain create --- ipldstore/hamt_wrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index cc588d2..bfe7a6f 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -8,7 +8,7 @@ from dask.distributed import Lock from multiformats import CID from hashlib import sha256 -from py_hamt.hamt import Hamt, create, load +from py_hamt.hamt import Hamt, load @dataclass @@ -176,7 +176,7 @@ def __init__( self.hamt = load(store, starting_id) else: # Create HAMT from scratch with sensible default options - self.hamt = create( + self.hamt = Hamt.create( store, options={"bit_width": 8, "bucket_size": 5, "hash_alg": 0x12} ) From 3129e53c16d2027d8c197f9c095fb22460355c9d Mon Sep 17 00:00:00 2001 From: eschechter Date: Thu, 16 Mar 2023 07:52:25 -0700 Subject: [PATCH 18/21] only enter lock if in dask cluster, make reqs more in line with main --- ipldstore/hamt_wrapper.py | 19 +++++++++++++++++-- setup.cfg | 16 ++++++++-------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index bfe7a6f..e3f4bd5 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -1,11 +1,12 @@ import cbor2 from dataclasses import dataclass +from contextlib import nullcontext import hashlib import json import psutil import requests import typing -from dask.distributed import Lock +from dask.distributed import Lock, get_client from multiformats import CID from hashlib import sha256 from py_hamt.hamt import Hamt, load @@ -53,6 +54,19 @@ def get_cbor_dag_hash(obj) -> typing.Tuple[CID, bytes]: return CID("base32", 1, "dag-cbor", obj_cbor_multi_hash), obj_cbor +def _in_dask_context() -> bool: + """Determine whether code is running in a dask context + + Returns: + bool: whether code is in a dask client context + """ + try: + get_client() + return True + except ValueError: + return False + + class HamtMemoryStore: """Class implementing methods necessary for a HAMT store. Unlike other stores, does not save objects permanently -- instead, stores them in memory and generates an ID without actually persisting the object. @@ -182,6 +196,7 @@ def __init__( self.others_dict = others_dict if others_dict is not None else {} self._system_ram = psutil.virtual_memory().total + self._in_dask_context = _in_dask_context() def get(self, key_path: typing.List[str]): """Get the value located at a `key_path`. First checks the HAMT, and if unable to find the key, @@ -210,7 +225,7 @@ def set(self, key_path: typing.List[str], value) -> None: # We need to lock the HAMT to prevent multiple threads writing to it at once, # as it is not thread safe # lock needs name so dask knows to recognize it across threads - with Lock("hamt-write"): + with Lock("hamt-write") if self._in_dask_context else nullcontext(): self.hamt = self.hamt.set(self.SEP.join(key_path), value) # Now check if the percent of system RAM used by the store's mapping exceeds a 10% threshold percent_ram_used_by_mapping = ( diff --git a/setup.cfg b/setup.cfg index 1530940..b3823ce 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,18 +27,18 @@ packages = find: python_requires = >=3.8 install_requires = aiohttp - cbor2 - dag-cbor >= 0.2.2, < 0.3 - dask[distributed] + typing-extensions + typing-validation == 0.0.2 multiformats - numcodecs - psutil + dag-cbor == 0.2.2 pure-protobuf >= 2.1.0 - py-hamt @ git+ssh://git@github.com/dClimate/python-iamap@evan-dev + numcodecs requests - typing-validation - typing-extensions + cbor2 zarr + psutil + dask[distributed] + py-hamt @ git+ssh://git@github.com/dClimate/py-hamt@v1.0.0 [options.package_data] * = py.typed, *.json From c5427562ea98c174bb04f5ce76264d2ada4323ac Mon Sep 17 00:00:00 2001 From: TheGreatAlgo <37487508+TheGreatAlgo@users.noreply.github.com> Date: Thu, 16 Mar 2023 15:59:38 +0100 Subject: [PATCH 19/21] fix: camelCase, bytes --- ipldstore/hamt_wrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index e3f4bd5..328c232 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -181,7 +181,7 @@ def __init__( # Register the sha2-256 hash with HAMT Hamt.register_hasher( - 0x12, 32, lambda x: hashlib.sha256(x.encode("utf-8")).digest() + 0x12, 32, lambda x: hashlib.sha256(x).digest() ) store = HamtMemoryStore() @@ -191,7 +191,7 @@ def __init__( else: # Create HAMT from scratch with sensible default options self.hamt = Hamt.create( - store, options={"bit_width": 8, "bucket_size": 5, "hash_alg": 0x12} + store, options={"bitWidth": 8, "bucketSize": 5, "hashAlg": 0x12} ) self.others_dict = others_dict if others_dict is not None else {} From cde54920725b5de6a15c9cb3ca7e2ce6cd76afb3 Mon Sep 17 00:00:00 2001 From: eschechter Date: Thu, 16 Mar 2023 12:29:16 -0700 Subject: [PATCH 20/21] bump py-hamt version --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index b3823ce..34cfa3a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,7 +38,7 @@ install_requires = zarr psutil dask[distributed] - py-hamt @ git+ssh://git@github.com/dClimate/py-hamt@v1.0.0 + py-hamt @ git+ssh://git@github.com/dClimate/py-hamt@v1.1.0 [options.package_data] * = py.typed, *.json From d2330cddac02aae28adceed220728d82ce362595 Mon Sep 17 00:00:00 2001 From: eschechter Date: Thu, 16 Mar 2023 12:41:34 -0700 Subject: [PATCH 21/21] decode hamt keys from bytes to str when iterating --- ipldstore/hamt_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipldstore/hamt_wrapper.py b/ipldstore/hamt_wrapper.py index 328c232..5e4bade 100644 --- a/ipldstore/hamt_wrapper.py +++ b/ipldstore/hamt_wrapper.py @@ -258,7 +258,7 @@ def iter_all(self) -> typing.Iterator[str]: str: key """ yield from self._iter_nested("", self.others_dict) - yield from self.hamt.keys() + yield from (key.decode("utf-8") for key in self.hamt.keys()) def _iter_nested(self, prefix: str, mapping: dict) -> typing.Iterator[str]: """Iterates over all keys in `mapping`, reconstructing the key from decomposed