Skip to content

Commit 7a806cf

Browse files
1yamhohphilogicaenesitor
authored
Feature: Allow PAYG using base (#258)
* Feature: handle superfuild flow * feat: control of message / flow before starting the instance * fix: mypy issue * fix: add setuptools for ci * Add unit test in pair programming * fixup! Add unit test in pair programming * Refactor: handle_flow_reduction and handle_flow into update_flow * Fix: add type annotations * fixup! Merge branch 'master' into 1yam-payg * fixup! Add unit test in pair programming * fixup! Add unit test in pair programming * Fix raised error when no flow to reduce + dumby fix echo * Add qemu_support, confidential_support and stream_reward_address checks * Fix flow outputs * Fixes after tests * Feature: Allow PAYG on bae * Fix instance confidential adding payment_chain arg * fix: mypy issue * Merge from master * Fix PAYG + balance checks * Remove default rootfs to allow prompting * Fix: Update to latest Aleph-SDK version * Fix name / crn_name * Fix: Remove BASE chain for now and ceil flow to up number to ensure to have enough tokens. * Fix: Solve code quality issues. * Fix: Solve mypy issues. --------- Co-authored-by: Hugo Herter <git@hugoherter.com> Co-authored-by: philogicae <philogicae+github@gmail.com> Co-authored-by: Andres D. Molins <amolinsdiaz@yahoo.es>
1 parent 740b156 commit 7a806cf

File tree

7 files changed

+250
-19
lines changed

7 files changed

+250
-19
lines changed

pyproject.toml

+4-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ classifiers = [
2424
]
2525

2626
dependencies = [
27-
"aleph-sdk-python>=1.0.0rc2",
28-
"aleph-message>=0.4.8",
27+
"aleph-sdk-python>=1.0.0",
28+
"setuptools>=65.5.0",
29+
"aleph-message>=0.4.9",
2930
"aiohttp==3.9.5",
3031
"typer==0.12.3",
3132
"python-magic==0.4.27",
@@ -69,6 +70,7 @@ source = "vcs"
6970
[tool.hatch.envs.default]
7071
platforms = ["linux", "macos"]
7172
dependencies = [
73+
"setuptools>=65.5.0",
7274
"pytest==8.2.2",
7375
"pytest-asyncio==0.23.7",
7476
"pytest-cov==5.0.0",

src/aleph_client/commands/help_strings.py

+1
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,4 @@
4848
CRN_PENDING = "Pending..."
4949
ALLOCATION_AUTO = "Auto - Scheduler"
5050
ALLOCATION_MANUAL = "Manual - Selection"
51+
PAYMENT_CHAIN = "Chain you want to use to pay for your instance"

src/aleph_client/commands/instance/__init__.py

+88-16
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44
import json
55
import logging
66
import shutil
7+
from decimal import Decimal
78
from ipaddress import IPv6Interface
9+
from math import ceil
810
from pathlib import Path
911
from typing import Dict, List, Optional, Tuple, Union, cast
1012

13+
import aiohttp
1114
import typer
1215
from aiohttp import ClientConnectorError, ClientResponseError, ClientSession
1316
from aleph.sdk import AlephHttpClient, AuthenticatedAlephHttpClient
1417
from aleph.sdk.account import _load_account
18+
from aleph.sdk.chains.ethereum import ETHAccount
1519
from aleph.sdk.client.vm_client import VmClient
1620
from aleph.sdk.client.vm_confidential_client import VmConfidentialClient
1721
from aleph.sdk.conf import settings as sdk_settings
@@ -21,6 +25,7 @@
2125
MessageNotFoundError,
2226
)
2327
from aleph.sdk.query.filters import MessageFilter
28+
from aleph.sdk.query.responses import PriceResponse
2429
from aleph.sdk.types import AccountFromPrivateKey, StorageEnum
2530
from aleph.sdk.utils import calculate_firmware_hash
2631
from aleph_message.models import InstanceMessage, StoreMessage
@@ -49,12 +54,15 @@
4954
setup_logging,
5055
validated_int_prompt,
5156
validated_prompt,
57+
wait_for_confirmed_flow,
58+
wait_for_processed_instance,
5259
)
5360
from aleph_client.conf import settings
5461
from aleph_client.models import CRNInfo
5562
from aleph_client.utils import AsyncTyper, fetch_json
5663

5764
from ..utils import has_nested_attr
65+
from .superfluid import FlowUpdate, update_flow
5866

5967
logger = logging.getLogger(__name__)
6068
app = AsyncTyper(no_args_is_help=True)
@@ -63,9 +71,10 @@
6371
@app.command()
6472
async def create(
6573
payment_type: PaymentType = typer.Option(None, help=help_strings.PAYMENT_TYPE),
74+
payment_chain: Chain = typer.Option(None, help=help_strings.PAYMENT_CHAIN),
6675
hypervisor: HypervisorType = typer.Option(None, help=help_strings.HYPERVISOR),
6776
name: Optional[str] = typer.Option(None, help=help_strings.INSTANCE_NAME),
68-
rootfs: str = typer.Option("ubuntu22", help=help_strings.ROOTFS),
77+
rootfs: str = typer.Option(None, help=help_strings.ROOTFS),
6978
rootfs_size: int = typer.Option(None, help=help_strings.ROOTFS_SIZE),
7079
vcpus: int = typer.Option(None, help=help_strings.VCPUS),
7180
memory: int = typer.Option(None, help=help_strings.MEMORY),
@@ -133,6 +142,28 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
133142
)
134143
is_stream = payment_type != PaymentType.hold
135144

145+
# super_token_chains = get_chains_with_super_token()
146+
super_token_chains = [Chain.AVAX.value]
147+
if is_stream:
148+
if payment_chain is None or payment_chain not in super_token_chains:
149+
payment_chain = Chain(
150+
Prompt.ask(
151+
"Which chain do you want to use for Pay-As-You-Go?",
152+
choices=super_token_chains,
153+
default=Chain.AVAX.value,
154+
)
155+
)
156+
if isinstance(account, ETHAccount):
157+
account.switch_chain(payment_chain)
158+
if account.superfluid_connector: # Quick check with theoretical min price
159+
try:
160+
account.superfluid_connector.can_start_flow(Decimal(0.000031)) # 0.11/h
161+
except Exception as e:
162+
echo(e)
163+
raise typer.Exit(code=1)
164+
else:
165+
payment_chain = Chain.ETH # Hold chain for all balances
166+
136167
if confidential:
137168
if hypervisor and hypervisor != HypervisorType.qemu:
138169
echo("Only QEMU is supported as an hypervisor for confidential")
@@ -171,7 +202,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
171202
if confidential:
172203
# Confidential only support custom rootfs
173204
rootfs = "custom"
174-
elif rootfs not in os_choices:
205+
elif not rootfs or rootfs not in os_choices:
175206
rootfs = Prompt.ask(
176207
"Use a custom rootfs or one of the following prebuilt ones:",
177208
default=rootfs,
@@ -206,6 +237,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
206237
if not firmware_message:
207238
echo("Confidential Firmware hash does not exist on aleph.im")
208239
raise typer.Exit(code=1)
240+
209241
name = name or validated_prompt("Instance name", lambda x: len(x) < 65)
210242
rootfs_size = rootfs_size or validated_int_prompt(
211243
"Disk size in MiB", default=settings.DEFAULT_ROOTFS_SIZE, min_value=10_240, max_value=102_400
@@ -228,27 +260,24 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
228260
immutable_volume=immutable_volume,
229261
)
230262

231-
# For PAYG or confidential, the user select directly the node on which to run on
232-
# For PAYG User have to make the payment stream separately
233-
# For now, we allow hold for confidential, but the user still has to choose on which CRN to run.
234263
stream_reward_address = None
235264
crn = None
236265
if crn_url and crn_hash:
237266
crn_url = sanitize_url(crn_url)
238267
try:
239-
name, score, reward_addr = "?", 0, ""
268+
crn_name, score, reward_addr = "?", 0, ""
240269
nodes: NodeInfo = await _fetch_nodes()
241270
for node in nodes.nodes:
242271
if node["address"].rstrip("/") == crn_url:
243-
name = node["name"]
272+
crn_name = node["name"]
244273
score = node["score"]
245274
reward_addr = node["stream_reward"]
246275
break
247276
crn_info = await fetch_crn_info(crn_url)
248277
if crn_info:
249278
crn = CRNInfo(
250279
hash=ItemHash(crn_hash),
251-
name=name or "?",
280+
name=crn_name or "?",
252281
url=crn_url,
253282
version=crn_info.get("version", ""),
254283
score=score,
@@ -293,13 +322,11 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
293322
raise typer.Exit(1)
294323

295324
async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client:
296-
payment: Optional[Payment] = None
297-
if stream_reward_address:
298-
payment = Payment(
299-
chain=Chain.AVAX,
300-
receiver=stream_reward_address,
301-
type=payment_type,
302-
)
325+
payment = Payment(
326+
chain=payment_chain,
327+
receiver=stream_reward_address if stream_reward_address else None,
328+
type=payment_type,
329+
)
303330
try:
304331
message, status = await client.create_instance(
305332
sync=True,
@@ -341,7 +368,36 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
341368
# Not the ideal solution
342369
logger.debug(f"Cannot allocate {item_hash}: no CRN url")
343370
return item_hash, crn_url
344-
account = _load_account(private_key, private_key_file)
371+
372+
# Wait for the instance message to be processed
373+
async with aiohttp.ClientSession() as session:
374+
await wait_for_processed_instance(session, item_hash)
375+
376+
# Pay-As-You-Go
377+
if payment_type == PaymentType.superfluid:
378+
price: PriceResponse = await client.get_program_price(item_hash)
379+
ceil_factor = 10**18
380+
required_tokens = ceil(Decimal(price.required_tokens) * ceil_factor) / ceil_factor
381+
if isinstance(account, ETHAccount) and account.superfluid_connector:
382+
try: # Double check with effective price
383+
account.superfluid_connector.can_start_flow(Decimal(0.000031)) # Min for 0.11/h
384+
except Exception as e:
385+
echo(e)
386+
raise typer.Exit(code=1)
387+
flow_hash = await update_flow(
388+
account=account,
389+
receiver=crn.stream_reward_address,
390+
flow=Decimal(required_tokens),
391+
update_type=FlowUpdate.INCREASE,
392+
)
393+
# Wait for the flow transaction to be confirmed
394+
await wait_for_confirmed_flow(account, message.content.payment.receiver)
395+
if flow_hash:
396+
echo(
397+
f"Flow {flow_hash} has been created:\n\t- price/sec: {price.required_tokens:.7f} ALEPH\n\t- receiver: {crn.stream_reward_address}"
398+
)
399+
400+
# Notify CRN
345401
async with VmClient(account, crn.url) as crn_client:
346402
status, result = await crn_client.start_instance(vm_id=item_hash)
347403
logger.debug(status, result)
@@ -437,6 +493,20 @@ async def delete(
437493
echo("You are not the owner of this instance")
438494
raise typer.Exit(code=1)
439495

496+
# Check for streaming payment and eventually stop it
497+
payment: Optional[Payment] = existing_message.content.payment
498+
if payment is not None and payment.type == PaymentType.superfluid:
499+
price: PriceResponse = await client.get_program_price(item_hash)
500+
if payment.receiver is not None:
501+
if isinstance(account, ETHAccount):
502+
account.switch_chain(payment.chain)
503+
if account.superfluid_connector:
504+
flow_hash = await update_flow(
505+
account, payment.receiver, Decimal(price.required_tokens), FlowUpdate.REDUCE
506+
)
507+
if flow_hash:
508+
echo(f"Flow {flow_hash} has been deleted.")
509+
440510
# Check status of the instance and eventually erase associated VM
441511
node_list: NodeInfo = await _fetch_nodes()
442512
_, details = await _get_instance_details(existing_message, node_list)
@@ -962,6 +1032,7 @@ async def confidential(
9621032
keep_session: bool = typer.Option(None, help=help_strings.KEEP_SESSION),
9631033
vm_secret: str = typer.Option(None, help=help_strings.VM_SECRET),
9641034
payment_type: PaymentType = typer.Option(None, help=help_strings.PAYMENT_TYPE),
1035+
payment_chain: Optional[Chain] = typer.Option(None, help=help_strings.PAYMENT_CHAIN),
9651036
name: Optional[str] = typer.Option(None, help=help_strings.INSTANCE_NAME),
9661037
rootfs: str = typer.Option("ubuntu22", help=help_strings.ROOTFS),
9671038
rootfs_size: int = typer.Option(None, help=help_strings.ROOTFS_SIZE),
@@ -1002,6 +1073,7 @@ async def confidential(
10021073
if not vm_id or len(vm_id) != 64:
10031074
vm_id, crn_url = await create(
10041075
payment_type,
1076+
payment_chain,
10051077
None,
10061078
name,
10071079
rootfs,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import logging
2+
from decimal import Decimal
3+
from enum import Enum
4+
5+
from aleph.sdk.chains.ethereum import ETHAccount
6+
from aleph.sdk.conf import settings
7+
from aleph_message.models import Chain
8+
from click import echo
9+
from eth_utils.currency import to_wei
10+
from superfluid import Web3FlowInfo
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
def from_wei(wei_value: Decimal) -> Decimal:
16+
"""Converts the given wei value to ether."""
17+
return wei_value / Decimal(10**settings.TOKEN_DECIMALS)
18+
19+
20+
class FlowUpdate(str, Enum):
21+
REDUCE = "reduce"
22+
INCREASE = "increase"
23+
24+
25+
async def update_flow(account: ETHAccount, receiver: str, flow: Decimal, update_type: FlowUpdate):
26+
"""
27+
Update the flow of a Superfluid stream between a sender and receiver.
28+
This function either increases or decreases the flow rate between the sender and receiver,
29+
based on the update_type. If no flow exists and the update type is augmentation, it creates a new flow
30+
with the specified rate. If the update type is reduction and the reduction amount brings the flow to zero
31+
or below, the flow is deleted.
32+
33+
:param account: The SuperFluid account instance used to interact with the blockchain.
34+
:param chain: The blockchain chain to interact with.
35+
:param receiver: Address of the receiver in hexadecimal format.
36+
:param flow: The flow rate to be added or removed (in ether).
37+
:param update_type: The type of update to perform (augmentation or reduction).
38+
:return: The transaction hash of the executed operation (create, update, or delete flow).
39+
"""
40+
41+
# Retrieve current flow info
42+
flow_info: Web3FlowInfo = await account.get_flow(receiver)
43+
44+
current_flow_rate_wei: Decimal = Decimal(flow_info["flowRate"] or "0")
45+
flow_rate_wei: int = to_wei(flow, "ether")
46+
47+
if update_type == FlowUpdate.INCREASE:
48+
if current_flow_rate_wei > 0:
49+
# Update existing flow by augmenting the rate
50+
new_flow_rate_wei = current_flow_rate_wei + flow_rate_wei
51+
new_flow_rate_ether = from_wei(new_flow_rate_wei)
52+
return await account.update_flow(receiver, new_flow_rate_ether)
53+
else:
54+
# Create a new flow if none exists
55+
return await account.create_flow(receiver, flow)
56+
elif update_type == FlowUpdate.REDUCE:
57+
if current_flow_rate_wei > 0:
58+
# Reduce the existing flow
59+
new_flow_rate_wei = current_flow_rate_wei - flow_rate_wei
60+
if new_flow_rate_wei > 0:
61+
new_flow_rate_ether = from_wei(new_flow_rate_wei)
62+
return await account.update_flow(receiver, new_flow_rate_ether)
63+
else:
64+
# Delete the flow if the new flow rate is zero or negative
65+
return await account.delete_flow(receiver)
66+
else:
67+
echo("No existing flow to stop. Skipping...")

src/aleph_client/commands/message.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ async def post(
128128
file_size = os.path.getsize(path)
129129
storage_engine = StorageEnum.ipfs if file_size > 4 * 1024 * 1024 else StorageEnum.storage
130130

131-
with open(path, "r") as fd:
131+
with open(path) as fd:
132132
content = json.load(fd)
133133

134134
else:

src/aleph_client/commands/utils.py

+31
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import logging
45
import os
56
import sys
67
from datetime import datetime
78
from typing import Any, Callable, Dict, List, Optional, TypeVar, Union
89

910
import typer
11+
from aiohttp import ClientSession
12+
from aleph.sdk.chains.ethereum import ETHAccount
13+
from aleph.sdk.conf import settings as sdk_settings
1014
from aleph.sdk.types import GenericMessage
15+
from aleph_message.models import ItemHash
1116
from pygments import highlight
1217
from pygments.formatters.terminal256 import Terminal256Formatter
1318
from pygments.lexers import JsonLexer
1419
from rich.prompt import IntPrompt, Prompt, PromptError
1520
from typer import echo
1621

22+
from aleph_client.utils import fetch_json
23+
1724
logger = logging.getLogger(__name__)
1825

1926

@@ -208,3 +215,27 @@ def has_nested_attr(obj, *attr_chain) -> bool:
208215
return False
209216
obj = getattr(obj, attr)
210217
return True
218+
219+
220+
async def wait_for_processed_instance(session: ClientSession, item_hash: ItemHash):
221+
"""Wait for a message to be processed by CCN"""
222+
while True:
223+
url = f"{sdk_settings.API_HOST.rstrip('/')}/api/v0/messages/{item_hash}"
224+
message = await fetch_json(session, url)
225+
if message["status"] == "processed":
226+
return
227+
elif message["status"] == "pending":
228+
typer.echo(f"Message {item_hash} is still pending, waiting 10sec...")
229+
await asyncio.sleep(10)
230+
elif message["status"] == "rejected":
231+
raise Exception(f"Message {item_hash} has been rejected")
232+
233+
234+
async def wait_for_confirmed_flow(account: ETHAccount, receiver: str):
235+
"""Wait for a flow to be confirmed on-chain"""
236+
while True:
237+
flow = await account.get_flow(receiver)
238+
if flow:
239+
return
240+
typer.echo("Flow transaction is still pending, waiting 10sec...")
241+
await asyncio.sleep(10)

0 commit comments

Comments
 (0)