Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove backend.run() #1962

Merged
merged 12 commits into from
Jan 7, 2025
349 changes: 16 additions & 333 deletions qiskit_ibm_runtime/ibm_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
"""Module for interfacing with an IBM Quantum Backend."""

import logging
from typing import Iterable, Union, Optional, Any, List, Dict
from typing import Iterable, Union, Optional, Any, List
from datetime import datetime as python_datetime
from copy import deepcopy
from dataclasses import asdict
import warnings

from qiskit import QuantumCircuit
from qiskit.qobj.utils import MeasLevel, MeasReturnType
Expand All @@ -41,30 +39,22 @@
PulseBackendConfiguration,
)

# temporary until we unite the 2 Session classes
from .provider_session import (
Session as ProviderSession,
)

from . import qiskit_runtime_service # pylint: disable=unused-import,cyclic-import
from .runtime_job import RuntimeJob

from .api.clients import RuntimeClient
from .exceptions import IBMBackendApiProtocolError, IBMBackendValueError, IBMBackendApiError
from .exceptions import (
IBMBackendApiProtocolError,
IBMBackendValueError,
IBMBackendError,
)
from .utils.backend_converter import convert_to_target
from .utils.default_session import get_cm_session as get_cm_primitive_session

from .utils.backend_decoder import (
defaults_from_server_data,
properties_from_server_data,
configuration_from_server_data,
)
from .utils.deprecation import issue_deprecation_msg
from .utils.options import QASM2Options, QASM3Options
from .api.exceptions import RequestsApiError
from .utils import local_to_utc, are_circuits_dynamic, validate_job_tags

from .utils.pubsub import Publisher

