Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Function and jobs refactor - resolve circular dependencies #1517

Merged
merged 33 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
76a3349
refactor client and fix tests
korgan00 Oct 10, 2024
d8595f6
remove client/arguments.serverless
korgan00 Oct 10, 2024
41e9de0
remove unused TIMEOUTs
korgan00 Oct 11, 2024
3df8944
functions deprecation warnings
korgan00 Oct 11, 2024
204510b
added get_job_by_id with deprecation warning
korgan00 Oct 11, 2024
ba6684c
added __init__.py
korgan00 Oct 11, 2024
a3c0809
deprecated list function correction
korgan00 Oct 11, 2024
fd77ab2
replace deleted _token with token
korgan00 Oct 11, 2024
5149fed
**kwargs
korgan00 Oct 11, 2024
ad24a13
add duplicate code exception
korgan00 Oct 11, 2024
f289a7e
fixed run function
korgan00 Oct 11, 2024
8dc49bd
fixed code quality
korgan00 Oct 11, 2024
c6bf152
fix linter
korgan00 Oct 14, 2024
7d7d7db
delete file management from client. Use it only in serverless
korgan00 Oct 14, 2024
1918dc3
fix docs
korgan00 Oct 14, 2024
3e55964
skip file download and manage data directory tests
korgan00 Oct 14, 2024
164ce73
change docker build
korgan00 Oct 14, 2024
47e33e8
change docker build
korgan00 Oct 14, 2024
9ae1121
fix from_dict
korgan00 Oct 15, 2024
cf749a1
refactor function and job and resolve circular dependency
korgan00 Oct 15, 2024
92ac23b
Merge branch 'main' into client-refactor
korgan00 Oct 17, 2024
706eead
move _upload_with_artifact changes
korgan00 Oct 17, 2024
8e083aa
rename get_XXXX methods as XXXX
korgan00 Oct 17, 2024
c3477e9
rename get_XXXX methods as XXXX on local client
korgan00 Oct 17, 2024
57e3334
fix upload return on serverless
korgan00 Oct 17, 2024
7402e53
fix upload return on local client
korgan00 Oct 17, 2024
ccfa237
Merge branch 'client-refactor' into function-and-jobs-refactor
korgan00 Oct 21, 2024
ec201e4
fix lint
korgan00 Oct 22, 2024
2d14e8a
revert tox file
korgan00 Oct 22, 2024
8ae9093
Merge branch 'main' into function-and-jobs-refactor
korgan00 Oct 22, 2024
1b7e308
fix functions return
korgan00 Oct 22, 2024
021efb8
JobClient -> JobService
korgan00 Oct 23, 2024
018e6d9
jobService variable camelCase to snake_case
korgan00 Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 12 additions & 62 deletions client/qiskit_serverless/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,19 @@
"""
import warnings
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any, Union
from typing import Optional, List

from qiskit_ibm_runtime import QiskitRuntimeService

from qiskit_serverless.core.job import (
Job,
Configuration,
from qiskit_serverless.core.job import Job, JobService
from qiskit_serverless.core.function import (
QiskitFunction,
RunnableQiskitFunction,
RunService,
)
from qiskit_serverless.core.function import QiskitFunction
from qiskit_serverless.utils import JsonSerializable
from qiskit_serverless.visualizaiton import Widget


class BaseClient(JsonSerializable, ABC):
class BaseClient(JobService, RunService, JsonSerializable, ABC):
"""
A client class for specifying custom compute resources.

Expand Down Expand Up @@ -140,76 +139,27 @@ def get_jobs(self, **kwargs) -> List[Job]:
)
return self.jobs(**kwargs)

