Skip to content

Commit

Permalink
19 Nov update. make files added not all complete. chunk/soc.py, feed/…
Browse files Browse the repository at this point in the history
…identifier.py, modules/soc.py, chunk.py full added
  • Loading branch information
Aviksaikat committed Nov 20, 2023
1 parent 8226b53 commit 19a48a1
Show file tree
Hide file tree
Showing 21 changed files with 903 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/

.vscode
# custom
dev.sh
__local__.json
Expand Down
44 changes: 44 additions & 0 deletions src/bee_py/bee.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Optional

from bee_py.chunk.signer import sign
from bee_py.utils.urls import assert_bee_url, strip_last_slash


class Bee:
"""
The main component that abstracts operations available on the main Bee API.
Not all methods are always available as it depends on what mode is Bee node launched in.
For example, gateway mode and light node mode has only a limited set of endpoints enabled.
Attributes:
url: URL on which is the main API of Bee node exposed.
signer: Default Signer object used for signing operations, mainly Feeds.
request_options: Ky instance that defines connection to Bee node.
"""

def __init__(self, url: str, options: Optional[dict] = None):
"""
Constructs a new Bee instance.
Args:
url: URL on which is the main API of Bee node exposed.
options: Additional options for the Bee instance.
"""
assert_bee_url(url)

# Remove last slash if present, as our endpoint strings starts with `/...`
# which could lead to double slash in URL to which Bee responds with
# unnecessary redirects.
self.url = strip_last_slash(url)

if options and "signer" in options:
self.signer = sign(options["signer"])

self.request_options = {
"baseURL": self.url,
"timeout": options.get("timeout", False),
"headers": options.get("headers", None),
"onRequest": options.get("onRequest", None),
"adapter": options.get("adapter", None),
}
6 changes: 6 additions & 0 deletions src/bee_py/chunk/cac.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ def __init__(self, data: bytes, span: bytes):
self.data = data
self.span = span

def data(self):
return self.data

def span(self):
return self.span

def payload(self) -> bytes:
return flex_bytes_at_offset(self.data, CAC_PAYLOAD_OFFSET, MIN_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE)

