diff --git a/.github/workflows/docker-build.yaml b/.github/workflows/docker-build.yaml index a008fd8ae..7ff654c94 100644 --- a/.github/workflows/docker-build.yaml +++ b/.github/workflows/docker-build.yaml @@ -10,8 +10,6 @@ jobs: timeout-minutes: 45 steps: - uses: actions/checkout@v4 - - name: Build the function - run: docker build -t test_function:latest --build-arg TARGETARCH="amd64" -f ./tests/basic/function/Sample-Docker ./tests/basic - name: Build the containers run: docker compose -f docker-compose-dev.yaml build - name: Run the jupyter profile @@ -29,6 +27,7 @@ jobs: shell: bash run: | cd tests/basic + rm ./06_function.py for f in *.py; do echo "$f" && python "$f" &>> basic.log; done done=$(cat basic.log | grep -c "DONE") if [[ $done == 4 ]] diff --git a/.github/workflows/notebook-local-verify.yaml b/.github/workflows/notebook-local-verify.yaml index d66ae10fe..8a280c3c0 100644 --- a/.github/workflows/notebook-local-verify.yaml +++ b/.github/workflows/notebook-local-verify.yaml @@ -22,6 +22,8 @@ jobs: for f in tests/basic/*.py; do sed -i "s/import ServerlessClient/import LocalClient/;s/= ServerlessClient(/= LocalClient(/;/token=os\.environ\.get/d;/host=os\.environ\.get/d" "$f"; done for f in tests/experimental/*.py; do sed -i "s/import ServerlessClient/import LocalClient/;s/= ServerlessClient(/= LocalClient(/;/token=os\.environ\.get/d;/host=os\.environ\.get/d" "$f"; done rm tests/basic/06_function.py + rm tests/experimental/file_download.py + rm tests/experimental/manage_data_directory.py - name: install dependencies shell: bash run: pip install client/ diff --git a/client/qiskit_serverless/__init__.py b/client/qiskit_serverless/__init__.py index df7d1bb88..5daefbeec 100644 --- a/client/qiskit_serverless/__init__.py +++ b/client/qiskit_serverless/__init__.py @@ -22,20 +22,15 @@ from importlib_metadata import version as metadata_version, PackageNotFoundError from .core import ( - BaseProvider, BaseClient, distribute_task, distribute_qiskit_function, get, put, get_refs_by_status, - ServerlessProvider, ServerlessClient, - IBMServerlessProvider, IBMServerlessClient, - RayProvider, RayClient, - LocalProvider, LocalClient, save_result, Configuration, diff --git a/client/qiskit_serverless/core/__init__.py b/client/qiskit_serverless/core/__init__.py index 84668942a..2852c01d7 100644 --- a/client/qiskit_serverless/core/__init__.py +++ b/client/qiskit_serverless/core/__init__.py @@ -31,11 +31,7 @@ BaseClient RayClient LocalClient - ComputeResource Job - GatewayJobClient - BaseJobClient - RayJobClient save_result QiskitPattern QiskitFunction @@ -51,25 +47,13 @@ """ -from .client import ( - BaseProvider, - BaseClient, - ComputeResource, - ServerlessProvider, - ServerlessClient, - IBMServerlessProvider, - IBMServerlessClient, - LocalProvider, - LocalClient, - RayProvider, - RayClient, -) +from .client import BaseClient + +from .clients.local_client import LocalClient +from .clients.ray_client import RayClient +from .clients.serverless_client import ServerlessClient, IBMServerlessClient from .job import ( - BaseJobClient, - RayJobClient, - GatewayJobClient, - LocalJobClient, Job, save_result, Configuration, diff --git a/client/qiskit_serverless/core/client.py b/client/qiskit_serverless/core/client.py index a8a9f1165..147ab9baf 100644 --- a/client/qiskit_serverless/core/client.py +++ b/client/qiskit_serverless/core/client.py @@ -24,134 +24,24 @@ :toctree: ../stubs/ ComputeResource - ServerlessClient + BaseClient """ -# pylint: disable=duplicate-code -import logging import warnings -import os.path -import os -from dataclasses import dataclass +from abc import ABC, abstractmethod from typing import Optional, List, Dict, Any, Union -import ray -import requests -from ray.dashboard.modules.job.sdk import JobSubmissionClient -from opentelemetry import trace from qiskit_ibm_runtime import QiskitRuntimeService -from qiskit_serverless.core.constants import ( - REQUESTS_TIMEOUT, - ENV_GATEWAY_PROVIDER_HOST, - ENV_GATEWAY_PROVIDER_VERSION, - ENV_GATEWAY_PROVIDER_TOKEN, - GATEWAY_PROVIDER_VERSION_DEFAULT, - IBM_SERVERLESS_HOST_URL, -) -from qiskit_serverless.core.files import GatewayFilesClient from qiskit_serverless.core.job import ( Job, - RayJobClient, - GatewayJobClient, - LocalJobClient, - BaseJobClient, Configuration, ) from qiskit_serverless.core.function import QiskitFunction -from qiskit_serverless.core.tracing import _trace_env_vars -from qiskit_serverless.exception import QiskitServerlessException from qiskit_serverless.utils import JsonSerializable -from qiskit_serverless.utils.json import safe_json_request from qiskit_serverless.visualizaiton import Widget -TIMEOUT = 30 - -@dataclass -class ComputeResource: - """ComputeResource class. - - Args: - name: name of compute_resource - host: host address of compute_resource - namespace: k8s namespace of compute_resource - port_interactive: port of compute_resource for interactive mode - port_job_server: port of compute resource for job server - resources: list of resources - """ - - name: str - host: Optional[str] = None - port_interactive: int = 10001 - port_job_server: int = 8265 - resources: Optional[Dict[str, float]] = None - - def job_client(self) -> Optional[BaseJobClient]: - """Return job client for given compute resource. - - Returns: - job client - """ - if self.host is not None: - connection_url = f"http://{self.host}:{self.port_job_server}" - client = None - try: - client = RayJobClient(JobSubmissionClient(connection_url)) - except ConnectionError: - logging.warning( - "Failed to establish connection with jobs server at %s. " - "You will not be able to run jobs on this provider.", - connection_url, - ) - - return client - return None - - def context(self, **kwargs): - """Return context allocated for this compute_resource.""" - _trace_env_vars({}, location="on context allocation") - - init_args = { - **kwargs, - **{ - "address": kwargs.get( - "address", - self.connection_string_interactive_mode(), - ), - "ignore_reinit_error": kwargs.get("ignore_reinit_error", True), - "logging_level": kwargs.get("logging_level", "warning"), - "resources": kwargs.get("resources", self.resources), - }, - } - - return ray.init(**init_args) - - def connection_string_interactive_mode(self) -> Optional[str]: - """Return connection string to compute_resource.""" - if self.host is not None: - return f"ray://{self.host}:{self.port_interactive}" - return None - - @classmethod - def from_dict(cls, data: dict): - """Create compute_resource object form dict.""" - return ComputeResource( - name=data.get("name"), - host=data.get("host"), - port_interactive=data.get("port_interactive"), - port_job_server=data.get("port_job_server"), - ) - - def __eq__(self, other: object): - if isinstance(other, ComputeResource): - return self.name == other.name and self.host == other.host - return False - - def __repr__(self): - return f"" - - -class BaseClient(JsonSerializable): +class BaseClient(JsonSerializable, ABC): """ A client class for specifying custom compute resources. @@ -168,12 +58,7 @@ class BaseClient(JsonSerializable): """ def __init__( # pylint: disable=too-many-positional-arguments - self, - name: str, - host: Optional[str] = None, - token: Optional[str] = None, - compute_resource: Optional[ComputeResource] = None, - available_compute_resources: Optional[List[ComputeResource]] = None, + self, name: str, host: Optional[str] = None, token: Optional[str] = None ): """ Initialize a BaseClient instance. @@ -182,42 +67,18 @@ def __init__( # pylint: disable=too-many-positional-arguments name: name of client host: host of client a.k.a managers host token: authentication token for manager - compute_resource: selected compute_resource from provider - available_compute_resources: available clusters in provider """ self.name = name self.host = host self.token = token - self.compute_resource = compute_resource - if available_compute_resources is None: - if compute_resource is not None: - available_compute_resources = [compute_resource] - else: - available_compute_resources = [] - self.available_compute_resources = available_compute_resources @classmethod + @abstractmethod def from_dict(cls, dictionary: dict): - return BaseProvider(**dictionary) - - def job_client(self): - """Return job client for configured compute resource of provider. - - Returns: - job client - """ - return self.compute_resource.job_client() - - def context(self, **kwargs): - """Allocated context for selected compute_resource for provider.""" - if self.compute_resource is None: - raise QiskitServerlessException( - f"ComputeResource was not selected for provider {self.name}" - ) - return self.compute_resource.context(**kwargs) + """Converts dict to object.""" def __eq__(self, other): - if isinstance(other, BaseProvider): + if isinstance(other, BaseClient): return self.name == other.name return False @@ -225,25 +86,27 @@ def __eq__(self, other): def __repr__(self): return f"<{self.name}>" - def get_compute_resources(self) -> List[ComputeResource]: - """Return compute resources for provider.""" - raise NotImplementedError + #################### + ####### JOBS ####### + #################### + @abstractmethod + def jobs(self, **kwargs) -> List[Job]: + """Return list of jobs. - def create_compute_resource(self, resource) -> int: - """Create compute resource for provider.""" - raise NotImplementedError + Returns: + list of jobs. + """ - def delete_compute_resource(self, resource) -> int: - """Delete compute resource for provider.""" - raise NotImplementedError + @abstractmethod + def job(self, job_id: str) -> Optional[Job]: + """Returns job by job id. - def get_jobs(self, **kwargs) -> List[Job]: - """Return list of jobs. + Args: + job_id: job id Returns: - list of jobs. + Job instance """ - raise NotImplementedError def get_job_by_id(self, job_id: str) -> Optional[Job]: """Returns job by job id. @@ -254,16 +117,30 @@ def get_job_by_id(self, job_id: str) -> Optional[Job]: Returns: Job instance """ - job_client = self.job_client() + warnings.warn( + "`get_job_by_id` method has been deprecated. " + "And will be removed in future releases. " + "Please, use `get_job` instead.", + DeprecationWarning, + ) + return self.job(job_id) - if job_client is None: - logging.warning( # pylint: disable=logging-fstring-interpolation - "Job has not been found as no provider " - "with remote host has been configured. " - ) - return None - return Job(job_id=job_id, job_client=job_client) + def get_jobs(self, **kwargs) -> List[Job]: + # pylint: disable=duplicate-code + """Return list of jobs. + + Returns: + list of jobs. + """ + warnings.warn( + "`get_jobs` method has been deprecated. " + "And will be removed in future releases. " + "Please, use `jobs` instead.", + DeprecationWarning, + ) + return self.jobs(**kwargs) + @abstractmethod def run( self, program: Union[QiskitFunction, str], @@ -289,471 +166,73 @@ def run( Returns: Job """ - job_client = self.job_client() - if job_client is None: - logging.warning( # pylint: disable=logging-fstring-interpolation - f"Job has not been submitted as no provider " - f"with remote host has been configured. " - f"Selected provider: {self}" - ) - return None + @abstractmethod + def status(self, job_id: str) -> str: + """Check status.""" - return job_client.run(program, None, arguments, config) + @abstractmethod + def stop( + self, job_id: str, service: Optional[QiskitRuntimeService] = None + ) -> Union[str, bool]: + """Stops job/program.""" - def upload(self, program: QiskitFunction): - """Uploads program.""" - raise NotImplementedError + @abstractmethod + def result(self, job_id: str) -> Any: + """Return results.""" - def files(self) -> List[str]: - """Returns list of available files produced by programs to download.""" - raise NotImplementedError + @abstractmethod + def logs(self, job_id: str) -> str: + """Return logs.""" - def download( - self, - file: str, - download_location: str = "./", - ): - """Download file.""" - warnings.warn( - "`download` method has been deprecated. " - "And will be removed in future releases. " - "Please, use `file_download` instead.", - DeprecationWarning, - ) - return self.file_download(file, download_location) + @abstractmethod + def filtered_logs(self, job_id: str, **kwargs) -> str: + """Return filtered logs.""" - def file_download( - self, - file: str, - target_name: Optional[str] = None, - download_location: str = "./", - ): - """Download file.""" - raise NotImplementedError - - def file_delete(self, file: str): - """Deletes file uploaded or produced by the programs,""" - raise NotImplementedError - - def file_upload(self, file: str): - """Upload file.""" - raise NotImplementedError - - def widget(self): - """Widget for information about provider and jobs.""" - return Widget(self).show() + ######################### + ####### Functions ####### + ######################### - def get_programs(self, **kwargs): - """[Deprecated] Returns list of available programs.""" - warnings.warn( - "`get_programs` method has been deprecated. " - "And will be removed in future releases. " - "Please, use `list` instead.", - DeprecationWarning, - ) - return self.list(**kwargs) + @abstractmethod + def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: + """Uploads program.""" - def list(self, **kwargs) -> List[QiskitFunction]: + @abstractmethod + def functions(self, **kwargs) -> List[QiskitFunction]: """Returns list of available programs.""" - raise NotImplementedError - def get( + @abstractmethod + def function( self, title: str, provider: Optional[str] = None ) -> Optional[QiskitFunction]: - """Returns qiskit function based on title provided.""" - raise NotImplementedError - - -class BaseProvider(BaseClient): - """ - [Deprecated since version 0.10.0] Use :class:`.BaseClient` instead. - - A provider for connecting to a specified host. This class has been - renamed to :class:`.BaseClient`. - """ - - -class ServerlessClient(BaseClient): - """ - A client for connecting to a specified host. - - Example: - >>> client = ServerlessClient( - >>> name="", - >>> host="", - >>> token="", - >>> ) - """ - - def __init__( # pylint: disable=too-many-positional-arguments - self, - name: Optional[str] = None, - host: Optional[str] = None, - version: Optional[str] = None, - token: Optional[str] = None, - verbose: bool = False, - ): - """ - Initializes the ServerlessClient instance. - - Args: - name: name of client - host: host of gateway - version: version of gateway - token: authorization token - """ - name = name or "gateway-client" - host = host or os.environ.get(ENV_GATEWAY_PROVIDER_HOST) - if host is None: - raise QiskitServerlessException("Please provide `host` of gateway.") - - version = version or os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) - if version is None: - version = GATEWAY_PROVIDER_VERSION_DEFAULT - - token = token or os.environ.get(ENV_GATEWAY_PROVIDER_TOKEN) - if token is None: - raise QiskitServerlessException( - "Authentication credentials must be provided in form of `token`." - ) - - super().__init__(name) - self.verbose = verbose - self.host = host - self.version = version - self._verify_token(token) - self._token = token - - self._job_client = GatewayJobClient(self.host, self._token, self.version) - self._files_client = GatewayFilesClient(self.host, self._token, self.version) - - def get_compute_resources(self) -> List[ComputeResource]: - raise NotImplementedError( - "ServerlessClient does not support resources api yet." - ) - - def create_compute_resource(self, resource) -> int: - raise NotImplementedError( - "ServerlessClient does not support resources api yet." - ) - - def delete_compute_resource(self, resource) -> int: - raise NotImplementedError( - "ServerlessClient does not support resources api yet." - ) - - def get_job_by_id(self, job_id: str) -> Optional[Job]: - return self._job_client.get(job_id) - - def run( - self, - program: Union[QiskitFunction, str], - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> Job: - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("Provider.run"): - warnings.warn( - "`run` method has been deprecated. " - "And will be removed in future releases. " - "Please, use `function.run` instead.", - DeprecationWarning, - ) - if isinstance(program, QiskitFunction) and program.entrypoint is not None: - job = self._job_client.run(program.title, None, arguments, config) - else: - job = self._job_client.run(program, None, arguments, config) - return job - - def upload(self, program: QiskitFunction): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("Provider.upload"): - response = self._job_client.upload(program) - return response - - def get_jobs(self, **kwargs) -> List[Job]: - return self._job_client.list(**kwargs) - - def files(self, provider: Optional[str] = None) -> List[str]: - return self._files_client.list(provider) - - def file_download( - self, - file: str, - target_name: Optional[str] = None, - download_location: str = "./", - provider: Optional[str] = None, - ): - return self._files_client.download( - file, download_location, target_name, provider - ) - - def file_delete(self, file: str, provider: Optional[str] = None): - return self._files_client.delete(file, provider) - - def file_upload(self, file: str, provider: Optional[str] = None): - return self._files_client.upload(file, provider) - - def list(self, **kwargs) -> List[QiskitFunction]: - """Returns list of available programs.""" - return self._job_client.get_programs(**kwargs) + """Returns program based on parameters.""" def get( self, title: str, provider: Optional[str] = None ) -> Optional[QiskitFunction]: - return self._job_client.get_program(title=title, provider=provider) - - def _verify_token(self, token: str): - """Verify token.""" - try: - safe_json_request( - request=lambda: requests.get( - url=f"{self.host}/api/v1/programs/", - headers={"Authorization": f"Bearer {token}"}, - timeout=REQUESTS_TIMEOUT, - ), - verbose=self.verbose, - ) - except QiskitServerlessException as reason: - raise QiskitServerlessException("Cannot verify token.") from reason - - -class ServerlessProvider(ServerlessClient): - """ - [Deprecated since version 0.10.0] Use :class:`.ServerlessClient` instead. - - A provider for connecting to a specified host. This class has been - renamed to :class:`.ServerlessClient`. - """ - - -class IBMServerlessClient(ServerlessClient): - """ - A client for connecting to the IBM serverless host. - - Credentials can be saved to disk by calling the `save_account()` method:: - - from qiskit_serverless import IBMServerlessClient - IBMServerlessClient.save_account(token=) - - Once the credentials are saved, you can simply instantiate the client with no - constructor args, as shown below. - - from qiskit_serverless import IBMServerlessClient - client = IBMServerlessClient() - - Instead of saving credentials to disk, you can also set the environment variable - ENV_GATEWAY_PROVIDER_TOKEN and then instantiate the client as below:: - - from qiskit_serverless import IBMServerlessClient - client = IBMServerlessClient() - - You can also enable an account just for the current session by instantiating the - provider with the API token:: - - from qiskit_serverless import IBMServerlessClient - client = IBMServerlessClient(token=) - """ - - def __init__(self, token: Optional[str] = None, name: Optional[str] = None): - """ - Initialize a client with access to an IBMQ-provided remote cluster. - - If a ``token`` is used to initialize an instance, the ``name`` argument - will be ignored. - - If only a ``name`` is provided, the token for the named account will - be retrieved from the user's local IBM Quantum account config file. - - If neither argument is provided, the token will be searched for in the - environment variables and also in the local IBM Quantum account config - file using the default account name. - - Args: - token: IBM quantum token - name: Name of the account to load - """ - token = token or QiskitRuntimeService(name=name).active_account().get("token") - super().__init__(token=token, host=IBM_SERVERLESS_HOST_URL) - - @staticmethod - def save_account( - token: Optional[str] = None, - name: Optional[str] = None, - overwrite: Optional[bool] = False, - ) -> None: - """ - Save the account to disk for future use. - - Args: - token: IBM Quantum API token - name: Name of the account to save - overwrite: ``True`` if the existing account is to be overwritten - """ - QiskitRuntimeService.save_account(token=token, name=name, overwrite=overwrite) - - def get_compute_resources(self) -> List[ComputeResource]: - raise NotImplementedError( - "IBMServerlessClient does not support resources api yet." - ) - - def create_compute_resource(self, resource) -> int: - raise NotImplementedError( - "IBMServerlessClient does not support resources api yet." - ) - - def delete_compute_resource(self, resource) -> int: - raise NotImplementedError( - "IBMServerlessClient does not support resources api yet." + """Returns program based on parameters.""" + warnings.warn( + "`get` method has been deprecated. " + "And will be removed in future releases. " + "Please, use `get_function` instead.", + DeprecationWarning, ) + return self.function(title, provider=provider) - -class IBMServerlessProvider(IBMServerlessClient): - """ - [Deprecated since version 0.10.0] Use :class:`.IBMServerlessClient` instead. - - A provider for connecting to IBM Serverless instance. This class has been - renamed to :class:`.IBMServerlessClient`. - """ - - -class RayClient(BaseClient): - """RayClient.""" - - def __init__(self, host: str): - """Ray client - - Args: - host: ray head node host - - Example: - >>> ray_provider = RayClient("http://localhost:8265") - """ - super().__init__("ray-client", host) - self.client = RayJobClient(JobSubmissionClient(host)) - - def run( - self, - program: Union[QiskitFunction, str], - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> Job: - if isinstance(program, str): - raise NotImplementedError("Ray client only supports full Programs.") - - return self.client.run(program, None, arguments, config) - - def get_job_by_id(self, job_id: str) -> Optional[Job]: - return self.client.get(job_id) - - def get_jobs(self, **kwargs) -> List[Job]: - return self.client.list() - - -class RayProvider(RayClient): - """ - [Deprecated since version 0.10.0] Use :class:`.RayClient` instead. - - A provider for connecting to a ray head node. This class has been - renamed to :class:`.RayClient`. - """ - - -class LocalClient(BaseClient): - """LocalClient.""" - - def __init__(self): - """Local client - - Args: - - Example: - >>> local = LocalClient()) - """ - super().__init__("local-client") - self.client = LocalJobClient() - self.in_test = os.getenv("IN_TEST") - - def run( - self, - program: Union[QiskitFunction, str], - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> Job: + def list(self, **kwargs) -> List[QiskitFunction]: + """Returns list of available programs.""" warnings.warn( - "`client.run` method has been deprecated. " + "`list` method has been deprecated. " "And will be removed in future releases. " - "Please, use `function.run` instead.", + "Please, use `get_functions` instead.", DeprecationWarning, ) - if isinstance(program, QiskitFunction) and program.entrypoint is not None: - job = self.client.run(program.title, None, arguments, config) - else: - job = self.client.run(program, None, arguments, config) - return job + return self.functions(**kwargs) - def get_job_by_id(self, job_id: str) -> Optional[Job]: - return self.client.get(job_id) - - def get_jobs(self, **kwargs) -> List[Job]: - return self.client.list() - - def upload(self, program: QiskitFunction): - return self.client.upload(program) + ###################### + ####### Widget ####### + ###################### def widget(self): """Widget for information about provider and jobs.""" return Widget(self).show() - - def get_programs(self, **kwargs) -> List[QiskitFunction]: - return self.client.get_programs(**kwargs) - - def files(self) -> List[str]: - if self.in_test: - logging.warning("files method is not implemented in LocalProvider.") - return [] - raise NotImplementedError("files method is not implemented in LocalProvider.") - - def file_upload(self, file: str): - if self.in_test: - logging.warning("file_upload method is not implemented in LocalProvider.") - return - raise NotImplementedError("files method is not implemented in LocalProvider.") - - def file_download( - self, - file: str, - target_name: Optional[str] = None, - download_location: str = "./", - ): - if self.in_test: - logging.warning("file_download method is not implemented in LocalProvider.") - return None - raise NotImplementedError("files method is not implemented in LocalProvider.") - - def file_delete(self, file: str): - if self.in_test: - logging.warning("file_delete method is not implemented in LocalProvider.") - return None - raise NotImplementedError("files method is not implemented in LocalProvider.") - - def list(self, **kwargs): - return self.client.get_programs(**kwargs) - - def get( - self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - functions = { - function.title: function for function in self.client.get_programs() - } - return functions.get(title) - - -class LocalProvider(LocalClient): - """ - [Deprecated since version 0.10.0] Use :class:`.LocalClient` instead. - - A provider for connecting to local job execution instance. This class has been - renamed to :class:`.LocalClient`. - """ diff --git a/client/qiskit_serverless/core/clients/__init__.py b/client/qiskit_serverless/core/clients/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/client/qiskit_serverless/core/clients/local_client.py b/client/qiskit_serverless/core/clients/local_client.py new file mode 100644 index 000000000..0dd03faba --- /dev/null +++ b/client/qiskit_serverless/core/clients/local_client.py @@ -0,0 +1,195 @@ +# This code is a Qiskit project. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +================================================ +Provider (:mod:`qiskit_serverless.core.client`) +================================================ + +.. currentmodule:: qiskit_serverless.core.client + +Qiskit Serverless provider +=========================== + +.. autosummary:: + :toctree: ../stubs/ + + LocalClient +""" +# pylint: disable=duplicate-code +import json +import os.path +import os +import re +import sys +from typing import Optional, List, Dict, Any, Union +from uuid import uuid4 + +import subprocess +from subprocess import Popen + +from qiskit_ibm_runtime import QiskitRuntimeService + +from qiskit_serverless.core.constants import ( + OT_PROGRAM_NAME, + ENV_JOB_ARGUMENTS, +) +from qiskit_serverless.core.client import BaseClient +from qiskit_serverless.core.job import ( + Job, + Configuration, +) +from qiskit_serverless.core.function import QiskitFunction +from qiskit_serverless.exception import QiskitServerlessException +from qiskit_serverless.serializers.program_serializers import ( + QiskitObjectsEncoder, +) + + +class LocalClient(BaseClient): + """LocalClient.""" + + def __init__(self): + """Local client + + Args: + + Example: + >>> local = LocalClient() + """ + super().__init__("local-client") + self.in_test = os.getenv("IN_TEST") + self._jobs = {} + self._patterns = [] + + @classmethod + def from_dict(cls, dictionary: dict): + return LocalClient(**dictionary) + + #################### + ####### JOBS ####### + #################### + + def job(self, job_id: str) -> Optional[Job]: + return self._jobs[job_id]["job"] + + def jobs(self, **kwargs) -> List[Job]: + return [job["job"] for job in list(self._jobs.values())] + + def run( + self, + program: Union[QiskitFunction, str], + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + ) -> Job: + # pylint: disable=too-many-locals + title = "" + if isinstance(program, QiskitFunction): + title = program.title + else: + title = str(program) + + for pattern in self._patterns: + if pattern["title"] == title: + saved_program = pattern + if saved_program[ # pylint: disable=possibly-used-before-assignment + "dependencies" + ]: + dept = json.loads(saved_program["dependencies"]) + for dependency in dept: + subprocess.check_call( + [sys.executable, "-m", "pip", "install", dependency] + ) + arguments = arguments or {} + env_vars = { + **(saved_program["env_vars"] or {}), + **{OT_PROGRAM_NAME: saved_program["title"]}, + **{"PATH": os.environ["PATH"]}, + **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, + } + + with Popen( + ["python", saved_program["working_dir"] + saved_program["entrypoint"]], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + env=env_vars, + ) as pipe: + status = "SUCCEEDED" + if pipe.wait(): + status = "FAILED" + output, _ = pipe.communicate() + results = re.search("\nSaved Result:(.+?):End Saved Result\n", output) + result = "" + if results: + result = results.group(1) + + job = Job(job_id=str(uuid4()), client=self) + self._jobs[job.job_id] = { + "status": status, + "logs": output, + "result": result, + "job": job, + } + return job + + def status(self, job_id: str): + return self._jobs[job_id]["status"] + + def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): + """Stops job/program.""" + return f"job:{job_id} has already stopped" + + def result(self, job_id: str): + return self._jobs[job_id]["result"] + + def logs(self, job_id: str): + return self._jobs[job_id]["logs"] + + def filtered_logs(self, job_id: str, **kwargs): + """Return filtered logs.""" + raise NotImplementedError + + ######################### + ####### Functions ####### + ######################### + + def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: + # check if entrypoint exists + if not os.path.exists(os.path.join(program.working_dir, program.entrypoint)): + raise QiskitServerlessException( + f"Entrypoint file [{program.entrypoint}] does not exist " + f"in [{program.working_dir}] working directory." + ) + + pattern = { + "title": program.title, + "provider": program.provider, + "entrypoint": program.entrypoint, + "working_dir": program.working_dir, + "env_vars": program.env_vars, + "arguments": json.dumps({}), + "dependencies": json.dumps(program.dependencies or []), + "client": self, + } + self._patterns.append(pattern) + return QiskitFunction.from_json(pattern) + + def functions(self, **kwargs) -> List[QiskitFunction]: + """Returns list of programs.""" + return [QiskitFunction.from_json(program) for program in self._patterns] + + def function( + self, title: str, provider: Optional[str] = None + ) -> Optional[QiskitFunction]: + functions = {function.title: function for function in self.functions()} + return functions.get(title) diff --git a/client/qiskit_serverless/core/clients/ray_client.py b/client/qiskit_serverless/core/clients/ray_client.py new file mode 100644 index 000000000..b0f2c9204 --- /dev/null +++ b/client/qiskit_serverless/core/clients/ray_client.py @@ -0,0 +1,172 @@ +# This code is a Qiskit project. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +================================================ +Provider (:mod:`qiskit_serverless.core.client`) +================================================ + +.. currentmodule:: qiskit_serverless.core.client + +Qiskit Serverless provider +=========================== + +.. autosummary:: + :toctree: ../stubs/ + + RayClient +""" +# pylint: disable=duplicate-code +import json +import warnings +from typing import Optional, List, Dict, Any, Union +from uuid import uuid4 + +from ray.dashboard.modules.job.sdk import JobSubmissionClient +from qiskit_ibm_runtime import QiskitRuntimeService + +from qiskit_serverless.core.constants import ( + OT_PROGRAM_NAME, + ENV_JOB_ARGUMENTS, +) +from qiskit_serverless.core.job import ( + Configuration, + Job, +) +from qiskit_serverless.core.function import QiskitFunction +from qiskit_serverless.serializers.program_serializers import ( + QiskitObjectsEncoder, +) + +from qiskit_serverless.core.client import BaseClient + + +class RayClient(BaseClient): + """RayClient.""" + + def __init__(self, host: str): + """Ray client + + Args: + host: ray head node host + + Example: + >>> ray_provider = RayClient("http://localhost:8265") + """ + super().__init__("ray-client", host) + self.job_submission_client = JobSubmissionClient(host) + + @classmethod + def from_dict(cls, dictionary: dict): + return RayClient(**dictionary) + + #################### + ####### JOBS ####### + #################### + + def jobs(self, **kwargs) -> List[Job]: + """Return list of jobs. + + Returns: + list of jobs. + """ + return [ + Job(job.job_id, client=self) + for job in self.job_submission_client.list_jobs() + ] + + def job(self, job_id: str) -> Optional[Job]: + """Returns job by job id. + + Args: + job_id: job id + + Returns: + Job instance + """ + return Job( + self.job_submission_client.get_job_info(job_id).submission_id, client=self + ) + + def run( + self, + program: Union[QiskitFunction, str], + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + ) -> Job: + if not isinstance(program, QiskitFunction): + warnings.warn( + "`run` doesn't support program str yet. " + "Send a QiskitFunction instead. " + ) + raise NotImplementedError + + arguments = arguments or {} + entrypoint = f"python {program.entrypoint}" + + # set program name so OT can use it as parent span name + env_vars = { + **(program.env_vars or {}), + **{OT_PROGRAM_NAME: program.title}, + **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, + } + + job_id = self.job_submission_client.submit_job( + entrypoint=entrypoint, + submission_id=f"qs_{uuid4()}", + runtime_env={ + "working_dir": program.working_dir, + "pip": program.dependencies, + "env_vars": env_vars, + }, + ) + return Job(job_id=job_id, client=self) + + def status(self, job_id: str) -> str: + """Check status.""" + return self.job_submission_client.get_job_status(job_id).value + + def stop( + self, job_id: str, service: Optional[QiskitRuntimeService] = None + ) -> Union[str, bool]: + """Stops job/program.""" + return self.job_submission_client.stop_job(job_id) + + def result(self, job_id: str) -> Any: + """Return results.""" + return self.logs(job_id) + + def logs(self, job_id: str) -> str: + """Return logs.""" + return self.job_submission_client.get_job_logs(job_id) + + def filtered_logs(self, job_id: str, **kwargs) -> str: + """Return filtered logs.""" + raise NotImplementedError + + ######################### + ####### Functions ####### + ######################### + + def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: + """Uploads program.""" + raise NotImplementedError("Upload is not available for RayClient.") + + def functions(self, **kwargs) -> List[QiskitFunction]: + """Returns list of available programs.""" + raise NotImplementedError("get_programs is not available for RayClient.") + + def function( + self, title: str, provider: Optional[str] = None + ) -> Optional[QiskitFunction]: + """Returns program based on parameters.""" + raise NotImplementedError("get_program is not available for RayClient.") diff --git a/client/qiskit_serverless/core/clients/serverless_client.py b/client/qiskit_serverless/core/clients/serverless_client.py new file mode 100644 index 000000000..1889f61e4 --- /dev/null +++ b/client/qiskit_serverless/core/clients/serverless_client.py @@ -0,0 +1,596 @@ +# This code is a Qiskit project. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +================================================ +Provider (:mod:`qiskit_serverless.core.client`) +================================================ + +.. currentmodule:: qiskit_serverless.core.client + +Qiskit Serverless provider +=========================== + +.. autosummary:: + :toctree: ../stubs/ + + ServerlessClient +""" +# pylint: disable=duplicate-code +import json +import os.path +import os +import re +import tarfile +from pathlib import Path +from dataclasses import asdict +from typing import Optional, List, Dict, Any, Union + +import requests +from opentelemetry import trace +from qiskit_ibm_runtime import QiskitRuntimeService + +from qiskit_serverless.core.constants import ( + REQUESTS_TIMEOUT, + ENV_GATEWAY_PROVIDER_HOST, + ENV_GATEWAY_PROVIDER_VERSION, + ENV_GATEWAY_PROVIDER_TOKEN, + GATEWAY_PROVIDER_VERSION_DEFAULT, + IBM_SERVERLESS_HOST_URL, + MAX_ARTIFACT_FILE_SIZE_MB, +) +from qiskit_serverless.core.client import BaseClient +from qiskit_serverless.core.files import GatewayFilesClient +from qiskit_serverless.core.job import ( + Job, + Configuration, +) +from qiskit_serverless.core.function import QiskitFunction +from qiskit_serverless.exception import QiskitServerlessException +from qiskit_serverless.utils.json import ( + safe_json_request_as_dict, + safe_json_request_as_list, + safe_json_request, +) +from qiskit_serverless.utils.formatting import format_provider_name_and_title +from qiskit_serverless.serializers.program_serializers import ( + QiskitObjectsEncoder, + QiskitObjectsDecoder, +) + + +class ServerlessClient(BaseClient): + """ + A client for connecting to a specified host. + + Example: + >>> client = ServerlessClient( + >>> name="", + >>> host="", + >>> token="", + >>> ) + """ + + def __init__( # pylint: disable=too-many-positional-arguments + self, + name: Optional[str] = None, + host: Optional[str] = None, + version: Optional[str] = None, + token: Optional[str] = None, + verbose: bool = False, + ): + """ + Initializes the ServerlessClient instance. + + Args: + name: name of client + host: host of gateway + version: version of gateway + token: authorization token + """ + name = name or "gateway-client" + host = host or os.environ.get(ENV_GATEWAY_PROVIDER_HOST) + if host is None: + raise QiskitServerlessException("Please provide `host` of gateway.") + + version = version or os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) + if version is None: + version = GATEWAY_PROVIDER_VERSION_DEFAULT + + token = token or os.environ.get(ENV_GATEWAY_PROVIDER_TOKEN) + if token is None: + raise QiskitServerlessException( + "Authentication credentials must be provided in form of `token`." + ) + + super().__init__(name, host, token) + self.verbose = verbose + self.version = version + self._verify_token(token) + + self._files_client = GatewayFilesClient(self.host, self.token, self.version) + + @classmethod + def from_dict(cls, dictionary: dict): + return ServerlessClient(**dictionary) + + def _verify_token(self, token: str): + """Verify token.""" + try: + safe_json_request( + request=lambda: requests.get( + url=f"{self.host}/api/v1/programs/", + headers={"Authorization": f"Bearer {token}"}, + timeout=REQUESTS_TIMEOUT, + ), + verbose=self.verbose, + ) + except QiskitServerlessException as reason: + raise QiskitServerlessException("Cannot verify token.") from reason + + #################### + ####### JOBS ####### + #################### + + def jobs(self, **kwargs) -> List[Job]: + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.list"): + limit = kwargs.get("limit", 10) + kwargs["limit"] = limit + offset = kwargs.get("offset", 0) + kwargs["offset"] = offset + + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs", + params=kwargs, + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + + return [ + Job(job.get("id"), client=self, raw_data=job) + for job in response_data.get("results", []) + ] + + def job(self, job_id: str) -> Optional[Job]: + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.get"): + url = f"{self.host}/api/{self.version}/jobs/{job_id}/" + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + url, + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + + job = None + job_id = response_data.get("id") + if job_id is not None: + job = Job( + job_id=job_id, + client=self, + ) + + return job + + def run( + self, + program: Union[QiskitFunction, str], + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + provider: Optional[str] = None, + ) -> Job: + if isinstance(program, QiskitFunction): + title = program.title + provider = program.provider + else: + title = str(program) + + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.run") as span: + span.set_attribute("program", title) + span.set_attribute("provider", provider) + span.set_attribute("arguments", str(arguments)) + + url = f"{self.host}/api/{self.version}/programs/run/" + + data = { + "title": title, + "provider": provider, + "arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder), + } # type: Dict[str, Any] + if config: + data["config"] = asdict(config) + else: + data["config"] = asdict(Configuration()) + + response_data = safe_json_request_as_dict( + request=lambda: requests.post( + url=url, + json=data, + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + job_id = response_data.get("id") + span.set_attribute("job.id", job_id) + + return Job(job_id, client=self) + + def status(self, job_id: str): + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.status"): + default_status = "Unknown" + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs/{job_id}/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + + return response_data.get("status", default_status) + + def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.stop"): + if service: + data = { + "service": json.dumps(service, cls=QiskitObjectsEncoder), + } + else: + data = { + "service": None, + } + response_data = safe_json_request_as_dict( + request=lambda: requests.post( + f"{self.host}/api/{self.version}/jobs/{job_id}/stop/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + json=data, + ) + ) + + return response_data.get("message") + + def result(self, job_id: str): + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.result"): + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs/{job_id}/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + return json.loads( + response_data.get("result", "{}") or "{}", cls=QiskitObjectsDecoder + ) + + def logs(self, job_id: str): + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.logs"): + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs/{job_id}/logs/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + return response_data.get("logs") + + def filtered_logs(self, job_id: str, **kwargs): + all_logs = self.logs(job_id=job_id) + included = "" + include = kwargs.get("include") + if include is not None: + for line in all_logs.split("\n"): + if re.search(include, line) is not None: + included = included + line + "\n" + else: + included = all_logs + + excluded = "" + exclude = kwargs.get("exclude") + if exclude is not None: + for line in included.split("\n"): + if line != "" and re.search(exclude, line) is None: + excluded = excluded + line + "\n" + else: + excluded = included + return excluded + + ######################### + ####### Functions ####### + ######################### + + def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.run") as span: + span.set_attribute("program", program.title) + url = f"{self.host}/api/{self.version}/programs/upload/" + + if program.image is not None: + # upload function with custom image + function_uploaded = _upload_with_docker_image( + program=program, url=url, token=self.token, span=span + ) + elif program.entrypoint is not None: + # upload funciton with artifact + function_uploaded = _upload_with_artifact( + program=program, url=url, token=self.token, span=span + ) + else: + raise QiskitServerlessException( + "Function must either have `entrypoint` or `image` specified." + ) + + return function_uploaded + + def functions(self, **kwargs) -> List[QiskitFunction]: + """Returns list of available programs.""" + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("program.list"): + response_data = safe_json_request_as_list( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/programs", + headers={"Authorization": f"Bearer {self.token}"}, + params=kwargs, + timeout=REQUESTS_TIMEOUT, + ) + ) + + return [ + QiskitFunction( + program.get("title"), + provider=program.get("provider", None), + raw_data=program, + client=self, + description=program.get("description"), + ) + for program in response_data + ] + + def function( + self, title: str, provider: Optional[str] = None + ) -> Optional[QiskitFunction]: + """Returns program based on parameters.""" + provider, title = format_provider_name_and_title( + request_provider=provider, title=title + ) + + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("program.get_by_title"): + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/programs/get_by_title/{title}", + headers={"Authorization": f"Bearer {self.token}"}, + params={"provider": provider}, + timeout=REQUESTS_TIMEOUT, + ) + ) + return QiskitFunction( + response_data.get("title"), + provider=response_data.get("provider", None), + raw_data=response_data, + client=self, + ) + + ##################### + ####### FILES ####### + ##################### + + def files(self, provider: Optional[str] = None) -> List[str]: + """Returns list of available files produced by programs to download.""" + return self._files_client.list(provider) + + def file_download( + self, + file: str, + target_name: Optional[str] = None, + download_location: str = "./", + provider: Optional[str] = None, + ): + """Download file.""" + return self._files_client.download( + file, download_location, target_name, provider + ) + + def file_delete(self, file: str, provider: Optional[str] = None): + """Deletes file uploaded or produced by the programs,""" + return self._files_client.delete(file, provider) + + def file_upload(self, file: str, provider: Optional[str] = None): + """Upload file.""" + return self._files_client.upload(file, provider) + + +class IBMServerlessClient(ServerlessClient): + """ + A client for connecting to the IBM serverless host. + + Credentials can be saved to disk by calling the `save_account()` method:: + + from qiskit_serverless import IBMServerlessClient + IBMServerlessClient.save_account(token=) + + Once the credentials are saved, you can simply instantiate the client with no + constructor args, as shown below. + + from qiskit_serverless import IBMServerlessClient + client = IBMServerlessClient() + + Instead of saving credentials to disk, you can also set the environment variable + ENV_GATEWAY_PROVIDER_TOKEN and then instantiate the client as below:: + + from qiskit_serverless import IBMServerlessClient + client = IBMServerlessClient() + + You can also enable an account just for the current session by instantiating the + provider with the API token:: + + from qiskit_serverless import IBMServerlessClient + client = IBMServerlessClient(token=) + """ + + def __init__(self, token: Optional[str] = None, name: Optional[str] = None): + """ + Initialize a client with access to an IBMQ-provided remote cluster. + + If a ``token`` is used to initialize an instance, the ``name`` argument + will be ignored. + + If only a ``name`` is provided, the token for the named account will + be retrieved from the user's local IBM Quantum account config file. + + If neither argument is provided, the token will be searched for in the + environment variables and also in the local IBM Quantum account config + file using the default account name. + + Args: + token: IBM quantum token + name: Name of the account to load + """ + token = token or QiskitRuntimeService(name=name).active_account().get("token") + super().__init__(token=token, host=IBM_SERVERLESS_HOST_URL) + + @staticmethod + def save_account( + token: Optional[str] = None, + name: Optional[str] = None, + overwrite: Optional[bool] = False, + ) -> None: + """ + Save the account to disk for future use. + + Args: + token: IBM Quantum API token + name: Name of the account to save + overwrite: ``True`` if the existing account is to be overwritten + """ + QiskitRuntimeService.save_account(token=token, name=name, overwrite=overwrite) + + +def _upload_with_docker_image( + program: QiskitFunction, url: str, token: str, span: Any +) -> QiskitFunction: + """Uploads function with custom docker image. + + Args: + program (QiskitFunction): function instance + url (str): upload gateway url + token (str): auth token + span (Any): tracing span + + Returns: + str: uploaded function name + """ + response_data = safe_json_request_as_dict( + request=lambda: requests.post( + url=url, + data={ + "title": program.title, + "provider": program.provider, + "image": program.image, + "arguments": json.dumps({}), + "dependencies": json.dumps(program.dependencies or []), + "env_vars": json.dumps(program.env_vars or {}), + "description": program.description, + }, + headers={"Authorization": f"Bearer {token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + program_title = response_data.get("title", "na") + program_provider = response_data.get("provider", "na") + span.set_attribute("program.title", program_title) + span.set_attribute("program.provider", program_provider) + return QiskitFunction.from_json(response_data) + + +def _upload_with_artifact( + program: QiskitFunction, url: str, token: str, span: Any +) -> QiskitFunction: + """Uploads function with artifact. + + Args: + program (QiskitFunction): function instance + url (str): endpoint for gateway upload + token (str): auth token + span (Any): tracing span + + Raises: + QiskitServerlessException: if no entrypoint or size of artifact is too large. + + Returns: + str: uploaded function name + """ + artifact_file_path = os.path.join(program.working_dir, "artifact.tar") + + # check if entrypoint exists + if ( + not os.path.exists(os.path.join(program.working_dir, program.entrypoint)) + or program.entrypoint[0] == "/" + ): + raise QiskitServerlessException( + f"Entrypoint file [{program.entrypoint}] does not exist " + f"in [{program.working_dir}] working directory." + ) + + try: + with tarfile.open(artifact_file_path, "w", dereference=True) as tar: + for filename in os.listdir(program.working_dir): + fpath = os.path.join(program.working_dir, filename) + tar.add(fpath, arcname=filename) + + # check file size + size_in_mb = Path(artifact_file_path).stat().st_size / 1024**2 + if size_in_mb > MAX_ARTIFACT_FILE_SIZE_MB: + raise QiskitServerlessException( + f"{artifact_file_path} is {int(size_in_mb)} Mb, " + f"which is greater than {MAX_ARTIFACT_FILE_SIZE_MB} allowed. " + f"Try to reduce size of `working_dir`." + ) + + with open(artifact_file_path, "rb") as file: + response_data = safe_json_request_as_dict( + request=lambda: requests.post( + url=url, + data={ + "title": program.title, + "provider": program.provider, + "entrypoint": program.entrypoint, + "arguments": json.dumps({}), + "dependencies": json.dumps(program.dependencies or []), + "env_vars": json.dumps(program.env_vars or {}), + "description": program.description, + }, + files={"artifact": file}, + headers={"Authorization": f"Bearer {token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + program_title = response_data.get("title", "na") + program_provider = response_data.get("provider", "na") + span.set_attribute("program.title", program_title) + span.set_attribute("program.provider", program_provider) + response_function = QiskitFunction.from_json(response_data) + except Exception as error: # pylint: disable=broad-exception-caught + raise QiskitServerlessException from error + finally: + if os.path.exists(artifact_file_path): + os.remove(artifact_file_path) + + return response_function diff --git a/client/qiskit_serverless/core/decorators.py b/client/qiskit_serverless/core/decorators.py index 7fb4aa924..ffccc9305 100644 --- a/client/qiskit_serverless/core/decorators.py +++ b/client/qiskit_serverless/core/decorators.py @@ -358,13 +358,13 @@ def distribute_qiskit_function( # pylint: disable=import-outside-toplevel,cyclic-import from qiskit_serverless import QiskitServerlessException from qiskit_serverless.core.function import QiskitFunction - from qiskit_serverless.core.client import ServerlessProvider + from qiskit_serverless import ServerlessClient # create provider if provider is None: # try to create from env vars try: - provider = ServerlessProvider() + provider = ServerlessClient() except QiskitServerlessException as qs_error: raise QiskitServerlessException( "Set provider in arguments for `distribute_program` " diff --git a/client/qiskit_serverless/core/files.py b/client/qiskit_serverless/core/files.py index 8104fe64a..fbb799bcc 100644 --- a/client/qiskit_serverless/core/files.py +++ b/client/qiskit_serverless/core/files.py @@ -37,7 +37,7 @@ REQUESTS_STREAMING_TIMEOUT, REQUESTS_TIMEOUT, ) -from qiskit_serverless.utils.json import safe_json_request +from qiskit_serverless.utils.json import safe_json_request_as_dict class GatewayFilesClient: @@ -109,7 +109,7 @@ def list(self, provider: Optional[str] = None) -> List[str]: """Returns list of available files to download produced by programs,""" tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("files.list"): - response_data = safe_json_request( + response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/files/", params={"provider": provider}, @@ -123,7 +123,7 @@ def delete(self, file: str, provider: Optional[str] = None) -> Optional[str]: """Deletes file uploaded or produced by the programs,""" tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("files.delete"): - response_data = safe_json_request( + response_data = safe_json_request_as_dict( request=lambda: requests.delete( f"{self.host}/api/{self.version}/files/delete/", data={"file": file, "provider": provider}, diff --git a/client/qiskit_serverless/core/function.py b/client/qiskit_serverless/core/function.py index 6ee3cfa90..10ea2f0b3 100644 --- a/client/qiskit_serverless/core/function.py +++ b/client/qiskit_serverless/core/function.py @@ -58,7 +58,7 @@ class QiskitFunction: # pylint: disable=too-many-instance-attributes version: Optional[str] = None tags: Optional[List[str]] = None raw_data: Optional[Dict[str, Any]] = None - job_client: Optional[Any] = None + client: Optional[Any] = None image: Optional[str] = None validate: bool = True schema: Optional[str] = None @@ -100,7 +100,7 @@ def run(self, **kwargs): Returns: Job: job handler for function execution """ - if self.job_client is None: + if self.client is None: raise ValueError("No clients specified for a function.") if self.validate: @@ -112,14 +112,14 @@ def run(self, **kwargs): ) config = kwargs.pop("config", None) - return self.job_client.run( - program=self.title, - provider=self.provider, + return self.client.run( + program=self, arguments=kwargs, config=config, ) def get_jobs(self): + # pylint: disable=duplicate-code """List of jobs created in this function. Raises: @@ -149,7 +149,7 @@ def jobs(self): Job, ) - if self.job_client is None: + if self.client is None: raise ValueError("No clients specified for a function.") if self.validate: @@ -160,12 +160,12 @@ def jobs(self): f"Function validation failed. Validation errors:\n {error_string}", ) - response = self.job_client.get_jobs( + response = self.client.get_jobs( title=self.title, provider=self.provider, ) jobs = [ - Job(job_id=job.get("id"), job_client=self.job_client, raw_data=job) + Job(job_id=job.get("id"), client=self.client, raw_data=job) for job in response ] return jobs diff --git a/client/qiskit_serverless/core/job.py b/client/qiskit_serverless/core/job.py index d38b53765..03e6f6d42 100644 --- a/client/qiskit_serverless/core/job.py +++ b/client/qiskit_serverless/core/job.py @@ -31,46 +31,30 @@ import json import logging import os -import re -import tarfile import time -import sys import warnings -from pathlib import Path -from typing import Dict, Any, Optional, List, Union -from uuid import uuid4 -from dataclasses import asdict, dataclass - -import subprocess -from subprocess import Popen +from typing import Dict, Any, Optional +from dataclasses import dataclass import ray.runtime_env import requests -from ray.dashboard.modules.job.sdk import JobSubmissionClient -from opentelemetry import trace from qiskit_ibm_runtime import QiskitRuntimeService from qiskit_serverless.core.constants import ( - OT_PROGRAM_NAME, REQUESTS_TIMEOUT, ENV_JOB_GATEWAY_TOKEN, ENV_JOB_GATEWAY_HOST, ENV_JOB_ID_GATEWAY, ENV_GATEWAY_PROVIDER_VERSION, GATEWAY_PROVIDER_VERSION_DEFAULT, - MAX_ARTIFACT_FILE_SIZE_MB, - ENV_JOB_ARGUMENTS, ) -from qiskit_serverless.core.function import QiskitFunction -from qiskit_serverless.exception import QiskitServerlessException from qiskit_serverless.serializers.program_serializers import ( QiskitObjectsEncoder, QiskitObjectsDecoder, ) -from qiskit_serverless.utils.json import is_jsonable, safe_json_request -from qiskit_serverless.utils.formatting import format_provider_name_and_title +from qiskit_serverless.utils.json import is_jsonable RuntimeEnv = ray.runtime_env.RuntimeEnv @@ -92,578 +76,28 @@ class Configuration: # pylint: disable=too-many-instance-attributes auto_scaling: Optional[bool] = False -class BaseJobClient: - """Base class for Job clients.""" - - def run( - self, - program: Union[str, QiskitFunction], - provider: Optional[str] = None, - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> "Job": - """Runs program.""" - raise NotImplementedError - - def upload(self, program: QiskitFunction): - """Uploads program.""" - raise NotImplementedError - - def get(self, job_id) -> Optional["Job"]: - """Returns job by job id""" - raise NotImplementedError - - def list(self, **kwargs) -> List["Job"]: - """Returns list of jobs.""" - raise NotImplementedError - - def status(self, job_id: str): - """Check status.""" - raise NotImplementedError - - def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): - """Stops job/program.""" - raise NotImplementedError - - def logs(self, job_id: str): - """Return logs.""" - raise NotImplementedError - - def filtered_logs(self, job_id: str, **kwargs): - """Return filtered logs.""" - raise NotImplementedError - - def result(self, job_id: str): - """Return results.""" - raise NotImplementedError - - def get_programs(self, **kwargs): - """Returns list of programs.""" - raise NotImplementedError - - def get_program( - self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - """Returns program based on parameters.""" - raise NotImplementedError - - def get_jobs(self, title: str, provider: Optional[str] = None): - """Returns job ids of executed program based on parameters.""" - raise NotImplementedError - - -class RayJobClient(BaseJobClient): - """RayJobClient.""" - - def __init__(self, client: JobSubmissionClient): - """Ray job client. - Wrapper around JobSubmissionClient - - Args: - client: JobSubmissionClient - """ - self._job_client = client - - def status(self, job_id: str): - return self._job_client.get_job_status(job_id).value - - def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): - return self._job_client.stop_job(job_id) - - def logs(self, job_id: str): - return self._job_client.get_job_logs(job_id) - - def filtered_logs(self, job_id: str, **kwargs): - raise NotImplementedError - - def result(self, job_id: str): - return self.logs(job_id) - - def get(self, job_id) -> Optional["Job"]: - return Job(self._job_client.get_job_info(job_id).job_id, job_client=self) - - def list(self, **kwargs) -> List["Job"]: - return [ - Job(job.job_id, job_client=self) for job in self._job_client.list_jobs() - ] - - def run( - self, - program: Union[str, QiskitFunction], - provider: Optional[str] = None, - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ): - if not isinstance(program, QiskitFunction): - warnings.warn( - "`run` doesn't support program str yet. " - "Send a QiskitFunction instead. " - ) - return NotImplementedError - - arguments = arguments or {} - entrypoint = f"python {program.entrypoint}" - - # set program name so OT can use it as parent span name - env_vars = { - **(program.env_vars or {}), - **{OT_PROGRAM_NAME: program.title}, - **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, - } - - job_id = self._job_client.submit_job( - entrypoint=entrypoint, - submission_id=f"qs_{uuid4()}", - runtime_env={ - "working_dir": program.working_dir, - "pip": program.dependencies, - "env_vars": env_vars, - }, - ) - return Job(job_id=job_id, job_client=self) - - def upload(self, program: QiskitFunction): - raise NotImplementedError("Upload is not available for RayJobClient.") - - -class LocalJobClient(BaseJobClient): - """LocalJobClient.""" - - def __init__(self): - """Local job client. - - Args: - """ - self._jobs = {} - self._patterns = [] - - def status(self, job_id: str): - return self._jobs[job_id]["status"] - - def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): - """Stops job/program.""" - return f"job:{job_id} has already stopped" - - def logs(self, job_id: str): - return self._jobs[job_id]["logs"] - - def result(self, job_id: str): - return self._jobs[job_id]["result"] - - def get(self, job_id) -> Optional["Job"]: - return self._jobs[job_id]["job"] - - def list(self, **kwargs) -> List["Job"]: - return [job["job"] for job in list(self._jobs.values())] - - def filtered_logs(self, job_id: str, **kwargs): - """Return filtered logs.""" - raise NotImplementedError - - def run( # pylint: disable=too-many-locals - self, - program: Union[str, QiskitFunction], - provider: Optional[str] = None, - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ): - if isinstance(program, QiskitFunction): - title = program.title - else: - title = str(program) - - for pattern in self._patterns: - if pattern["title"] == title: - saved_program = pattern - if saved_program[ # pylint: disable=possibly-used-before-assignment - "dependencies" - ]: - dept = json.loads(saved_program["dependencies"]) - for dependency in dept: - subprocess.check_call( - [sys.executable, "-m", "pip", "install", dependency] - ) - arguments = arguments or {} - env_vars = { - **(saved_program["env_vars"] or {}), - **{OT_PROGRAM_NAME: saved_program["title"]}, - **{"PATH": os.environ["PATH"]}, - **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, - } - - with Popen( - ["python", saved_program["working_dir"] + saved_program["entrypoint"]], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - env=env_vars, - ) as pipe: - status = "SUCCEEDED" - if pipe.wait(): - status = "FAILED" - output, _ = pipe.communicate() - results = re.search("\nSaved Result:(.+?):End Saved Result\n", output) - result = "" - if results: - result = results.group(1) - - job = Job(job_id=str(uuid4()), job_client=self) - entry = {"status": status, "logs": output, "result": result, "job": job} - self._jobs[job.job_id] = entry - return job - - def upload(self, program: QiskitFunction): - # check if entrypoint exists - if not os.path.exists(os.path.join(program.working_dir, program.entrypoint)): - raise QiskitServerlessException( - f"Entrypoint file [{program.entrypoint}] does not exist " - f"in [{program.working_dir}] working directory." - ) - self._patterns.append( - { - "title": program.title, - "provider": program.provider, - "entrypoint": program.entrypoint, - "working_dir": program.working_dir, - "env_vars": program.env_vars, - "arguments": json.dumps({}), - "dependencies": json.dumps(program.dependencies or []), - } - ) - return program.title - - def get_programs(self, **kwargs): - """Returns list of programs.""" - return [ - QiskitFunction( - program.get("title"), - provider=program.get("provider", None), - raw_data=program, - job_client=self, - ) - for program in self._patterns - ] - - def get_program( - self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - """Returns program based on parameters.""" - all_programs = { - program.get("title"): QiskitFunction( - program.get("title"), - provider=program.get("provider", None), - raw_data=program, - job_client=self, - ) - for program in self._patterns - } - return all_programs.get("title") - - -class GatewayJobClient(BaseJobClient): - """GatewayJobClient.""" - - def __init__(self, host: str, token: str, version: str): - """Job client for Gateway service. - - Args: - host: gateway host - version: gateway version - token: authorization token - """ - self.host = host - self.version = version - self._token = token - - def run( # pylint: disable=too-many-locals - self, - program: Union[str, QiskitFunction], - provider: Optional[str] = None, - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> "Job": - if isinstance(program, QiskitFunction): - title = program.title - else: - title = str(program) - - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.run") as span: - span.set_attribute("program", title) - span.set_attribute("provider", provider) - span.set_attribute("arguments", str(arguments)) - - url = f"{self.host}/api/{self.version}/programs/run/" - - data = { - "title": title, - "provider": provider, - "arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder), - } # type: Dict[str, Any] - if config: - data["config"] = asdict(config) - else: - data["config"] = asdict(Configuration()) - - response_data = safe_json_request( - request=lambda: requests.post( - url=url, - json=data, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - job_id = response_data.get("id") - span.set_attribute("job.id", job_id) - - return Job(job_id, job_client=self) - - def upload(self, program: QiskitFunction): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.run") as span: - span.set_attribute("program", program.title) - url = f"{self.host}/api/{self.version}/programs/upload/" - - if program.image is not None: - # upload function with custom image - program_title = _upload_with_docker_image( - program=program, url=url, token=self._token, span=span - ) - elif program.entrypoint is not None: - # upload funciton with artifact - program_title = _upload_with_artifact( - program=program, url=url, token=self._token, span=span - ) - else: - raise QiskitServerlessException( - "Function must either have `entrypoint` or `image` specified." - ) - - return program_title - - def status(self, job_id: str): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.status"): - default_status = "Unknown" - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs/{job_id}/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - - return response_data.get("status", default_status) - - def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.stop"): - if service: - data = { - "service": json.dumps(service, cls=QiskitObjectsEncoder), - } - else: - data = { - "service": None, - } - response_data = safe_json_request( - request=lambda: requests.post( - f"{self.host}/api/{self.version}/jobs/{job_id}/stop/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - json=data, - ) - ) - - return response_data.get("message") - - def logs(self, job_id: str): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.logs"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs/{job_id}/logs/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return response_data.get("logs") - - def filtered_logs(self, job_id: str, **kwargs): - all_logs = self.logs(job_id=job_id) - included = "" - include = kwargs.get("include") - if include is not None: - for line in all_logs.split("\n"): - if re.search(include, line) is not None: - included = included + line + "\n" - else: - included = all_logs - - excluded = "" - exclude = kwargs.get("exclude") - if exclude is not None: - for line in included.split("\n"): - if line != "" and re.search(exclude, line) is None: - excluded = excluded + line + "\n" - else: - excluded = included - return excluded - - def result(self, job_id: str): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.result"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs/{job_id}/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return json.loads( - response_data.get("result", "{}") or "{}", cls=QiskitObjectsDecoder - ) - - def get(self, job_id) -> Optional["Job"]: - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.get"): - url = f"{self.host}/api/{self.version}/jobs/{job_id}/" - response_data = safe_json_request( - request=lambda: requests.get( - url, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - - job = None - job_id = response_data.get("id") - if job_id is not None: - job = Job( - job_id=response_data.get("id"), - job_client=self, - ) - - return job - - def list(self, **kwargs) -> List["Job"]: - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.list"): - limit = kwargs.get("limit", 10) - kwargs["limit"] = limit - offset = kwargs.get("offset", 0) - kwargs["offset"] = offset - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs", - params=kwargs, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return [ - Job(job.get("id"), job_client=self, raw_data=job) - for job in response_data.get("results", []) - ] - - def get_programs(self, **kwargs): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("program.list"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs", - headers={"Authorization": f"Bearer {self._token}"}, - params=kwargs, - timeout=REQUESTS_TIMEOUT, - ) - ) - return [ - QiskitFunction( - program.get("title"), - provider=program.get("provider", None), - raw_data=program, - job_client=self, - description=program.get("description"), - ) - for program in response_data - ] - - def get_program( - self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - """Returns program based on parameters.""" - provider, title = format_provider_name_and_title( - request_provider=provider, title=title - ) - - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("program.get_by_title"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs/get_by_title/{title}", - headers={"Authorization": f"Bearer {self._token}"}, - params={"provider": provider}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return QiskitFunction( - response_data.get("title"), - provider=response_data.get("provider", None), - raw_data=response_data, - job_client=self, - ) - - def get_jobs(self, title: str, provider: Optional[str] = None): - """Returns job ids executed the program based on parameters.""" - provider, title = format_provider_name_and_title( - request_provider=provider, title=title - ) - - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("program.get_by_title"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs/get_by_title/{title}", - headers={"Authorization": f"Bearer {self._token}"}, - params={"provider": provider}, - timeout=REQUESTS_TIMEOUT, - ) - ) - program_id = response_data.get("id", None) - if not program_id: - return None - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs/{program_id}/get_jobs/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return response_data - - class Job: """Job.""" def __init__( self, job_id: str, - job_client: BaseJobClient, + client: Any, raw_data: Optional[Dict[str, Any]] = None, ): """Job class for async script execution. Args: job_id: if of the job - job_client: job client + client: client """ self.job_id = job_id - self._job_client = job_client + self._client = client self.raw_data = raw_data or {} def status(self): """Returns status of the job.""" - return _map_status_to_serverless(self._job_client.status(self.job_id)) + return _map_status_to_serverless(self._client.status(self.job_id)) def stop(self, service: Optional[QiskitRuntimeService] = None): """Stops the job from running.""" @@ -677,11 +111,11 @@ def stop(self, service: Optional[QiskitRuntimeService] = None): def cancel(self, service: Optional[QiskitRuntimeService] = None): """Cancels the job.""" - return self._job_client.stop(self.job_id, service=service) + return self._client.stop(self.job_id, service=service) def logs(self) -> str: """Returns logs of the job.""" - return self._job_client.logs(self.job_id) + return self._client.logs(self.job_id) def filtered_logs(self, **kwargs) -> str: """Returns logs of the job. @@ -689,7 +123,7 @@ def filtered_logs(self, **kwargs) -> str: include: rex expression finds match in the log line to be included exclude: rex expression finds match in the log line to be excluded """ - return self._job_client.filtered_logs(job_id=self.job_id, **kwargs) + return self._client.filtered_logs(job_id=self.job_id, **kwargs) def result(self, wait=True, cadence=5, verbose=False, maxwait=0): """Return results of the job. @@ -712,7 +146,7 @@ def result(self, wait=True, cadence=5, verbose=False, maxwait=0): logging.info(count) # Retrieve the results. If they're string format, try to decode to a dictionary. - results = self._job_client.result(self.job_id) + results = self._client.result(self.job_id) if isinstance(results, str): try: results = json.loads(results, cls=QiskitObjectsDecoder) @@ -813,115 +247,3 @@ def _map_status_to_serverless(status: str) -> str: return status_map[status] except KeyError: return status - - -def _upload_with_docker_image( - program: QiskitFunction, url: str, token: str, span: Any -) -> str: - """Uploads function with custom docker image. - - Args: - program (QiskitFunction): function instance - url (str): upload gateway url - token (str): auth token - span (Any): tracing span - - Returns: - str: uploaded function name - """ - response_data = safe_json_request( - request=lambda: requests.post( - url=url, - data={ - "title": program.title, - "provider": program.provider, - "image": program.image, - "arguments": json.dumps({}), - "dependencies": json.dumps(program.dependencies or []), - "env_vars": json.dumps(program.env_vars or {}), - "description": program.description, - }, - headers={"Authorization": f"Bearer {token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - program_title = response_data.get("title", "na") - program_provider = response_data.get("provider", "na") - span.set_attribute("program.title", program_title) - span.set_attribute("program.provider", program_provider) - return program_title - - -def _upload_with_artifact( - program: QiskitFunction, url: str, token: str, span: Any -) -> str: - """Uploads function with artifact. - - Args: - program (QiskitFunction): function instance - url (str): endpoint for gateway upload - token (str): auth token - span (Any): tracing span - - Raises: - QiskitServerlessException: if no entrypoint or size of artifact is too large. - - Returns: - str: uploaded function name - """ - artifact_file_path = os.path.join(program.working_dir, "artifact.tar") - - # check if entrypoint exists - if ( - not os.path.exists(os.path.join(program.working_dir, program.entrypoint)) - or program.entrypoint[0] == "/" - ): - raise QiskitServerlessException( - f"Entrypoint file [{program.entrypoint}] does not exist " - f"in [{program.working_dir}] working directory." - ) - - try: - with tarfile.open(artifact_file_path, "w", dereference=True) as tar: - for filename in os.listdir(program.working_dir): - fpath = os.path.join(program.working_dir, filename) - tar.add(fpath, arcname=filename) - - # check file size - size_in_mb = Path(artifact_file_path).stat().st_size / 1024**2 - if size_in_mb > MAX_ARTIFACT_FILE_SIZE_MB: - raise QiskitServerlessException( - f"{artifact_file_path} is {int(size_in_mb)} Mb, " - f"which is greater than {MAX_ARTIFACT_FILE_SIZE_MB} allowed. " - f"Try to reduce size of `working_dir`." - ) - - with open(artifact_file_path, "rb") as file: - response_data = safe_json_request( - request=lambda: requests.post( - url=url, - data={ - "title": program.title, - "provider": program.provider, - "entrypoint": program.entrypoint, - "arguments": json.dumps({}), - "dependencies": json.dumps(program.dependencies or []), - "env_vars": json.dumps(program.env_vars or {}), - "description": program.description, - }, - files={"artifact": file}, - headers={"Authorization": f"Bearer {token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - program_title = response_data.get("title", "na") - program_provider = response_data.get("provider", "na") - span.set_attribute("program.title", program_title) - span.set_attribute("program.provider", program_provider) - except Exception as error: # pylint: disable=broad-exception-caught - raise QiskitServerlessException from error - finally: - if os.path.exists(artifact_file_path): - os.remove(artifact_file_path) - - return program_title diff --git a/client/qiskit_serverless/utils/json.py b/client/qiskit_serverless/utils/json.py index f611b9f6a..61ab5baeb 100644 --- a/client/qiskit_serverless/utils/json.py +++ b/client/qiskit_serverless/utils/json.py @@ -26,9 +26,9 @@ JsonSerializable """ import json -from abc import ABC +from abc import ABC, abstractmethod from json import JSONEncoder -from typing import Optional, Type, Callable, Dict, Any +from typing import List, Optional, Type, Callable, Dict, Any, Union import requests @@ -40,9 +40,9 @@ class JsonSerializable(ABC): """Classes that can be serialized as json.""" @classmethod + @abstractmethod def from_dict(cls, dictionary: dict): """Converts dict to object.""" - raise NotImplementedError def to_dict(self) -> dict: """Converts class to dict.""" @@ -74,7 +74,49 @@ def is_jsonable(data, cls: Optional[Type[JSONEncoder]] = None): return False -def safe_json_request(request: Callable, verbose: bool = False) -> Dict[str, Any]: +def safe_json_request_as_list(request: Callable, verbose: bool = False) -> List[Any]: + """Returns parsed json data from request. + + Args: + request: callable for request. + verbose: post reason in error message + + Example: + >>> safe_json_request(request=lambda: requests.get("https://ibm.com")) + + Returns: + parsed json response as list structure + """ + response = safe_json_request(request, verbose) + if isinstance(response, List): + return response + raise TypeError("JSON is not a List") + + +def safe_json_request_as_dict( + request: Callable, verbose: bool = False +) -> Dict[str, Any]: + """Returns parsed json data from request. + + Args: + request: callable for request. + verbose: post reason in error message + + Example: + >>> safe_json_request(request=lambda: requests.get("https://ibm.com")) + + Returns: + parsed json response as dict structure + """ + response = safe_json_request(request, verbose) + if isinstance(response, Dict): + return response + raise TypeError("JSON is not a Dict") + + +def safe_json_request( + request: Callable, verbose: bool = False +) -> Union[Dict[str, Any], List[Any]]: """Returns parsed json data from request. Args: diff --git a/client/tests/core/test_job.py b/client/tests/core/test_job.py index 08485f226..f0edcce0d 100644 --- a/client/tests/core/test_job.py +++ b/client/tests/core/test_job.py @@ -1,19 +1,29 @@ """Tests job.""" + +# pylint: disable=too-few-public-methods import os from unittest import TestCase -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock, patch import numpy as np import requests_mock from qiskit.circuit.random import random_circuit +from qiskit_serverless import ServerlessClient from qiskit_serverless.core.constants import ( ENV_JOB_GATEWAY_HOST, ENV_JOB_ID_GATEWAY, ENV_JOB_GATEWAY_TOKEN, ) -from qiskit_serverless.core.job import save_result, GatewayJobClient +from qiskit_serverless.core.job import save_result + + +class ResponseMock: + """Utility class to mock request.get response with a json""" + + ok = True + text = "{}" class TestJob(TestCase): @@ -40,9 +50,10 @@ def test_save_result(self): ) self.assertTrue(result) + @patch("requests.get", Mock(return_value=ResponseMock())) def test_filtered_logs(self): """Tests job filtered log.""" - client = GatewayJobClient("host", "token", "version") + client = ServerlessClient(host="host", token="token", version="version") client.logs = MagicMock( return_value="This is the line 1\nThis is the second line\nOK. This is the last line.\n", # pylint: disable=line-too-long ) diff --git a/client/tests/core/test_pattern.py b/client/tests/core/test_pattern.py index 79909448f..99cbd2dd0 100644 --- a/client/tests/core/test_pattern.py +++ b/client/tests/core/test_pattern.py @@ -3,11 +3,9 @@ from testcontainers.compose import DockerCompose -from qiskit_serverless import BaseClient -from qiskit_serverless.core import ComputeResource +from qiskit_serverless import RayClient, QiskitFunction from qiskit_serverless.core.job import Job -from qiskit_serverless.core.function import QiskitFunction -from tests.utils import wait_for_job_client, wait_for_job_completion +from tests.utils import wait_for_ray_ready, wait_for_job_completion resources_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "../resources" @@ -22,14 +20,11 @@ def test_program(): ) as compose: host = compose.get_service_host("testrayhead", 8265) port = compose.get_service_port("testrayhead", 8265) + connection_url = f"http://{host}:{port}" - serverless = BaseClient( - name="docker", - compute_resource=ComputeResource( - name="docker", host=host, port_job_server=port - ), - ) - wait_for_job_client(serverless) + wait_for_ray_ready(connection_url) + + serverless = RayClient(host=connection_url) program = QiskitFunction( title="simple_job", @@ -49,7 +44,7 @@ def test_program(): assert job.in_terminal_state() assert job.status() == "DONE" - recovered_job = serverless.get_job_by_id(job.job_id) + recovered_job = serverless.job(job.job_id) assert recovered_job.job_id == job.job_id assert "42" in recovered_job.logs() assert recovered_job.in_terminal_state() diff --git a/client/tests/utils.py b/client/tests/utils.py index 90ae4b16a..1fe95cc89 100644 --- a/client/tests/utils.py +++ b/client/tests/utils.py @@ -1,17 +1,20 @@ """Test utils.""" import time -from qiskit_serverless import BaseClient +from ray.dashboard.modules.job.sdk import JobSubmissionClient + from qiskit_serverless.core.job import Job -def wait_for_job_client(serverless: BaseClient, timeout: int = 60): - """Utility function that wait for job client to awake.""" +def wait_for_ray_ready(connection_url: str, timeout: int = 60): + """Utility function that waits for ray to be up.""" + client = None must_finish = time.time() + timeout - while time.time() < must_finish: - if serverless.job_client() is not None: - break - time.sleep(1) + while time.time() < must_finish and not client: + try: + client = JobSubmissionClient(connection_url) + except ConnectionError: + time.sleep(1) def wait_for_job_completion(job: Job, timeout: int = 60):