Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel #1318

Merged
merged 5 commits into from
May 10, 2024
Merged

Cancel #1318

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions client/qiskit_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from ray.dashboard.modules.job.sdk import JobSubmissionClient

from opentelemetry import trace
from qiskit_ibm_runtime import QiskitRuntimeService

from qiskit_serverless.core.constants import (
OT_PROGRAM_NAME,
Expand Down Expand Up @@ -130,7 +131,7 @@ def status(self, job_id: str):
"""Check status."""
raise NotImplementedError

def stop(self, job_id: str):
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
"""Stops job/program."""
raise NotImplementedError

Expand Down Expand Up @@ -166,7 +167,7 @@ def __init__(self, client: JobSubmissionClient):
def status(self, job_id: str):
return self._job_client.get_job_status(job_id).value

def stop(self, job_id: str):
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
return self._job_client.stop_job(job_id)

def logs(self, job_id: str):
Expand Down Expand Up @@ -239,7 +240,7 @@ def __init__(self):
def status(self, job_id: str):
return self._jobs[job_id]["status"]

def stop(self, job_id: str):
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
"""Stops job/program."""
return f"job:{job_id} has already stopped"

Expand Down Expand Up @@ -528,14 +529,23 @@ def status(self, job_id: str):

return response_data.get("status", default_status)

def stop(self, job_id: str):
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.stop"):
if service:
data = {
"service": json.dumps(service, cls=QiskitObjectsEncoder),
}
else:
data = {
"service": None,
}
response_data = safe_json_request(
request=lambda: requests.post(
f"{self.host}/api/{self.version}/jobs/{job_id}/stop/",
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
json=data,
)
)

Expand Down Expand Up @@ -668,9 +678,9 @@ def status(self):
"""Returns status of the job."""
return _map_status_to_serverless(self._job_client.status(self.job_id))

def stop(self):
def stop(self, service: Optional[QiskitRuntimeService] = None):
"""Stops the job from running."""
return self._job_client.stop(self.job_id)
return self._job_client.stop(self.job_id, service=service)

def logs(self) -> str:
"""Returns logs of the job."""
Expand Down
25 changes: 25 additions & 0 deletions gateway/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from rest_framework.decorators import action
from rest_framework.generics import get_object_or_404
from rest_framework.response import Response

from qiskit_ibm_runtime import RuntimeInvalidStateError, QiskitRuntimeService
from utils import sanitize_file_path

from .models import VIEW_PROGRAM_PERMISSION, Program, Job, RuntimeJob
Expand Down Expand Up @@ -415,6 +417,10 @@ def logs(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen
logs = job.logs
return Response({"logs": logs})

def get_runtime_job(self, job):
"""get runtime job for job"""
return RuntimeJob.objects.filter(job=job)

@action(methods=["POST"], detail=True)
def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argument
"""Stops job"""
Expand All @@ -426,6 +432,25 @@ def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen
job.status = Job.STOPPED
job.save(update_fields=["status"])
message = "Job has been stopped."
runtime_jobs = self.get_runtime_job(job)
if runtime_jobs and len(runtime_jobs) != 0:
if request.data.get("service"):
service = QiskitRuntimeService(
**json.loads(request.data.get("service"), cls=json.JSONDecoder)[
"__value__"
]
)
for runtime_job_entry in runtime_jobs:
jobinstance = service.job(runtime_job_entry.runtime_job)
if jobinstance:
try:
logger.info(
"canceling [%s]", runtime_job_entry.runtime_job
)
jobinstance.cancel()
except RuntimeInvalidStateError:
logger.warning("cancel failed")

if job.compute_resource:
if job.compute_resource.active:
job_handler = get_job_handler(job.compute_resource.host)
Expand Down
1 change: 1 addition & 0 deletions gateway/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ drf-yasg>=1.21.7
cryptography>=41.0.1
# Django dependency, but we need a newer version (IBMQ#246)
sqlparse>=0.5.0
qiskit_ibm_runtime>=0.22.0

Loading