Skip to content

Feature: ipv4 and missing service #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions src/aleph/sdk/client/authenticated_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, Mapping, NoReturn, Optional, Tuple, Union
from typing import Any, Dict, Mapping, NoReturn, Optional, Tuple, Type, Union

import aiohttp
from aleph_message.models import (
Expand Down Expand Up @@ -38,6 +38,7 @@
from ..utils import extended_json_encoder, make_instance_content, make_program_content
from .abstract import AuthenticatedAlephClient
from .http import AlephHttpClient
from .service.port_forwarder import AuthenticatedPortForwarder

logger = logging.getLogger(__name__)

Expand All @@ -50,6 +51,21 @@

class AuthenticatedAlephHttpClient(AlephHttpClient, AuthenticatedAlephClient):
account: Account
_registered_authenticated_services: Dict[str, Tuple[Type, Dict[str, Any]]] = {}

@classmethod
def register_authenticated_service(
cls, name: str, service_class: Type, **kwargs
) -> None:
"""
Register an authenticated service to be instantiated when the authenticated client is entered.
This is used for services that require an account.

:param name: The attribute name to use for the service
:param service_class: The class to instantiate
:param kwargs: Additional kwargs to pass to the service constructor
"""
cls._registered_authenticated_services[name] = (service_class, kwargs)

BROADCAST_MESSAGE_FIELDS = {
"sender",
Expand Down Expand Up @@ -81,6 +97,20 @@ def __init__(
)
self.account = account

async def __aenter__(self):
await super().__aenter__()
# Override services with authenticated versions
self.port_forwarder = AuthenticatedPortForwarder(self)

# Initialize registered authenticated services
for name, (
service_class,
kwargs,
) in self.__class__._registered_authenticated_services.items():
setattr(self, name, service_class(self, **kwargs))

return self

async def ipfs_push(self, content: Mapping) -> str:
"""
Push arbitrary content as JSON to the IPFS service.
Expand Down Expand Up @@ -392,7 +422,7 @@ async def create_store(
if extra_fields is not None:
values.update(extra_fields)

content = StoreContent.parse_obj(values)
content = StoreContent.model_validate(values)

message, status, _ = await self.submit(
content=content.model_dump(exclude_none=True),
Expand Down
34 changes: 33 additions & 1 deletion src/aleph/sdk/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,30 @@
safe_getattr,
)
from .abstract import AlephClient
from .service.crn.http_crn import CrnService
from .service.dns.http_dns import DNSService
from .service.port_forwarder.http_port_forwarder import PortForwarder
from .service.scheduler.http_scheduler import SchedulerService
from .service.utils.http_utils import UtilsService

logger = logging.getLogger(__name__)


class AlephHttpClient(AlephClient):
api_server: str
_http_session: Optional[aiohttp.ClientSession]
_registered_services: Dict[str, Tuple[Type, Dict[str, Any]]] = {}

@classmethod
def register_service(cls, name: str, service_class: Type, **kwargs) -> None:
"""
Register a service to be instantiated when the client is entered.

:param name: The attribute name to use for the service
:param service_class: The class to instantiate
:param kwargs: Additional kwargs to pass to the service constructor
"""
cls._registered_services[name] = (service_class, kwargs)

def __init__(
self,
Expand Down Expand Up @@ -121,6 +138,20 @@ async def __aenter__(self):
)
)

# Initialize default services
self.dns = DNSService(self)
self.port_forwarder = PortForwarder(self)
self.crn = CrnService(self)
self.scheduler = SchedulerService(self)
self.utils = UtilsService(self)

# Initialize registered services
for name, (
service_class,
kwargs,
) in self.__class__._registered_services.items():
setattr(self, name, service_class(self, **kwargs))

return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
Expand All @@ -137,7 +168,8 @@ async def fetch_aggregate(self, address: str, key: str) -> Dict[str, Dict]:
resp.raise_for_status()
result = await resp.json()
data = result.get("data", dict())
return data.get(key)
final_result = data.get(key)
return final_result

async def fetch_aggregates(
self, address: str, keys: Optional[Iterable[str]] = None
Expand Down
Empty file.
47 changes: 47 additions & 0 deletions src/aleph/sdk/client/service/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from abc import ABC
from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar

from pydantic import BaseModel

from aleph.sdk.conf import settings

if TYPE_CHECKING:
from aleph.sdk.client.http import AlephHttpClient


T = TypeVar("T", bound=BaseModel)


class AggregateConfig(BaseModel, Generic[T]):
"""
A generic container for "aggregate" data of type T.
- `data` will be either None or a list of T-instances.
"""

data: Optional[List[T]] = None


class BaseService(ABC, Generic[T]):
aggregate_key: str
model_cls: Type[T]

def __init__(self, client: "AlephHttpClient"):
self._client = client
self.model_cls: Type[T]

def _build_crn_update_url(self, crn: str, vm_hash: str) -> str:
return settings.CRN_URL_UPDATE.format(crn_url=crn, vm_hash=vm_hash)

async def get_config(self, address: str):

aggregate_data = await self._client.fetch_aggregate(
address=address, key=self.aggregate_key
)

if aggregate_data:
model_instance = self.model_cls.model_validate(aggregate_data)
config = AggregateConfig[T](data=[model_instance])
else:
config = AggregateConfig[T](data=None)

return config
Empty file.
138 changes: 138 additions & 0 deletions src/aleph/sdk/client/service/crn/http_crn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from typing import TYPE_CHECKING, Dict, Optional, Union

import aiohttp
from aiohttp.client_exceptions import ClientResponseError
from aleph_message.models import ItemHash

from aleph.sdk.conf import settings
from aleph.sdk.exceptions import MethodNotAvailableOnCRN, VmNotFoundOnHost
from aleph.sdk.types import CrnExecutionV1, CrnExecutionV2, CrnV1List, CrnV2List
from aleph.sdk.utils import sanitize_url

if TYPE_CHECKING:
from aleph.sdk.client.http import AlephHttpClient


class CrnService:
"""
This service allow interact with CRNS API
TODO: ADD
/about/executions/details
/about/executions/records
/about/usage/system
/about/certificates
/about/capability
/about/config
/status/check/fastapi
/status/check/fastapi/legacy
/status/check/host
/status/check/version
/status/check/ipv6
/status/config
"""

def __init__(self, client: "AlephHttpClient"):
self._client = client

async def get_last_crn_version(self):
"""
Fetch Last version tag from aleph-vm github repo
"""
# Create a new session for external domain requests
async with aiohttp.ClientSession() as session:
async with session.get(settings.CRN_VERSION) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("tag_name")

async def get_crns_list(self, only_active: bool = True) -> dict:
"""
Query a persistent VM running on aleph.im to retrieve list of CRNs:
https://crns-list.aleph.sh/crns.json

Parameters
----------
only_active : bool
If True (the default), only return active CRNs (i.e. `filter_inactive=false`).
If False, return all CRNs (i.e. `filter_inactive=true`).

Returns
-------
dict
The parsed JSON response from /crns.json.
"""
# We want filter_inactive = (not only_active)
# Convert bool to string for the query parameter
filter_inactive_str = str(not only_active).lower()
params = {"filter_inactive": filter_inactive_str}

# Create a new session for external domain requests
async with aiohttp.ClientSession() as session:
async with session.get(
sanitize_url(settings.CRN_LIST_URL), params=params
) as resp:
resp.raise_for_status()
return await resp.json()

async def get_active_vms_v2(self, crn_address: str) -> CrnV2List:
endpoint = "/v2/about/executions/list"

full_url = sanitize_url(crn_address + endpoint)

async with aiohttp.ClientSession() as session:
async with session.get(full_url) as resp:
resp.raise_for_status()
raw = await resp.json()
vm_mmap = CrnV2List.model_validate(raw)
return vm_mmap

async def get_active_vms_v1(self, crn_address: str) -> CrnV1List:
endpoint = "/about/executions/list"

full_url = sanitize_url(crn_address + endpoint)

async with aiohttp.ClientSession() as session:
async with session.get(full_url) as resp:
resp.raise_for_status()
raw = await resp.json()
vm_map = CrnV1List.model_validate(raw)
return vm_map

async def get_active_vms(self, crn_address: str) -> Union[CrnV2List, CrnV1List]:
try:
return await self.get_active_vms_v2(crn_address)
except ClientResponseError as e:
if e.status == 404:
return await self.get_active_vms_v1(crn_address)
raise

async def get_vm(
self, crn_address: str, item_hash: ItemHash
) -> Optional[Union[CrnExecutionV1, CrnExecutionV2]]:
vms = await self.get_active_vms(crn_address)

vm_map: Dict[ItemHash, Union[CrnExecutionV1, CrnExecutionV2]] = vms.root

if item_hash not in vm_map:
return None

return vm_map[item_hash]

async def update_instance_config(self, crn_address: str, item_hash: ItemHash):
vm = await self.get_vm(crn_address, item_hash)

if not vm:
raise VmNotFoundOnHost(crn_url=crn_address, item_hash=item_hash)

# CRN have two week to upgrade their node,
# So if the CRN does not have the update
# We can't update config
if isinstance(vm, CrnExecutionV1):
raise MethodNotAvailableOnCRN()

full_url = sanitize_url(crn_address + f"/control/{item_hash}/update")

async with aiohttp.ClientSession() as session:
async with session.get(full_url) as resp:
resp.raise_for_status()
return await resp.json()
Empty file.
39 changes: 39 additions & 0 deletions src/aleph/sdk/client/service/dns/http_dns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import TYPE_CHECKING, List, Optional

import aiohttp
from aleph_message.models import ItemHash

from aleph.sdk.conf import settings
from aleph.sdk.types import Dns, DnsListAdapter
from aleph.sdk.utils import sanitize_url

if TYPE_CHECKING:
from aleph.sdk.client.http import AlephHttpClient


class DNSService:
"""
This Service mostly made to get active dns for instance:
`https://api.dns.public.aleph.sh/instances/list`
"""

def __init__(self, client: "AlephHttpClient"):
self._client = client

async def get_public_dns(self) -> List[Dns]:
"""
Get all the public dns ha
"""
async with aiohttp.ClientSession() as session:
async with session.get(sanitize_url(settings.DNS_API)) as resp:
resp.raise_for_status()
raw = await resp.json()

return DnsListAdapter.validate_json(raw)

async def get_dns_for_instance(self, vm_hash: ItemHash) -> Optional[Dns]:
dns_list: List[Dns] = await self.get_public_dns()
for dns in dns_list:
if dns.item_hash == vm_hash:
return dns
return None
12 changes: 12 additions & 0 deletions src/aleph/sdk/client/service/port_forwarder/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from aleph.sdk.types import AllForwarders, PortFlags, Ports

from .authenticated_port_forwarder import AuthenticatedPortForwarder
from .http_port_forwarder import PortForwarder

__all__ = [
"PortForwarder",
"AuthenticatedPortForwarder",
"PortFlags",
"Ports",
"AllForwarders",
]
Loading
Loading