diff --git a/test/ibm/runtime/test_runtime.py b/test/ibm/runtime/test_runtime.py index 2449ec296..18461491f 100644 --- a/test/ibm/runtime/test_runtime.py +++ b/test/ibm/runtime/test_runtime.py @@ -50,7 +50,7 @@ from qiskit.providers.jobstatus import JobStatus from qiskit_ibm_runtime.exceptions import IBMInputValueError -from qiskit_ibm_runtime import IBMRuntimeService, RuntimeJob +from qiskit_ibm_runtime import IBMRuntimeService, RuntimeJob, IBMBackend from qiskit_ibm_runtime.credentials import Credentials from qiskit_ibm_runtime.hub_group_project import HubGroupProject from qiskit_ibm_runtime.utils import RuntimeEncoder, RuntimeDecoder @@ -62,6 +62,7 @@ from .fake_runtime_client import (BaseFakeRuntimeClient, FailedRanTooLongRuntimeJob, FailedRuntimeJob, CancelableRuntimeJob, CustomResultRuntimeJob) from .utils import SerializableClass, SerializableClassDecoder, get_complex_types +from ...contextmanagers import mock_ibm_provider class TestRuntime(IBMTestCase): @@ -121,12 +122,19 @@ class TestRuntime(IBMTestCase): def setUp(self): """Initial test setup.""" super().setUp() - service = mock.MagicMock(spec=IBMRuntimeService) - hgp = mock.MagicMock(spec=HubGroupProject) - hgp.credentials = Credentials( + with mock_ibm_provider(): + self.service = IBMRuntimeService("abc") + self.service._programs = {} + self.service._default_hgp = mock.MagicMock(spec=HubGroupProject) + self.service._default_hgp.credentials = Credentials( token="", url="", services={"runtime": "https://quantum-computing.ibm.com"}) - self.runtime = IBMRuntimeService(service, hgp) - self.runtime._api_client = BaseFakeRuntimeClient() + + def get_backend(backend_name, hub=None, group=None, project=None): + # pylint: disable=unused-argument + return mock.MagicMock(spec=IBMBackend) + + self.service.get_backend = get_backend + self.service._api_client = BaseFakeRuntimeClient() def test_coder(self): """Test runtime encoder and decoder.""" @@ -299,7 +307,7 @@ def test_decoder_import(self): def test_list_programs(self): """Test listing programs.""" program_id = self._upload_program() - programs = self.runtime.programs() + programs = self.service.programs() all_ids = [prog.program_id for prog in programs] self.assertIn(program_id, all_ids) @@ -308,19 +316,19 @@ def test_list_programs_with_limit_skip(self): program_1 = self._upload_program() program_2 = self._upload_program() program_3 = self._upload_program() - programs = self.runtime.programs(limit=2, skip=1) + programs = self.service.programs(limit=2, skip=1) all_ids = [prog.program_id for prog in programs] self.assertNotIn(program_1, all_ids) self.assertIn(program_2, all_ids) self.assertIn(program_3, all_ids) - programs = self.runtime.programs(limit=3) + programs = self.service.programs(limit=3) all_ids = [prog.program_id for prog in programs] self.assertIn(program_1, all_ids) def test_list_program(self): """Test listing a single program.""" program_id = self._upload_program() - program = self.runtime.program(program_id) + program = self.service.program(program_id) self.assertEqual(program_id, program.program_id) def test_print_programs(self): @@ -329,15 +337,15 @@ def test_print_programs(self): for idx in range(3): ids.append(self._upload_program(name=f"name_{idx}")) - programs = self.runtime.programs() + programs = self.service.programs() with patch('sys.stdout', new=StringIO()) as mock_stdout: - self.runtime.pprint_programs() + self.service.pprint_programs() stdout = mock_stdout.getvalue() for prog in programs: self.assertIn(prog.program_id, stdout) self.assertIn(prog.name, stdout) self.assertNotIn(str(prog.max_execution_time), stdout) - self.runtime.pprint_programs(detailed=True) + self.service.pprint_programs(detailed=True) stdout_detailed = mock_stdout.getvalue() for prog in programs: self.assertIn(prog.program_id, stdout_detailed) @@ -351,7 +359,7 @@ def test_upload_program(self): program_id = self._upload_program(max_execution_time=max_execution_time, is_public=is_public) self.assertTrue(program_id) - program = self.runtime.program(program_id) + program = self.service.program(program_id) self.assertTrue(program) self.assertEqual(max_execution_time, program.max_execution_time) self.assertEqual(program.is_public, is_public) @@ -379,10 +387,10 @@ def test_update_program(self): for new_vals in sub_tests: with self.subTest(new_vals=new_vals.keys()): program_id = self._upload_program() - self.runtime.update_program(program_id=program_id, **new_vals) - updated = self.runtime.program(program_id, refresh=True) + self.service.update_program(program_id=program_id, **new_vals) + updated = self.service.program(program_id, refresh=True) if "data" in new_vals: - raw_program = self.runtime._api_client.program_get(program_id) + raw_program = self.service._api_client.program_get(program_id) self.assertEqual(new_data, raw_program["data"]) if "metadata" in new_vals and "name" not in new_vals: self.assertEqual(new_metadata["name"], updated.name) @@ -393,29 +401,29 @@ def test_update_program(self): if "max_execution_time" in new_vals: self.assertEqual(new_cost, updated.max_execution_time) if "spec" in new_vals: - raw_program = self.runtime._api_client.program_get(program_id) + raw_program = self.service._api_client.program_get(program_id) self.assertEqual(new_spec, raw_program["spec"]) def test_update_program_no_new_fields(self): """Test updating a program without any new data.""" program_id = self._upload_program() with warnings.catch_warnings(record=True) as warn_cm: - self.runtime.update_program(program_id=program_id) + self.service.update_program(program_id=program_id) self.assertEqual(len(warn_cm), 1) def test_delete_program(self): """Test deleting program.""" program_id = self._upload_program() - self.runtime.delete_program(program_id) + self.service.delete_program(program_id) with self.assertRaises(RuntimeProgramNotFound): - self.runtime.program(program_id, refresh=True) + self.service.program(program_id, refresh=True) def test_double_delete_program(self): """Test deleting a deleted program.""" program_id = self._upload_program() - self.runtime.delete_program(program_id) + self.service.delete_program(program_id) with self.assertRaises(RuntimeProgramNotFound): - self.runtime.delete_program(program_id) + self.service.delete_program(program_id) def test_run_program(self): """Test running program.""" @@ -446,16 +454,16 @@ def test_run_program_with_custom_runtime_image(self): def test_retrieve_program_data(self): """Test retrieving program data""" program_id = self._upload_program(name="qiskit-test") - self.runtime.programs() - program = self.runtime.program(program_id) + self.service.programs() + program = self.service.program(program_id) self.assertEqual(program.data, self.DEFAULT_DATA) self._validate_program(program) def test_program_params_validation(self): """Test program parameters validation process""" - program_id = self.runtime.upload_program( + program_id = self.service.upload_program( data=self.DEFAULT_DATA, metadata=self.DEFAULT_METADATA) - program = self.runtime.program(program_id) + program = self.service.program(program_id) params: ParameterNamespace = program.parameters() params.param1 = 'Hello, World' # Check OK params @@ -471,9 +479,9 @@ def test_program_params_validation(self): def test_program_params_namespace(self): """Test running a program using parameter namespace.""" - program_id = self.runtime.upload_program( + program_id = self.service.upload_program( data=self.DEFAULT_DATA, metadata=self.DEFAULT_METADATA) - params = self.runtime.program(program_id).parameters() + params = self.service.program(program_id).parameters() params.param1 = "Hello World" self._run_program(program_id, inputs=params) @@ -481,7 +489,7 @@ def test_run_program_failed(self): """Test a failed program execution.""" job = self._run_program(job_classes=FailedRuntimeJob) job.wait_for_final_state() - job_result_raw = self.runtime._api_client.job_results(job.job_id) + job_result_raw = self.service._api_client.job_results(job.job_id) self.assertEqual(JobStatus.ERROR, job.status()) self.assertEqual(API_TO_JOB_ERROR_MESSAGE['FAILED'].format( job.job_id, job_result_raw), job.error_message()) @@ -492,7 +500,7 @@ def test_run_program_failed_ran_too_long(self): """Test a program that failed since it ran longer than maxiumum execution time.""" job = self._run_program(job_classes=FailedRanTooLongRuntimeJob) job.wait_for_final_state() - job_result_raw = self.runtime._api_client.job_results(job.job_id) + job_result_raw = self.service._api_client.job_results(job.job_id) self.assertEqual(JobStatus.ERROR, job.status()) self.assertEqual(API_TO_JOB_ERROR_MESSAGE['CANCELLED - RAN TOO LONG'].format( job.job_id, job_result_raw), job.error_message()) @@ -504,7 +512,7 @@ def test_retrieve_job(self): program_id = self._upload_program() params = {'param1': 'foo'} job = self._run_program(program_id, inputs=params) - rjob = self.runtime.job(job.job_id) + rjob = self.service.job(job.job_id) self.assertEqual(job.job_id, rjob.job_id) self.assertEqual(program_id, rjob.program_id) @@ -514,7 +522,7 @@ def test_jobs_no_limit(self): program_id = self._upload_program() for _ in range(25): jobs.append(self._run_program(program_id)) - rjobs = self.runtime.jobs(limit=None) + rjobs = self.service.jobs(limit=None) self.assertEqual(25, len(rjobs)) def test_jobs_limit(self): @@ -528,7 +536,7 @@ def test_jobs_limit(self): limits = [21, 30] for limit in limits: with self.subTest(limit=limit): - rjobs = self.runtime.jobs(limit=limit) + rjobs = self.service.jobs(limit=limit) self.assertEqual(min(limit, job_count), len(rjobs)) def test_jobs_skip(self): @@ -537,7 +545,7 @@ def test_jobs_skip(self): program_id = self._upload_program() for _ in range(5): jobs.append(self._run_program(program_id)) - rjobs = self.runtime.jobs(skip=4) + rjobs = self.service.jobs(skip=4) self.assertEqual(1, len(rjobs)) def test_jobs_skip_limit(self): @@ -546,7 +554,7 @@ def test_jobs_skip_limit(self): program_id = self._upload_program() for _ in range(10): jobs.append(self._run_program(program_id)) - rjobs = self.runtime.jobs(skip=4, limit=2) + rjobs = self.service.jobs(skip=4, limit=2) self.assertEqual(2, len(rjobs)) def test_jobs_pending(self): @@ -555,7 +563,7 @@ def test_jobs_pending(self): program_id = self._upload_program() (jobs, pending_jobs_count, _) = self._populate_jobs_with_all_statuses( jobs=jobs, program_id=program_id) - rjobs = self.runtime.jobs(pending=True) + rjobs = self.service.jobs(pending=True) self.assertEqual(pending_jobs_count, len(rjobs)) def test_jobs_limit_pending(self): @@ -564,7 +572,7 @@ def test_jobs_limit_pending(self): program_id = self._upload_program() (jobs, *_) = self._populate_jobs_with_all_statuses(jobs=jobs, program_id=program_id) limit = 4 - rjobs = self.runtime.jobs(limit=limit, pending=True) + rjobs = self.service.jobs(limit=limit, pending=True) self.assertEqual(limit, len(rjobs)) def test_jobs_skip_pending(self): @@ -574,7 +582,7 @@ def test_jobs_skip_pending(self): (jobs, pending_jobs_count, _) = self._populate_jobs_with_all_statuses( jobs=jobs, program_id=program_id) skip = 4 - rjobs = self.runtime.jobs(skip=skip, pending=True) + rjobs = self.service.jobs(skip=skip, pending=True) self.assertEqual(pending_jobs_count - skip, len(rjobs)) def test_jobs_limit_skip_pending(self): @@ -584,7 +592,7 @@ def test_jobs_limit_skip_pending(self): (jobs, *_) = self._populate_jobs_with_all_statuses(jobs=jobs, program_id=program_id) limit = 2 skip = 3 - rjobs = self.runtime.jobs(limit=limit, skip=skip, pending=True) + rjobs = self.service.jobs(limit=limit, skip=skip, pending=True) self.assertEqual(limit, len(rjobs)) def test_jobs_returned(self): @@ -593,7 +601,7 @@ def test_jobs_returned(self): program_id = self._upload_program() (jobs, _, returned_jobs_count) = self._populate_jobs_with_all_statuses( jobs=jobs, program_id=program_id) - rjobs = self.runtime.jobs(pending=False) + rjobs = self.service.jobs(pending=False) self.assertEqual(returned_jobs_count, len(rjobs)) def test_jobs_limit_returned(self): @@ -602,7 +610,7 @@ def test_jobs_limit_returned(self): program_id = self._upload_program() (jobs, *_) = self._populate_jobs_with_all_statuses(jobs=jobs, program_id=program_id) limit = 6 - rjobs = self.runtime.jobs(limit=limit, pending=False) + rjobs = self.service.jobs(limit=limit, pending=False) self.assertEqual(limit, len(rjobs)) def test_jobs_skip_returned(self): @@ -612,7 +620,7 @@ def test_jobs_skip_returned(self): (jobs, _, returned_jobs_count) = self._populate_jobs_with_all_statuses( jobs=jobs, program_id=program_id) skip = 4 - rjobs = self.runtime.jobs(skip=skip, pending=False) + rjobs = self.service.jobs(skip=skip, pending=False) self.assertEqual(returned_jobs_count - skip, len(rjobs)) def test_jobs_limit_skip_returned(self): @@ -622,7 +630,7 @@ def test_jobs_limit_skip_returned(self): (jobs, *_) = self._populate_jobs_with_all_statuses(jobs=jobs, program_id=program_id) limit = 6 skip = 2 - rjobs = self.runtime.jobs(limit=limit, skip=skip, pending=False) + rjobs = self.service.jobs(limit=limit, skip=skip, pending=False) self.assertEqual(limit, len(rjobs)) def test_jobs_filter_by_program_id(self): @@ -633,7 +641,7 @@ def test_jobs_filter_by_program_id(self): job_1 = self._run_program(program_id=program_id_1) job.wait_for_final_state() job_1.wait_for_final_state() - rjobs = self.runtime.jobs(program_id=program_id) + rjobs = self.service.jobs(program_id=program_id) self.assertEqual(program_id, rjobs[0].program_id) self.assertEqual(1, len(rjobs)) @@ -643,15 +651,15 @@ def test_jobs_filter_by_provider(self): job = self._run_program(program_id=program_id, hub="defaultHub", group="defaultGroup", project="defaultProject") job.wait_for_final_state() - rjobs = self.runtime.jobs(program_id=program_id, + rjobs = self.service.jobs(program_id=program_id, hub="defaultHub", group="defaultGroup", project="defaultProject") self.assertEqual(program_id, rjobs[0].program_id) self.assertEqual(1, len(rjobs)) - rjobs = self.runtime.jobs(program_id=program_id, + rjobs = self.service.jobs(program_id=program_id, hub="test", group="test", project="test") self.assertFalse(rjobs) with self.assertRaises(IBMInputValueError): - self.runtime.jobs(hub="defaultHub") + self.service.jobs(hub="defaultHub") def test_cancel_job(self): """Test canceling a job.""" @@ -659,7 +667,7 @@ def test_cancel_job(self): time.sleep(1) job.cancel() self.assertEqual(job.status(), JobStatus.CANCELLED) - rjob = self.runtime.job(job.job_id) + rjob = self.service.job(job.job_id) self.assertEqual(rjob.status(), JobStatus.CANCELLED) def test_final_result(self): @@ -726,9 +734,9 @@ def test_program_metadata(self): for metadata in sub_tests: with self.subTest(metadata_type=type(metadata)): - program_id = self.runtime.upload_program(data=self.DEFAULT_DATA, metadata=metadata) - program = self.runtime.program(program_id) - self.runtime.delete_program(program_id) + program_id = self.service.upload_program(data=self.DEFAULT_DATA, metadata=metadata) + program = self.service.program(program_id) + self.service.delete_program(program_id) self._validate_program(program) def test_different_providers(self): @@ -737,8 +745,8 @@ def test_different_providers(self): job = self._run_program(program_id) cred = Credentials(token="", url="", hub="hub2", group="group2", project="project2", services={"runtime": "https://quantum-computing.ibm.com"}) - self.runtime._default_hgp.credentials = cred - rjob = self.runtime.job(job.job_id) + self.service._default_hgp.credentials = cred + rjob = self.service.job(job.job_id) self.assertIsNotNone(rjob.backend) def _upload_program(self, name=None, max_execution_time=300, @@ -750,7 +758,7 @@ def _upload_program(self, name=None, max_execution_time=300, metadata.update(name=name) metadata.update(is_public=is_public) metadata.update(max_execution_time=max_execution_time) - program_id = self.runtime.upload_program( + program_id = self.service.upload_program( data=data, metadata=metadata) return program_id @@ -760,16 +768,16 @@ def _run_program(self, program_id=None, inputs=None, job_classes=None, final_sta """Run a program.""" options = {'backend_name': "some_backend"} if final_status is not None: - self.runtime._api_client.set_final_status(final_status) + self.service._api_client.set_final_status(final_status) elif job_classes: - self.runtime._api_client.set_job_classes(job_classes) + self.service._api_client.set_job_classes(job_classes) elif all([hub, group, project]): - self.runtime._api_client.set_hgp(hub, group, project) + self.service._api_client.set_hgp(hub, group, project) if program_id is None: program_id = self._upload_program() with patch('qiskit_ibm_runtime.ibm_runtime_service.RuntimeClient', - return_value=self.runtime._api_client): - job = self.runtime.run(program_id=program_id, options=options, + return_value=self.service._api_client): + job = self.service.run(program_id=program_id, options=options, inputs=inputs, result_decoder=decoder, image=image) return job diff --git a/test/ibm/test_account_client.py b/test/ibm/test_account_client.py index bce2e6353..2b68218fb 100644 --- a/test/ibm/test_account_client.py +++ b/test/ibm/test_account_client.py @@ -13,22 +13,15 @@ """Tests for the AccountClient class.""" import re -import traceback -from unittest import mock -from urllib3.connectionpool import HTTPConnectionPool -from urllib3.exceptions import MaxRetryError from qiskit.circuit import ClassicalRegister, QuantumCircuit, QuantumRegister -from qiskit.compiler import assemble, transpile -from qiskit_ibm_runtime.apiconstants import ApiJobStatus from qiskit_ibm_runtime.api.clients import AccountClient, AuthClient from qiskit_ibm_runtime.api.exceptions import ApiError, RequestsApiError -from qiskit_ibm_runtime.utils.utils import RefreshQueue from ..ibm_test_case import IBMTestCase from ..decorators import requires_qe_access, requires_provider from ..contextmanagers import custom_envs, no_envs -from ..http_server import SimpleServer, ServerErrorOnceHandler, ClientErrorHandler +from ..http_server import SimpleServer, ClientErrorHandler class TestAccountClient(IBMTestCase): @@ -44,7 +37,7 @@ def setUpClass(cls, service, hub, group, project): cls.hub = hub cls.group = group cls.project = project - default_hgp = cls.service.backend._default_hgp + default_hgp = cls.service._default_hgp cls.access_token = default_hgp._api_client.account_api.session._access_token def setUp(self): @@ -74,7 +67,7 @@ def tearDown(self) -> None: def _get_client(self): """Helper for instantiating an AccountClient.""" # pylint: disable=no-value-for-parameter - return AccountClient(self.service.backend._default_hgp.credentials) + return AccountClient(self.service._default_hgp.credentials) def test_custom_client_app_header(self): """Check custom client application header.""" @@ -90,47 +83,6 @@ def test_custom_client_app_header(self): self.assertNotIn(custom_header, client._session.headers['X-Qx-Client-Application']) - def test_access_token_not_in_exception_traceback(self): - """Check that access token is replaced within chained request exceptions.""" - backend_name = 'ibmq_qasm_simulator' - backend = self.service.get_backend(backend_name, hub=self.hub, - group=self.group, project=self.project) - circuit = transpile(self.qc1, backend, seed_transpiler=self.seed) - qobj = assemble(circuit, backend, shots=1) - client = backend._api_client - - exception_message = 'The access token in this exception ' \ - 'message should be replaced: {}'.format(self.access_token) - exception_traceback_str = '' - try: - with mock.patch.object( - HTTPConnectionPool, - 'urlopen', - side_effect=MaxRetryError( - HTTPConnectionPool('host'), 'url', reason=exception_message)): - _ = client.job_submit(backend.name(), qobj.to_dict()) - except RequestsApiError: - exception_traceback_str = traceback.format_exc() - - self.assertTrue(exception_traceback_str) - if self.access_token in exception_traceback_str: - self.fail('Access token not replaced in request exception traceback.') - - def test_job_submit_retry(self): - """Test job submit requests get retried.""" - client = self._get_client() - - # Send request to local server. - valid_data = {'id': 'fake_id', - 'objectStorageInfo': {'uploadUrl': SimpleServer.URL}, - 'job': {'id': 'fake_id'}} - self.fake_server = SimpleServer(handler_class=ServerErrorOnceHandler) - self.fake_server.set_good_response(valid_data) - self.fake_server.start() - client.account_api.session.base_url = SimpleServer.URL - - client.job_submit('ibmq_qasm_simulator', {}) - def test_client_error(self): """Test client error.""" client = self._get_client() @@ -152,71 +104,6 @@ def test_client_error(self): self.assertIn('Bad client input', str(err_cm.exception)) -class TestAccountClientJobs(IBMTestCase): - """Tests for AccountClient methods related to jobs. - - This TestCase submits a Job during class invocation, available at - ``cls.job``. Tests should inspect that job according to their needs. - """ - - @classmethod - @requires_provider - def setUpClass(cls, service, hub, group, project): - # pylint: disable=arguments-differ - super().setUpClass() - cls.service = service - cls.hub = hub - cls.group = group - cls.project = project - default_hgp = cls.service.backend._default_hgp - cls.access_token = default_hgp._api_client.account_api.session._access_token - - backend_name = 'ibmq_qasm_simulator' - backend = cls.service.get_backend(backend_name, hub=cls.hub, - group=cls.group, project=cls.project) - cls.client = backend._api_client - cls.job = cls.client.job_submit( - backend_name, cls._get_qobj(backend).to_dict()) - cls.job_id = cls.job['job_id'] - - @staticmethod - def _get_qobj(backend): - """Return a Qobj.""" - # Create a circuit. - qr = QuantumRegister(2) - cr = ClassicalRegister(2) - qc1 = QuantumCircuit(qr, cr, name='qc1') - seed = 73846087 - - # Assemble the Qobj. - qobj = assemble(transpile([qc1], backend=backend, - seed_transpiler=seed), - backend=backend, shots=1) - - return qobj - - def test_job_get(self): - """Test job_get.""" - response = self.client.job_get(self.job_id) - self.assertIn('status', response) - - def test_job_final_status_polling(self): - """Test getting a job's final status via polling.""" - status_queue = RefreshQueue(maxsize=1) - response = self.client._job_final_status_polling(self.job_id, status_queue=status_queue) - self.assertEqual(response.pop('status', None), ApiJobStatus.COMPLETED.value) - self.assertNotEqual(status_queue.qsize(), 0) - - def test_list_jobs_statuses_skip(self): - """Test listing job statuses with an offset.""" - jobs_raw = self.client.list_jobs_statuses(limit=1, skip=1, extra_filter={ - 'creationDate': {'lte': self.job['creation_date']}}) - - # Ensure our job is skipped - for job in jobs_raw: - self.assertNotEqual(job['job_id'], self.job_id) - - class TestAuthClient(IBMTestCase): """Tests for the AuthClient.""" diff --git a/test/ibm/test_ibm_backend.py b/test/ibm/test_ibm_backend.py index 7a336306f..3b9492fee 100644 --- a/test/ibm/test_ibm_backend.py +++ b/test/ibm/test_ibm_backend.py @@ -13,16 +13,13 @@ """IBMBackend Test.""" from datetime import timedelta, datetime -from unittest import SkipTest from unittest.mock import patch from qiskit import QuantumCircuit from qiskit.providers.models import QasmBackendConfiguration -from qiskit.test.reference_circuits import ReferenceCircuits from ..ibm_test_case import IBMTestCase from ..decorators import requires_device, requires_provider -from ..utils import get_pulse_schedule, cancel_job class TestIBMBackend(IBMTestCase): @@ -107,42 +104,6 @@ def test_backend_reservations(self): "Reservation {} found={}, used start datetime {}, end datetime {}".format( reserv, found, start_dt, end_dt)) - def test_backend_options(self): - """Test backend options.""" - service = self.backend.provider() - backends = service.backends(open_pulse=True, operational=True, hub=self.backend.hub, - group=self.backend.group, project=self.backend.project) - if not backends: - raise SkipTest('Skipping pulse test since no pulse backend found.') - - backend = backends[0] - backend.options.shots = 2048 - backend.set_options(qubit_lo_freq=[4.9e9, 5.0e9], - meas_lo_freq=[6.5e9, 6.6e9], - meas_level=2) - job = backend.run(get_pulse_schedule(backend), meas_level=1, foo='foo') - backend_options = service.backend.job(job.job_id()).backend_options() - self.assertEqual(backend_options['shots'], 2048) - # Qobj config freq is in GHz. - self.assertAlmostEqual(backend_options['qubit_lo_freq'], [4.9e9, 5.0e9]) - self.assertEqual(backend_options['meas_lo_freq'], [6.5e9, 6.6e9]) - self.assertEqual(backend_options['meas_level'], 1) - self.assertEqual(backend_options['foo'], 'foo') - cancel_job(job) - - def test_sim_backend_options(self): - """Test simulator backend options.""" - service = self.backend.provider() - backend = service.get_backend('ibmq_qasm_simulator', hub=self.backend.hub, - group=self.backend.group, project=self.backend.project) - backend.options.shots = 2048 - backend.set_options(memory=True) - job = backend.run(ReferenceCircuits.bell(), shots=1024, foo='foo') - backend_options = service.backend.job(job.job_id()).backend_options() - self.assertEqual(backend_options['shots'], 1024) - self.assertTrue(backend_options['memory']) - self.assertEqual(backend_options['foo'], 'foo') - def test_deprecate_id_instruction(self): """Test replacement of 'id' Instructions with 'Delay' instructions.""" @@ -192,7 +153,7 @@ def setUpClass(cls, service, hub, group, project): def test_my_reservations(self): """Test my_reservations method""" - reservations = self.service.backend.my_reservations() + reservations = self.service.my_reservations() for reserv in reservations: for attr in reserv.__dict__: self.assertIsNotNone( diff --git a/test/ibm/test_ibm_provider.py b/test/ibm/test_ibm_provider.py index db7ff09c7..5a07cea9a 100644 --- a/test/ibm/test_ibm_provider.py +++ b/test/ibm/test_ibm_provider.py @@ -17,8 +17,7 @@ from unittest import skipIf, mock from configparser import ConfigParser from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister -from qiskit.test import providers, slow_test -from qiskit.compiler import transpile +from qiskit.test import providers from qiskit.providers.exceptions import QiskitBackendNotFoundError from qiskit.providers.models.backendproperties import BackendProperties @@ -35,7 +34,7 @@ from qiskit_ibm_runtime.apiconstants import QISKIT_IBM_RUNTIME_API_URL from ..ibm_test_case import IBMTestCase -from ..decorators import requires_device, requires_qe_access, requires_provider +from ..decorators import requires_qe_access, requires_provider from ..contextmanagers import custom_qiskitrc, no_envs, CREDENTIAL_ENV_VARS from ..utils import get_hgp @@ -383,42 +382,9 @@ def test_remote_backend_properties(self): if backend.configuration().simulator: self.assertEqual(properties, None) - def test_headers_in_result_sims(self): - """Test that the qobj headers are passed onto the results for sims.""" - backend = self.service.get_backend('ibmq_qasm_simulator', hub=self.hub, group=self.group, - project=self.project) - - custom_header = {'x': 1, 'y': [1, 2, 3], 'z': {'a': 4}} - circuits = transpile(self.qc1, backend=backend) - - # TODO Use circuit metadata for individual header when terra PR-5270 is released. - # qobj.experiments[0].header.some_field = 'extra info' - - job = backend.run(circuits, header=custom_header) - result = job.result() - self.assertTrue(custom_header.items() <= job.header().items()) - self.assertTrue(custom_header.items() <= result.header.to_dict().items()) - # self.assertEqual(result.results[0].header.some_field, 'extra info') - - @slow_test - @requires_device - def test_headers_in_result_devices(self, backend): - """Test that the qobj headers are passed onto the results for devices.""" - custom_header = {'x': 1, 'y': [1, 2, 3], 'z': {'a': 4}} - - # TODO Use circuit metadata for individual header when terra PR-5270 is released. - # qobj.experiments[0].header.some_field = 'extra info' - - job = backend.run(transpile(self.qc1, backend=backend), header=custom_header) - job.wait_for_final_state(wait=300, callback=self.simple_job_callback) - result = job.result() - self.assertTrue(custom_header.items() <= job.header().items()) - self.assertTrue(custom_header.items() <= result.header.to_dict().items()) - # self.assertEqual(result.results[0].header.some_field, 'extra info') - def test_aliases(self): """Test that display names of devices map the regular names.""" - aliased_names = self.service.backend._aliased_backend_names() + aliased_names = self.service._aliased_backend_names() for display_name, backend_name in aliased_names.items(): with self.subTest(display_name=display_name, @@ -456,5 +422,5 @@ def test_provider_backends(self): """Test provider_backends have correct attributes.""" provider_backends = {back for back in dir(self.service.backend) if isinstance(getattr(self.service.backend, back), IBMBackend)} - backends = {back.name().lower() for back in self.service.backend._backends.values()} + backends = {back.name().lower() for back in self.service._backends.values()} self.assertEqual(provider_backends, backends) diff --git a/test/ibm/test_ibm_qasm_simulator.py b/test/ibm/test_ibm_qasm_simulator.py deleted file mode 100644 index f483374ea..000000000 --- a/test/ibm/test_ibm_qasm_simulator.py +++ /dev/null @@ -1,172 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Test IBM Quantum online QASM simulator.""" - -from unittest import mock - -from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister -from qiskit.compiler import transpile -from qiskit.test.reference_circuits import ReferenceCircuits -from qiskit.providers.aer.noise import NoiseModel - -from ..ibm_test_case import IBMTestCase -from ..decorators import requires_provider, requires_device - - -class TestIBMQasmSimulator(IBMTestCase): - """Test IBM Quantum QASM Simulator.""" - - @requires_provider - def setUp(self, service, hub, group, project): - """Initial test setup.""" - # pylint: disable=arguments-differ - super().setUp() - self.service = service - self.hub = hub - self.group = group - self.project = project - self.sim_backend = self.service.get_backend('ibmq_qasm_simulator', hub=self.hub, - group=self.group, project=self.project) - - def test_execute_one_circuit_simulator_online(self): - """Test execute_one_circuit_simulator_online.""" - qr = QuantumRegister(1) - cr = ClassicalRegister(1) - qc = QuantumCircuit(qr, cr, name='qc') - qc.h(qr[0]) - qc.measure(qr[0], cr[0]) - circs = transpile(qc, backend=self.sim_backend, seed_transpiler=73846087) - shots = 1024 - job = self.sim_backend.run(circs, shots=shots) - result = job.result() - counts = result.get_counts(qc) - target = {'0': shots / 2, '1': shots / 2} - threshold = 0.1 * shots - self.assertDictAlmostEqual(counts, target, threshold) - - def test_execute_several_circuits_simulator_online(self): - """Test execute_several_circuits_simulator_online.""" - qr = QuantumRegister(2) - cr = ClassicalRegister(2) - qcr1 = QuantumCircuit(qr, cr, name='qc1') - qcr2 = QuantumCircuit(qr, cr, name='qc2') - qcr1.h(qr) - qcr2.h(qr[0]) - qcr2.cx(qr[0], qr[1]) - qcr1.measure(qr[0], cr[0]) - qcr1.measure(qr[1], cr[1]) - qcr2.measure(qr[0], cr[0]) - qcr2.measure(qr[1], cr[1]) - shots = 1024 - circs = transpile([qcr1, qcr2], backend=self.sim_backend, seed_transpiler=73846087) - job = self.sim_backend.run(circs, shots=shots) - result = job.result() - counts1 = result.get_counts(qcr1) - counts2 = result.get_counts(qcr2) - target1 = {'00': shots / 4, '01': shots / 4, - '10': shots / 4, '11': shots / 4} - target2 = {'00': shots / 2, '11': shots / 2} - threshold = 0.1 * shots - self.assertDictAlmostEqual(counts1, target1, threshold) - self.assertDictAlmostEqual(counts2, target2, threshold) - - def test_online_qasm_simulator_two_registers(self): - """Test online_qasm_simulator_two_registers.""" - qr1 = QuantumRegister(2) - cr1 = ClassicalRegister(2) - qr2 = QuantumRegister(2) - cr2 = ClassicalRegister(2) - qcr1 = QuantumCircuit(qr1, qr2, cr1, cr2, name="circuit1") - qcr2 = QuantumCircuit(qr1, qr2, cr1, cr2, name="circuit2") - qcr1.x(qr1[0]) - qcr2.x(qr2[1]) - qcr1.measure(qr1[0], cr1[0]) - qcr1.measure(qr1[1], cr1[1]) - qcr1.measure(qr2[0], cr2[0]) - qcr1.measure(qr2[1], cr2[1]) - qcr2.measure(qr1[0], cr1[0]) - qcr2.measure(qr1[1], cr1[1]) - qcr2.measure(qr2[0], cr2[0]) - qcr2.measure(qr2[1], cr2[1]) - circs = transpile([qcr1, qcr2], self.sim_backend, seed_transpiler=8458) - job = self.sim_backend.run(circs, shots=1024) - result = job.result() - result1 = result.get_counts(qcr1) - result2 = result.get_counts(qcr2) - self.assertEqual(result1, {'00 01': 1024}) - self.assertEqual(result2, {'10 00': 1024}) - - def test_conditional_operation(self): - """Test conditional operation.""" - qr = QuantumRegister(4) - cr = ClassicalRegister(4) - circuit = QuantumCircuit(qr, cr) - circuit.x(qr[0]) - circuit.x(qr[2]) - circuit.measure(qr[0], cr[0]) - circuit.x(qr[0]).c_if(cr, 1) - - result = self.sim_backend.run(transpile(circuit, backend=self.sim_backend)).result() - self.assertEqual(result.get_counts(circuit), {'0001': 4000}) - - def test_new_sim_method(self): - """Test new simulator methods.""" - def _new_submit(qobj, *args, **kwargs): - # pylint: disable=unused-argument - self.assertEqual(qobj.config.method, 'extended_stabilizer', - f"qobj header={qobj.header}") - return mock.MagicMock() - - backend = self.sim_backend - - sim_method = backend._configuration._data.get('simulation_method', None) - submit_fn = backend._submit_job - - try: - backend._configuration._data['simulation_method'] = 'extended_stabilizer' - backend._submit_job = _new_submit - circ = transpile(ReferenceCircuits.bell(), backend=backend) - backend.run(circ, header={'test': 'circuits'}) - finally: - backend._configuration._data['simulation_method'] = sim_method - backend._submit_job = submit_fn - - def test_new_sim_method_no_overwrite(self): - """Test custom method option is not overwritten.""" - def _new_submit(qobj, *args, **kwargs): - # pylint: disable=unused-argument - self.assertEqual(qobj.config.method, 'my_method', f"qobj header={qobj.header}") - return mock.MagicMock() - - backend = self.sim_backend - - sim_method = backend._configuration._data.get('simulation_method', None) - submit_fn = backend._submit_job - - try: - backend._configuration._data['simulation_method'] = 'extended_stabilizer' - backend._submit_job = _new_submit - circ = transpile(ReferenceCircuits.bell(), backend=backend) - backend.run(circ, method='my_method', header={'test': 'circuits'}) - finally: - backend._configuration._data['simulation_method'] = sim_method - backend._submit_job = submit_fn - - @requires_device - def test_simulator_with_noise_model(self, backend): - """Test using simulator with a noise model.""" - noise_model = NoiseModel.from_backend(backend) - result = self.sim_backend.run( - transpile(ReferenceCircuits.bell(), backend=self.sim_backend), - noise_model=noise_model).result() - self.assertTrue(result) diff --git a/test/ibm/test_serialization.py b/test/ibm/test_serialization.py index 12f524fe7..d31f396c6 100644 --- a/test/ibm/test_serialization.py +++ b/test/ibm/test_serialization.py @@ -12,21 +12,16 @@ """Test serializing and deserializing data sent to the server.""" -from unittest import SkipTest, skipIf +from unittest import skipIf from typing import Any, Dict, Optional import dateutil.parser -from qiskit.test.reference_circuits import ReferenceCircuits -from qiskit.test import slow_test -from qiskit import transpile, schedule, QuantumCircuit from qiskit.circuit import Parameter from qiskit.version import VERSION as terra_version -from qiskit_ibm_runtime import least_busy from qiskit_ibm_runtime.utils.json_encoder import IBMJsonEncoder from ..decorators import requires_provider -from ..utils import cancel_job from ..ibm_test_case import IBMTestCase @@ -45,38 +40,6 @@ def setUpClass(cls, service, hub, group, project): cls.project = project cls.sim_backend = service.get_backend('ibmq_qasm_simulator', hub=cls.hub, group=cls.group, project=cls.project) - cls.bell = transpile(ReferenceCircuits.bell(), backend=cls.sim_backend) - - def test_qasm_qobj(self): - """Test serializing qasm qobj data.""" - job = self.sim_backend.run(self.bell) - rqobj = self.service.backend.job(job.job_id())._get_qobj() - - self.assertEqual(_array_to_list(job._get_qobj().to_dict()), rqobj.to_dict()) - - def test_pulse_qobj(self): - """Test serializing pulse qobj data.""" - backends = self.service.backends(operational=True, open_pulse=True, hub=self.hub, - group=self.group, project=self.project) - if not backends: - self.skipTest('Need pulse backends.') - - backend = backends[0] - config = backend.configuration() - defaults = backend.defaults() - inst_map = defaults.instruction_schedule_map - - x = inst_map.get('x', 0) - measure = inst_map.get('measure', range(config.n_qubits)) << x.duration - schedules = x | measure - - job = backend.run(schedules, meas_level=1, shots=256) - rqobj = self.service.backend.job(job.job_id())._get_qobj() - # Convert numpy arrays to lists since they now get converted right - # before being sent to the server. - self.assertEqual(_array_to_list(job._get_qobj().to_dict()), rqobj.to_dict()) - - cancel_job(job) def test_backend_configuration(self): """Test deserializing backend configuration.""" @@ -121,35 +84,6 @@ def test_backend_properties(self): properties = backend.properties() self._verify_data(properties.to_dict(), good_keys) - def test_qasm_job_result(self): - """Test deserializing a QASM job result.""" - result = self.sim_backend.run(self.bell).result() - - # Known keys that look like a serialized complex number. - good_keys = ('results.metadata.input_qubit_map', 'results.metadata.active_input_qubits') - - self._verify_data(result.to_dict(), good_keys=good_keys) - - @slow_test - def test_pulse_job_result(self): - """Test deserializing a pulse job result.""" - backends = self.service.backends(open_pulse=True, operational=True, hub=self.hub, - group=self.group, project=self.project) - if not backends: - raise SkipTest('Skipping pulse test since no pulse backend found.') - - backend = least_busy(backends) - qc = QuantumCircuit(1, 1) - qc.x(0) - qc.measure([0], [0]) - sched = schedule(transpile(qc, backend=backend), backend=backend) - job = backend.run(sched) - result = job.result() - - # Known keys that look like a serialized object. - good_keys = ('header.backend_version', 'backend_version') - self._verify_data(result.to_dict(), good_keys) - def _verify_data( self, data: Dict, diff --git a/test/ibm_test_case.py b/test/ibm_test_case.py index d87149010..fd311a6c7 100644 --- a/test/ibm_test_case.py +++ b/test/ibm_test_case.py @@ -15,21 +15,16 @@ import os import logging import inspect -import time -from functools import partialmethod from qiskit.test.base import BaseQiskitTestCase from qiskit_ibm_runtime import QISKIT_IBM_RUNTIME_LOGGER_NAME -from qiskit_ibm_runtime.api.clients.account import AccountClient -from qiskit_ibm_runtime.apiconstants import ApiJobStatus, API_JOB_FINAL_STATES -from qiskit_ibm_runtime.job.exceptions import IBMJobNotFoundError from .utils import setup_test_logging class IBMTestCase(BaseQiskitTestCase): - """Custom TestCase for use with the IBM Provider.""" + """Custom TestCase for use with the Qiskit IBM Runtime.""" @classmethod def setUpClass(cls): @@ -39,13 +34,6 @@ def setUpClass(cls): setup_test_logging(cls.log, filename) cls._set_logging_level(logging.getLogger(QISKIT_IBM_RUNTIME_LOGGER_NAME)) - @classmethod - def simple_job_callback(cls, job_id, job_status, job, **kwargs): - """A callback function that logs current job status.""" - # pylint: disable=unused-argument - queue_info = kwargs.get('queue_info', 'unknown') - cls.log.info("Job %s status is %s, queue_info is %s", job_id, job_status, queue_info) - @classmethod def _set_logging_level(cls, logger: logging.Logger) -> None: """Set logging level for the input logger. @@ -63,45 +51,3 @@ def _set_logging_level(cls, logger: logging.Logger) -> None: if not any(isinstance(handler, logging.StreamHandler) for handler in logger.handlers): logger.addHandler(logging.StreamHandler()) logger.propagate = False - - def setUp(self) -> None: - """Test level setup.""" - super().setUp() - # Record submitted jobs. - self._jobs = [] - self._saved_submit = AccountClient.job_submit - AccountClient.job_submit = partialmethod(self._recorded_submit) - - def tearDown(self) -> None: - """Test level tear down.""" - super().tearDown() - failed = False - # It's surprisingly difficult to find out whether the test failed. - # Using a private attribute is not ideal but it'll have to do. - for _, exc_info in self._outcome.errors: - if exc_info is not None: - failed = True - - if not failed: - for client, job_id in self._jobs: - try: - job_status = client.job_get(job_id)['status'] - if ApiJobStatus(job_status) not in API_JOB_FINAL_STATES: - client.job_cancel(job_id) - time.sleep(1) - retry = 3 - while retry > 0: - try: - client.job_delete(job_id) - time.sleep(1) - retry -= 1 - except IBMJobNotFoundError: - retry = 0 - except Exception: # pylint: disable=broad-except - pass - - def _recorded_submit(self, client, *args, **kwargs): - """Record submitted jobs.""" - submit_info = self._saved_submit(client, *args, **kwargs) - self._jobs.append((client, submit_info['job_id'])) - return submit_info