From e4925e3cc76b6d4499cea9d63fec62fe8c50f4d7 Mon Sep 17 00:00:00 2001 From: El De-dog-lo <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 9 Oct 2024 12:17:00 -0400 Subject: [PATCH] feat(platform): add payment cli for streaming payments w/ ApePay (#129) * refactor(cluster): add stream_id to cluster record * refactor(client): add streaming payment methods to platform client * style(client): make mypy happy * refactor(cli): add some helpful methods * feat(cluster): add commands to pay for, add funds to, and cancel streams * feat(cli): add note on how long it takes to deploy, and how to check * fix(cli): bad variable reference * feat(cli): add more helpful notes to CLI for payments * chore(deps): ApePay didn't officially support Py 3.10 * docs(platform): add docs guide for payments * fix(typing): switch order of computed_field and property * fix(client): revert change ordering of property/computed_filed * docs(cli): add better note to CLI for cluster pay funding * fix: black and isort --------- Co-authored-by: johnson2427 <37009091+johnson2427@users.noreply.github.com> Co-authored-by: johnson2427 --- docs/userguides/platform.md | 20 +++ setup.py | 1 + silverback/_cli.py | 317 ++++++++++++++++++++++++++++++++--- silverback/_click_ext.py | 64 +++++++ silverback/cluster/client.py | 33 ++++ silverback/cluster/types.py | 3 +- 6 files changed, 410 insertions(+), 28 deletions(-) diff --git a/docs/userguides/platform.md b/docs/userguides/platform.md index f4e405fa..922fb5c8 100644 --- a/docs/userguides/platform.md +++ b/docs/userguides/platform.md @@ -20,6 +20,23 @@ The Platform UI will let you create and manage Clusters using a graphical experi The CLI experience is for those working locally who don't want to visit the website, or are locally developing their applications. ``` +Once you have created your Cluster, you have to fund it so it is made available for your use. +To do that, use the [`silverback cluster pay create`][silverback-cluster-pay-create] command to fund your newly created cluster. +Please note that provisioning your cluster will take time, and it may take up to an hour for it to be ready. +Check back after 10-15 minutes using the [`silverback cluster info`][silverback-cluster-info] command to see when it's ready. + +At any point after the Cluster is funded, you can fund it with more funds via [`silverback cluster pay add-time`][silverback-cluster-pay-add-time] +command to extend the timeline that the Cluster is kept around for. +Note that it is possible for anyone to add more time to the Cluster, at any time and for any amount. + +If that timeline expires, the Platform will automatically de-provision your infrastructure, and it is not possible to reverse this! +The Platform may send you notifications when your Stream is close to expiring, but it is up to you to remember to fill it so it doesn't. +Note that your data collection will stay available for up to 30 days allowing you the ability to download any data you need. + +Lastly, if you ever feel like you no longer need your Cluster, you can cancel the funding for it and get a refund of the remaining funds. +If you are the owner of the Stream, you can do this via the [`silverback cluster pay cancel`][silverback-cluster-pay-cancel] command. +Only the owner may do this, so if you are not the owner you should contact them to have them do that action for you. + ## Connecting to your Cluster To connect to a cluster, you can use commands from the [`silverback cluster`][silverback-cluster] subcommand group. @@ -222,6 +239,9 @@ TODO: Downloading metrics from your Bot [silverback-cluster-health]: ../commands/cluster.html#silverback-cluster-health [silverback-cluster-info]: ../commands/cluster.html#silverback-cluster-info [silverback-cluster-new]: ../commands/cluster.html#silverback-cluster-new +[silverback-cluster-pay-add-time]: ../commands/cluster.html#silverback-cluster-pay-add-time +[silverback-cluster-pay-cancel]: ../commands/cluster.html#silverback-cluster-pay-cancel +[silverback-cluster-pay-create]: ../commands/cluster.html#silverback-cluster-pay-create [silverback-cluster-registry-auth-new]: ../commands/cluster.html#silverback-cluster-registry-auth-new [silverback-cluster-vars]: ../commands/cluster.html#silverback-cluster-vars [silverback-cluster-vars-info]: ../commands/cluster.html#silverback-cluster-vars-info diff --git a/setup.py b/setup.py index 27035f80..39b665bb 100644 --- a/setup.py +++ b/setup.py @@ -61,6 +61,7 @@ url="https://github.com/ApeWorX/silverback", include_package_data=True, install_requires=[ + "apepay>=0.3.1,<1", "click", # Use same version as eth-ape "eth-ape>=0.7,<1.0", "ethpm-types>=0.6.10", # lower pin only, `eth-ape` governs upper pin diff --git a/silverback/_cli.py b/silverback/_cli.py index 67a5ace7..fe193e47 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -2,17 +2,21 @@ import os import shlex import subprocess +from datetime import timedelta from pathlib import Path import click import yaml # type: ignore[import-untyped] +from ape.api import AccountAPI, NetworkAPI from ape.cli import ( AccountAliasPromptChoice, ConnectedProviderCommand, + account_option, ape_cli_context, network_option, ) -from ape.exceptions import Abort +from ape.contracts import ContractInstance +from ape.exceptions import Abort, ApeException from fief_client.integrations.cli import FiefAuth from silverback._click_ext import ( @@ -23,9 +27,11 @@ cluster_client, display_login_message, platform_client, + timedelta_callback, + token_amount_callback, ) from silverback.cluster.client import ClusterClient, PlatformClient -from silverback.cluster.types import ClusterTier +from silverback.cluster.types import ClusterTier, ResourceStatus from silverback.runner import PollingRunner, WebsocketRunner from silverback.worker import run_worker @@ -259,35 +265,121 @@ def list_clusters(platform: PlatformClient, workspace: str): "cluster_slug", help="Slug for new cluster (Defaults to `name.lower()`)", ) +@click.argument("workspace") +@platform_client +def new_cluster( + platform: PlatformClient, + workspace: str, + cluster_name: str | None, + cluster_slug: str | None, +): + """Create a new cluster in WORKSPACE""" + + if not (workspace_client := platform.workspaces.get(workspace)): + raise click.BadOptionUsage("workspace", f"Unknown workspace '{workspace}'") + + if cluster_name: + click.echo(f"name: {cluster_name}") + click.echo(f"slug: {cluster_slug or cluster_name.lower().replace(' ', '-')}") + + elif cluster_slug: + click.echo(f"slug: {cluster_slug}") + + cluster = workspace_client.create_cluster( + cluster_name=cluster_name, + cluster_slug=cluster_slug, + ) + click.echo(f"{click.style('SUCCESS', fg='green')}: Created '{cluster.name}'") + + if cluster.status == ResourceStatus.CREATED: + click.echo( + f"{click.style('WARNING', fg='yellow')}: To use this cluster, " + f"please pay via `silverback cluster pay create {workspace}/{cluster_slug}`" + ) + + +@cluster.group(cls=SectionedHelpGroup, section="Platform Commands (https://silverback.apeworx.io)") +def pay(): + """Pay for CLUSTER with Crypto using ApePay streaming payments""" + + +@pay.command(name="create", cls=ConnectedProviderCommand) +@account_option() +@click.argument("cluster_path") @click.option( "-t", "--tier", - default=ClusterTier.PERSONAL.name, + default=ClusterTier.PERSONAL.name.capitalize(), metavar="NAME", + type=click.Choice( + [ + ClusterTier.PERSONAL.name.capitalize(), + ClusterTier.PROFESSIONAL.name.capitalize(), + ] + ), help="Named set of options to use for cluster as a base (Defaults to Personal)", ) @click.option( "-c", "--config", "config_updates", + metavar="KEY VALUE", type=(str, str), multiple=True, - help="Config options to set for cluster (overrides value of -t/--tier)", + help="Config options to set for cluster (overrides values from -t/--tier selection)", +) +@click.option("--token", metavar="ADDRESS", help="Token Symbol or Address to use to fund stream") +@click.option( + "--amount", + "token_amount", + metavar="VALUE", + callback=token_amount_callback, + default=None, + help="Token amount to use to fund stream", +) +@click.option( + "--time", + "stream_time", + metavar="TIMESTAMP or TIMEDELTA", + callback=timedelta_callback, + default=None, + help="Time to fund stream for", ) -@click.argument("workspace") @platform_client -def new_cluster( +def create_payment_stream( platform: PlatformClient, - workspace: str, - cluster_name: str | None, - cluster_slug: str | None, + network: NetworkAPI, + account: AccountAPI, + cluster_path: str, tier: str, config_updates: list[tuple[str, str]], + token: ContractInstance | None, + token_amount: int | None, + stream_time: timedelta | None, ): - """Create a new cluster in WORKSPACE""" + """ + Create a new streaming payment for a given CLUSTER - if not (workspace_client := platform.workspaces.get(workspace)): - raise click.BadOptionUsage("workspace", f"Unknown workspace '{workspace}'") + NOTE: This action cannot be cancelled! Streams must exist for at least 1 hour before cancelling. + """ + + if "/" not in cluster_path or len(cluster_path.split("/")) > 2: + raise click.BadArgumentUsage(f"Invalid cluster path: '{cluster_path}'") + + workspace_name, cluster_name = cluster_path.split("/") + if not (workspace_client := platform.workspaces.get(workspace_name)): + raise click.BadArgumentUsage(f"Unknown workspace: '{workspace_name}'") + + elif not (cluster := workspace_client.clusters.get(cluster_name)): + raise click.BadArgumentUsage( + f"Unknown cluster in workspace '{workspace_name}': '{cluster_name}'" + ) + + elif cluster.status != ResourceStatus.CREATED: + raise click.UsageError(f"Cannot fund '{cluster_path}': cluster has existing streams.") + + elif token_amount is None and stream_time is None: + raise click.UsageError("Must specify one of '--amount' or '--time'.") if not hasattr(ClusterTier, tier.upper()): raise click.BadOptionUsage("tier", f"Invalid choice: {tier}") @@ -297,30 +389,201 @@ def new_cluster( for k, v in config_updates: setattr(configuration, k, int(v) if v.isnumeric() else v) - if cluster_name: - click.echo(f"name: {cluster_name}") - click.echo(f"slug: {cluster_slug or cluster_name.lower().replace(' ', '-')}") + sm = platform.get_stream_manager(network.chain_id) + product = configuration.get_product_code(account.address, cluster.id) - elif cluster_slug: - click.echo(f"slug: {cluster_slug}") + if not token: + accepted_tokens = platform.get_accepted_tokens(network.chain_id) + token = accepted_tokens.get( + click.prompt( + "Select one of the following tokens to fund your stream with", + type=click.Choice(list(accepted_tokens)), + ) + ) + assert token # mypy happy + + if not token_amount: + assert stream_time # mypy happy + one_token = 10 ** token.decimals() + token_amount = int( + one_token + * ( + stream_time.total_seconds() + / sm.compute_stream_life( + account.address, token, one_token, [product] + ).total_seconds() + ) + ) + else: + stream_time = sm.compute_stream_life(account.address, token, token_amount, [product]) + + assert token_amount # mypy happy click.echo(yaml.safe_dump(dict(configuration=configuration.settings_display_dict()))) + click.echo(f"duration: {stream_time}\n") - if not click.confirm("Do you want to make a new cluster with this configuration?"): + if not click.confirm( + f"Do you want to use this configuration to fund Cluster '{cluster_path}'?" + ): return - cluster = workspace_client.create_cluster( - cluster_name=cluster_name, - cluster_slug=cluster_slug, + if not token.balanceOf(account) >= token_amount: + raise click.UsageError( + f"Do not have sufficient balance of '{token.symbol()}' to fund stream." + ) + + elif not token.allowance(account, sm.address) >= token_amount: + click.echo(f"Approve StreamManager({sm.address}) for '{token.symbol()}'") + token.approve( + sm.address, + 2**256 - 1 if click.confirm("Unlimited Approval?") else token_amount, + sender=account, + ) + + # NOTE: will ask for approvals and do additional checks + try: + stream = sm.create( + token, token_amount, [product], min_stream_life=stream_time, sender=account + ) + except ApeException as e: + raise click.UsageError(str(e)) from e + + click.echo(f"{click.style('SUCCESS', fg='green')}: Cluster funded for {stream.time_left}.") + + click.echo( + f"{click.style('WARNING', fg='yellow')}: Cluster may take up to 1 hour to deploy." + " Check back in 10-15 minutes using `silverback cluster info` to start using your cluster." ) - click.echo(f"{click.style('SUCCESS', fg='green')}: Created '{cluster.name}'") - # TODO: Pay for cluster via new stream -# `silverback cluster pay WORKSPACE/NAME --account ALIAS --time "10 days"` -# TODO: Create a signature scheme for ClusterInfo -# (ClusterInfo configuration as plaintext, .id as nonce?) -# TODO: Test payment w/ Signature validation of extra data +@pay.command(name="add-time", cls=ConnectedProviderCommand) +@account_option() +@click.argument("cluster_path", metavar="CLUSTER") +@click.option( + "--amount", + "token_amount", + metavar="VALUE", + callback=token_amount_callback, + default=None, + help="Token amount to use to fund stream", +) +@click.option( + "--time", + "stream_time", + metavar="TIMESTAMP or TIMEDELTA", + callback=timedelta_callback, + default=None, + help="Time to fund stream for", +) +@platform_client +def fund_payment_stream( + platform: PlatformClient, + network: NetworkAPI, + account: AccountAPI, + cluster_path: str, + token_amount: int | None, + stream_time: timedelta | None, +): + """ + Fund an existing streaming payment for the given CLUSTER + + NOTE: You can fund anyone else's Stream! + """ + + if "/" not in cluster_path or len(cluster_path.split("/")) > 2: + raise click.BadArgumentUsage(f"Invalid cluster path: '{cluster_path}'") + + workspace_name, cluster_name = cluster_path.split("/") + if not (workspace_client := platform.workspaces.get(workspace_name)): + raise click.BadArgumentUsage(f"Unknown workspace: '{workspace_name}'") + + elif not (cluster := workspace_client.clusters.get(cluster_name)): + raise click.BadArgumentUsage( + f"Unknown cluster in workspace '{workspace_name}': '{cluster_name}'" + ) + + elif cluster.status != ResourceStatus.RUNNING: + raise click.UsageError(f"Cannot fund '{cluster_info.name}': cluster is not running.") + + elif not (stream := workspace_client.get_payment_stream(cluster, network.chain_id)): + raise click.UsageError("Cluster is not funded via ApePay Stream") + + elif token_amount is None and stream_time is None: + raise click.UsageError("Must specify one of '--amount' or '--time'.") + + if not token_amount: + assert stream_time # mypy happy + one_token = 10 ** stream.token.decimals() + token_amount = int( + one_token + * ( + stream_time.total_seconds() + / stream.manager.compute_stream_life( + account.address, stream.token, one_token, stream.products + ).total_seconds() + ) + ) + + if not stream.token.balanceOf(account) >= token_amount: + raise click.UsageError("Do not have sufficient funding") + + elif not stream.token.allowance(account, stream.manager.address) >= token_amount: + click.echo(f"Approving StreamManager({stream.manager.address})") + stream.token.approve( + stream.manager.address, + 2**256 - 1 if click.confirm("Unlimited Approval?") else token_amount, + sender=account, + ) + + click.echo( + f"Funding Stream for Cluster '{cluster_path}' with " + f"{token_amount / 10**stream.token.decimals():0.4f} {stream.token.symbol()}" + ) + stream.add_funds(token_amount, sender=account) + + click.echo(f"{click.style('SUCCESS', fg='green')}: Cluster funded for {stream.time_left}.") + + +@pay.command(name="cancel", cls=ConnectedProviderCommand) +@account_option() +@click.argument("cluster_path", metavar="CLUSTER") +@platform_client +def cancel_payment_stream( + platform: PlatformClient, + network: NetworkAPI, + account: AccountAPI, + cluster_path: str, +): + """ + Shutdown CLUSTER and refund all funds to Stream owner + + NOTE: Only the Stream owner can perform this action! + """ + + if "/" not in cluster_path or len(cluster_path.split("/")) > 2: + raise click.BadArgumentUsage(f"Invalid cluster path: '{cluster_path}'") + + workspace_name, cluster_name = cluster_path.split("/") + if not (workspace_client := platform.workspaces.get(workspace_name)): + raise click.BadArgumentUsage(f"Unknown workspace: '{workspace_name}'") + + elif not (cluster := workspace_client.clusters.get(cluster_name)): + raise click.BadArgumentUsage( + f"Unknown cluster in workspace '{workspace_name}': '{cluster_name}'" + ) + + elif cluster.status != ResourceStatus.RUNNING: + raise click.UsageError(f"Cannot fund '{cluster_info.name}': cluster is not running.") + + elif not (stream := workspace_client.get_payment_stream(cluster, network.chain_id)): + raise click.UsageError("Cluster is not funded via ApePay Stream") + + if click.confirm( + click.style("This action is irreversible, are you sure?", bold=True, bg="red") + ): + stream.cancel(sender=account) + + click.echo(f"{click.style('WARNING', fg='yellow')}: Cluster cannot be used anymore.") @cluster.command(name="info") diff --git a/silverback/_click_ext.py b/silverback/_click_ext.py index e849f3dc..0435bbcf 100644 --- a/silverback/_click_ext.py +++ b/silverback/_click_ext.py @@ -1,7 +1,11 @@ +from datetime import datetime, timedelta from functools import update_wrapper from pathlib import Path import click +from ape import Contract, convert +from ape.contracts import ContractInstance +from ape.types import AddressType from fief_client import Fief from fief_client.integrations.cli import FiefAuth, FiefAuthNotAuthenticatedError @@ -32,6 +36,66 @@ def cls_import_callback(ctx, param, cls_name): raise click.BadParameter(message=f"Failed to import {param} class: '{cls_name}'.") +def contract_callback( + ctx: click.Context, param: click.Parameter, contract_address: str +) -> ContractInstance: + return Contract(convert(contract_address, AddressType)) + + +def token_amount_callback( + ctx: click.Context, + param: click.Parameter, + token_amount: str | None, +) -> int | None: + if token_amount is None: + return None + + return convert(token_amount, int) + + +def timedelta_callback( + ctx: click.Context, param: click.Parameter, timestamp_or_str: str | None +) -> timedelta | None: + if timestamp_or_str is None: + return None + + try: + timestamp = datetime.fromisoformat(timestamp_or_str) + except ValueError: + timestamp = None + + if timestamp: + if timestamp <= (now := datetime.now()): + raise click.BadParameter("Must be a time in the future.", ctx=ctx, param=param) + return timestamp - now + + elif " " in timestamp_or_str: + units_value = {} + for time_units in map(lambda s: s.strip(), timestamp_or_str.split(",")): + + time, units = time_units.split(" ") + if not units.endswith("s"): + units += "s" + + if units not in {"seconds", "minutes", "hours", "days", "weeks"}: + raise click.BadParameter( + f"Not spelled properly: '{time_units}'.", ctx=ctx, param=param + ) + + units_value[units] = int(time) + + return timedelta(**units_value) # type: ignore[arg-type] + + elif timestamp_or_str.isnumeric(): + return timedelta(seconds=int(timestamp_or_str)) + + raise click.BadParameter( + "Must be an ISO timestamp (in the future), or a timedelta like '1 week'.", + ctx=ctx, + param=param, + ) + + class OrderedCommands(click.Group): # NOTE: Override so we get the list ordered by definition order def list_commands(self, ctx: click.Context) -> list[str]: diff --git a/silverback/cluster/client.py b/silverback/cluster/client.py index 294331b2..d612cee3 100644 --- a/silverback/cluster/client.py +++ b/silverback/cluster/client.py @@ -2,6 +2,9 @@ from typing import ClassVar, Literal import httpx +from ape import Contract +from ape.contracts import ContractInstance +from apepay import Stream, StreamManager from pydantic import computed_field from silverback.version import version @@ -13,6 +16,7 @@ ClusterInfo, ClusterState, RegistryCredentialsInfo, + StreamInfo, VariableGroupInfo, WorkspaceInfo, ) @@ -362,6 +366,23 @@ def create_cluster( self.clusters.update({new_cluster.slug: new_cluster}) # NOTE: Update cache return new_cluster + def get_payment_stream(self, cluster: ClusterInfo, chain_id: int) -> Stream | None: + response = self.client.get( + f"/clusters/{cluster.id}/stream", + params=dict(workspace=str(self.id)), + ) + handle_error_with_response(response) + + if not (raw_stream_info := response.json()): + return None + + stream_info = StreamInfo.model_validate(raw_stream_info) + + if not stream_info.chain_id == chain_id: + return None + + return Stream(manager=StreamManager(stream_info.manager), id=stream_info.stream_id) + class PlatformClient(httpx.Client): def __init__(self, *args, **kwargs): @@ -411,3 +432,15 @@ def create_workspace( new_workspace = Workspace.model_validate_json(response.text) self.workspaces.update({new_workspace.slug: new_workspace}) # NOTE: Update cache return new_workspace + + def get_stream_manager(self, chain_id: int) -> StreamManager: + response = self.get(f"/streams/manager/{chain_id}") + handle_error_with_response(response) + return StreamManager(response.json()) + + def get_accepted_tokens(self, chain_id: int) -> dict[str, ContractInstance]: + response = self.get(f"/streams/tokens/{chain_id}") + handle_error_with_response(response) + return { + token_info["symbol"]: Contract(token_info["address"]) for token_info in response.json() + } diff --git a/silverback/cluster/types.py b/silverback/cluster/types.py index 0a35ee51..4f054d52 100644 --- a/silverback/cluster/types.py +++ b/silverback/cluster/types.py @@ -284,7 +284,8 @@ class ClusterInfo(BaseModel): name: str # User-friendly display name slug: str # Shorthand name, for CLI and URI usage - expiration: datetime | None = None # NOTE: self-hosted clusters have no expiration + expiration: datetime | None = None # NOTE: Self-hosted clusters have no expiration + stream_id: uuid.UUID | None = None # NOTE: If there is an ApePay payment stream for this created: datetime # When the resource was first created status: ResourceStatus