Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use websocket stream to retrieve runtime job results #33

Merged
merged 45 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
bff1445
retrieve job results w websockets
kt474 Nov 29, 2021
37a8b34
Merge branch 'main' into job-results-stream
kt474 Nov 30, 2021
352aa20
update stream results method
kt474 Nov 30, 2021
7a1bcab
Merge branch 'main' into job-results-stream
kt474 Dec 1, 2021
f4f21dc
Merge branch 'main' into job-results-stream
kt474 Dec 2, 2021
b6e007b
Merge branch 'main' into job-results-stream
kt474 Dec 5, 2021
9c8af9e
Merge branch 'main' into job-results-stream
kt474 Dec 9, 2021
3e3d442
Merge branch 'main' into job-results-stream
kt474 Jan 4, 2022
785bd40
Merge branch 'Qiskit:main' into job-results-stream
kt474 Jan 13, 2022
4ea4c67
Merge branch 'main' into job-results-stream
kt474 Jan 14, 2022
625efa6
Merge branch 'main' into job-results-stream
kt474 Jan 16, 2022
17c82d7
add mock for unit test
kt474 Jan 18, 2022
354d1e9
update unit tests
kt474 Jan 20, 2022
5576ba1
Merge branch 'main' into job-results-stream
kt474 Jan 20, 2022
3d55d55
temp disable test case
kt474 Jan 21, 2022
b76dc84
increase sleep time
kt474 Jan 21, 2022
99b1bce
increase wait for final result
kt474 Jan 21, 2022
5c0b56e
temp increase wait on test final result
kt474 Jan 21, 2022
751c6ab
temp disable final result test
kt474 Jan 21, 2022
baa71d9
temp disable get result twice
kt474 Jan 21, 2022
edbb4ad
adjust wait times
kt474 Jan 21, 2022
77a5ca1
add test case back
kt474 Jan 21, 2022
08c8223
fix lint
kt474 Jan 21, 2022
765d8aa
update test case
kt474 Jan 24, 2022
3f6ce72
Merge branch 'main' into job-results-stream
kt474 Jan 27, 2022
d82365e
Merge branch 'main' into job-results-stream
rathishcholarajan Jan 31, 2022
bb17ede
Merge branch 'main' into job-results-stream
kt474 Feb 3, 2022
75ba585
Merge branch 'main' into job-results-stream
kt474 Feb 7, 2022
13e2a9f
update comments
kt474 Feb 7, 2022
10b641e
fix lint
kt474 Feb 7, 2022
e157f54
added fake wait for final state
kt474 Feb 8, 2022
4ccf196
fix lint
kt474 Feb 8, 2022
f077a21
increase default timeout
kt474 Feb 8, 2022
96904db
remove timeout
kt474 Feb 8, 2022
eed2e97
remove import
kt474 Feb 8, 2022
ab2fc54
Merge branch 'main' into job-results-stream
kt474 Feb 9, 2022
aae233a
Merge branch 'main' into job-results-stream
kt474 Feb 15, 2022
cf44197
remove sleep
kt474 Feb 16, 2022
b3e800b
Merge branch 'main' into job-results-stream
rathishcholarajan Feb 16, 2022
26e4fcc
move fake method to utils
kt474 Feb 17, 2022
e1e3309
add mock function
kt474 Feb 17, 2022
07572b3
fix lint
kt474 Feb 17, 2022
c8a314d
fix lint
kt474 Feb 17, 2022
9aec1a3
add statement back
kt474 Feb 17, 2022
aa9599e
update test case
kt474 Feb 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 11 additions & 28 deletions qiskit_ibm_runtime/runtime_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
"""Qiskit runtime job."""

from typing import Any, Optional, Callable, Dict, Type
import time
import logging
from concurrent import futures
import traceback
import queue
from datetime import datetime

from qiskit.providers.exceptions import JobTimeoutError
from qiskit.providers.backend import Backend
from qiskit.providers.jobstatus import JobStatus, JOB_FINAL_STATES