@abstractmethod
def run(
self,
program: Union[QiskitFunction, str],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Job:
"""Execute a program as a async job.

Example:
>>> serverless = QiskitServerless()
>>> program = QiskitFunction(
>>> "job.py",
>>> arguments={"arg1": "val1"},
>>> dependencies=["requests"]
>>> )
>>> job = serverless.run(program)
>>> # <Job | ...>

Args:
arguments: arguments to run program with
program: Program object

Returns:
Job
"""

@abstractmethod
def status(self, job_id: str) -> str:
"""Check status."""

@abstractmethod
def stop(
self, job_id: str, service: Optional[QiskitRuntimeService] = None
) -> Union[str, bool]:
"""Stops job/program."""

@abstractmethod
def result(self, job_id: str) -> Any:
"""Return results."""

@abstractmethod
def logs(self, job_id: str) -> str:
"""Return logs."""

@abstractmethod
def filtered_logs(self, job_id: str, **kwargs) -> str:
"""Return filtered logs."""

Comment on lines -143 to -191
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This methods are defined in new RunService and JobClient abstract classes.

#########################
####### Functions #######
#########################

@abstractmethod
def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
"""Uploads program."""

@abstractmethod
def functions(self, **kwargs) -> List[QiskitFunction]:
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""

@abstractmethod
def function(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
"""Returns program based on parameters."""

def get(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
"""Returns program based on parameters."""
warnings.warn(
"`get` method has been deprecated. "
Expand All @@ -219,7 +169,7 @@ def get(
)
return self.function(title, provider=provider)

def list(self, **kwargs) -> List[QiskitFunction]:
def list(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""
warnings.warn(
"`list` method has been deprecated. "
Expand Down
14 changes: 7 additions & 7 deletions client/qiskit_serverless/core/clients/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
Job,
Configuration,
)
from qiskit_serverless.core.function import QiskitFunction
from qiskit_serverless.core.function import QiskitFunction, RunnableQiskitFunction
from qiskit_serverless.exception import QiskitServerlessException
from qiskit_serverless.serializers.program_serializers import (
QiskitObjectsEncoder,
Expand Down Expand Up @@ -133,7 +133,7 @@ def run(
if results:
result = results.group(1)

job = Job(job_id=str(uuid4()), client=self)
job = Job(job_id=str(uuid4()), job_service=self)
self._jobs[job.job_id] = {
"status": status,
"logs": output,
Expand Down Expand Up @@ -163,7 +163,7 @@ def filtered_logs(self, job_id: str, **kwargs):
####### Functions #######
#########################

def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
# check if entrypoint exists
if not os.path.exists(os.path.join(program.working_dir, program.entrypoint)):
raise QiskitServerlessException(
Expand All @@ -182,14 +182,14 @@ def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
"client": self,
}
self._patterns.append(pattern)
return QiskitFunction.from_json(pattern)
return RunnableQiskitFunction.from_json(pattern)

def functions(self, **kwargs) -> List[QiskitFunction]:
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of programs."""
return [QiskitFunction.from_json(program) for program in self._patterns]
return [RunnableQiskitFunction.from_json(program) for program in self._patterns]

def function(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
functions = {function.title: function for function in self.functions()}
return functions.get(title)
15 changes: 8 additions & 7 deletions client/qiskit_serverless/core/clients/ray_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
Configuration,
Job,
)
from qiskit_serverless.core.function import QiskitFunction
from qiskit_serverless.core.function import QiskitFunction, RunnableQiskitFunction
from qiskit_serverless.serializers.program_serializers import (
QiskitObjectsEncoder,
)
Expand Down Expand Up @@ -80,7 +80,7 @@ def jobs(self, **kwargs) -> List[Job]:
list of jobs.
"""
return [
Job(job.job_id, client=self)
Job(job.job_id, job_service=self)
for job in self.job_submission_client.list_jobs()
]

Expand All @@ -94,7 +94,8 @@ def job(self, job_id: str) -> Optional[Job]:
Job instance
"""
return Job(
self.job_submission_client.get_job_info(job_id).submission_id, client=self
self.job_submission_client.get_job_info(job_id).submission_id,
job_service=self,
)

def run(
Expand Down Expand Up @@ -129,7 +130,7 @@ def run(
"env_vars": env_vars,
},
)
return Job(job_id=job_id, client=self)
return Job(job_id=job_id, job_service=self)

def status(self, job_id: str) -> str:
"""Check status."""
Expand Down Expand Up @@ -157,16 +158,16 @@ def filtered_logs(self, job_id: str, **kwargs) -> str:
####### Functions #######
#########################

def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
"""Uploads program."""
raise NotImplementedError("Upload is not available for RayClient.")

def functions(self, **kwargs) -> List[QiskitFunction]:
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""
raise NotImplementedError("get_programs is not available for RayClient.")

def function(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
"""Returns program based on parameters."""
raise NotImplementedError("get_program is not available for RayClient.")
55 changes: 30 additions & 25 deletions client/qiskit_serverless/core/clients/serverless_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@
Job,
Configuration,
)
from qiskit_serverless.core.function import QiskitFunction
from qiskit_serverless.core.function import (
QiskitFunction,
RunService,
RunnableQiskitFunction,
)

from qiskit_serverless.exception import QiskitServerlessException
from qiskit_serverless.utils.json import (
safe_json_request_as_dict,
Expand Down Expand Up @@ -159,7 +164,7 @@ def jobs(self, **kwargs) -> List[Job]:
)

return [
Job(job.get("id"), client=self, raw_data=job)
Job(job.get("id"), job_service=self, raw_data=job)
for job in response_data.get("results", [])
]

Expand All @@ -180,7 +185,7 @@ def job(self, job_id: str) -> Optional[Job]:
if job_id is not None:
job = Job(
job_id=job_id,
client=self,
job_service=self,
)

return job
Expand Down Expand Up @@ -227,7 +232,7 @@ def run(
job_id = response_data.get("id")
span.set_attribute("job.id", job_id)

return Job(job_id, client=self)
return Job(job_id, job_service=self)

def status(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
Expand Down Expand Up @@ -316,7 +321,7 @@ def filtered_logs(self, job_id: str, **kwargs):
####### Functions #######
#########################

def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
span.set_attribute("program", program.title)
Expand All @@ -325,12 +330,12 @@ def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
if program.image is not None:
# upload function with custom image
function_uploaded = _upload_with_docker_image(
program=program, url=url, token=self.token, span=span
program=program, url=url, token=self.token, span=span, client=self
)
elif program.entrypoint is not None:
# upload funciton with artifact
function_uploaded = _upload_with_artifact(
program=program, url=url, token=self.token, span=span
program=program, url=url, token=self.token, span=span, client=self
)
else:
raise QiskitServerlessException(
Expand All @@ -339,7 +344,7 @@ def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:

return function_uploaded

def functions(self, **kwargs) -> List[QiskitFunction]:
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("program.list"):
Expand All @@ -353,19 +358,19 @@ def functions(self, **kwargs) -> List[QiskitFunction]:
)

return [
QiskitFunction(
program.get("title"),
RunnableQiskitFunction(
client=self,
title=program.get("title"),
provider=program.get("provider", None),
raw_data=program,
client=self,
description=program.get("description"),
)
for program in response_data
]

def function(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
"""Returns program based on parameters."""
provider, title = format_provider_name_and_title(
request_provider=provider, title=title
Expand All @@ -381,11 +386,11 @@ def function(
timeout=REQUESTS_TIMEOUT,
)
)
return QiskitFunction(
response_data.get("title"),
return RunnableQiskitFunction(
client=self,
title=response_data.get("title"),
provider=response_data.get("provider", None),
raw_data=response_data,
client=self,
)

#####################
Expand Down Expand Up @@ -484,8 +489,8 @@ def save_account(


def _upload_with_docker_image(
program: QiskitFunction, url: str, token: str, span: Any
) -> QiskitFunction:
program: QiskitFunction, url: str, token: str, span: Any, client: RunService
) -> RunnableQiskitFunction:
"""Uploads function with custom docker image.

Args:
Expand Down Expand Up @@ -517,12 +522,13 @@ def _upload_with_docker_image(
program_provider = response_data.get("provider", "na")
span.set_attribute("program.title", program_title)
span.set_attribute("program.provider", program_provider)
return QiskitFunction.from_json(response_data)
response_data["client"] = client
return RunnableQiskitFunction.from_json(response_data)


def _upload_with_artifact(
program: QiskitFunction, url: str, token: str, span: Any
) -> QiskitFunction:
program: QiskitFunction, url: str, token: str, span: Any, client: RunService
) -> RunnableQiskitFunction:
"""Uploads function with artifact.

Args:
Expand Down Expand Up @@ -582,11 +588,10 @@ def _upload_with_artifact(
timeout=REQUESTS_TIMEOUT,
)
)
program_title = response_data.get("title", "na")
program_provider = response_data.get("provider", "na")
span.set_attribute("program.title", program_title)
span.set_attribute("program.provider", program_provider)
response_function = QiskitFunction.from_json(response_data)
span.set_attribute("program.title", response_data.get("title", "na"))
span.set_attribute("program.provider", response_data.get("provider", "na"))
response_data["client"] = client
response_function = RunnableQiskitFunction.from_json(response_data)
except Exception as error: # pylint: disable=broad-exception-caught
raise QiskitServerlessException from error
finally:
Expand Down
Loading