Expand Down
2 changes: 1 addition & 1 deletion src/bee_py/chunk/signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def public_key_to_address(pub_key: Union[str, bytes, eth_keys.datatypes.PublicKe
return HexBytes(address)


def recover_addres(message: SignableMessage, signature: MessageSignature) -> AddressType:
def recover_address(message: SignableMessage, signature: MessageSignature) -> AddressType:
"""
Recovers the Ethereum address from a given signature and message digest.
Expand Down
250 changes: 250 additions & 0 deletions src/bee_py/chunk/soc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
from typing import NewType, Optional

from eth_typing import ChecksumAddress as AddressType
from pydantic import BaseModel

from bee_py.chunk.bmt import bmt_hash
from bee_py.chunk.cac import (
MAX_PAYLOAD_SIZE,
MIN_PAYLOAD_SIZE,
Chunk,
assert_valid_chunk_data,
make_content_addressed_chunk,
)
from bee_py.chunk.serialize import serialize_bytes
from bee_py.chunk.signer import recover_address, sign
from bee_py.chunk.span import SPAN_SIZE
from bee_py.modules.chunk import download
from bee_py.modules.soc import upload
from bee_py.types.type import BatchId, BeeRequestOptions, Reference, UploadOptions, assert_address
from bee_py.utils.bytes import bytes_at_offset, bytes_equal, flex_bytes_at_offset
from bee_py.utils.error import BeeError
from bee_py.utils.hash import keccak256_hash
from bee_py.utils.hex import bytes_to_hex

# * Global variables
IDENTIFIER_SIZE = 32
SIGNATURE_SIZE = 65

SOC_IDENTIFIER_OFFSET = 0
SOC_SIGNATURE_OFFSET = SOC_IDENTIFIER_OFFSET + IDENTIFIER_SIZE
SOC_SPAN_OFFSET = SOC_SIGNATURE_OFFSET + SIGNATURE_SIZE
SOC_PAYLOAD_OFFSET = SOC_SPAN_OFFSET + SPAN_SIZE

# Define a new type for Identifier
Identifier = NewType("Identifier", bytes)


class SingleOwnerChunkBase(BaseModel):
"""Abstract base class for Single Owner Chunks (SOCs).
Represents a SOC, a type of chunk that allows a user to assign arbitrary data to an address
and attest to the chunk's integrity with their digital signature. It defines the basic properties
common to all SOCs.
Attributes:
identifier: The identifier of the SOC.
signature: The signature of the SOC.
owner: The owner of the SOC.
"""

identifier: Identifier
signature: bytes
owner: AddressType


class SingleOwnerChunk(SingleOwnerChunkBase, Chunk):
"""Represents a Single Owner Chunk (SOC).
A concrete implementation of the SingleOwnerChunkBase class. It represents a SOC
with all its properties and behaviors defined.
Attributes:
identifier: The identifier of the SOC.
signature: The signature of the SOC.
owner: The owner of the SOC.
"""

pass


def recover_chunk_owner(data: bytes) -> AddressType:
"""Recovers the owner's Ethereum address from a single owner chunk (SOC).
Args:
data: The byte array representing the SOC data.
Returns:
The Ethereum address of the SOC's owner.
"""
cac_data = data[SOC_SPAN_OFFSET:]
chunk_address = bmt_hash(cac_data)
signature = bytes_at_offset(data, SOC_SIGNATURE_OFFSET, SIGNATURE_SIZE)
identifier = bytes_at_offset(data, SOC_IDENTIFIER_OFFSET, IDENTIFIER_SIZE)
digest = keccak256_hash(identifier, chunk_address)
owner_address = recover_address(signature, digest)

return owner_address


def make_single_owner_chunk_from_data(data: bytes, address: AddressType) -> dict:
"""
Verifies if the data is a valid single owner chunk.
Args:
data: The chunk data.
address: The address of the single owner chunk.
Returns:
dict: A dictionary representing a single owner chunk.
"""
owner_address = recover_chunk_owner(data)
identifier = bytes_at_offset(data, SOC_IDENTIFIER_OFFSET, IDENTIFIER_SIZE)
soc_address = keccak256_hash(identifier, owner_address)

if bytes_equal(address, soc_address):
msg = "SOC Data does not match given address!"
raise BeeError(msg)

def signature():
return bytes_at_offset(data, SOC_SIGNATURE_OFFSET, SIGNATURE_SIZE)

def span():
return bytes_at_offset(data, SOC_SPAN_OFFSET, SPAN_SIZE)

def payload():
return flex_bytes_at_offset(data, SOC_PAYLOAD_OFFSET, MIN_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE)

return {
"data": data,
"identifier": identifier,
"signature": signature(),
"span": span(),
"payload": payload(),
"address": soc_address,
"owner": owner_address,
}


def make_soc_address(identifier: Identifier, address: AddressType) -> bytes:
return keccak256_hash(identifier, address)


def make_single_owner_chunk(
chunk: Chunk,
identifier: Identifier,
signer: dict,
) -> dict:
"""
Creates a single owner chunk object.
Args:
chunk: A chunk object used for the span and payload.
identifier: The identifier of the chunk.
signer: The singer interface for signing the chunk.
Returns:
dict: A dictionary representing a single owner chunk.
"""
chunk_address = chunk.address()
assert_valid_chunk_data(chunk.data(), chunk_address)

digest = keccak256_hash(identifier, chunk_address)
signature = sign(signer, digest)
data = serialize_bytes(identifier, signature, chunk.span(), chunk.payload())
address = make_soc_address(identifier, signer.get("address", ""))

return {
"data": data,
"identifier": identifier,
"signature": signature,
"span": chunk.span(),
"payload": chunk.payload(),
"address": address,
"owner": signer.get("address", ""),
}


def upload_single_owner_chunk(
request_options: BeeRequestOptions,
chunk: SingleOwnerChunk,
postage_batch_id: BatchId,
options: Optional[UploadOptions] = None,
) -> str:
"""Uploads a Single Owner Chunk (SOC) to the Bee network.
Args:
request_options: BeeRequestOptions for making requests.
chunk: The SOC object to be uploaded.
postage_batch_id: The Postage BatchId to be assigned to the uploaded data.
options: Upload options for controlling the upload process.
Returns:
A Reference object representing the uploaded chunk.
"""

# Convert the owner, identifier, and signature to hexadecimal strings
owner = bytes_to_hex(chunk.owner)
identifier = bytes_to_hex(chunk.identifier)
signature = bytes_to_hex(chunk.signature)

# Serialize the chunk data, including the span and payload
data = serialize_bytes(chunk.span(), chunk.payload())

# Upload the SOC data using the SOC API's upload method
return upload(request_options, owner, identifier, signature, data, postage_batch_id, options)


def upload_single_owner_chunk_data(
request_options: BeeRequestOptions,
signer: dict,
postage_batch_id: BatchId,
identifier: Identifier,
data: bytes,
options: Optional[dict] = None,
) -> Reference:
"""
Helper function to create and upload SOC.
Args:
request_options: Ky Options for making requests.
signer: The singer interface for signing the chunk.
postage_batch_id: Postage BatchId that will be assigned to uploaded data.
identifier: The identifier of the chunk.
data: The chunk data.
options: Upload options.
Returns:
str: The reference of the uploaded chunk.
"""
assert_address(postage_batch_id)
cac = make_content_addressed_chunk(data)
soc = make_single_owner_chunk(cac, identifier, signer)

return upload_single_owner_chunk(request_options, soc, postage_batch_id, options)


def download_single_owner_chunk(
request_options: BeeRequestOptions,
owner_address: AddressType,
identifier: Identifier,
) -> SingleOwnerChunk:
"""Downloads a Single Owner Chunk (SOC) from the Bee network.
Args:
request_options: BeeRequestOptions for making requests.
owner_address: The Ethereum address of the SOC's owner.
identifier: The identifier of the SOC.
Returns:
The downloaded SOC object.
"""

# Compute the SOC address using the identifier and owner's address
soc_address = make_soc_address(identifier, owner_address)

# Download the SOC data from the Bee network using the provided URL
data = download(request_options, bytes_to_hex(soc_address))

# Construct the SOC object from the downloaded data and address
return make_single_owner_chunk_from_data(data, soc_address)
38 changes: 38 additions & 0 deletions src/bee_py/feed/feed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import NewType, Union

from bee_py.chunk.serialize import serialize_bytes
from bee_py.chunk.soc import make

TIMESTAMP_PAYLOAD_SIZE = 8
TIMESTAMP_PAYLOAD_SIZE_HEX = 16


IndexBytes = NewType("IndexBytes", bytes)


class Epoch:
def __init__(self, time: int, level: int):
self.time = time
self.level = level


class Index:
def __init__(self, value: Union[int, Epoch, bytes, str]):
self._validate(value)
self._value = value

def _validate(self, value: Union[int, Epoch, bytes, str]):
if not (
isinstance(value, (int, Epoch))
or (isinstance(value, bytes) and len(value) == TIMESTAMP_PAYLOAD_SIZE)
or (
isinstance(value, str)
and len(value) == TIMESTAMP_PAYLOAD_SIZE_HEX
and all(c in "0123456789abcdefABCDEF" for c in value)
)
):
msg = "Index must be an int, Epoch, 8-byte bytes, or a hex string of length 16"
raise ValueError(msg)

def __str__(self):
return str(self._value)
Loading

0 comments on commit 19a48a1

Please sign in to comment.