Skip to content

Commit

Permalink
Cancel (#1318)
Browse files Browse the repository at this point in the history
* cancel backend runtime jobs in qiskit_serverless job.stop()

Signed-off-by: Akihiko Kuroda <akihikokuroda2020@gmail.com>
  • Loading branch information
akihikokuroda authored May 10, 2024
1 parent 641da7b commit be8361a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
22 changes: 16 additions & 6 deletions client/qiskit_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from ray.dashboard.modules.job.sdk import JobSubmissionClient

from opentelemetry import trace
from qiskit_ibm_runtime import QiskitRuntimeService

from qiskit_serverless.core.constants import (
OT_PROGRAM_NAME,
Expand Down Expand Up @@ -122,7 +123,7 @@ def status(self, job_id: str):
"""Check status."""
raise NotImplementedError

def stop(self, job_id: str):
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
"""Stops job/program."""
raise NotImplementedError

Expand Down Expand Up @@ -158,7 +159,7 @@ def __init__(self, client: JobSubmissionClient):
def status(self, job_id: str):
return self._job_client.get_job_status(job_id).value

def stop(self, job_id: str):
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
return self._job_client.stop_job(job_id)

def logs(self, job_id: str):
Expand Down Expand Up @@ -230,7 +231,7 @@ def __init__(self):
def status(self, job_id: str):
return self._jobs[job_id]["status"]

def stop(self, job_id: str):
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
"""Stops job/program."""
return f"job:{job_id} has already stopped"

Expand Down Expand Up @@ -418,14 +419,23 @@ def status(self, job_id: str):

return response_data.get("status", default_status)

def stop(self, job_id: str):
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.stop"):
if service:
data = {
"service": json.dumps(service, cls=QiskitObjectsEncoder),
}
else:
data = {
"service": None,
}
response_data = safe_json_request(
request=lambda: requests.post(
f"{self.host}/api/{self.version}/jobs/{job_id}/stop/",
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
json=data,
)
)

Expand Down Expand Up @@ -556,9 +566,9 @@ def status(self):
"""Returns status of the job."""
return _map_status_to_serverless(self._job_client.status(self.job_id))

def stop(self):
def stop(self, service: Optional[QiskitRuntimeService] = None):
"""Stops the job from running."""
return self._job_client.stop(self.job_id)
return self._job_client.stop(self.job_id, service=service)

def logs(self) -> str:
"""Returns logs of the job."""
Expand Down
25 changes: 25 additions & 0 deletions gateway/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from rest_framework.decorators import action
from rest_framework.generics import get_object_or_404
from rest_framework.response import Response

from qiskit_ibm_runtime import RuntimeInvalidStateError, QiskitRuntimeService
from utils import sanitize_file_path

from .models import VIEW_PROGRAM_PERMISSION, Program, Job, RuntimeJob
Expand Down Expand Up @@ -318,6 +320,10 @@ def logs(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen
logs = job.logs
return Response({"logs": logs})

def get_runtime_job(self, job):
"""get runtime job for job"""
return RuntimeJob.objects.filter(job=job)

@action(methods=["POST"], detail=True)
def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argument
"""Stops job"""
Expand All @@ -329,6 +335,25 @@ def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen
job.status = Job.STOPPED
job.save(update_fields=["status"])
message = "Job has been stopped."
runtime_jobs = self.get_runtime_job(job)
if runtime_jobs and len(runtime_jobs) != 0:
if request.data.get("service"):
service = QiskitRuntimeService(
**json.loads(request.data.get("service"), cls=json.JSONDecoder)[
"__value__"
]
)
for runtime_job_entry in runtime_jobs:
jobinstance = service.job(runtime_job_entry.runtime_job)
if jobinstance:
try:
logger.info(
"canceling [%s]", runtime_job_entry.runtime_job
)
jobinstance.cancel()
except RuntimeInvalidStateError:
logger.warning("cancel failed")

if job.compute_resource:
if job.compute_resource.active:
job_handler = get_job_handler(job.compute_resource.host)
Expand Down
1 change: 1 addition & 0 deletions gateway/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ drf-yasg>=1.21.7
cryptography>=41.0.1
# Django dependency, but we need a newer version (IBMQ#246)
sqlparse>=0.5.0
qiskit_ibm_runtime>=0.22.0

0 comments on commit be8361a

Please sign in to comment.