from .utils import local_to_utc

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -194,7 +184,6 @@ def __init__(
self._defaults: Any = None
self._target: Any = None
self._max_circuits = configuration.max_experiments
self._session: ProviderSession = None
if (
not self._configuration.simulator
and hasattr(self.options, "noise_model")
Expand Down Expand Up @@ -591,323 +580,17 @@ def __deepcopy__(self, _memo: dict = None) -> "IBMBackend":
cpy._options = deepcopy(self._options, _memo)
return cpy

def run(
self,
circuits: Union[QuantumCircuit, str, List[Union[QuantumCircuit, str]]],
dynamic: bool = None,
job_tags: Optional[List[str]] = None,
init_circuit: Optional[QuantumCircuit] = None,
init_num_resets: Optional[int] = None,
header: Optional[Dict] = None,
shots: Optional[Union[int, float]] = None,
memory: Optional[bool] = None,
meas_level: Optional[Union[int, MeasLevel]] = None,
meas_return: Optional[Union[str, MeasReturnType]] = None,
rep_delay: Optional[float] = None,
init_qubits: Optional[bool] = None,
use_measure_esp: Optional[bool] = None,
noise_model: Optional[Any] = None,
seed_simulator: Optional[int] = None,
**run_config: Dict,
) -> RuntimeJob:
"""Run on the backend.
If a keyword specified here is also present in the ``options`` attribute/object,
the value specified here will be used for this run.

Args:
circuits: An individual or a
list of :class:`~qiskit.circuits.QuantumCircuit`.
dynamic: Whether the circuit is dynamic (uses in-circuit conditionals)
job_tags: Tags to be assigned to the job. The tags can subsequently be used
as a filter in the :meth:`jobs()` function call.
init_circuit: A quantum circuit to execute for initializing qubits before each circuit.
If specified, ``init_num_resets`` is ignored. Applicable only if ``dynamic=True``
is specified.
init_num_resets: The number of qubit resets to insert before each circuit execution.
header: User input that will be attached to the job and will be
copied to the corresponding result header. Headers do not affect the run.
This replaces the old ``Qobj`` header. This parameter is applicable only
if ``dynamic=False`` is specified or defaulted to.
shots: Number of repetitions of each circuit, for sampling. Default: 4000
or ``max_shots`` from the backend configuration, whichever is smaller.
This parameter is applicable only if ``dynamic=False`` is specified or defaulted to.
memory: If ``True``, per-shot measurement bitstrings are returned as well
(provided the backend supports it). For OpenPulse jobs, only
measurement level 2 supports this option. This parameter is applicable only if
``dynamic=False`` is specified or defaulted to.
meas_level: Level of the measurement output for pulse experiments. See
`OpenPulse specification <https://arxiv.org/pdf/1809.03452.pdf>`_ for details:

* ``0``, measurements of the raw signal (the measurement output pulse envelope)
* ``1``, measurement kernel is selected (a complex number obtained after applying the
measurement kernel to the measurement output signal)
* ``2`` (default), a discriminator is selected and the qubit state is stored (0 or 1)

This parameter is applicable only if ``dynamic=False`` is specified or defaulted to.
meas_return: Level of measurement data for the backend to return. For ``meas_level`` 0 and 1:

* ``single`` returns information from every shot.
* ``avg`` returns average measurement output (averaged over number of shots).

This parameter is applicable only if ``dynamic=False`` is specified or defaulted to.
rep_delay: Delay between primitives in seconds. Only supported on certain
backends (if ``backend.configuration().dynamic_reprate_enabled=True``).
If supported, ``rep_delay`` must be from the range supplied
by the backend (``backend.configuration().rep_delay_range``). Default is given by
``backend.configuration().default_rep_delay``. This parameter is applicable only if
``dynamic=False`` is specified or defaulted to.
init_qubits: Whether to reset the qubits to the ground state for each shot.
Default: ``True``. This parameter is applicable only if ``dynamic=False`` is specified
or defaulted to.
use_measure_esp: Whether to use excited state promoted (ESP) readout for measurements
which are the terminal instruction to a qubit. ESP readout can offer higher fidelity
than standard measurement sequences. See
`here <https://arxiv.org/pdf/2008.08571.pdf>`_.
Default: ``True`` if backend supports ESP readout, else ``False``. Backend support
for ESP readout is determined by the flag ``measure_esp_enabled`` in
``backend.configuration()``. This parameter is applicable only if ``dynamic=False`` is
specified or defaulted to.
noise_model: Noise model (Simulators only). This parameter is applicable
only if ``dynamic=False`` is specified or defaulted to.
seed_simulator: Random seed to control sampling (Simulators only). This parameter
is applicable only if ``dynamic=False`` is specified or defaulted to.
**run_config: Extra arguments used to configure the run. This parameter is applicable
only if ``dynamic=False`` is specified or defaulted to.

Returns:
The job to be executed.

Raises:
IBMBackendApiError: If an unexpected error occurred while submitting
the job.
IBMBackendApiProtocolError: If an unexpected value received from
the server.
IBMBackendValueError:
- If an input parameter value is not valid.
- If ESP readout is used and the backend does not support this.
def run(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
"""
# pylint: disable=arguments-differ
issue_deprecation_msg(
msg="backend.run() and related sessions methods are deprecated ",
version="0.23",
remedy="More details can be found in the primitives migration "
"guide https://docs.quantum.ibm.com/migration-guides/qiskit-runtime.",
period="6 months",
)
validate_job_tags(job_tags)
if not isinstance(circuits, List):
circuits = [circuits]
self._check_circuits_attributes(circuits)

if use_measure_esp and getattr(self.configuration(), "measure_esp_enabled", False) is False:
raise IBMBackendValueError(
"ESP readout not supported on this device. Please make sure the flag "
"'use_measure_esp' is unset or set to 'False'."
)
actually_dynamic = are_circuits_dynamic(circuits)
if dynamic is False and actually_dynamic:
warnings.warn(
"Parameter 'dynamic' is False, but the circuit contains dynamic constructs."
)
dynamic = dynamic or actually_dynamic

if dynamic and "qasm3" not in getattr(self.configuration(), "supported_features", []):
warnings.warn(f"The backend {self.name} does not support dynamic circuits.")

status = self.status()
if status.operational is True and status.status_msg != "active":
warnings.warn(f"The backend {self.name} is currently paused.")

program_id = str(run_config.get("program_id", ""))
if program_id:
run_config.pop("program_id", None)
else:
program_id = QASM3RUNNERPROGRAMID if dynamic else QOBJRUNNERPROGRAMID

image: Optional[str] = run_config.get("image", None) # type: ignore
if image is not None:
image = str(image)

if isinstance(init_circuit, bool):
raise IBMBackendApiError(
"init_circuit does not accept boolean values. "
"A quantum circuit should be passed in instead."
)

if isinstance(shots, float):
shots = int(shots)

run_config_dict = self._get_run_config(
program_id=program_id,
init_circuit=init_circuit,
init_num_resets=init_num_resets,
header=header,
shots=shots,
memory=memory,
meas_level=meas_level,
meas_return=meas_return,
rep_delay=rep_delay,
init_qubits=init_qubits,
use_measure_esp=use_measure_esp,
noise_model=noise_model,
seed_simulator=seed_simulator,
**run_config,
)

run_config_dict["circuits"] = circuits

return self._runtime_run(
program_id=program_id,
inputs=run_config_dict,
backend_name=self.name,
job_tags=job_tags,
image=image,
)

def _runtime_run(
self,
program_id: str,
inputs: Dict,
backend_name: str,
job_tags: Optional[List[str]] = None,
image: Optional[str] = None,
) -> RuntimeJob:
"""Runs the runtime program and returns the corresponding job object"""
hgp_name = None
if self._service._channel == "ibm_quantum":
hgp_name = self._instance or self._service._get_hgp().name

# Check if initialized within a Primitive session. If so, issue a warning.
if get_cm_primitive_session():
warnings.warn(
"A Primitive session is open but Backend.run() jobs will not be run within this session"
)
session_id = None
if self._session:
if not self._session.active:
raise RuntimeError(f"The session {self._session.session_id} is closed.")
session_id = self._session.session_id

log_level = getattr(self.options, "log_level", None) # temporary
try:
response = self._api_client.program_run(
program_id=program_id,
backend_name=backend_name,
params=inputs,
hgp=hgp_name,
log_level=log_level,
job_tags=job_tags,
session_id=session_id,
start_session=False,
image=image,
)
except RequestsApiError as ex:
raise IBMBackendApiError("Error submitting job: {}".format(str(ex))) from ex
try:
job = RuntimeJob(
backend=self,
api_client=self._api_client,
client_params=self._service._client_params,
job_id=response["id"],
program_id=program_id,
session_id=session_id,
service=self.service,
tags=job_tags,
)
logger.debug("Job %s was successfully submitted.", job.job_id())
except TypeError as err:
logger.debug("Invalid job data received: %s", response)
raise IBMBackendApiProtocolError(
"Unexpected return value received from the server "
"when submitting job: {}".format(str(err))
) from err
Publisher().publish("ibm.job.start", job)
return job

def _get_run_config(self, program_id: str, **kwargs: Any) -> Dict:
"""Return the consolidated runtime configuration."""
# Check if is a QASM3 like program id.
if program_id.startswith(QASM3RUNNERPROGRAMID):
fields = asdict(QASM3Options()).keys()
run_config_dict = QASM3Options().to_transport_dict()
else:
fields = asdict(QASM2Options()).keys()
run_config_dict = QASM2Options().to_transport_dict()
backend_options = self._options.__dict__
for key, val in kwargs.items():
if val is not None:
run_config_dict[key] = val
if key not in fields and not self.configuration().simulator:
warnings.warn( # type: ignore[unreachable]
f"{key} is not a recognized runtime option and may be ignored by the backend.",
stacklevel=4,
)
elif backend_options.get(key) is not None and key in fields:
run_config_dict[key] = backend_options[key]
return run_config_dict

def open_session(self, max_time: Optional[Union[int, str]] = None) -> ProviderSession:
"""Open session"""
issue_deprecation_msg(
msg="backend.run() and related sessions methods are deprecated ",
version="0.23",
remedy="More details can be found in the primitives migration guide "
"https://docs.quantum.ibm.com/migration-guides/qiskit-runtime.",
period="6 months",
)
if not self._configuration.simulator:
new_session = self._service._api_client.create_session(
self.name, self._instance, max_time, self._service.channel
)
self._session = ProviderSession(max_time=max_time, session_id=new_session.get("id"))
else:
self._session = ProviderSession()
return self._session
Raises:
IBMBackendError: The run() method is no longer supported.

@property
def session(self) -> ProviderSession:
"""Return session"""
issue_deprecation_msg(
msg="backend.run() and related sessions methods are deprecated ",
version="0.23",
remedy="More details can be found in the primitives migration "
"guide https://docs.quantum.ibm.com/migration-guides/qiskit-runtime.",
period="6 months",
)
return self._session

def cancel_session(self) -> None:
"""Cancel session. All pending jobs will be cancelled."""
issue_deprecation_msg(
msg="backend.run() and related sessions methods are deprecated ",
version="0.23",
remedy="More details can be found in the primitives migration "
"guide https://docs.quantum.ibm.com/migration-guides/qiskit-runtime.",
period="6 months",
)
if self._session:
self._session.cancel()
if self._session.session_id:
self._api_client.close_session(self._session.session_id)

self._session = None

def close_session(self) -> None:
"""Close the session so new jobs will no longer be accepted, but existing
queued or running jobs will run to completion. The session will be terminated once there
are no more pending jobs."""
issue_deprecation_msg(
msg="backend.run() and related sessions methods are deprecated ",
version="0.23",
remedy="More details can be found in the primitives migration "
"guide https://docs.quantum.ibm.com/migration-guides/qiskit-runtime.",
period="6 months",
"""
raise IBMBackendError(
"Support for backend.run() has been removed. Please see our migration guide "
"https://docs.quantum.ibm.com/migration-guides/qiskit-runtime for instructions "
"on how to migrate to the primitives interface."
)
if self._session:
self._session.cancel()
if self._session.session_id:
self._api_client.close_session(self._session.session_id)
self._session = None

def get_translation_stage_plugin(self) -> str:
"""Return the default translation stage plugin name for IBM backends."""
Expand Down
Loading
Loading