Skip to content

Commit

Permalink
Merge pull request #8 from dClimate/hamt
Browse files Browse the repository at this point in the history
Hamt
  • Loading branch information
eschechter authored Mar 21, 2023
2 parents 673d674 + b9b147c commit 128df96
Show file tree
Hide file tree
Showing 4 changed files with 367 additions and 187 deletions.
122 changes: 10 additions & 112 deletions ipldstore/contentstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import asyncio

from multiformats import CID, multicodec, multibase, multihash, varint
import cbor2, dag_cbor
from cbor2 import CBORTag
import cbor2
from dag_cbor.encoding import EncodableType as DagCborEncodable
from typing_validation import validate

Expand All @@ -25,20 +24,17 @@
DagPbCodec = multicodec.get("dag-pb")
DagCborCodec = multicodec.get("dag-cbor")

def default_encoder(encoder, value):
encoder.encode(CBORTag(42, b'\x00' + bytes(value)))

def grouper(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))


def get_retry_session() -> requests.Session:
session = requests.Session()
session = requests.Session()
retries = Retry(connect=5, total=5, backoff_factor=4)
session.mount("http://", HTTPAdapter(max_retries=retries))
return session


def default_encoder(encoder, value):
encoder.encode(cbor2.CBORTag(42, b'\x00' + bytes(value)))

class ContentAddressableStore(ABC):
@abstractmethod
def get_raw(self, cid: CID) -> bytes:
Expand All @@ -49,7 +45,7 @@ def get(self, cid: CID) -> ValueType:
if cid.codec == RawCodec:
return value
elif cid.codec == DagCborCodec:
return dag_cbor.decode(value)
return cbor2.loads(value)
else:
raise ValueError(f"can't decode CID's codec '{cid.codec.name}'")

Expand All @@ -72,73 +68,11 @@ def put(self, value: ValueType) -> CID:
if isinstance(value, bytes):
return self.put_raw(value, RawCodec)
else:
return self.put_raw(dag_cbor.encode(value), DagCborCodec)
return self.put_raw(cbor2.dumps(value, default=default_encoder), DagCborCodec)

def normalize_cid(self, cid: CID) -> CID: # pylint: disable=no-self-use
return cid

@overload
def to_car(self, root: CID, stream: BufferedIOBase) -> int:
...

@overload
def to_car(self, root: CID, stream: None = None) -> bytes:
...

def to_car(self, root: CID, stream: Optional[BufferedIOBase] = None) -> Union[bytes, int]:
validate(root, CID)
validate(stream, Optional[BufferedIOBase])

if stream is None:
buffer = BytesIO()
stream = buffer
return_bytes = True
else:
return_bytes = False

bytes_written = 0
header = dag_cbor.encode({"version": 1, "roots": [root]})
bytes_written += stream.write(varint.encode(len(header)))
bytes_written += stream.write(header)
bytes_written += self._to_car(root, stream, set())

if return_bytes:
return buffer.getvalue()
else:
return bytes_written

def _to_car(self,
root: CID,
stream: BufferedIOBase,
already_written: MutableSet[CID]) -> int:
"""
makes a CAR without the header
"""
bytes_written = 0

if root not in already_written:
data = self.get_raw(root)
cid_bytes = bytes(root)
bytes_written += stream.write(varint.encode(len(cid_bytes) + len(data)))
bytes_written += stream.write(cid_bytes)
bytes_written += stream.write(data)
already_written.add(root)

if root.codec == DagCborCodec:
value = dag_cbor.decode(data)
for child in iter_links(value):
bytes_written += self._to_car(child, stream, already_written)
return bytes_written

def import_car(self, stream_or_bytes: StreamLike) -> List[CID]:
roots, blocks = read_car(stream_or_bytes)
roots = [self.normalize_cid(root) for root in roots]

for cid, data, _ in blocks:
self.put_raw(bytes(data), cid.codec)

return roots


class MappingCAStore(ContentAddressableStore):
def __init__(self,
Expand Down Expand Up @@ -192,6 +126,7 @@ async def _async_get(host: str, session: aiohttp.ClientSession, cid: CID):
async with session.post(host + api_method, params={"arg": str(cid)}) as resp:
return await resp.read()


async def _main_async(keys: List[CID], host: str, d: Dict[CID, bytes]):
async with aiohttp.ClientSession() as session:
tasks = [_async_get(host, session, key) for key in keys]
Expand Down Expand Up @@ -219,30 +154,12 @@ def __init__(self,
else:
self._default_hash = multihash.Multihash(codec=default_hash)

def recover_tree(self, broken_struct):
if not isinstance(broken_struct, dict):
return broken_struct
all_recovered = []
ret_tree = {}
for k in broken_struct:
if len(k) > 1 and k.startswith("/") and k[2:].isnumeric():
cid_to_recover = CID.decode(broken_struct[k].value[1:])
recovered = self.recover_tree(cbor2.loads(self.get_raw(cid_to_recover)))
all_recovered.append(recovered)
else:
ret_tree[k] = self.recover_tree(broken_struct[k])
for recovered in all_recovered:
for k in recovered:
ret_tree[k] = self.recover_tree(recovered[k])

return ret_tree

def get(self, cid: CID) -> ValueType:
value = self.get_raw(cid)
if cid.codec == DagPbCodec:
return value
elif cid.codec == DagCborCodec:
return self.recover_tree(cbor2.loads(value))
return cbor2.loads(value)
else:
raise ValueError(f"can't decode CID's codec '{cid.codec.name}'")

Expand All @@ -263,31 +180,12 @@ def get_raw(self, cid: CID) -> bytes:
res.raise_for_status()
return res.content

def make_tree_structure(self, node):
if not isinstance(node, dict):
return node
new_tree = {}
if len(node) <= self._max_nodes_per_level:
for key in node:
new_tree[key] = self.make_tree_structure(node[key])
return new_tree
for group_of_keys in grouper(list(node.keys()), self._max_nodes_per_level):
key_for_group = f"/{hash(frozenset(group_of_keys))}"
sub_tree = {}
for key in group_of_keys:
sub_tree[key] = node[key]
new_tree[key_for_group] = self.put_sub_tree(self.make_tree_structure(sub_tree))
return self.make_tree_structure(new_tree)

def put_sub_tree(self, d):
return self.put_raw(cbor2.dumps(d, default=default_encoder), DagCborCodec, should_pin=False)

def put(self, value: ValueType) -> CID:
validate(value, ValueType)
if isinstance(value, bytes):
return self.put_raw(value, DagPbCodec)
else:
return self.put_raw(cbor2.dumps(self.make_tree_structure(value), default=default_encoder), DagCborCodec)
return self.put_raw(cbor2.dumps(value, default=default_encoder), DagCborCodec)

def put_raw(self,
raw_value: bytes,
Expand Down
Loading

0 comments on commit 128df96

Please sign in to comment.