diff --git a/src/aleph_client/asynchronous.py b/src/aleph_client/asynchronous.py index a21e4834..3bb3bb2d 100644 --- a/src/aleph_client/asynchronous.py +++ b/src/aleph_client/asynchronous.py @@ -9,7 +9,8 @@ import time from datetime import datetime from functools import lru_cache -from typing import Type, Mapping +from typing import Type, Mapping, Optional, Union, Any, Dict, List, Iterable, AsyncIterable +from pathlib import Path from aleph_message.models import ( ForgetContent, @@ -42,7 +43,6 @@ magic = None # type:ignore from .conf import settings -from typing import Optional, Iterable, Union, Any, Dict, List, AsyncIterable import aiohttp from aiohttp import ClientSession @@ -65,12 +65,11 @@ def get_fallback_session() -> ClientSession: async def ipfs_push( - content, - session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + content: Mapping, + session: ClientSession, + api_server: str, ) -> str: - session = session or get_fallback_session() - + """Push arbitrary content as JSON to the IPFS service.""" url = f"{api_server}/api/v0/ipfs/add_json" logger.debug(f"Pushing to IPFS on {url}") @@ -80,12 +79,11 @@ async def ipfs_push( async def storage_push( - content, - session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + content: Mapping, + session: ClientSession, + api_server: str, ) -> str: - session = session or get_fallback_session() - + """Push arbitrary content as JSON to the storage service.""" url = f"{api_server}/api/v0/storage/add_json" logger.debug(f"Pushing to storage on {url}") @@ -96,11 +94,10 @@ async def storage_push( async def ipfs_push_file( file_content, - session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + session: ClientSession, + api_server: str, ) -> str: - session = session or get_fallback_session() - + """Push a file to the IPFS service.""" data = aiohttp.FormData() data.add_field("file", file_content) @@ -114,11 +111,10 @@ async def ipfs_push_file( async def storage_push_file( file_content, - session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + session: ClientSession, + api_server: str, ) -> str: - session = session or get_fallback_session() - + """Push a file to the storage service.""" data = aiohttp.FormData() data.add_field("file", file_content) @@ -216,13 +212,11 @@ async def _handle_broadcast_response( async def broadcast( message, + session: ClientSession, + api_server: str, sync: bool = True, - session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, ) -> None: """Broadcast a message on the Aleph network via pubsub for nodes to pick it up.""" - session = session or get_fallback_session() - url = f"{api_server}/api/v0/messages" logger.debug(f"Posting message on {url}") @@ -247,14 +241,28 @@ async def create_post( post_content, post_type: str, ref: Optional[str] = None, - address: Optional[str] = settings.ADDRESS_TO_USE, - channel: str = settings.DEFAULT_CHANNEL, + address: Optional[str] = None, + channel: Optional[str] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + api_server: Optional[str] = None, inline: bool = True, storage_engine: StorageEnum = StorageEnum.storage, ) -> PostMessage: - address = address or account.get_address() + """ + Create a POST message on the Aleph network. It is associated with a channel and owned by an account. + + :param account: The account that will sign and own the message + :param post_content: The content of the message + :param post_type: An arbitrary content type that helps to describe the post_content + :param ref: A reference to a previous message that it replaces + :param address: The address that will be displayed as the author of the message + :param channel: The channel that the message will be posted on + :param session: An optional aiohttp session to use for the request + :param api_server: An optional API server to use for the request (Default: "https://api2.aleph.im") + :param inline: An optional flag to indicate if the content should be inlined in the message or not + :param storage_engine: An optional storage engine to use for the message, if not inlined (Default: "storage") + """ + address = address or settings.ADDRESS_TO_USE or account.get_address() content = PostContent( type=post_type, @@ -280,13 +288,25 @@ async def create_aggregate( account: Account, key, content, - address: Optional[str] = settings.ADDRESS_TO_USE, - channel: str = settings.DEFAULT_CHANNEL, + address: Optional[str] = None, + channel: Optional[str] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + api_server: Optional[str] = None, inline: bool = True, ) -> AggregateMessage: - address = address or account.get_address() + """ + Create an AGGREGATE message. It is meant to be used as a quick access storage associated with an account. + + :param account: Account to use to sign the message + :param key: Key to use to store the content + :param content: Content to store + :param address: Address to use to sign the message + :param channel: Channel to use (Default: "TEST") + :param session: Session to use (Default: get_fallback_session()) + :param api_server: API server to use (Default: "https://api2.aleph.im") + :param inline: Whether to write content inside the message (Default: True) + """ + address = address or settings.ADDRESS_TO_USE or account.get_address() content_ = AggregateContent( key=key, @@ -308,23 +328,47 @@ async def create_aggregate( async def create_store( account: Account, - address=settings.ADDRESS_TO_USE, + address: Optional[str] = None, file_content: Optional[bytes] = None, + file_path: Optional[Union[str, Path]] = None, file_hash: Optional[str] = None, guess_mime_type: bool = False, ref: Optional[str] = None, - storage_engine=StorageEnum.storage, + storage_engine: StorageEnum = StorageEnum.storage, extra_fields: Optional[dict] = None, - channel: str = settings.DEFAULT_CHANNEL, + channel: Optional[str] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + api_server: Optional[str] = None, ) -> StoreMessage: - address = address or account.get_address() + """ + Create a STORE message to store a file on the Aleph network. + + Can be passed either a file path, an IPFS hash or the file's content as raw bytes. + + :param account: Account to use to sign the message + :param address: Address to display as the author of the message (Default: account.get_address()) + :param file_content: Byte stream of the file to store (Default: None) + :param file_path: Path to the file to store (Default: None) + :param file_hash: Hash of the file to store (Default: None) + :param guess_mime_type: Guess the MIME type of the file (Default: False) + :param ref: Reference to a previous message (Default: None) + :param storage_engine: Storage engine to use (Default: "storage") + :param extra_fields: Extra fields to add to the STORE message (Default: None) + :param channel: Channel to post the message to (Default: "TEST") + :param session: aiohttp session to use (Default: get_fallback_session()) + :param api_server: Aleph API server to use (Default: "https://api2.aleph.im") + """ + address = address or settings.ADDRESS_TO_USE or account.get_address() extra_fields = extra_fields or {} + session = session or get_fallback_session() + api_server = api_server or settings.API_HOST if file_hash is None: if file_content is None: - raise ValueError("Please specify at least a file_content or a file_hash") + if file_path is None: + raise ValueError("Please specify at least a file_content, a file_hash or a file_path") + else: + file_content = open(file_path, "rb").read() if storage_engine == StorageEnum.storage: file_hash = await storage_push_file( @@ -374,26 +418,50 @@ async def create_program( program_ref: str, entrypoint: str, runtime: str, - environment_variables: Optional[Dict[str, str]] = None, + environment_variables: Optional[Mapping[str, str]] = None, storage_engine: StorageEnum = StorageEnum.storage, - channel: str = settings.DEFAULT_CHANNEL, - address: Optional[str] = settings.ADDRESS_TO_USE, + channel: Optional[str] = None, + address: Optional[str] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, - memory: int = settings.DEFAULT_VM_MEMORY, - vcpus: int = settings.DEFAULT_VM_VCPUS, - timeout_seconds: float = settings.DEFAULT_VM_TIMEOUT, + api_server: Optional[str] = None, + memory: Optional[int] = None, + vcpus: Optional[int] = None, + timeout_seconds: Optional[float] = None, persistent: bool = False, encoding: Encoding = Encoding.zip, - volumes: Optional[List[Dict]] = None, - subscriptions: Optional[List[Dict]] = None, + volumes: Optional[List[Mapping]] = None, + subscriptions: Optional[List[Mapping]] = None, ) -> ProgramMessage: + """ + Post a (create) PROGRAM message. + + :param account: Account to use to sign the message + :param program_ref: Reference to the program to run + :param entrypoint: Entrypoint to run + :param runtime: Runtime to use + :param environment_variables: Environment variables to pass to the program + :param storage_engine: Storage engine to use (Default: "storage") + :param channel: Channel to use (Default: "TEST") + :param address: Address to use (Default: account.get_address()) + :param session: Session to use (Default: get_fallback_session()) + :param api_server: API server to use (Default: "https://api2.aleph.im") + :param memory: Memory in MB for the VM to be allocated (Default: 128) + :param vcpus: Number of vCPUs to allocate (Default: 1) + :param timeout_seconds: Timeout in seconds (Default: 30.0) + :param persistent: Whether the program should be persistent or not (Default: False) + :param encoding: Encoding to use (Default: Encoding.zip) + :param volumes: Volumes to mount + :param subscriptions: Patterns of Aleph messages to forward to the program's event receiver + """ + address = address or settings.ADDRESS_TO_USE or account.get_address() volumes = volumes if volumes is not None else [] - address = address or account.get_address() + memory = memory or settings.DEFAULT_VM_MEMORY + vcpus = vcpus or settings.DEFAULT_VM_VCPUS + timeout_seconds = timeout_seconds or settings.DEFAULT_VM_TIMEOUT # TODO: Check that program_ref, runtime and data_ref exist - ## Register the different ways to trigger a VM + # Register the different ways to trigger a VM if subscriptions: # Trigger on HTTP calls and on Aleph message subscriptions. triggers = {"http": True, "persistent": persistent, "message": subscriptions} @@ -467,12 +535,27 @@ async def forget( hashes: List[str], reason: Optional[str], storage_engine: StorageEnum = StorageEnum.storage, - channel: str = settings.DEFAULT_CHANNEL, - address: Optional[str] = settings.ADDRESS_TO_USE, + channel: Optional[str] = None, + address: Optional[str] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + api_server: Optional[str] = None, ) -> ForgetMessage: - address = address or account.get_address() + """ + Post a FORGET message to remove previous messages from the network. + + Targeted messages need to be signed by the same account that is attempting to forget them, + if the creating address did not delegate the access rights to the forgetting account. + + :param account: Account to use to sign the message + :param hashes: Hashes of the messages to forget + :param reason: Reason for forgetting the messages + :param storage_engine: Storage engine to use (Default: "storage") + :param channel: Channel to use (Default: "TEST") + :param address: Address to use (Default: account.get_address()) + :param session: Session to use (Default: get_fallback_session()) + :param api_server: API server to use (Default: "https://api2.aleph.im") + """ + address = address or settings.ADDRESS_TO_USE or account.get_address() content = ForgetContent( hashes=hashes, @@ -488,21 +571,25 @@ async def forget( channel=channel, api_server=api_server, storage_engine=storage_engine, - session=session, - inline=True, + session=session ) async def submit( account: Account, - content: Dict, + content: Mapping, message_type: MessageType, - channel: str = settings.DEFAULT_CHANNEL, - api_server: str = settings.API_HOST, + channel: Optional[str] = None, + api_server: Optional[str] = None, storage_engine: StorageEnum = StorageEnum.storage, session: Optional[ClientSession] = None, inline: bool = True, ) -> AlephMessage: + """Main function to post a message to the network. Use the other functions in this module if you can.""" + channel = channel or settings.DEFAULT_CHANNEL + api_server = api_server or settings.API_HOST + session = session or get_fallback_session() + message: Dict[str, Any] = { # 'item_hash': ipfs_hash, "chain": account.CHAIN, @@ -545,11 +632,21 @@ async def submit( async def fetch_aggregate( address: str, key: str, - limit: Optional[int] = 100, + limit: int = 100, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + api_server: Optional[str] = None, ) -> Dict[str, Dict]: + """ + Fetch a value from the aggregate store by owner address and item key. + + :param address: Address of the owner of the aggregate + :param key: Key of the aggregate + :param limit: Maximum number of items to fetch (Default: 100) + :param session: Session to use (Default: get_fallback_session()) + :param api_server: API server to use (Default: "https://api2.aleph.im") + """ session = session or get_fallback_session() + api_server = api_server or settings.API_HOST params: Dict[str, Any] = {"keys": key} if limit: @@ -566,11 +663,21 @@ async def fetch_aggregate( async def fetch_aggregates( address: str, keys: Optional[Iterable[str]] = None, - limit: Optional[int] = 100, + limit: int = 100, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + api_server: Optional[str] = None, ) -> Dict[str, Dict]: + """ + Fetch key-value pairs from the aggregate store by owner address. + + :param address: Address of the owner of the aggregate + :param keys: Keys of the aggregates to fetch (Default: all items) + :param limit: Maximum number of items to fetch (Default: 100) + :param session: Session to use (Default: get_fallback_session()) + :param api_server: API server to use (Default: "https://api2.aleph.im") + """ session = session or get_fallback_session() + api_server = api_server or settings.API_HOST keys_str = ",".join(keys) if keys else "" params: Dict[str, Any] = {} @@ -601,9 +708,27 @@ async def get_posts( start_date: Optional[Union[datetime, float]] = None, end_date: Optional[Union[datetime, float]] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, -): + api_server: Optional[str] = None, +) -> Dict[str, Dict]: + """ + Fetch a list of posts from the network. + + :param pagination: Number of items to fetch (Default: 200) + :param page: Page to fetch, begins at 1 (Default: 1) + :param types: Types of posts to fetch (Default: all types) + :param refs: If set, only fetch posts that reference these hashes (in the "refs" field) + :param addresses: Addresses of the posts to fetch (Default: all addresses) + :param tags: Tags of the posts to fetch (Default: all tags) + :param hashes: Specific item_hashes to fetch + :param channels: Channels of the posts to fetch (Default: all channels) + :param chains: Chains of the posts to fetch (Default: all chains) + :param start_date: Earliest date to fetch messages from + :param end_date: Latest date to fetch messages from + :param session: Session to use (Default: get_fallback_session()) + :param api_server: API server to use (Default: "https://api2.aleph.im") + """ session = session or get_fallback_session() + api_server = api_server or settings.API_HOST params: Dict[str, Any] = dict(pagination=pagination, page=page) @@ -647,8 +772,8 @@ async def download_file( Warning: Downloading large files can be slow and memory intensive. :param file_hash: The hash of the file to retrieve. - :param session: The aiohttp session to use. (DEFAULT: get_fallback_session()) - :param api_server: The API server to use. (DEFAULT: "https://api2.aleph.im") + :param session: The aiohttp session to use. (Default: get_fallback_session()) + :param api_server: The API server to use. (Default: "https://api2.aleph.im") """ session = session or get_fallback_session() api_server = api_server or settings.API_HOST @@ -673,11 +798,35 @@ async def get_messages( start_date: Optional[Union[datetime, float]] = None, end_date: Optional[Union[datetime, float]] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + api_server: Optional[str] = None, ignore_invalid_messages: bool = True, invalid_messages_log_level: int = logging.NOTSET, ) -> MessagesResponse: + """ + Fetch a list of messages from the network. + + :param pagination: Number of items to fetch (Default: 200) + :param page: Page to fetch, begins at 1 (Default: 1) + :param message_type: Filter by message type, can be "AGGREGATE", "POST", "PROGRAM", "VM", "STORE" or "FORGET" + :param content_types: Filter by content type + :param content_keys: Filter by content key + :param refs: If set, only fetch posts that reference these hashes (in the "refs" field) + :param addresses: Addresses of the posts to fetch (Default: all addresses) + :param tags: Tags of the posts to fetch (Default: all tags) + :param hashes: Specific item_hashes to fetch + :param channels: Channels of the posts to fetch (Default: all channels) + :param chains: Filter by sender address chain + :param start_date: Earliest date to fetch messages from + :param end_date: Latest date to fetch messages from + :param session: Session to use (Default: get_fallback_session()) + :param api_server: API server to use (Default: "https://api2.aleph.im") + :param ignore_invalid_messages: Ignore invalid messages (Default: False) + :param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET) + """ session = session or get_fallback_session() + api_server = api_server or settings.API_HOST + ignore_invalid_messages = True if ignore_invalid_messages is None else ignore_invalid_messages + invalid_messages_log_level = logging.NOTSET if invalid_messages_log_level is None else invalid_messages_log_level params: Dict[str, Any] = dict(pagination=pagination, page=page) @@ -748,9 +897,17 @@ async def get_message( message_type: Optional[Type[GenericMessage]] = None, channel: Optional[str] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + api_server: Optional[str] = None, ) -> GenericMessage: - """Get a single message from its `item_hash`.""" + """ + Get a single message from its `item_hash` and perform some basic validation. + + :param item_hash: Hash of the message to fetch + :param message_type: Type of message to fetch + :param channel: Channel of the message to fetch + :param session: Session to use (Default: get_fallback_session()) + :param api_server: API server to use (Default: "https://api2.aleph.im") + """ messages_response = await get_messages( hashes=[item_hash], session=session, @@ -786,13 +943,26 @@ async def watch_messages( start_date: Optional[Union[datetime, float]] = None, end_date: Optional[Union[datetime, float]] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, + api_server: Optional[str] = None, ) -> AsyncIterable[AlephMessage]: """ Iterate over current and future matching messages asynchronously. - """ + :param message_type: Type of message to watch + :param content_types: Content types to watch + :param refs: References to watch + :param addresses: Addresses to watch + :param tags: Tags to watch + :param hashes: Hashes to watch + :param channels: Channels to watch + :param chains: Chains to watch + :param start_date: Start date from when to watch + :param end_date: End date until when to watch + :param session: Session to use (Default: get_fallback_session()) + :param api_server: API server to use (Default: "https://api2.aleph.im") + """ session = session or get_fallback_session() + api_server = api_server or settings.API_HOST params: Dict[str, Any] = dict() diff --git a/src/aleph_client/synchronous.py b/src/aleph_client/synchronous.py index 8903eda8..793f87a5 100644 --- a/src/aleph_client/synchronous.py +++ b/src/aleph_client/synchronous.py @@ -68,19 +68,39 @@ def create_program( runtime: str, environment_variables: Optional[Dict[str, str]] = None, storage_engine: StorageEnum = StorageEnum.storage, - channel: str = settings.DEFAULT_CHANNEL, - address: Optional[str] = settings.ADDRESS_TO_USE, + channel: Optional[str] = None, + address: Optional[str] = None, session: Optional[ClientSession] = None, - api_server: str = settings.API_HOST, - memory: int = settings.DEFAULT_VM_MEMORY, - vcpus: int = settings.DEFAULT_VM_VCPUS, - timeout_seconds: float = settings.DEFAULT_VM_TIMEOUT, + api_server: Optional[str] = None, + memory: Optional[int] = None, + vcpus: Optional[int] = None, + timeout_seconds: Optional[float] = None, persistent: bool = False, encoding: Encoding = Encoding.zip, volumes: Optional[List[Dict]] = None, - subscriptions: Optional[List[Dict]] = None, ): + """ + Post a (create) PROGRAM message. + + :param account: Account to use to sign the message + :param program_ref: Reference to the program to run + :param entrypoint: Entrypoint to run + :param runtime: Runtime to use + :param environment_variables: Environment variables to pass to the program + :param storage_engine: Storage engine to use (DEFAULT: "storage") + :param channel: Channel to use (DEFAULT: "TEST") + :param address: Address to use (DEFAULT: account.get_address()) + :param session: Session to use (DEFAULT: get_fallback_session()) + :param api_server: API server to use (DEFAULT: "https://api2.aleph.im") + :param memory: Memory in MB for the VM to be allocated (DEFAULT: 128) + :param vcpus: Number of vCPUs to allocate (DEFAULT: 1) + :param timeout_seconds: Timeout in seconds (DEFAULT: 30.0) + :param persistent: Whether the program should be persistent or not (DEFAULT: False) + :param encoding: Encoding to use (DEFAULT: Encoding.zip) + :param volumes: Volumes to mount + :param subscriptions: Patterns of Aleph messages to forward to the program's event receiver + """ return wrap_async(asynchronous.create_program)( account=account, program_ref=program_ref, diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/test_vm_app.py b/tests/unit/test_vm_app.py index 2bff5740..eb96d7f2 100644 --- a/tests/unit/test_vm_app.py +++ b/tests/unit/test_vm_app.py @@ -1,9 +1,12 @@ import asyncio import pytest +from fastapi.testclient import TestClient from tests.unit.test_app.main import app -from fastapi.testclient import TestClient + +# Note: for some reason, the test client must be declared at the same level as the import. +client = TestClient(app) @pytest.mark.asyncio @@ -26,7 +29,6 @@ async def send(dico): def test_app_http(): - client = TestClient(app) response = client.get("/") assert response.status_code == 200 assert response.json() == {"index": "/"}