From 8c1c4ee40c556c3aab889e3f4121b12d90aebdbd Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Tue, 2 Jan 2024 17:21:17 +0100 Subject: [PATCH 01/18] Emit CommandProgress event --- yapapi/events.py | 3 +++ yapapi/log.py | 1 + yapapi/rest/activity.py | 15 +++++++++++++-- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/yapapi/events.py b/yapapi/events.py index 312d15866..ed19aee1a 100644 --- a/yapapi/events.py +++ b/yapapi/events.py @@ -521,6 +521,9 @@ class CommandStdOut(CommandEvent): class CommandStdErr(CommandEvent): output: str +@attr.s(auto_attribs=True, repr=False) +class CommandProgress(CommandEvent): + message: str @attr.s(auto_attribs=True, repr=False) class TaskAccepted(TaskEvent): diff --git a/yapapi/log.py b/yapapi/log.py index dc19c944a..179bfa084 100644 --- a/yapapi/log.py +++ b/yapapi/log.py @@ -165,6 +165,7 @@ def enable_default_logger( events.CommandStdOut: "Command stdout", events.CommandStdErr: "Command stderr", events.CommandExecuted: "Script command executed", + events.CommandProgress: "Script command progress update", events.GettingResults: "Getting script results", events.ScriptFinished: "Script finished", events.TaskAccepted: "Task accepted", diff --git a/yapapi/rest/activity.py b/yapapi/rest/activity.py index 8b13e605d..1a4541063 100644 --- a/yapapi/rest/activity.py +++ b/yapapi/rest/activity.py @@ -283,8 +283,7 @@ async def __aiter__(self) -> AsyncIterator[CommandEventData]: results = await self._get_results(timeout=min(timeout, 5)) any_new: bool = False - results = results[last_idx:] - for result in results: + for result in results[last_idx:]: any_new = True assert last_idx == result.index, f"Expected {last_idx}, got {result.index}" @@ -300,6 +299,18 @@ async def __aiter__(self) -> AsyncIterator[CommandEventData]: last_idx = result.index + 1 if result.is_batch_finished: break + + current_idx = last_idx - 1 + if current_idx >= 0: + current = results[current_idx] + + if current.message is not None: + kwargs = dict( + cmd_idx=current.index, + message=current.message, + ) + yield events.CommandProgress, kwargs + if not any_new: delay = min(3, max(0, self.seconds_left())) await asyncio.sleep(delay) From aecf2996ef84d740425b30a7c3d0e2a4d25448dd Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Tue, 2 Jan 2024 18:30:01 +0100 Subject: [PATCH 02/18] Fix formatting --- yapapi/events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/yapapi/events.py b/yapapi/events.py index ed19aee1a..e97a06127 100644 --- a/yapapi/events.py +++ b/yapapi/events.py @@ -521,10 +521,12 @@ class CommandStdOut(CommandEvent): class CommandStdErr(CommandEvent): output: str + @attr.s(auto_attribs=True, repr=False) class CommandProgress(CommandEvent): message: str + @attr.s(auto_attribs=True, repr=False) class TaskAccepted(TaskEvent): @property From 8688c90ed1a03e44d8d3514c0fd406fd8bf4c2f8 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Tue, 9 Jan 2024 18:32:58 +0100 Subject: [PATCH 03/18] Transfer progress impl based on streaming results --- examples/transfer-progress/progress.py | 128 +++++++++++++++++++++++++ yapapi/events.py | 5 +- yapapi/rest/activity.py | 18 ++-- yapapi/script/__init__.py | 11 ++- yapapi/script/command.py | 82 +++++++++++----- 5 files changed, 209 insertions(+), 35 deletions(-) create mode 100644 examples/transfer-progress/progress.py diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py new file mode 100644 index 000000000..0c2222e50 --- /dev/null +++ b/examples/transfer-progress/progress.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 + +import pathlib +import sys +import json +from dataclasses import dataclass + +from yapapi.payload import Payload, vm +from yapapi.props import inf +from yapapi.props.base import constraint, prop +from yapapi.script import ProgressArgs +from yapapi.services import Service + +import asyncio +from datetime import datetime + +import colorama # type: ignore + +from yapapi import Golem + +examples_dir = pathlib.Path(__file__).resolve().parent.parent +sys.path.append(str(examples_dir)) + +from utils import ( + TEXT_COLOR_CYAN, + TEXT_COLOR_DEFAULT, + TEXT_COLOR_MAGENTA, + TEXT_COLOR_YELLOW, + build_parser, + format_usage, + print_env_info, + run_golem_example, +) + +RUNTIME_NAME = "ai" +CAPABILITIES = "golem.runtime.capabilities" + + +def progress_event_handler(event: "yapapi.events.CommandProgress"): + if event.progress is not None: + progress = event.progress + + percent = 0.0 + if progress[1] is None: + percent = "unknown" + else: + percent = 100.0 * progress[0] / progress[1] + + print(f"Transfer progress: {percent}% ({progress[0]} {event['unit']} / {progress[1]} {event['unit']})") + + +@dataclass +class ExamplePayload(Payload): + image_url: str = prop("golem.!exp.ai.v1.srv.comp.ai.model") + image_fmt: str = prop("golem.!exp.ai.v1.srv.comp.ai.model-format", default="safetensors") + + runtime: str = constraint(inf.INF_RUNTIME_NAME, default=RUNTIME_NAME) + capabilities: str = constraint(CAPABILITIES, default="dummy") + + +class ExampleService(Service): + # @staticmethod + # async def get_payload(): + # return ExamplePayload(image_url="hash:sha3:0b682cf78786b04dc108ff0b254db1511ef820105129ad021d2e123a7b975e7c:https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true") + + @staticmethod + async def get_payload(): + return await vm.repo( + image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", + min_mem_gib=0.5, + min_storage_gib=2.0, + ) + + async def start(self): + async for script in super().start(): + yield script + + async def run(self): + script = self._ctx.new_script(timeout=None) + + progress = ProgressArgs() + script.upload_from_url( + "hash:sha3:92180a67d096be309c5e6a7146d89aac4ef900e2bf48a52ea569df7d:https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0/resolve/main/sd_xl_base_1.0.safetensors?download=true", + "/golem/resource/model-big", progress_args=progress) + script.upload_from_url( + "hash:sha3:0b682cf78786b04dc108ff0b254db1511ef820105129ad021d2e123a7b975e7c:https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true", + "/golem/resource/model-small", progress_args=progress) + + yield script + + +async def main(subnet_tag, driver=None, network=None): + async with Golem( + budget=50.0, + subnet_tag=subnet_tag, + payment_driver=driver, + payment_network=network, + stream_output=True, + ) as golem: + cluster = await golem.run_service( + ExampleService, + num_instances=1, + ) + + golem.add_event_consumer(progress_event_handler, ["CommandProgress"]) + + def instances(): + return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances] + + while True: + await asyncio.sleep(3) + print(f"instances: {instances()}") + + +if __name__ == "__main__": + parser = build_parser("Run transfer progress example app") + now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S") + parser.set_defaults(log_file=f"progress-yapapi-{now}.log") + args = parser.parse_args() + + run_golem_example( + main( + subnet_tag=args.subnet_tag, + driver=args.payment_driver, + network=args.payment_network, + ), + log_file=args.log_file, + ) diff --git a/yapapi/events.py b/yapapi/events.py index e97a06127..96e40bb86 100644 --- a/yapapi/events.py +++ b/yapapi/events.py @@ -524,7 +524,10 @@ class CommandStdErr(CommandEvent): @attr.s(auto_attribs=True, repr=False) class CommandProgress(CommandEvent): - message: str + step: Tuple[int, int] + message: Optional[str] + progress: Tuple[int, Optional[int]] + unit: Optional[str] @attr.s(auto_attribs=True, repr=False) diff --git a/yapapi/rest/activity.py b/yapapi/rest/activity.py index 1a4541063..a382c75b6 100644 --- a/yapapi/rest/activity.py +++ b/yapapi/rest/activity.py @@ -300,17 +300,6 @@ async def __aiter__(self) -> AsyncIterator[CommandEventData]: if result.is_batch_finished: break - current_idx = last_idx - 1 - if current_idx >= 0: - current = results[current_idx] - - if current.message is not None: - kwargs = dict( - cmd_idx=current.index, - message=current.message, - ) - yield events.CommandProgress, kwargs - if not any_new: delay = min(3, max(0, self.seconds_left())) await asyncio.sleep(delay) @@ -394,6 +383,13 @@ def _message_event_to_event_data(msg_event: MessageEvent) -> CommandEventData: evt_cls = events.CommandStdErr kwargs["output"] = str(evt_data) or "" + elif evt_kind == "progress": + evt_cls = events.CommandProgress + kwargs["step"] = evt_data.get("step") + kwargs["message"] = evt_data.get("message") + kwargs["progress"] = evt_data.get("progress") + kwargs["unit"] = evt_data.get("unit") + else: raise RuntimeError(f"Unsupported runtime event: {evt_kind}") diff --git a/yapapi/script/__init__.py b/yapapi/script/__init__.py index 0f560a5b9..08e4f4453 100644 --- a/yapapi/script/__init__.py +++ b/yapapi/script/__init__.py @@ -19,7 +19,7 @@ SendFile, SendJson, Start, - Terminate, + Terminate, UploadFileFromInternet, ProgressArgs, ) from yapapi.storage import DOWNLOAD_BYTES_LIMIT_DEFAULT @@ -223,3 +223,12 @@ def upload_json(self, data: dict, dst_path: str) -> Awaitable[CommandExecuted]: :param dst_path: remote (provider) destination path """ return self.add(SendJson(data, dst_path)) + + def upload_from_url(self, src_url: str, dst_path: str, progress_args: Optional[ProgressArgs] = None) -> Awaitable[CommandExecuted]: + """Schedule sending a file to the provider. + + :param src_url: remote (internet) source url + :param dst_path: remote (provider) destination path + :param progress_args: Enables progress events + """ + return self.add(UploadFileFromInternet(src_url, dst_path, progress_args=progress_args)) diff --git a/yapapi/script/command.py b/yapapi/script/command.py index e4ff464e3..9c44b533c 100644 --- a/yapapi/script/command.py +++ b/yapapi/script/command.py @@ -13,7 +13,6 @@ if TYPE_CHECKING: from yapapi.script import Script - # For example: { "start": { "args": [] } } BatchCommand = Dict[str, Dict[str, Union[str, List[str]]]] @@ -92,11 +91,18 @@ def evaluate(self): return self._make_batch_command("terminate") +class ProgressArgs: + """Interval represented as human-readable duration string (examples: '5s' '10min')""" + update_interval: Optional[str] + update_step: Optional[int] + + class _SendContent(Command, abc.ABC): - def __init__(self, dst_path: str): + def __init__(self, dst_path: str, progress_args: Optional[ProgressArgs] = None): super().__init__() self._dst_path = dst_path self._src: Optional[Source] = None + self._progress = progress_args @abc.abstractmethod async def _do_upload(self, storage: StorageProvider) -> Source: @@ -105,7 +111,7 @@ async def _do_upload(self, storage: StorageProvider) -> Source: def evaluate(self): assert self._src return self._make_batch_command( - "transfer", _from=self._src.download_url, _to=f"container:{self._dst_path}" + "transfer", _from=self._src.download_url, _to=f"container:{self._dst_path}", _progress={} ) async def before(self): @@ -173,12 +179,12 @@ class Run(Command): """Command which schedules running a shell command on a provider.""" def __init__( - self, - cmd: str, - *args: str, - env: Optional[Dict[str, str]] = None, - stderr: CaptureContext = CaptureContext.build(mode="stream"), - stdout: CaptureContext = CaptureContext.build(mode="stream"), + self, + cmd: str, + *args: str, + env: Optional[Dict[str, str]] = None, + stderr: CaptureContext = CaptureContext.build(mode="stream"), + stdout: CaptureContext = CaptureContext.build(mode="stream"), ): """Create a new Run command. @@ -210,8 +216,8 @@ def __repr__(self): class _ReceiveContent(Command, abc.ABC): def __init__( - self, - src_path: str, + self, + src_path: str, ): super().__init__() self._src_path: str = src_path @@ -242,9 +248,9 @@ class DownloadFile(_ReceiveContent): """Command which schedules downloading a file from a provider.""" def __init__( - self, - src_path: str, - dst_path: str, + self, + src_path: str, + dst_path: str, ): """Create a new DownloadFile command. @@ -270,10 +276,10 @@ class DownloadBytes(_ReceiveContent): """Command which schedules downloading a file from a provider as bytes.""" def __init__( - self, - src_path: str, - on_download: Callable[[bytes], Awaitable], - limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, + self, + src_path: str, + on_download: Callable[[bytes], Awaitable], + limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, ): """Create a new DownloadBytes command. @@ -298,10 +304,10 @@ class DownloadJson(DownloadBytes): """Command which schedules downloading a file from a provider as JSON data.""" def __init__( - self, - src_path: str, - on_download: Callable[[Any], Awaitable], - limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, + self, + src_path: str, + on_download: Callable[[Any], Awaitable], + limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, ): """Create a new DownloadJson command. @@ -314,3 +320,35 @@ def __init__( @staticmethod async def __on_json_download(on_download: Callable[[bytes], Awaitable], content: bytes): await on_download(json.loads(content)) + + +class InternetSource(Source): + def __init__(self, url: str): + self._url = url + + @property + def download_url(self) -> str: + return self._url + + async def content_length(self) -> int: + return 0 + + +class UploadFileFromInternet(_SendContent): + def __init__(self, src_url: str, dst_path: str, progress_args: Optional[ProgressArgs] = None): + """Create a new UploadFileFromInternet command. + + :param src_url: remote (internet) source url + :param dst_path: remote (provider) destination path + """ + super().__init__(dst_path, progress_args=progress_args) + self._src_url = src_url + + async def _do_upload(self, storage: StorageProvider) -> Source: + return InternetSource(self._src_url) + + async def after(self) -> None: + pass + + def __repr__(self): + return f"{super().__repr__()} src={self._src_url}" From 2d313bea451664a7c592e188c401328df3bdf16c Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Thu, 11 Jan 2024 18:45:35 +0100 Subject: [PATCH 04/18] Progress bar --- examples/transfer-progress/progress.py | 50 +++++++++++++++++--------- pyproject.toml | 2 ++ 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 0c2222e50..30dea221b 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -4,6 +4,7 @@ import sys import json from dataclasses import dataclass +from alive_progress import alive_bar from yapapi.payload import Payload, vm from yapapi.props import inf @@ -36,17 +37,31 @@ CAPABILITIES = "golem.runtime.capabilities" -def progress_event_handler(event: "yapapi.events.CommandProgress"): - if event.progress is not None: - progress = event.progress +class ProgressDisplayer: + def __init__(self): + self._transfers_bars = {} + self._transfers_ctx = {} - percent = 0.0 - if progress[1] is None: - percent = "unknown" - else: - percent = 100.0 * progress[0] / progress[1] + def __enter__(self): + pass - print(f"Transfer progress: {percent}% ({progress[0]} {event['unit']} / {progress[1]} {event['unit']})") + def __exit__(self, exc_type, exc_val, exc_tb): + for key, bar in self._transfers_ctx: + bar.__exit__(exc_type, exc_val, exc_tb) + + def progress_bar(self, event: "yapapi.events.CommandProgress"): + if event.progress is not None: + progress = event.progress + + if progress[1] is not None: + if self._transfers_ctx.get(event.script_id) is None: + bar = alive_bar(total=progress[1], manual=True, title="Uploading file", unit=event.unit) + bar_ctx = bar.__enter__() + self._transfers_bars[event.script_id] = bar + self._transfers_ctx[event.script_id] = bar_ctx + + bar = self._transfers_ctx.get(event.script_id) + bar(progress[0] / progress[1]) @dataclass @@ -76,16 +91,17 @@ async def start(self): yield script async def run(self): - script = self._ctx.new_script(timeout=None) - progress = ProgressArgs() + script = self._ctx.new_script(timeout=None) script.upload_from_url( "hash:sha3:92180a67d096be309c5e6a7146d89aac4ef900e2bf48a52ea569df7d:https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0/resolve/main/sd_xl_base_1.0.safetensors?download=true", "/golem/resource/model-big", progress_args=progress) + yield script + + script = self._ctx.new_script(timeout=None) script.upload_from_url( "hash:sha3:0b682cf78786b04dc108ff0b254db1511ef820105129ad021d2e123a7b975e7c:https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true", "/golem/resource/model-small", progress_args=progress) - yield script @@ -97,19 +113,19 @@ async def main(subnet_tag, driver=None, network=None): payment_network=network, stream_output=True, ) as golem: + bar = ProgressDisplayer() cluster = await golem.run_service( ExampleService, num_instances=1, ) - golem.add_event_consumer(progress_event_handler, ["CommandProgress"]) + def progress_event_handler(event: "yapapi.events.CommandProgress"): + bar.progress_bar(event) - def instances(): - return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances] + golem.add_event_consumer(progress_event_handler, ["CommandProgress"]) while True: - await asyncio.sleep(3) - print(f"instances: {instances()}") + await asyncio.sleep(8) if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index d1ef20e5f..aa76dac09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,8 @@ python-statemachine = "^0.8.0" setuptools = "*" pip = "*" +alive_progress = "3.1" + # Docs sphinx = { version = "^4.0.1", optional = true } sphinx-autodoc-typehints = { version = "^1.12.0", optional = true } From fd8d01c585ecc99c40b6be9f12e0c14d59e3713b Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Fri, 12 Jan 2024 13:32:08 +0100 Subject: [PATCH 05/18] Progress example handles progress bar for 2 transfers --- examples/blender/blender.py | 2 +- examples/transfer-progress/progress.py | 40 +++++++++++++++++++------- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/examples/blender/blender.py b/examples/blender/blender.py index b445f9c52..c1aa84a66 100755 --- a/examples/blender/blender.py +++ b/examples/blender/blender.py @@ -125,7 +125,7 @@ async def worker(ctx: WorkContext, tasks): worker, [Task(data=frame) for frame in frames], payload=package, - max_workers=3, + max_workers=6, timeout=timeout, ) async for task in completed_tasks: diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 30dea221b..cf53db445 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -42,12 +42,9 @@ def __init__(self): self._transfers_bars = {} self._transfers_ctx = {} - def __enter__(self): - pass - - def __exit__(self, exc_type, exc_val, exc_tb): + def exit(self): for key, bar in self._transfers_ctx: - bar.__exit__(exc_type, exc_val, exc_tb) + bar.__exit__(None, None, None) def progress_bar(self, event: "yapapi.events.CommandProgress"): if event.progress is not None: @@ -55,14 +52,28 @@ def progress_bar(self, event: "yapapi.events.CommandProgress"): if progress[1] is not None: if self._transfers_ctx.get(event.script_id) is None: - bar = alive_bar(total=progress[1], manual=True, title="Uploading file", unit=event.unit) + bar = alive_bar(total=progress[1], manual=True, title="Progress", unit=event.unit, scale=True, + dual_line=True) bar_ctx = bar.__enter__() + bar_ctx.text = f"Uploading file: {event.command._src_url} -> {event.command._dst_path}" + self._transfers_bars[event.script_id] = bar self._transfers_ctx[event.script_id] = bar_ctx bar = self._transfers_ctx.get(event.script_id) bar(progress[0] / progress[1]) + def executed(self, event: "yapapi.events.CommandExecuted"): + if self._transfers_ctx.get(event.script_id) is not None: + bar_obj = self._transfers_bars.get(event.script_id) + bar = self._transfers_ctx.get(event.script_id) + + bar(1.0) + bar_obj.__exit__(None, None, None) + + self._transfers_bars.pop(event.script_id) + self._transfers_ctx.pop(event.script_id) + @dataclass class ExamplePayload(Payload): @@ -92,16 +103,17 @@ async def start(self): async def run(self): progress = ProgressArgs() + script = self._ctx.new_script(timeout=None) script.upload_from_url( - "hash:sha3:92180a67d096be309c5e6a7146d89aac4ef900e2bf48a52ea569df7d:https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0/resolve/main/sd_xl_base_1.0.safetensors?download=true", - "/golem/resource/model-big", progress_args=progress) + "https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true", + "/golem/resource/model-small", progress_args=progress) yield script script = self._ctx.new_script(timeout=None) script.upload_from_url( - "hash:sha3:0b682cf78786b04dc108ff0b254db1511ef820105129ad021d2e123a7b975e7c:https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true", - "/golem/resource/model-small", progress_args=progress) + "https://registry.golem.network/v1/image/download?tag=golem/ray-on-golem:0.6.1-py3.10.13-ray2.7.1&https=true", + "/golem/resource/model-big", progress_args=progress) yield script @@ -122,7 +134,15 @@ async def main(subnet_tag, driver=None, network=None): def progress_event_handler(event: "yapapi.events.CommandProgress"): bar.progress_bar(event) + def on_shutdown(_event: "yapapi.events.ServiceFinished"): + bar.exit() + + def on_command_executed(event: "yapapi.events.CommandExecuted"): + bar.executed(event) + golem.add_event_consumer(progress_event_handler, ["CommandProgress"]) + golem.add_event_consumer(on_shutdown, ["ServiceFinished"]) + golem.add_event_consumer(on_command_executed, ["CommandExecuted"]) while True: await asyncio.sleep(8) From 7a06babdb9124a792589683c451de22ca99d0133 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Fri, 12 Jan 2024 16:43:49 +0100 Subject: [PATCH 06/18] Progress of gftp download/upload --- examples/transfer-progress/progress.py | 66 +++++++++++++++----------- yapapi/script/__init__.py | 10 ++-- yapapi/script/command.py | 2 +- 3 files changed, 46 insertions(+), 32 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index cf53db445..882272603 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -2,12 +2,14 @@ import pathlib import sys +import os import json from dataclasses import dataclass from alive_progress import alive_bar +import yapapi.script.command from yapapi.payload import Payload, vm -from yapapi.props import inf +from yapapi.payload.vm import _VmPackage from yapapi.props.base import constraint, prop from yapapi.script import ProgressArgs from yapapi.services import Service @@ -23,19 +25,12 @@ sys.path.append(str(examples_dir)) from utils import ( - TEXT_COLOR_CYAN, - TEXT_COLOR_DEFAULT, - TEXT_COLOR_MAGENTA, - TEXT_COLOR_YELLOW, build_parser, format_usage, print_env_info, run_golem_example, ) -RUNTIME_NAME = "ai" -CAPABILITIES = "golem.runtime.capabilities" - class ProgressDisplayer: def __init__(self): @@ -47,6 +42,9 @@ def exit(self): bar.__exit__(None, None, None) def progress_bar(self, event: "yapapi.events.CommandProgress"): + if event.message is not None: + print(f"{event.message}") + if event.progress is not None: progress = event.progress @@ -55,7 +53,13 @@ def progress_bar(self, event: "yapapi.events.CommandProgress"): bar = alive_bar(total=progress[1], manual=True, title="Progress", unit=event.unit, scale=True, dual_line=True) bar_ctx = bar.__enter__() - bar_ctx.text = f"Uploading file: {event.command._src_url} -> {event.command._dst_path}" + + if isinstance(event.command, yapapi.script.command.Deploy): + bar_ctx.text = f"Deploying image" + elif isinstance(event.command, yapapi.script.command._SendContent): + bar_ctx.text = f"Uploading file: {event.command._src.download_url} -> {event.command._dst_path}" + elif isinstance(event.command, yapapi.script.command._ReceiveContent): + bar_ctx.text = f"Downloading file: {event.command._src_path} -> {event.command._dst_path}" self._transfers_bars[event.script_id] = bar self._transfers_ctx[event.script_id] = bar_ctx @@ -74,32 +78,27 @@ def executed(self, event: "yapapi.events.CommandExecuted"): self._transfers_bars.pop(event.script_id) self._transfers_ctx.pop(event.script_id) - @dataclass -class ExamplePayload(Payload): - image_url: str = prop("golem.!exp.ai.v1.srv.comp.ai.model") - image_fmt: str = prop("golem.!exp.ai.v1.srv.comp.ai.model-format", default="safetensors") - - runtime: str = constraint(inf.INF_RUNTIME_NAME, default=RUNTIME_NAME) - capabilities: str = constraint(CAPABILITIES, default="dummy") +class ExamplePayload(_VmPackage): + progress_capability: bool = constraint("golem.activity.caps.transfer.report-progress", operator="=", default=True) class ExampleService(Service): - # @staticmethod - # async def get_payload(): - # return ExamplePayload(image_url="hash:sha3:0b682cf78786b04dc108ff0b254db1511ef820105129ad021d2e123a7b975e7c:https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true") - @staticmethod async def get_payload(): - return await vm.repo( + package = await vm.repo( image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", min_mem_gib=0.5, - min_storage_gib=2.0, + min_storage_gib=10.0, ) + return ExamplePayload(image_url=package.image_url, constraints=package.constraints, progress_capability=True) async def start(self): - async for script in super().start(): - yield script + script = self._ctx.new_script(timeout=None) + script.deploy(progress={}) + script.start() + + yield script async def run(self): progress = ProgressArgs() @@ -111,11 +110,24 @@ async def run(self): yield script script = self._ctx.new_script(timeout=None) - script.upload_from_url( - "https://registry.golem.network/v1/image/download?tag=golem/ray-on-golem:0.6.1-py3.10.13-ray2.7.1&https=true", - "/golem/resource/model-big", progress_args=progress) + script.upload_bytes( + os.urandom(40 * 1024 * 1024), + "/golem/resource/bytes.bin", progress_args=progress) yield script + script = self._ctx.new_script(timeout=None) + script.download_file( + "/golem/resource/bytes.bin", "download.bin", progress_args=progress) + yield script + + # script = self._ctx.new_script(timeout=None) + # script.upload_from_url( + # "https://registry.golem.network/v1/image/download?tag=golem/ray-on-golem:0.6.1-py3.10.13-ray2.7.1&https=true", + # "/golem/resource/model-big", progress_args=progress) + # yield script + + self.cluster.stop() + async def main(subnet_tag, driver=None, network=None): async with Golem( diff --git a/yapapi/script/__init__.py b/yapapi/script/__init__.py index 08e4f4453..8fc7e820f 100644 --- a/yapapi/script/__init__.py +++ b/yapapi/script/__init__.py @@ -169,6 +169,7 @@ def download_bytes( src_path: str, on_download: Callable[[bytes], Awaitable], limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, + **kwargs ) -> Awaitable[CommandExecuted]: """Schedule downloading a remote file from the provider as bytes. @@ -178,7 +179,7 @@ def download_bytes( """ return self.add(DownloadBytes(src_path, on_download, limit)) - def download_file(self, src_path: str, dst_path: str) -> Awaitable[CommandExecuted]: + def download_file(self, src_path: str, dst_path: str, **kwargs) -> Awaitable[CommandExecuted]: """Schedule downloading a remote file from the provider. :param src_path: remote (provider) source path @@ -191,6 +192,7 @@ def download_json( src_path: str, on_download: Callable[[Any], Awaitable], limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, + **kwargs ) -> Awaitable[CommandExecuted]: """Schedule downloading a remote file from the provider as JSON. @@ -200,7 +202,7 @@ def download_json( """ return self.add(DownloadJson(src_path, on_download, limit)) - def upload_bytes(self, data: bytes, dst_path: str) -> Awaitable[CommandExecuted]: + def upload_bytes(self, data: bytes, dst_path: str, **kwargs) -> Awaitable[CommandExecuted]: """Schedule sending bytes data to the provider. :param data: bytes to send @@ -208,7 +210,7 @@ def upload_bytes(self, data: bytes, dst_path: str) -> Awaitable[CommandExecuted] """ return self.add(SendBytes(data, dst_path)) - def upload_file(self, src_path: str, dst_path: str) -> Awaitable[CommandExecuted]: + def upload_file(self, src_path: str, dst_path: str, **kwargs) -> Awaitable[CommandExecuted]: """Schedule sending a file to the provider. :param src_path: local (requestor) source path @@ -216,7 +218,7 @@ def upload_file(self, src_path: str, dst_path: str) -> Awaitable[CommandExecuted """ return self.add(SendFile(src_path, dst_path)) - def upload_json(self, data: dict, dst_path: str) -> Awaitable[CommandExecuted]: + def upload_json(self, data: dict, dst_path: str, **kwargs) -> Awaitable[CommandExecuted]: """Schedule sending JSON data to the provider. :param data: dictionary representing JSON data to send diff --git a/yapapi/script/command.py b/yapapi/script/command.py index 9c44b533c..5b820f900 100644 --- a/yapapi/script/command.py +++ b/yapapi/script/command.py @@ -227,7 +227,7 @@ def __init__( def evaluate(self): assert self._dst_slot return self._make_batch_command( - "transfer", _from=f"container:{self._src_path}", to=self._dst_slot.upload_url + "transfer", _from=f"container:{self._src_path}", to=self._dst_slot.upload_url, _progress={} ) async def before(self): From f8bfd31b45668cc13213b89d5eb05b5c75d29af2 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Fri, 12 Jan 2024 17:18:23 +0100 Subject: [PATCH 07/18] Handle progress for commands in one script --- examples/transfer-progress/progress.py | 51 +++++++++++++++----------- yapapi/script/__init__.py | 8 ++-- yapapi/script/command.py | 6 ++- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 882272603..59e60e2f1 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -32,6 +32,10 @@ ) +def command_key(event: "yapapi.events.CommandProgress") -> str: + return f"{event.script_id}#{event.command._index}" + + class ProgressDisplayer: def __init__(self): self._transfers_bars = {} @@ -49,7 +53,8 @@ def progress_bar(self, event: "yapapi.events.CommandProgress"): progress = event.progress if progress[1] is not None: - if self._transfers_ctx.get(event.script_id) is None: + key = command_key(event) + if self._transfers_ctx.get(key) is None: bar = alive_bar(total=progress[1], manual=True, title="Progress", unit=event.unit, scale=True, dual_line=True) bar_ctx = bar.__enter__() @@ -61,22 +66,24 @@ def progress_bar(self, event: "yapapi.events.CommandProgress"): elif isinstance(event.command, yapapi.script.command._ReceiveContent): bar_ctx.text = f"Downloading file: {event.command._src_path} -> {event.command._dst_path}" - self._transfers_bars[event.script_id] = bar - self._transfers_ctx[event.script_id] = bar_ctx + self._transfers_bars[key] = bar + self._transfers_ctx[key] = bar_ctx - bar = self._transfers_ctx.get(event.script_id) + bar = self._transfers_ctx.get(key) bar(progress[0] / progress[1]) def executed(self, event: "yapapi.events.CommandExecuted"): - if self._transfers_ctx.get(event.script_id) is not None: - bar_obj = self._transfers_bars.get(event.script_id) - bar = self._transfers_ctx.get(event.script_id) + key = command_key(event) + if self._transfers_ctx.get(key) is not None: + bar_obj = self._transfers_bars.get(key) + bar = self._transfers_ctx.get(key) bar(1.0) bar_obj.__exit__(None, None, None) - self._transfers_bars.pop(event.script_id) - self._transfers_ctx.pop(event.script_id) + self._transfers_bars.pop(key) + self._transfers_ctx.pop(key) + @dataclass class ExamplePayload(_VmPackage): @@ -104,29 +111,27 @@ async def run(self): progress = ProgressArgs() script = self._ctx.new_script(timeout=None) - script.upload_from_url( + script.download_from_url( "https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true", "/golem/resource/model-small", progress_args=progress) - yield script - - script = self._ctx.new_script(timeout=None) script.upload_bytes( os.urandom(40 * 1024 * 1024), "/golem/resource/bytes.bin", progress_args=progress) - yield script - - script = self._ctx.new_script(timeout=None) script.download_file( "/golem/resource/bytes.bin", "download.bin", progress_args=progress) - yield script # script = self._ctx.new_script(timeout=None) # script.upload_from_url( # "https://registry.golem.network/v1/image/download?tag=golem/ray-on-golem:0.6.1-py3.10.13-ray2.7.1&https=true", # "/golem/resource/model-big", progress_args=progress) - # yield script - self.cluster.stop() + yield script + + os.remove("download.bin") + await self.cluster.terminate() + + +shutdown = False async def main(subnet_tag, driver=None, network=None): @@ -137,6 +142,8 @@ async def main(subnet_tag, driver=None, network=None): payment_network=network, stream_output=True, ) as golem: + global shutdown + bar = ProgressDisplayer() cluster = await golem.run_service( ExampleService, @@ -147,7 +154,9 @@ def progress_event_handler(event: "yapapi.events.CommandProgress"): bar.progress_bar(event) def on_shutdown(_event: "yapapi.events.ServiceFinished"): + global shutdown bar.exit() + shutdown = True def on_command_executed(event: "yapapi.events.CommandExecuted"): bar.executed(event) @@ -156,8 +165,8 @@ def on_command_executed(event: "yapapi.events.CommandExecuted"): golem.add_event_consumer(on_shutdown, ["ServiceFinished"]) golem.add_event_consumer(on_command_executed, ["CommandExecuted"]) - while True: - await asyncio.sleep(8) + while not shutdown: + await asyncio.sleep(1) if __name__ == "__main__": diff --git a/yapapi/script/__init__.py b/yapapi/script/__init__.py index 8fc7e820f..b7894a195 100644 --- a/yapapi/script/__init__.py +++ b/yapapi/script/__init__.py @@ -19,7 +19,7 @@ SendFile, SendJson, Start, - Terminate, UploadFileFromInternet, ProgressArgs, + Terminate, DownloadFileFromInternet, ProgressArgs, ) from yapapi.storage import DOWNLOAD_BYTES_LIMIT_DEFAULT @@ -125,7 +125,7 @@ async def _before(self): def add(self, cmd: Command) -> Awaitable[CommandExecuted]: """Add a :class:`yapapi.script.command.Command` to the :class:`Script`.""" self._commands.append(cmd) - cmd._set_script(self) + cmd._set_script(self, len(self._commands) - 1) return cmd._result def deploy(self, **kwargs: dict) -> Awaitable[CommandExecuted]: @@ -226,11 +226,11 @@ def upload_json(self, data: dict, dst_path: str, **kwargs) -> Awaitable[CommandE """ return self.add(SendJson(data, dst_path)) - def upload_from_url(self, src_url: str, dst_path: str, progress_args: Optional[ProgressArgs] = None) -> Awaitable[CommandExecuted]: + def download_from_url(self, src_url: str, dst_path: str, progress_args: Optional[ProgressArgs] = None) -> Awaitable[CommandExecuted]: """Schedule sending a file to the provider. :param src_url: remote (internet) source url :param dst_path: remote (provider) destination path :param progress_args: Enables progress events """ - return self.add(UploadFileFromInternet(src_url, dst_path, progress_args=progress_args)) + return self.add(DownloadFileFromInternet(src_url, dst_path, progress_args=progress_args)) diff --git a/yapapi/script/command.py b/yapapi/script/command.py index 5b820f900..ed9fdd12d 100644 --- a/yapapi/script/command.py +++ b/yapapi/script/command.py @@ -37,10 +37,12 @@ def _make_batch_command(cmd_name: str, **kwargs) -> BatchCommand: def __init__(self): self._result: asyncio.Future = asyncio.get_event_loop().create_future() self._script: Optional["Script"] = None + self._index: int = None - def _set_script(self, script: "Script") -> None: + def _set_script(self, script: "Script", index: int) -> None: assert self._script is None, f"Command {self} already belongs to a script {self._script}" self._script = script + self._index = index def emit(self, event_class: Type[CommandEventType], **kwargs) -> CommandEventType: if self._script is None: @@ -334,7 +336,7 @@ async def content_length(self) -> int: return 0 -class UploadFileFromInternet(_SendContent): +class DownloadFileFromInternet(_SendContent): def __init__(self, src_url: str, dst_path: str, progress_args: Optional[ProgressArgs] = None): """Create a new UploadFileFromInternet command. From a26cd27876d1550733fa6cdb34bc1d3da0034809 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Fri, 12 Jan 2024 17:20:59 +0100 Subject: [PATCH 08/18] Remove too big downloads --- examples/transfer-progress/progress.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 59e60e2f1..967ca17e3 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -119,12 +119,6 @@ async def run(self): "/golem/resource/bytes.bin", progress_args=progress) script.download_file( "/golem/resource/bytes.bin", "download.bin", progress_args=progress) - - # script = self._ctx.new_script(timeout=None) - # script.upload_from_url( - # "https://registry.golem.network/v1/image/download?tag=golem/ray-on-golem:0.6.1-py3.10.13-ray2.7.1&https=true", - # "/golem/resource/model-big", progress_args=progress) - yield script os.remove("download.bin") From 84513e65900c971d286aa68f3df0712bd6e3350a Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Mon, 15 Jan 2024 18:07:31 +0100 Subject: [PATCH 09/18] Pass progress args to script commands --- examples/transfer-progress/progress.py | 5 +-- yapapi/script/__init__.py | 12 +++--- yapapi/script/command.py | 51 ++++++++++++++++---------- 3 files changed, 40 insertions(+), 28 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 967ca17e3..7c2153dfc 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -3,14 +3,13 @@ import pathlib import sys import os -import json from dataclasses import dataclass from alive_progress import alive_bar import yapapi.script.command -from yapapi.payload import Payload, vm +from yapapi.payload import vm from yapapi.payload.vm import _VmPackage -from yapapi.props.base import constraint, prop +from yapapi.props.base import constraint from yapapi.script import ProgressArgs from yapapi.services import Service diff --git a/yapapi/script/__init__.py b/yapapi/script/__init__.py index b7894a195..f2d23ed30 100644 --- a/yapapi/script/__init__.py +++ b/yapapi/script/__init__.py @@ -177,7 +177,7 @@ def download_bytes( :param on_download: the callable to run on the received data :param limit: limit of bytes to be downloaded (expected size) """ - return self.add(DownloadBytes(src_path, on_download, limit)) + return self.add(DownloadBytes(src_path, on_download, limit, **kwargs)) def download_file(self, src_path: str, dst_path: str, **kwargs) -> Awaitable[CommandExecuted]: """Schedule downloading a remote file from the provider. @@ -185,7 +185,7 @@ def download_file(self, src_path: str, dst_path: str, **kwargs) -> Awaitable[Com :param src_path: remote (provider) source path :param dst_path: local (requestor) destination path """ - return self.add(DownloadFile(src_path, dst_path)) + return self.add(DownloadFile(src_path, dst_path, **kwargs)) def download_json( self, @@ -200,7 +200,7 @@ def download_json( :param on_download: the callable to run on the received data :param limit: limit of bytes to be downloaded (expected size) """ - return self.add(DownloadJson(src_path, on_download, limit)) + return self.add(DownloadJson(src_path, on_download, limit, **kwargs)) def upload_bytes(self, data: bytes, dst_path: str, **kwargs) -> Awaitable[CommandExecuted]: """Schedule sending bytes data to the provider. @@ -208,7 +208,7 @@ def upload_bytes(self, data: bytes, dst_path: str, **kwargs) -> Awaitable[Comman :param data: bytes to send :param dst_path: remote (provider) destination path """ - return self.add(SendBytes(data, dst_path)) + return self.add(SendBytes(data, dst_path, **kwargs)) def upload_file(self, src_path: str, dst_path: str, **kwargs) -> Awaitable[CommandExecuted]: """Schedule sending a file to the provider. @@ -216,7 +216,7 @@ def upload_file(self, src_path: str, dst_path: str, **kwargs) -> Awaitable[Comma :param src_path: local (requestor) source path :param dst_path: remote (provider) destination path """ - return self.add(SendFile(src_path, dst_path)) + return self.add(SendFile(src_path, dst_path, **kwargs)) def upload_json(self, data: dict, dst_path: str, **kwargs) -> Awaitable[CommandExecuted]: """Schedule sending JSON data to the provider. @@ -224,7 +224,7 @@ def upload_json(self, data: dict, dst_path: str, **kwargs) -> Awaitable[CommandE :param data: dictionary representing JSON data to send :param dst_path: remote (provider) destination path """ - return self.add(SendJson(data, dst_path)) + return self.add(SendJson(data, dst_path, **kwargs)) def download_from_url(self, src_url: str, dst_path: str, progress_args: Optional[ProgressArgs] = None) -> Awaitable[CommandExecuted]: """Schedule sending a file to the provider. diff --git a/yapapi/script/command.py b/yapapi/script/command.py index ed9fdd12d..a4a4842f6 100644 --- a/yapapi/script/command.py +++ b/yapapi/script/command.py @@ -5,6 +5,7 @@ from os import PathLike from pathlib import Path from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Type, Union +import attr from yapapi.events import CommandEventType, DownloadFinished, DownloadStarted from yapapi.script.capture import CaptureContext @@ -58,18 +59,27 @@ def __repr__(self): return f"{self.__class__.__name__}" +@attr.s(auto_attribs=True, repr=False) +class ProgressArgs: + """Interval represented as human-readable duration string (examples: '5s' '10min')""" + update_interval: Optional[str] = attr.field(init=False, default=None) + update_step: Optional[int] = attr.field(init=False, default=None) + + class Deploy(Command): """Command which deploys a given runtime on the provider.""" - def __init__(self, **kwargs: dict): + def __init__(self, progress_args: Optional[ProgressArgs] = None, **kwargs): super().__init__() self.kwargs = kwargs + self._progress = progress_args def __repr__(self): return f"{super().__repr__()} {self.kwargs}" def evaluate(self): - return self._make_batch_command("deploy", **self.kwargs) + kwargs = dict(self.kwargs, progress=attr.asdict(self._progress)) if self._progress else self.kwargs + return self._make_batch_command("deploy", **kwargs) class Start(Command): @@ -93,12 +103,6 @@ def evaluate(self): return self._make_batch_command("terminate") -class ProgressArgs: - """Interval represented as human-readable duration string (examples: '5s' '10min')""" - update_interval: Optional[str] - update_step: Optional[int] - - class _SendContent(Command, abc.ABC): def __init__(self, dst_path: str, progress_args: Optional[ProgressArgs] = None): super().__init__() @@ -112,8 +116,10 @@ async def _do_upload(self, storage: StorageProvider) -> Source: def evaluate(self): assert self._src + + kwargs = {"progress": attr.asdict(self._progress)} if self._progress else {} return self._make_batch_command( - "transfer", _from=self._src.download_url, _to=f"container:{self._dst_path}", _progress={} + "transfer", _from=self._src.download_url, _to=f"container:{self._dst_path}", **kwargs ) async def before(self): @@ -130,13 +136,13 @@ def __repr__(self): class SendBytes(_SendContent): """Command which schedules sending bytes data to a provider.""" - def __init__(self, data: bytes, dst_path: str): + def __init__(self, data: bytes, dst_path: str, progress_args: Optional[ProgressArgs] = None): """Create a new SendBytes command. :param data: bytes to send :param dst_path: remote (provider) destination path """ - super().__init__(dst_path) + super().__init__(dst_path, progress_args=progress_args) self._data: Optional[bytes] = data async def _do_upload(self, storage: StorageProvider) -> Source: @@ -149,25 +155,25 @@ async def _do_upload(self, storage: StorageProvider) -> Source: class SendJson(SendBytes): """Command which schedules sending JSON data to a provider.""" - def __init__(self, data: dict, dst_path: str): + def __init__(self, data: dict, dst_path: str, progress_args: Optional[ProgressArgs] = None): """Create a new SendJson command. :param data: dictionary representing JSON data to send :param dst_path: remote (provider) destination path """ - super().__init__(json.dumps(data).encode(encoding="utf-8"), dst_path) + super().__init__(json.dumps(data).encode(encoding="utf-8"), dst_path, progress_args=progress_args) class SendFile(_SendContent): """Command which schedules sending a file to a provider.""" - def __init__(self, src_path: str, dst_path: str): + def __init__(self, src_path: str, dst_path: str, progress_args: Optional[ProgressArgs] = None): """Create a new SendFile command. :param src_path: local (requestor) source path :param dst_path: remote (provider) destination path """ - super(SendFile, self).__init__(dst_path) + super(SendFile, self).__init__(dst_path, progress_args=progress_args) self._src_path = Path(src_path) async def _do_upload(self, storage: StorageProvider) -> Source: @@ -220,16 +226,20 @@ class _ReceiveContent(Command, abc.ABC): def __init__( self, src_path: str, + progress_args: Optional[ProgressArgs] = None ): super().__init__() self._src_path: str = src_path self._dst_slot: Optional[Destination] = None self._dst_path: Optional[PathLike] = None + self._progress = progress_args def evaluate(self): assert self._dst_slot + + kwargs = {"progress": attr.asdict(self._progress)} if self._progress else {} return self._make_batch_command( - "transfer", _from=f"container:{self._src_path}", to=self._dst_slot.upload_url, _progress={} + "transfer", _from=f"container:{self._src_path}", to=self._dst_slot.upload_url, **kwargs ) async def before(self): @@ -253,13 +263,14 @@ def __init__( self, src_path: str, dst_path: str, + progress_args: Optional[ProgressArgs] = None, ): """Create a new DownloadFile command. :param src_path: remote (provider) source path :param dst_path: local (requestor) destination path """ - super().__init__(src_path) + super().__init__(src_path, progress_args=progress_args) self._dst_path = Path(dst_path) async def after(self) -> None: @@ -282,6 +293,7 @@ def __init__( src_path: str, on_download: Callable[[bytes], Awaitable], limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, + progress_args: Optional[ProgressArgs] = None, ): """Create a new DownloadBytes command. @@ -289,7 +301,7 @@ def __init__( :param on_download: the callable to run on the received data :param limit: limit of bytes to be downloaded (expected size) """ - super().__init__(src_path) + super().__init__(src_path, progress_args=progress_args) self._on_download = on_download self._limit = limit @@ -310,6 +322,7 @@ def __init__( src_path: str, on_download: Callable[[Any], Awaitable], limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, + progress_args: Optional[ProgressArgs] = None, ): """Create a new DownloadJson command. @@ -317,7 +330,7 @@ def __init__( :param on_download: the callable to run on the received data :param limit: limit of bytes to be downloaded (expected size) """ - super().__init__(src_path, partial(self.__on_json_download, on_download), limit) + super().__init__(src_path, partial(self.__on_json_download, on_download), limit, progress_args=progress_args) @staticmethod async def __on_json_download(on_download: Callable[[bytes], Awaitable], content: bytes): From 557dccd6a0252e69cc9813893ccb027491fcbc8b Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Mon, 15 Jan 2024 19:06:10 +0100 Subject: [PATCH 10/18] Fix serialization of ProgressArgs --- examples/transfer-progress/progress.py | 2 +- yapapi/script/command.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 7c2153dfc..f96aaba5b 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -107,7 +107,7 @@ async def start(self): yield script async def run(self): - progress = ProgressArgs() + progress = ProgressArgs(updateInterval="5s") script = self._ctx.new_script(timeout=None) script.download_from_url( diff --git a/yapapi/script/command.py b/yapapi/script/command.py index a4a4842f6..c42fb11f0 100644 --- a/yapapi/script/command.py +++ b/yapapi/script/command.py @@ -62,8 +62,8 @@ def __repr__(self): @attr.s(auto_attribs=True, repr=False) class ProgressArgs: """Interval represented as human-readable duration string (examples: '5s' '10min')""" - update_interval: Optional[str] = attr.field(init=False, default=None) - update_step: Optional[int] = attr.field(init=False, default=None) + updateInterval: Optional[str] = attr.field(default=None) + updateStep: Optional[int] = attr.field(default=None) class Deploy(Command): From 72a46e37a34eee06fe273885b8780969bf702f71 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Mon, 15 Jan 2024 19:08:31 +0100 Subject: [PATCH 11/18] Set reasonable update interval --- examples/transfer-progress/progress.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index f96aaba5b..bea6b3787 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -107,7 +107,7 @@ async def start(self): yield script async def run(self): - progress = ProgressArgs(updateInterval="5s") + progress = ProgressArgs(updateInterval="300ms") script = self._ctx.new_script(timeout=None) script.download_from_url( From 9c91adb2a269629fef1dcee5d199eb280383fade Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Tue, 16 Jan 2024 15:08:04 +0100 Subject: [PATCH 12/18] Fix deploy arguments passing; Fix style checks --- examples/transfer-progress/progress.py | 101 +++++++++++++------------ pyproject.toml | 1 + yapapi/script/__init__.py | 18 +++-- yapapi/script/command.py | 69 +++++++++-------- yapapi/services/service_runner.py | 1 - 5 files changed, 106 insertions(+), 84 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index bea6b3787..4794d5403 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -1,34 +1,26 @@ #!/usr/bin/env python3 +import asyncio +import os import pathlib import sys -import os -from dataclasses import dataclass +from datetime import datetime + from alive_progress import alive_bar +from dataclasses import dataclass import yapapi.script.command +from yapapi import Golem from yapapi.payload import vm from yapapi.payload.vm import _VmPackage from yapapi.props.base import constraint from yapapi.script import ProgressArgs from yapapi.services import Service -import asyncio -from datetime import datetime - -import colorama # type: ignore - -from yapapi import Golem - examples_dir = pathlib.Path(__file__).resolve().parent.parent sys.path.append(str(examples_dir)) -from utils import ( - build_parser, - format_usage, - print_env_info, - run_golem_example, -) +from utils import build_parser, run_golem_example def command_key(event: "yapapi.events.CommandProgress") -> str: @@ -48,28 +40,38 @@ def progress_bar(self, event: "yapapi.events.CommandProgress"): if event.message is not None: print(f"{event.message}") - if event.progress is not None: + if event.progress is not None and event.progress[1] is not None: progress = event.progress + key = command_key(event) - if progress[1] is not None: - key = command_key(event) - if self._transfers_ctx.get(key) is None: - bar = alive_bar(total=progress[1], manual=True, title="Progress", unit=event.unit, scale=True, - dual_line=True) - bar_ctx = bar.__enter__() + if self._transfers_ctx.get(key) is None: + self.create_progress_bar(event) - if isinstance(event.command, yapapi.script.command.Deploy): - bar_ctx.text = f"Deploying image" - elif isinstance(event.command, yapapi.script.command._SendContent): - bar_ctx.text = f"Uploading file: {event.command._src.download_url} -> {event.command._dst_path}" - elif isinstance(event.command, yapapi.script.command._ReceiveContent): - bar_ctx.text = f"Downloading file: {event.command._src_path} -> {event.command._dst_path}" + bar = self._transfers_ctx.get(key) + bar(progress[0] / progress[1]) - self._transfers_bars[key] = bar - self._transfers_ctx[key] = bar_ctx + def create_progress_bar(self, event: "yapapi.events.CommandProgress"): + key = command_key(event) + bar = alive_bar( + total=event.progress[1], + manual=True, + title="Progress", + unit=event.unit, + scale=True, + dual_line=True, + ) + bar_ctx = bar.__enter__() - bar = self._transfers_ctx.get(key) - bar(progress[0] / progress[1]) + command = event.command + if isinstance(command, yapapi.script.command.Deploy): + bar_ctx.text = "Deploying image" + elif isinstance(command, yapapi.script.command._SendContent): + bar_ctx.text = f"Uploading file: {command._src.download_url} -> {command._dst_path}" + elif isinstance(command, yapapi.script.command._ReceiveContent): + bar_ctx.text = f"Downloading file: {command._src_path} -> {command._dst_path}" + + self._transfers_bars[key] = bar + self._transfers_ctx[key] = bar_ctx def executed(self, event: "yapapi.events.CommandExecuted"): key = command_key(event) @@ -86,7 +88,9 @@ def executed(self, event: "yapapi.events.CommandExecuted"): @dataclass class ExamplePayload(_VmPackage): - progress_capability: bool = constraint("golem.activity.caps.transfer.report-progress", operator="=", default=True) + progress_capability: bool = constraint( + "golem.activity.caps.transfer.report-progress", operator="=", default=True + ) class ExampleService(Service): @@ -97,11 +101,13 @@ async def get_payload(): min_mem_gib=0.5, min_storage_gib=10.0, ) - return ExamplePayload(image_url=package.image_url, constraints=package.constraints, progress_capability=True) + return ExamplePayload( + image_url=package.image_url, constraints=package.constraints, progress_capability=True + ) async def start(self): script = self._ctx.new_script(timeout=None) - script.deploy(progress={}) + script.deploy(progress_args=ProgressArgs(updateInterval="300ms")) script.start() yield script @@ -111,13 +117,14 @@ async def run(self): script = self._ctx.new_script(timeout=None) script.download_from_url( - "https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true", - "/golem/resource/model-small", progress_args=progress) + "https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors", + "/golem/resource/model-small", + progress_args=progress, + ) script.upload_bytes( - os.urandom(40 * 1024 * 1024), - "/golem/resource/bytes.bin", progress_args=progress) - script.download_file( - "/golem/resource/bytes.bin", "download.bin", progress_args=progress) + os.urandom(40 * 1024 * 1024), "/golem/resource/bytes.bin", progress_args=progress + ) + script.download_file("/golem/resource/bytes.bin", "download.bin", progress_args=progress) yield script os.remove("download.bin") @@ -129,16 +136,16 @@ async def run(self): async def main(subnet_tag, driver=None, network=None): async with Golem( - budget=50.0, - subnet_tag=subnet_tag, - payment_driver=driver, - payment_network=network, - stream_output=True, + budget=50.0, + subnet_tag=subnet_tag, + payment_driver=driver, + payment_network=network, + stream_output=True, ) as golem: global shutdown bar = ProgressDisplayer() - cluster = await golem.run_service( + await golem.run_service( ExampleService, num_instances=1, ) diff --git a/pyproject.toml b/pyproject.toml index aa76dac09..417662dc9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,7 @@ autoflake = "^1" flake8 = "^5" flake8-docstrings = "^1.6" Flake8-pyproject = "^1.2.2" +pyproject-autoflake = "^1.0.2" [tool.poe.tasks] diff --git a/yapapi/script/__init__.py b/yapapi/script/__init__.py index f2d23ed30..6f2605368 100644 --- a/yapapi/script/__init__.py +++ b/yapapi/script/__init__.py @@ -13,13 +13,15 @@ Deploy, DownloadBytes, DownloadFile, + DownloadFileFromInternet, DownloadJson, + ProgressArgs, Run, SendBytes, SendFile, SendJson, Start, - Terminate, DownloadFileFromInternet, ProgressArgs, + Terminate, ) from yapapi.storage import DOWNLOAD_BYTES_LIMIT_DEFAULT @@ -128,9 +130,11 @@ def add(self, cmd: Command) -> Awaitable[CommandExecuted]: cmd._set_script(self, len(self._commands) - 1) return cmd._result - def deploy(self, **kwargs: dict) -> Awaitable[CommandExecuted]: + def deploy( + self, progress_args: Optional[ProgressArgs] = None, **kwargs: dict + ) -> Awaitable[CommandExecuted]: """Schedule a :class:`Deploy` command on the provider.""" - return self.add(Deploy(**kwargs)) + return self.add(Deploy(progress_args=progress_args, **kwargs)) def start(self, *args: str) -> Awaitable[CommandExecuted]: """Schedule a :class:`Start` command on the provider.""" @@ -169,7 +173,7 @@ def download_bytes( src_path: str, on_download: Callable[[bytes], Awaitable], limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, - **kwargs + **kwargs, ) -> Awaitable[CommandExecuted]: """Schedule downloading a remote file from the provider as bytes. @@ -192,7 +196,7 @@ def download_json( src_path: str, on_download: Callable[[Any], Awaitable], limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, - **kwargs + **kwargs, ) -> Awaitable[CommandExecuted]: """Schedule downloading a remote file from the provider as JSON. @@ -226,7 +230,9 @@ def upload_json(self, data: dict, dst_path: str, **kwargs) -> Awaitable[CommandE """ return self.add(SendJson(data, dst_path, **kwargs)) - def download_from_url(self, src_url: str, dst_path: str, progress_args: Optional[ProgressArgs] = None) -> Awaitable[CommandExecuted]: + def download_from_url( + self, src_url: str, dst_path: str, progress_args: Optional[ProgressArgs] = None + ) -> Awaitable[CommandExecuted]: """Schedule sending a file to the provider. :param src_url: remote (internet) source url diff --git a/yapapi/script/command.py b/yapapi/script/command.py index c42fb11f0..ee690e3d2 100644 --- a/yapapi/script/command.py +++ b/yapapi/script/command.py @@ -5,6 +5,7 @@ from os import PathLike from pathlib import Path from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Type, Union + import attr from yapapi.events import CommandEventType, DownloadFinished, DownloadStarted @@ -38,7 +39,7 @@ def _make_batch_command(cmd_name: str, **kwargs) -> BatchCommand: def __init__(self): self._result: asyncio.Future = asyncio.get_event_loop().create_future() self._script: Optional["Script"] = None - self._index: int = None + self._index: int = 0 def _set_script(self, script: "Script", index: int) -> None: assert self._script is None, f"Command {self} already belongs to a script {self._script}" @@ -61,7 +62,8 @@ def __repr__(self): @attr.s(auto_attribs=True, repr=False) class ProgressArgs: - """Interval represented as human-readable duration string (examples: '5s' '10min')""" + """Interval represented as human-readable duration string (examples: '5s' '10min').""" + updateInterval: Optional[str] = attr.field(default=None) updateStep: Optional[int] = attr.field(default=None) @@ -78,7 +80,11 @@ def __repr__(self): return f"{super().__repr__()} {self.kwargs}" def evaluate(self): - kwargs = dict(self.kwargs, progress=attr.asdict(self._progress)) if self._progress else self.kwargs + kwargs = ( + dict(self.kwargs, progress=attr.asdict(self._progress)) + if self._progress + else self.kwargs + ) return self._make_batch_command("deploy", **kwargs) @@ -161,7 +167,9 @@ def __init__(self, data: dict, dst_path: str, progress_args: Optional[ProgressAr :param data: dictionary representing JSON data to send :param dst_path: remote (provider) destination path """ - super().__init__(json.dumps(data).encode(encoding="utf-8"), dst_path, progress_args=progress_args) + super().__init__( + json.dumps(data).encode(encoding="utf-8"), dst_path, progress_args=progress_args + ) class SendFile(_SendContent): @@ -187,12 +195,12 @@ class Run(Command): """Command which schedules running a shell command on a provider.""" def __init__( - self, - cmd: str, - *args: str, - env: Optional[Dict[str, str]] = None, - stderr: CaptureContext = CaptureContext.build(mode="stream"), - stdout: CaptureContext = CaptureContext.build(mode="stream"), + self, + cmd: str, + *args: str, + env: Optional[Dict[str, str]] = None, + stderr: CaptureContext = CaptureContext.build(mode="stream"), + stdout: CaptureContext = CaptureContext.build(mode="stream"), ): """Create a new Run command. @@ -223,11 +231,7 @@ def __repr__(self): class _ReceiveContent(Command, abc.ABC): - def __init__( - self, - src_path: str, - progress_args: Optional[ProgressArgs] = None - ): + def __init__(self, src_path: str, progress_args: Optional[ProgressArgs] = None): super().__init__() self._src_path: str = src_path self._dst_slot: Optional[Destination] = None @@ -260,10 +264,10 @@ class DownloadFile(_ReceiveContent): """Command which schedules downloading a file from a provider.""" def __init__( - self, - src_path: str, - dst_path: str, - progress_args: Optional[ProgressArgs] = None, + self, + src_path: str, + dst_path: str, + progress_args: Optional[ProgressArgs] = None, ): """Create a new DownloadFile command. @@ -289,11 +293,11 @@ class DownloadBytes(_ReceiveContent): """Command which schedules downloading a file from a provider as bytes.""" def __init__( - self, - src_path: str, - on_download: Callable[[bytes], Awaitable], - limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, - progress_args: Optional[ProgressArgs] = None, + self, + src_path: str, + on_download: Callable[[bytes], Awaitable], + limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, + progress_args: Optional[ProgressArgs] = None, ): """Create a new DownloadBytes command. @@ -318,11 +322,11 @@ class DownloadJson(DownloadBytes): """Command which schedules downloading a file from a provider as JSON data.""" def __init__( - self, - src_path: str, - on_download: Callable[[Any], Awaitable], - limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, - progress_args: Optional[ProgressArgs] = None, + self, + src_path: str, + on_download: Callable[[Any], Awaitable], + limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT, + progress_args: Optional[ProgressArgs] = None, ): """Create a new DownloadJson command. @@ -330,7 +334,12 @@ def __init__( :param on_download: the callable to run on the received data :param limit: limit of bytes to be downloaded (expected size) """ - super().__init__(src_path, partial(self.__on_json_download, on_download), limit, progress_args=progress_args) + super().__init__( + src_path, + partial(self.__on_json_download, on_download), + limit, + progress_args=progress_args, + ) @staticmethod async def __on_json_download(on_download: Callable[[bytes], Awaitable], content: bytes): diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index d481e77e6..0e48f0a0b 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -201,7 +201,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): self._state.stop() except statemachine.exceptions.TransitionNotAllowed: """The ServiceRunner is not running,""" - pass logger.debug("%s is shutting down... state: %s", self, self.state) From 2e1aa3afbdb1d7facd7c37753be44f431aed69ae Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Tue, 13 Feb 2024 16:13:40 +0100 Subject: [PATCH 13/18] Update events hierarchy in docs --- yapapi/events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/yapapi/events.py b/yapapi/events.py index 96e40bb86..53935f289 100644 --- a/yapapi/events.py +++ b/yapapi/events.py @@ -118,6 +118,7 @@ CommandStdOut CommandStdErr CommandExecuted + CommandProgress DownloadStarted DownloadFinished GettingResults From 1374b3f6b116c0f87d8647de487f222feda11a2d Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Tue, 13 Feb 2024 18:42:07 +0100 Subject: [PATCH 14/18] Update constraints --- examples/transfer-progress/progress.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 4794d5403..99ef6e059 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -88,7 +88,10 @@ def executed(self, event: "yapapi.events.CommandExecuted"): @dataclass class ExamplePayload(_VmPackage): - progress_capability: bool = constraint( + deploy_progress_capability: bool = constraint( + "golem.activity.caps.deploy.report-progress", operator="=", default=True + ) + transfer_progress_capability: bool = constraint( "golem.activity.caps.transfer.report-progress", operator="=", default=True ) @@ -102,7 +105,10 @@ async def get_payload(): min_storage_gib=10.0, ) return ExamplePayload( - image_url=package.image_url, constraints=package.constraints, progress_capability=True + image_url=package.image_url, + constraints=package.constraints, + deploy_progress_capability=True, + transfer_progress_capability=True, ) async def start(self): From b5689064347df7cb771b920c49533b5904731c77 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Fri, 16 Feb 2024 16:14:09 +0100 Subject: [PATCH 15/18] All constraints are included properly in the Demand --- examples/transfer-progress/progress.py | 56 ++++++++++++++++++-------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 99ef6e059..58b863916 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -5,15 +5,18 @@ import pathlib import sys from datetime import datetime +from typing import List from alive_progress import alive_bar from dataclasses import dataclass import yapapi.script.command from yapapi import Golem -from yapapi.payload import vm -from yapapi.payload.vm import _VmPackage +from yapapi.payload import vm, Payload +from yapapi.payload.vm import _VmPackage, VmRequest, VmPackageFormat +from yapapi.props import inf from yapapi.props.base import constraint +from yapapi.props.builder import DemandBuilder from yapapi.script import ProgressArgs from yapapi.services import Service @@ -87,28 +90,49 @@ def executed(self, event: "yapapi.events.CommandExecuted"): @dataclass -class ExamplePayload(_VmPackage): - deploy_progress_capability: bool = constraint( - "golem.activity.caps.deploy.report-progress", operator="=", default=True +class ExamplePayload(Payload): + image_url: str + min_mem_gib: float = constraint(inf.INF_MEM, operator=">=") + min_storage_gib: float = constraint(inf.INF_STORAGE, operator=">=") + min_cpu_threads: int = constraint(inf.INF_THREADS, operator=">=") + + capabilities: List[vm.VmCaps] = constraint( + "golem.runtime.capabilities", operator="=", default_factory=list ) - transfer_progress_capability: bool = constraint( - "golem.activity.caps.transfer.report-progress", operator="=", default=True + + runtime: str = constraint(inf.INF_RUNTIME_NAME, operator="=", default=vm.RUNTIME_VM) + + # Constraints can't be bool, because python serializes bool to `True` and market matcher + # expects `true`. + deploy_progress_capability: str = constraint( + "golem.activity.caps.deploy.report-progress", operator="=", default="true" + ) + transfer_progress_capability: str = constraint( + "golem.activity.caps.transfer.report-progress", operator="=", default="true" ) + async def decorate_demand(self, demand: DemandBuilder): + await super().decorate_demand(demand) + demand.add( + VmRequest(package_url=self.image_url, package_format=VmPackageFormat.GVMKIT_SQUASH) + ) + class ExampleService(Service): @staticmethod async def get_payload(): - package = await vm.repo( + package: _VmPackage = await vm.repo( image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", min_mem_gib=0.5, min_storage_gib=10.0, ) return ExamplePayload( image_url=package.image_url, - constraints=package.constraints, - deploy_progress_capability=True, - transfer_progress_capability=True, + min_mem_gib=package.constraints.min_mem_gib, + min_storage_gib=package.constraints.min_storage_gib, + min_cpu_threads=package.constraints.min_cpu_threads, + capabilities=package.constraints.capabilities, + runtime=package.constraints.runtime, ) async def start(self): @@ -142,11 +166,11 @@ async def run(self): async def main(subnet_tag, driver=None, network=None): async with Golem( - budget=50.0, - subnet_tag=subnet_tag, - payment_driver=driver, - payment_network=network, - stream_output=True, + budget=50.0, + subnet_tag=subnet_tag, + payment_driver=driver, + payment_network=network, + stream_output=True, ) as golem: global shutdown From c743d3f0b0278adfb0e720745700f5e3af7e1b77 Mon Sep 17 00:00:00 2001 From: "nieznany.sprawiciel" Date: Fri, 29 Mar 2024 18:45:49 +0100 Subject: [PATCH 16/18] Allow dict properties in Demand --- yapapi/props/builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yapapi/props/builder.py b/yapapi/props/builder.py index 0cc543c55..25331f988 100644 --- a/yapapi/props/builder.py +++ b/yapapi/props/builder.py @@ -72,7 +72,7 @@ def add(self, m: Model): value = int(value.timestamp() * 1000) if isinstance(value, enum.Enum): value = value.value - assert isinstance(value, (str, int, list)) + assert isinstance(value, (str, int, list, dict)) self._properties[prop_id] = value def add_properties(self, props: dict): From cdcf38a3e6968a15e51c41fc6d9b3f07213228eb Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Thu, 12 Sep 2024 11:34:47 +0200 Subject: [PATCH 17/18] chore: code formatting --- examples/transfer-progress/progress.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 58b863916..42b4a2701 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +from utils import build_parser, run_golem_example import asyncio import os import pathlib @@ -12,8 +13,8 @@ import yapapi.script.command from yapapi import Golem -from yapapi.payload import vm, Payload -from yapapi.payload.vm import _VmPackage, VmRequest, VmPackageFormat +from yapapi.payload import Payload, vm +from yapapi.payload.vm import VmPackageFormat, VmRequest, _VmPackage from yapapi.props import inf from yapapi.props.base import constraint from yapapi.props.builder import DemandBuilder @@ -23,8 +24,6 @@ examples_dir = pathlib.Path(__file__).resolve().parent.parent sys.path.append(str(examples_dir)) -from utils import build_parser, run_golem_example - def command_key(event: "yapapi.events.CommandProgress") -> str: return f"{event.script_id}#{event.command._index}" @@ -100,7 +99,8 @@ class ExamplePayload(Payload): "golem.runtime.capabilities", operator="=", default_factory=list ) - runtime: str = constraint(inf.INF_RUNTIME_NAME, operator="=", default=vm.RUNTIME_VM) + runtime: str = constraint(inf.INF_RUNTIME_NAME, + operator="=", default=vm.RUNTIME_VM) # Constraints can't be bool, because python serializes bool to `True` and market matcher # expects `true`. @@ -114,7 +114,8 @@ class ExamplePayload(Payload): async def decorate_demand(self, demand: DemandBuilder): await super().decorate_demand(demand) demand.add( - VmRequest(package_url=self.image_url, package_format=VmPackageFormat.GVMKIT_SQUASH) + VmRequest(package_url=self.image_url, + package_format=VmPackageFormat.GVMKIT_SQUASH) ) @@ -154,7 +155,8 @@ async def run(self): script.upload_bytes( os.urandom(40 * 1024 * 1024), "/golem/resource/bytes.bin", progress_args=progress ) - script.download_file("/golem/resource/bytes.bin", "download.bin", progress_args=progress) + script.download_file("/golem/resource/bytes.bin", + "download.bin", progress_args=progress) yield script os.remove("download.bin") From 9c8074240359bf3b8d40cdaf4c8aeb851d47da02 Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Thu, 12 Sep 2024 11:45:36 +0200 Subject: [PATCH 18/18] chore: code formatting 2 --- examples/transfer-progress/progress.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/examples/transfer-progress/progress.py b/examples/transfer-progress/progress.py index 42b4a2701..9c84aefcf 100644 --- a/examples/transfer-progress/progress.py +++ b/examples/transfer-progress/progress.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -from utils import build_parser, run_golem_example import asyncio import os import pathlib @@ -10,6 +9,7 @@ from alive_progress import alive_bar from dataclasses import dataclass +from utils import build_parser, run_golem_example import yapapi.script.command from yapapi import Golem @@ -99,8 +99,7 @@ class ExamplePayload(Payload): "golem.runtime.capabilities", operator="=", default_factory=list ) - runtime: str = constraint(inf.INF_RUNTIME_NAME, - operator="=", default=vm.RUNTIME_VM) + runtime: str = constraint(inf.INF_RUNTIME_NAME, operator="=", default=vm.RUNTIME_VM) # Constraints can't be bool, because python serializes bool to `True` and market matcher # expects `true`. @@ -114,8 +113,7 @@ class ExamplePayload(Payload): async def decorate_demand(self, demand: DemandBuilder): await super().decorate_demand(demand) demand.add( - VmRequest(package_url=self.image_url, - package_format=VmPackageFormat.GVMKIT_SQUASH) + VmRequest(package_url=self.image_url, package_format=VmPackageFormat.GVMKIT_SQUASH) ) @@ -155,8 +153,7 @@ async def run(self): script.upload_bytes( os.urandom(40 * 1024 * 1024), "/golem/resource/bytes.bin", progress_args=progress ) - script.download_file("/golem/resource/bytes.bin", - "download.bin", progress_args=progress) + script.download_file("/golem/resource/bytes.bin", "download.bin", progress_args=progress) yield script os.remove("download.bin") @@ -168,11 +165,11 @@ async def run(self): async def main(subnet_tag, driver=None, network=None): async with Golem( - budget=50.0, - subnet_tag=subnet_tag, - payment_driver=driver, - payment_network=network, - stream_output=True, + budget=50.0, + subnet_tag=subnet_tag, + payment_driver=driver, + payment_network=network, + stream_output=True, ) as golem: global shutdown