Expand Down Expand Up @@ -159,14 +157,12 @@ def interim_results(self, decoder: Optional[Type[ResultDecoder]] = None) -> Any:
def result(
self,
timeout: Optional[float] = None,
wait: float = 5,
decoder: Optional[Type[ResultDecoder]] = None,
) -> Any:
"""Return the results of the job.

Args:
timeout: Number of seconds to wait for job.
wait: Seconds between queries.
decoder: A :class:`ResultDecoder` subclass used to decode job results.

Returns:
Expand All @@ -177,7 +173,7 @@ def result(
"""
_decoder = decoder or self._result_decoder
if self._results is None or (_decoder != self._result_decoder):
self.wait_for_final_state(timeout=timeout, wait=wait)
self.wait_for_final_state(timeout=timeout)
if self._status == JobStatus.ERROR:
raise RuntimeJobFailureError(
f"Unable to retrieve job result. " f"{self.error_message()}"
Expand Down Expand Up @@ -222,29 +218,18 @@ def error_message(self) -> Optional[str]:
self._set_status_and_error_message()
return self._error_message

def wait_for_final_state(
self, timeout: Optional[float] = None, wait: float = 5
) -> None:
"""Poll the job status until it progresses to a final state such as ``DONE`` or ``ERROR``.
def wait_for_final_state(self, timeout: Optional[float] = None) -> None:
"""Use the websocket server to wait for the final the state of a job. The server
will remain open if the job is still running and the connection will be terminated
once the job completes. Then update and return the status of the job.

Args:
timeout: Seconds to wait for the job. If ``None``, wait indefinitely.
wait: Seconds between queries.

Raises:
JobTimeoutError: If the job does not reach a final state before the
specified timeout.
"""
start_time = time.time()
status = self.status()
while status not in JOB_FINAL_STATES:
elapsed_time = time.time() - start_time
if timeout is not None and elapsed_time >= timeout:
raise JobTimeoutError(
"Timeout while waiting for job {}.".format(self.job_id)
)
time.sleep(wait)
status = self.status()
if self._status not in JOB_FINAL_STATES:
self._ws_client_future = self._executor.submit(self._start_websocket_client)
self._ws_client_future.result(timeout)
self.status()

def stream_results(
self, callback: Callable, decoder: Optional[Type[ResultDecoder]] = None
Expand All @@ -264,14 +249,12 @@ def stream_results(
RuntimeInvalidStateError: If a callback function is already streaming results or
if the job already finished.
"""
if self._status in JOB_FINAL_STATES:
rathishcholarajan marked this conversation as resolved.
Show resolved Hide resolved
raise RuntimeInvalidStateError("Job already finished.")
if self._is_streaming():
raise RuntimeInvalidStateError(
"A callback function is already streaming results."
)

if self._status in JOB_FINAL_STATES:
raise RuntimeInvalidStateError("Job already finished.")

self._ws_client_future = self._executor.submit(self._start_websocket_client)
self._executor.submit(
self._stream_results,
Expand Down
8 changes: 4 additions & 4 deletions test/integration/test_interim_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ def test_stream_results_done(self, service):

def result_callback(job_id, interim_result):
# pylint: disable=unused-argument
nonlocal called_back
called_back = True
nonlocal called_back_count
called_back_count += 1

called_back = False
called_back_count = 0
job = self._run_program(service, interim_results="foobar")
job.wait_for_final_state()
job._status = JobStatus.RUNNING # Allow stream_results()
job.stream_results(result_callback)
time.sleep(2)
self.assertFalse(called_back)
self.assertEqual(1, called_back_count)
self.assertIsNotNone(job._ws_client._server_close_code)

@run_integration_test
Expand Down
11 changes: 11 additions & 0 deletions test/unit/mock/fake_runtime_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ def interim_results(self):
"""Return job interim results."""
return self._interim_results

def status(self):
"""Return job status."""
return self._status


class FailedRuntimeJob(BaseFakeRuntimeJob):
"""Class for faking a failed runtime job."""
Expand Down Expand Up @@ -451,6 +455,13 @@ def job_delete(self, job_id):
self._get_job(job_id)
del self._jobs[job_id]

def wait_for_final_state(self, job_id):
"""Wait for the final state of a program job."""
final_states = ["COMPLETED", "FAILED", "CANCELLED", "CANCELLED - RAN TOO LONG"]
status = self._get_job(job_id).status()
while status not in final_states:
status = self._get_job(job_id).status()

def _get_program(self, program_id):
"""Get program."""
if program_id not in self._programs:
Expand Down
22 changes: 19 additions & 3 deletions test/unit/test_job_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

"""Tests for runtime job retrieval."""

from unittest import mock
from qiskit_ibm_runtime.exceptions import IBMInputValueError
from qiskit_ibm_runtime import RuntimeJob
from .mock.fake_runtime_service import FakeRuntimeService
from ..ibm_test_case import IBMTestCase
from ..decorators import run_legacy_and_cloud_fake
Expand Down Expand Up @@ -182,8 +184,13 @@ def test_jobs_filter_by_program_id(self, service):

job = run_program(service=service, program_id=program_id)
job_1 = run_program(service=service, program_id=program_id_1)
job.wait_for_final_state()
job_1.wait_for_final_state()
with mock.patch.object(
RuntimeJob,
"wait_for_final_state",
side_effect=self._fake_wait_for_final_state(service, job),
):
job.wait_for_final_state()
job_1.wait_for_final_state()
rjobs = service.jobs(program_id=program_id)
self.assertEqual(program_id, rjobs[0].program_id)
self.assertEqual(1, len(rjobs))
Expand All @@ -195,7 +202,12 @@ def test_jobs_filter_by_instance(self):
instance = FakeRuntimeService.DEFAULT_HGPS[1]

job = run_program(service=service, program_id=program_id, instance=instance)
job.wait_for_final_state()
with mock.patch.object(
RuntimeJob,
"wait_for_final_state",
side_effect=self._fake_wait_for_final_state(service, job),
):
job.wait_for_final_state()
rjobs = service.jobs(program_id=program_id, instance=instance)
self.assertTrue(rjobs)
self.assertEqual(program_id, rjobs[0].program_id)
Expand Down Expand Up @@ -253,3 +265,7 @@ def _populate_jobs_with_all_statuses(self, service, program_id):
else:
returned_jobs_count += 1
return jobs, pending_jobs_count, returned_jobs_count

def _fake_wait_for_final_state(self, service, job):
"""Wait for the final state of a program job."""
service._api_client.wait_for_final_state(job.job_id)
99 changes: 69 additions & 30 deletions test/unit/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"""Tests for job related runtime functions."""

import random
from unittest import mock
import time

from qiskit.providers.exceptions import QiskitBackendNotFoundError
Expand Down Expand Up @@ -51,9 +52,13 @@ def test_run_program(self, service):
self.assertIsInstance(job, RuntimeJob)
self.assertIsInstance(job.status(), JobStatus)
self.assertEqual(job.inputs, params)
job.wait_for_final_state()
self.assertEqual(job.status(), JobStatus.DONE)
self.assertTrue(job.result())
with mock.patch.object(
RuntimeJob,
"wait_for_final_state",
side_effect=self._fake_wait_for_final_state(service, job),
):
self.assertEqual(job.status(), JobStatus.DONE)
self.assertTrue(job.result())

@run_legacy_and_cloud_fake
def test_run_phantom_program(self, service):
Expand Down Expand Up @@ -148,9 +153,14 @@ def test_run_program_with_custom_runtime_image(self, service):
self.assertIsInstance(job, RuntimeJob)
self.assertIsInstance(job.status(), JobStatus)
self.assertEqual(job.inputs, params)
job.wait_for_final_state()
with mock.patch.object(
RuntimeJob,
"wait_for_final_state",
side_effect=self._fake_wait_for_final_state(service, job),
):
job.wait_for_final_state()
self.assertTrue(job.result())
self.assertEqual(job.status(), JobStatus.DONE)
self.assertTrue(job.result())
self.assertEqual(job.image, image)

@run_legacy_and_cloud_fake
Expand All @@ -164,31 +174,41 @@ def test_run_program_with_custom_log_level(self, service):
def test_run_program_failed(self, service):
"""Test a failed program execution."""
job = run_program(service=service, job_classes=FailedRuntimeJob)
job.wait_for_final_state()
job_result_raw = service._api_client.job_results(job.job_id)
self.assertEqual(JobStatus.ERROR, job.status())
self.assertEqual(
API_TO_JOB_ERROR_MESSAGE["FAILED"].format(job.job_id, job_result_raw),
job.error_message(),
)
with self.assertRaises(RuntimeJobFailureError):
job.result()
with mock.patch.object(
RuntimeJob,
"wait_for_final_state",
side_effect=self._fake_wait_for_final_state(service, job),
):
job.wait_for_final_state()
job_result_raw = service._api_client.job_results(job.job_id)
self.assertEqual(JobStatus.ERROR, job.status())
self.assertEqual(
API_TO_JOB_ERROR_MESSAGE["FAILED"].format(job.job_id, job_result_raw),
job.error_message(),
)
with self.assertRaises(RuntimeJobFailureError):
job.result()

@run_legacy_and_cloud_fake
def test_run_program_failed_ran_too_long(self, service):
"""Test a program that failed since it ran longer than maximum execution time."""
job = run_program(service=service, job_classes=FailedRanTooLongRuntimeJob)
job.wait_for_final_state()
job_result_raw = service._api_client.job_results(job.job_id)
self.assertEqual(JobStatus.ERROR, job.status())
self.assertEqual(
API_TO_JOB_ERROR_MESSAGE["CANCELLED - RAN TOO LONG"].format(
job.job_id, job_result_raw
),
job.error_message(),
)
with self.assertRaises(RuntimeJobFailureError):
job.result()
with mock.patch.object(
RuntimeJob,
"wait_for_final_state",
side_effect=self._fake_wait_for_final_state(service, job),
):
job.wait_for_final_state()
job_result_raw = service._api_client.job_results(job.job_id)
self.assertEqual(JobStatus.ERROR, job.status())
self.assertEqual(
API_TO_JOB_ERROR_MESSAGE["CANCELLED - RAN TOO LONG"].format(
job.job_id, job_result_raw
),
job.error_message(),
)
with self.assertRaises(RuntimeJobFailureError):
job.result()

@run_legacy_and_cloud_fake
def test_program_params_namespace(self, service):
Expand All @@ -212,8 +232,13 @@ def test_cancel_job(self, service):
def test_final_result(self, service):
"""Test getting final result."""
job = run_program(service)
result = job.result()
self.assertTrue(result)
with mock.patch.object(
RuntimeJob,
"wait_for_final_state",
side_effect=self._fake_wait_for_final_state(service, job),
):
result = job.result()
self.assertTrue(result)

@run_legacy_and_cloud_fake
def test_interim_results(self, service):
Expand Down Expand Up @@ -248,7 +273,12 @@ def test_job_program_id(self, service):
def test_wait_for_final_state(self, service):
"""Test wait for final state."""
job = run_program(service)
job.wait_for_final_state()
with mock.patch.object(
RuntimeJob,
"wait_for_final_state",
side_effect=self._fake_wait_for_final_state(service, job),
):
job.wait_for_final_state()
self.assertEqual(JobStatus.DONE, job.status())

@run_legacy_and_cloud_fake
Expand All @@ -259,8 +289,13 @@ def test_get_result_twice(self, service):
job_cls.custom_result = custom_result

job = run_program(service=service, job_classes=job_cls)
_ = job.result()
_ = job.result()
with mock.patch.object(
RuntimeJob,
"wait_for_final_state",
side_effect=self._fake_wait_for_final_state(service, job),
):
rathishcholarajan marked this conversation as resolved.
Show resolved Hide resolved
_ = job.result()
_ = job.result()

@run_legacy_and_cloud_fake
def test_delete_job(self, service):
Expand All @@ -271,3 +306,7 @@ def test_delete_job(self, service):
service.delete_job(job.job_id)
with self.assertRaises(RuntimeJobNotFound):
service.job(job.job_id)

def _fake_wait_for_final_state(self, service, job):
rathishcholarajan marked this conversation as resolved.
Show resolved Hide resolved
"""Wait for the final state of a program job."""
service._api_client.wait_for_final_state(job.job_id)