diff --git a/qiskit_ibm_runtime/accounts/__init__.py b/qiskit_ibm_runtime/accounts/__init__.py new file mode 100644 index 000000000..ea424ef45 --- /dev/null +++ b/qiskit_ibm_runtime/accounts/__init__.py @@ -0,0 +1,18 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +Account management functionality related to the IBM Runtime Services. +""" + +from .account import Account, AccountType +from .management import AccountManager diff --git a/qiskit_ibm_runtime/accounts/account.py b/qiskit_ibm_runtime/accounts/account.py new file mode 100644 index 000000000..021b0702b --- /dev/null +++ b/qiskit_ibm_runtime/accounts/account.py @@ -0,0 +1,104 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Account related classes and functions.""" + + +from typing import Optional +from urllib.parse import urlparse + +from typing_extensions import Literal + +AccountType = Optional[Literal["cloud", "legacy"]] + +LEGACY_API_URL = "https://auth.quantum-computing.ibm.com/api" +CLOUD_API_URL = "https://cloud.ibm.com" + + +def _assert_valid_auth(auth: AccountType) -> None: + """Assert that the auth parameter is valid.""" + if not (auth in ["cloud", "legacy"]): + raise ValueError( + f"Inappropriate `auth` value. Expected one of ['cloud', 'legacy'], got '{auth}'." + ) + + +def _assert_valid_token(token: str) -> None: + """Assert that the token is valid.""" + if not (isinstance(token, str) and len(token) > 0): + raise ValueError( + f"Inappropriate `token` value. Expected a non-empty string, got '{token}'." + ) + + +def _assert_valid_url(url: str) -> None: + """Assert that the URL is valid.""" + try: + urlparse(url) + except: + raise ValueError(f"Inappropriate `url` value. Failed to parse '{url}' as URL.") + + +def _assert_valid_instance(auth: AccountType, instance: str) -> None: + """Assert that the instance name is valid for the given account type.""" + if auth == "cloud": + if not (isinstance(instance, str) and len(instance) > 0): + raise ValueError( + f"Inappropriate `instance` value. Expected a non-empty string." + ) + # TODO: add validation for legacy instance when adding test coverage + + +class Account: + """Class that represents an account.""" + + def __init__( + self, + auth: AccountType, + token: str, + url: Optional[str], + instance: Optional[str] = None, + # TODO: add validation for proxies input format + proxies: Optional[dict] = None, + verify: Optional[bool] = True, + ): + """Account constructor.""" + _assert_valid_auth(auth) + self.auth = auth + + _assert_valid_token(token) + self.token = token + + resolved_url = url or LEGACY_API_URL if auth == "legacy" else CLOUD_API_URL + _assert_valid_url(resolved_url) + self.url = resolved_url + + _assert_valid_instance(auth, instance) + self.instance = instance + self.proxies = proxies + self.verify = verify + + def to_saved_format(self) -> dict: + """Returns a dictionary that represents how the account is saved on disk.""" + return {k: v for k, v in self.__dict__.items() if v is not None} + + @classmethod + def from_saved_format(cls, data: dict) -> "Account": + """Creates an account instance from data saved on disk.""" + return cls( + auth=data.get("auth"), + url=data.get("url"), + token=data.get("token"), + instance=data.get("instance"), + proxies=data.get("proxies"), + verify=data.get("verify"), + ) diff --git a/qiskit_ibm_runtime/accounts/exceptions.py b/qiskit_ibm_runtime/accounts/exceptions.py new file mode 100644 index 000000000..bd20386f9 --- /dev/null +++ b/qiskit_ibm_runtime/accounts/exceptions.py @@ -0,0 +1,19 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Exceptions for the ``Accounts`` module.""" + +from ..exceptions import IBMError + + +class AccountsError(IBMError): + """Base class for errors raised during account management.""" diff --git a/qiskit_ibm_runtime/accounts/management.py b/qiskit_ibm_runtime/accounts/management.py new file mode 100644 index 000000000..ca7cdd63c --- /dev/null +++ b/qiskit_ibm_runtime/accounts/management.py @@ -0,0 +1,71 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Account management related classes and functions.""" + +import os +from typing import Optional, Union + +from .account import Account, AccountType +from .storage import save_config, read_config, delete_config + +_DEFAULT_ACCOUNG_CONFIG_JSON_FILE = os.path.join( + os.path.expanduser("~"), ".qiskit", "qiskit-ibm.json" +) +_DEFAULT_ACCOUNT_NAME = "default" + + +class AccountManager: + """Class that bundles account management related functionality.""" + + @staticmethod + def save( + token: Optional[str] = None, + url: Optional[str] = None, + instance: Optional[str] = None, + auth: Optional[AccountType] = None, + name: Optional[str] = _DEFAULT_ACCOUNT_NAME, + proxies: Optional[dict] = None, + verify: Optional[bool] = None, + ) -> None: + """Save account on disk.""" + + return save_config( + filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE, + name=name, + config=Account( + token=token, + url=url, + instance=instance, + auth=auth, + proxies=proxies, + verify=verify, + ).to_saved_format(), + ) + + @staticmethod + def list() -> Union[dict, None]: + """List all accounts saved on disk.""" + + return read_config(filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE) + + @staticmethod + def get(name: Optional[str] = _DEFAULT_ACCOUNT_NAME) -> Account: + """Read account from disk.""" + return Account.from_saved_format( + read_config(filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE, name=name) + ) + + @staticmethod + def delete(name: Optional[str] = _DEFAULT_ACCOUNT_NAME) -> bool: + """Delete account from disk.""" + return delete_config(name=name, filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE) diff --git a/qiskit_ibm_runtime/accounts/storage.py b/qiskit_ibm_runtime/accounts/storage.py new file mode 100644 index 000000000..aa827b723 --- /dev/null +++ b/qiskit_ibm_runtime/accounts/storage.py @@ -0,0 +1,82 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Utility functions related to storing account configuration on disk.""" + +import json +import os +from typing import Optional, Union + + +def save_config( + filename: str, + name: str, + config: dict, +) -> None: + """Save configuration data in a JSON file under the given name.""" + + _ensure_file_exists(filename) + + with open(filename, mode="r") as json_in: + data = json.load(json_in) + + with open(filename, mode="w") as json_out: + data[name] = config + json.dump(data, json_out, sort_keys=True, indent=4) + + +def read_config( + filename: str, + name: Optional[str] = None, +) -> Union[dict, None]: + """Save configuration data from a JSON file.""" + + _ensure_file_exists(filename) + + with open(filename) as json_file: + data = json.load(json_file) + if name is None: + return data + if name in data: + return data[name] + + return None + + +def delete_config( + filename: str, + name: str, +) -> bool: + """Delete configuration data from a JSON file.""" + + _ensure_file_exists(filename) + with open(filename, mode="r") as json_in: + data = json.load(json_in) + + if name in data: + with open(filename, mode="w") as json_out: + del data[name] + json.dump(data, json_out, sort_keys=True, indent=4) + return True + + return False + + +def _ensure_file_exists(filename: str, initial_content: str = "{}") -> None: + if not os.path.isfile(filename): + + # create parent directories + os.makedirs(os.path.dirname(filename), exist_ok=True) + + # initialize file + with open(filename, mode="w") as json_file: + json_file.write(initial_content) diff --git a/qiskit_ibm_runtime/credentials/__init__.py b/qiskit_ibm_runtime/credentials/__init__.py index bf742b746..10b22854e 100644 --- a/qiskit_ibm_runtime/credentials/__init__.py +++ b/qiskit_ibm_runtime/credentials/__init__.py @@ -32,47 +32,31 @@ .. autosummary:: :toctree: ../stubs/ - CredentialsError - InvalidCredentialsFormatError - CredentialsNotFoundError + """ -from collections import OrderedDict -from typing import Dict, Optional, Tuple, Any import logging +from collections import OrderedDict +from typing import Dict, Tuple, Any from .credentials import Credentials -from .hub_group_project_id import HubGroupProjectID +from .environ import read_credentials_from_environ from .exceptions import ( CredentialsError, - InvalidCredentialsFormatError, - CredentialsNotFoundError, HubGroupProjectIDInvalidStateError, ) -from .configrc import ( - read_credentials_from_qiskitrc, - store_credentials, - store_preferences, -) -from .environ import read_credentials_from_environ +from .hub_group_project_id import HubGroupProjectID logger = logging.getLogger(__name__) -def discover_credentials( - qiskitrc_filename: Optional[str] = None, -) -> Tuple[Dict[HubGroupProjectID, Credentials], Dict]: +def discover_credentials() -> Tuple[Dict[HubGroupProjectID, Credentials], Dict]: """Automatically discover credentials for IBM Quantum. This method looks for credentials in the following places in order and returns the first ones found: 1. The environment variables. - 2. The ``qiskitrc`` configuration file - - Args: - qiskitrc_filename: Full path to the ``qiskitrc`` configuration - file. If ``None``, ``$HOME/.qiskitrc/qiskitrc`` is used. Raises: HubGroupProjectIDInvalidStateError: If the default provider stored on @@ -92,10 +76,6 @@ def discover_credentials( readers = OrderedDict( [ ("environment variables", (read_credentials_from_environ, {})), - ( - "qiskitrc", - (read_credentials_from_qiskitrc, {"filename": qiskitrc_filename}), - ), ] ) # type: OrderedDict[str, Any] diff --git a/qiskit_ibm_runtime/credentials/configrc.py b/qiskit_ibm_runtime/credentials/configrc.py deleted file mode 100644 index fd10ec309..000000000 --- a/qiskit_ibm_runtime/credentials/configrc.py +++ /dev/null @@ -1,293 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Utilities for reading and writing credentials from and to configuration files.""" - -import logging -import os -from ast import literal_eval -from collections import defaultdict -from configparser import ConfigParser, ParsingError -from typing import Dict, Tuple, Optional, Any, Union - -from .credentials import Credentials -from .hub_group_project_id import HubGroupProjectID -from .exceptions import InvalidCredentialsFormatError, CredentialsNotFoundError - -logger = logging.getLogger(__name__) - -DEFAULT_QISKITRC_FILE = os.path.join(os.path.expanduser("~"), ".qiskit", "qiskitrc") -"""Default location of the configuration file.""" - -_ACTIVE_PREFERENCES = {"experiment": {"auto_save": lambda val: val.lower() == "true"}} -_PREFERENCES_SECTION_NAME = "qiskit_ibm_runtime.preferences" - - -def read_credentials_from_qiskitrc( - filename: Optional[str] = None, -) -> Tuple[Dict[HubGroupProjectID, Credentials], Dict]: - """Read a configuration file and return a dictionary with its contents. - - Args: - filename: Full path to the configuration file. If ``None``, the default - location is used (``$HOME/.qiskit/qiskitrc``). - - Returns: - A tuple containing the found credentials and the stored - preferences, if any, in the configuration file. The format - for the found credentials is ``{credentials_unique_id: Credentials}``, - whereas the preferences is ``{credentials_unique_id: {category: {key: val}}}``. - - Raises: - InvalidCredentialsFormatError: If the file cannot be parsed. Note - that this exception is not raised if the input file - does not exist, and an empty dictionary is returned instead. - HubGroupProjectIDInvalidStateError: If the default provider stored on - disk could not be parsed. - """ - filename = filename or DEFAULT_QISKITRC_FILE - config_parser = ConfigParser() - # Preserve case. - config_parser.optionxform = str # type: ignore - try: - config_parser.read(filename) - except ParsingError as ex: - raise InvalidCredentialsFormatError( - "Error parsing file {}: {}".format(filename, str(ex)) - ) from ex - - # Build the credentials dictionary. - credentials_dict: Dict[HubGroupProjectID, Credentials] = {} - preferences: Dict[HubGroupProjectID, Dict] = {} - - for name in config_parser.sections(): - if not name.startswith("qiskit_ibm_runtime"): - continue - - single_section = dict(config_parser.items(name)) - configs: Dict[str, Union[str, bool, HubGroupProjectID, Dict]] = {} - - if name == _PREFERENCES_SECTION_NAME: - preferences = _parse_preferences(single_section) - continue - - # Individually convert keys to their right types. - # TODO: consider generalizing, moving to json configuration or a more - # robust alternative. - for key, val in single_section.items(): - if key == "proxies": - configs[key] = literal_eval(val) - elif key == "verify": - configs[key] = config_parser[name].getboolean("verify") - elif key == "default_provider": - configs[key] = HubGroupProjectID.from_stored_format(val) - elif key == "url": - configs[key] = val - configs["auth_url"] = val - else: - configs[key] = val - - new_credentials = Credentials(**configs) # type: ignore[arg-type] - credentials_dict[new_credentials.unique_id()] = new_credentials - - return credentials_dict, preferences - - -def _parse_preferences(pref_section: Dict) -> Dict[HubGroupProjectID, Dict]: - """Parse the preferences section. - - Args: - pref_section: Preferences section. - - Returns: - Parsed preferences, indexed by hub/group/project. - """ - preferences: Dict[HubGroupProjectID, Dict] = defaultdict(dict) - for key, val in pref_section.items(): - # Preferences section format is hgp,category,item=value - elems = key.split(",") - if len(elems) != 3: - continue - hgp_id, pref_cat, pref_key = elems - try: - val_type = _ACTIVE_PREFERENCES[pref_cat][pref_key] - except KeyError: - continue - hgp_id = HubGroupProjectID.from_stored_format(hgp_id) - cur_val = preferences[hgp_id].get(pref_cat, {}) - cur_val.update({pref_key: val_type(val)}) # type: ignore[no-untyped-call] - preferences[hgp_id].update({pref_cat: cur_val}) - - return preferences - - -def write_qiskit_rc( - credentials: Dict[HubGroupProjectID, Credentials], - preferences: Optional[Dict] = None, - filename: Optional[str] = None, -) -> None: - """Write credentials to the configuration file. - - Args: - credentials: Dictionary with the credentials, in the - ``{credentials_unique_id: Credentials}`` format. - preferences: Preferences to store. - filename: Full path to the configuration file. If ``None``, the default - location is used (``$HOME/.qiskit/qiskitrc``). - """ - - def _credentials_object_to_dict( - credentials_obj: Credentials, - ) -> Dict[str, Any]: - """Convert a ``Credential`` object to a dictionary.""" - credentials_dict = { - key: getattr(credentials_obj, key) - for key in ["token", "url", "proxies", "verify"] - if getattr(credentials_obj, key) - } - - # Save the default provider to disk, if specified. - if credentials_obj.default_provider: - credentials_dict[ - "default_provider" - ] = credentials_obj.default_provider.to_stored_format() - - return credentials_dict - - def _section_name(credentials_: Credentials) -> str: - """Return a string suitable for use as a unique section name.""" - base_name = "qiskit_ibm_runtime" - if credentials_.is_ibm_quantum(): - base_name = "{}_{}_{}_{}".format( - base_name, *credentials_.unique_id().to_tuple() - ) - return base_name - - filename = filename or DEFAULT_QISKITRC_FILE - # Create the directories and the file if not found. - os.makedirs(os.path.dirname(filename), exist_ok=True) - - unrolled_credentials = { - _section_name(credentials_object): _credentials_object_to_dict( - credentials_object - ) - for _, credentials_object in credentials.items() - } - - if preferences: - unrolled_pref = {} - # Preferences section format is hgp,category,key=value. - # Input preferences format is {hgp: {cat: {pref_key: pref_val}}}. - for hgp, hgp_val in preferences.items(): - hgp = hgp.to_stored_format() - for cat, cat_val in hgp_val.items(): - for pref, pref_val in cat_val.items(): - unrolled_pref[f"{hgp},{cat},{pref}"] = pref_val - unrolled_credentials[_PREFERENCES_SECTION_NAME] = unrolled_pref - - # Write the configuration file. - with open(filename, "w") as config_file: - config_parser = ConfigParser() - config_parser.optionxform = str # type: ignore - config_parser.read_dict(unrolled_credentials) - config_parser.write(config_file) - - -def store_credentials( - credentials: Credentials, overwrite: bool = False, filename: Optional[str] = None -) -> None: - """Store the credentials for a single account in the configuration file. - - Args: - credentials: Credentials to save. - overwrite: ``True`` if any existing credentials are to be overwritten. - filename: Full path to the configuration file. If ``None``, the default - location is used (``$HOME/.qiskit/qiskitrc``). - """ - # Read the current providers stored in the configuration file. - filename = filename or DEFAULT_QISKITRC_FILE - stored_credentials, stored_preferences = read_credentials_from_qiskitrc(filename) - - # Check if duplicated credentials are already stored. By convention, - # we assume (hub, group, project) is always unique. - if credentials.unique_id() in stored_credentials and not overwrite: - logger.warning( - "Credentials already present. " "Set overwrite=True to overwrite." - ) - return - - # Append and write the credentials to file. - stored_credentials[credentials.unique_id()] = credentials - write_qiskit_rc( - credentials=stored_credentials, - preferences=stored_preferences, - filename=filename, - ) - - -def remove_credentials( - credentials: Credentials, filename: Optional[str] = None -) -> None: - """Remove credentials from the configuration file. - - Args: - credentials: Credentials to remove. - filename: Full path to the configuration file. If ``None``, the default - location is used (``$HOME/.qiskit/qiskitrc``). - - Raises: - CredentialsNotFoundError: If there is no account with that name on the - configuration file. - """ - # Set the name of the Provider from the class. - stored_credentials, stored_preferences = read_credentials_from_qiskitrc(filename) - - try: - del stored_credentials[credentials.unique_id()] - except KeyError: - raise CredentialsNotFoundError( - "The account {} does not exist in the configuration file.".format( - credentials.unique_id() - ) - ) from None - write_qiskit_rc( - credentials=stored_credentials, - preferences=stored_preferences, - filename=filename, - ) - - -def store_preferences( - preferences: Dict[HubGroupProjectID, Dict], filename: Optional[str] = None -) -> None: - """Store the preferences in the configuration file. - - Args: - preferences: Preferences to save. - filename: Full path to the configuration file. If ``None``, the default - location is used (``$HOME/.qiskit/qiskitrc``). - """ - # Read the current providers stored in the configuration file. - filename = filename or DEFAULT_QISKITRC_FILE - stored_credentials, stored_preferences = read_credentials_from_qiskitrc(filename) - - # Merge with existing preferences at category level. - for hgp, hgp_val in preferences.items(): - merged_hgp = stored_preferences.get(hgp, {}) - merged_hgp.update(hgp_val) - stored_preferences[hgp] = merged_hgp - - write_qiskit_rc( - credentials=stored_credentials, - preferences=stored_preferences, - filename=filename, - ) diff --git a/qiskit_ibm_runtime/credentials/credentials.py b/qiskit_ibm_runtime/credentials/credentials.py index 983116623..5c009b05e 100644 --- a/qiskit_ibm_runtime/credentials/credentials.py +++ b/qiskit_ibm_runtime/credentials/credentials.py @@ -19,8 +19,9 @@ from requests_ntlm import HttpNtlmAuth from .hub_group_project_id import HubGroupProjectID +from ..accounts import AccountType from ..api.auth import LegacyAuth, CloudAuth -from ..utils import is_crn, crn_to_api_host +from ..utils import crn_to_api_host REGEX_IBM_HUBS = ( "(?Phttp[s]://.+/api)" @@ -43,7 +44,9 @@ class Credentials: def __init__( self, token: str, - url: str, + url: str = None, + auth: Optional[AccountType] = None, + instance: Optional[str] = None, auth_url: Optional[str] = None, websockets_url: Optional[str] = None, hub: Optional[str] = None, @@ -74,7 +77,9 @@ def __init__( action in services like the `ExperimentService`. default_provider: Default provider to use. """ + self.auth = auth self.token = token + self.instance = instance self.access_token = access_token ( self.url, @@ -82,7 +87,7 @@ def __init__( self.hub, self.group, self.project, - ) = _unify_ibm_quantum_url(url, hub, group, project) + ) = _unify_ibm_quantum_url(auth, url, instance, hub, group, project) self.auth_url = auth_url or url self.websockets_url = websockets_url self.proxies = proxies or {} @@ -98,8 +103,8 @@ def __init__( def get_auth_handler(self) -> AuthBase: """Returns the respective authentication handler.""" - if is_crn(self.url): - return CloudAuth(api_key=self.token, crn=self.url) + if self.auth == "cloud": + return CloudAuth(api_key=self.token, crn=self.instance) return LegacyAuth(access_token=self.access_token) @@ -146,7 +151,9 @@ def connection_parameters(self) -> Dict[str, Any]: def _unify_ibm_quantum_url( - url: str, + auth: AccountType, + url: Optional[str] = None, + instance: Optional[str] = None, hub: Optional[str] = None, group: Optional[str] = None, project: Optional[str] = None, @@ -175,8 +182,8 @@ def _unify_ibm_quantum_url( regex_match = re.match(REGEX_IBM_HUBS, url, re.IGNORECASE) base_url = url - if is_crn(url): - base_url = crn_to_api_host(url) + if auth == "cloud": + base_url = crn_to_api_host(instance) elif regex_match: base_url, hub, group, project = regex_match.groups() else: diff --git a/qiskit_ibm_runtime/credentials/exceptions.py b/qiskit_ibm_runtime/credentials/exceptions.py index 48241050c..a5102620b 100644 --- a/qiskit_ibm_runtime/credentials/exceptions.py +++ b/qiskit_ibm_runtime/credentials/exceptions.py @@ -21,18 +21,6 @@ class CredentialsError(IBMError): pass -class InvalidCredentialsFormatError(CredentialsError): - """Errors raised when the credentials are in an invalid format.""" - - pass - - -class CredentialsNotFoundError(CredentialsError): - """Errors raised when the credentials are not found.""" - - pass - - class HubGroupProjectIDError(IBMError): """Base class for errors raised by the hub_group_project_id module.""" diff --git a/qiskit_ibm_runtime/ibm_runtime_service.py b/qiskit_ibm_runtime/ibm_runtime_service.py index c52e371fe..16c24336e 100644 --- a/qiskit_ibm_runtime/ibm_runtime_service.py +++ b/qiskit_ibm_runtime/ibm_runtime_service.py @@ -15,46 +15,34 @@ import copy import json import logging -import os import re import traceback import warnings from collections import OrderedDict -from typing import Dict, Callable, Optional, Union, List, Any, Type, Tuple +from typing import Dict, Callable, Optional, Union, List, Any, Type from qiskit.circuit import QuantumCircuit from qiskit.providers.backend import BackendV1 as Backend from qiskit.providers.exceptions import QiskitBackendNotFoundError -from qiskit.providers.providerutils import filter_backends from qiskit.providers.models import PulseBackendConfiguration, QasmBackendConfiguration +from qiskit.providers.providerutils import filter_backends from qiskit.transpiler import Layout -from typing_extensions import Literal from qiskit_ibm_runtime import runtime_job, ibm_backend # pylint: disable=unused-import +from .accounts import AccountManager, Account, AccountType from .api.clients import AuthClient, VersionClient from .api.clients.runtime import RuntimeClient from .api.exceptions import RequestsApiError from .backendreservation import BackendReservation -from .credentials import Credentials, HubGroupProjectID, discover_credentials -from .credentials.configrc import ( - remove_credentials, - read_credentials_from_qiskitrc, - store_credentials, -) -from .credentials.exceptions import HubGroupProjectIDInvalidStateError from .constants import QISKIT_IBM_RUNTIME_API_URL +from .credentials import Credentials, HubGroupProjectID from .exceptions import IBMNotAuthorizedError, IBMInputValueError, IBMProviderError from .exceptions import ( QiskitRuntimeError, RuntimeDuplicateProgramError, RuntimeProgramNotFound, RuntimeJobNotFound, - IBMProviderCredentialsInvalidToken, - IBMProviderCredentialsInvalidFormat, - IBMProviderCredentialsNotFound, - IBMProviderMultipleCredentialsFound, IBMProviderCredentialsInvalidUrl, - IBMProviderValueError, ) from .hub_group_project import HubGroupProject # pylint: disable=cyclic-import from .program.result_decoder import ResultDecoder @@ -129,54 +117,69 @@ class IBMRuntimeService: def __init__( self, - auth: Optional[Literal["cloud", "legacy"]] = None, token: Optional[str] = None, - locator: Optional[str] = None, - **kwargs: Any, + url: Optional[str] = None, + instance: Optional[str] = None, + auth: Optional[AccountType] = None, + name: Optional[str] = None, + proxies: Optional[dict] = None, + verify: Optional[bool] = None, ) -> None: """IBMRuntimeService constructor Args: - auth: Authentication type. `cloud` or `legacy`. If not specified, the saved default is used. - If there is no default value, and both accounts were saved on disk, the cloud type is - used. - token: Token used for authentication. If not specified, the saved token is used. - locator: The authentication url, if `auth=legacy`. Otherwise the CRN. - **kwargs: Additional settings for the connection: - - * proxies (dict): proxy configuration. - * verify (bool): verify the server's TLS certificate. + token: IBM Cloud API key or IBM Quantum API token. + url: The API URL. + Defaults to https://cloud.ibm.com (cloud) or + https://auth.quantum-computing.ibm.com/api (legacy). + instance: The CRN (cloud) or hub/group/project (legacy). + auth: Authentication type. `cloud` or `legacy`. + name: Name of the account to load. + proxies: Proxy configuration for the server. + verify: Verify the server's TLS certificate. Returns: An instance of IBMRuntimeService. - - Raises: - IBMProviderCredentialsInvalidFormat: If the default hub/group/project saved on - disk could not be parsed. - IBMProviderCredentialsNotFound: If no IBM Quantum credentials can be found. - IBMProviderCredentialsInvalidUrl: If the URL specified is not - a valid IBM Quantum authentication URL. - IBMProviderCredentialsInvalidToken: If the `token` is not a valid IBM Quantum token. """ super().__init__() - self.account_credentials, account_preferences = self._resolve_credentials( - token=token, locator=locator, **kwargs + + # TODO: add support for loading default account when optional parameters are not set + # i.e. fallback to environment variables + # i.e. fallback to default account saved on disk + self.account = ( + AccountManager.get(name=name) + if name + else Account( + auth=auth, + token=token, + url=url, + instance=instance, + proxies=proxies, + verify=verify, + ) + ) + self.account_credentials = Credentials( + auth=self.account.auth, + token=self.account.token, + url=self.account.url, + instance=self.account.instance, + proxies=self.account.proxies, + verify=self.account.verify, ) self._programs: Dict[str, RuntimeProgram] = {} self._backends: Dict[str, "ibm_backend.IBMBackend"] = {} + if auth == "cloud": self._api_client = RuntimeClient(credentials=self.account_credentials) self._backends = self._discover_remote_backends() else: - self._initialize_hgps( - credentials=self.account_credentials, preferences=account_preferences - ) + self._initialize_hgps(credentials=self.account_credentials) self._api_client = None hgps = self._get_hgps() for hgp in hgps: - for name, backend in hgp.backends.items(): - if name not in self._backends: - self._backends[name] = backend + for backend_name, backend in hgp.backends.items(): + if backend_name not in self._backends: + self._backends[backend_name] = backend if not self._api_client and hgp.has_service("runtime"): self._default_hgp = hgp self._api_client = RuntimeClient(self._default_hgp.credentials) @@ -185,68 +188,8 @@ def __init__( "https", "wss" ) self._programs = {} - self._discover_backends() - - def _resolve_credentials( - self, - token: Optional[str] = None, - locator: Optional[str] = None, - **kwargs: Any, - ) -> Tuple[Credentials, Dict]: - """Resolve credentials after looking up env variables and credentials saved on disk - - Args: - token: IBM Quantum token. - url: URL for the IBM Quantum authentication server. - **kwargs: Additional settings for the connection: - - * proxies (dict): proxy configuration. - * verify (bool): verify the server's TLS certificate. - - Returns: - Tuple of account_credentials, preferences - Raises: - IBMProviderCredentialsInvalidFormat: If the default hub/group/project saved on - disk could not be parsed. - IBMProviderCredentialsNotFound: If no IBM Quantum credentials can be found. - IBMProviderCredentialsInvalidToken: If the `token` is not a valid IBM Quantum token. - IBMProviderMultipleCredentialsFound: If multiple IBM Quantum credentials are found. - """ - if token: - if not isinstance(token, str): - raise IBMProviderCredentialsInvalidToken( - "Invalid IBM Quantum token " - 'found: "{}" of type {}.'.format(token, type(token)) - ) - url = ( - locator - or os.getenv("QISKIT_IBM_RUNTIME_API_URL") - or QISKIT_IBM_RUNTIME_API_URL - ) - account_credentials = Credentials( - token=token, url=url, auth_url=url, **kwargs - ) - preferences: Optional[Dict] = {} - else: - # Check for valid credentials in env variables or qiskitrc file. - try: - saved_credentials, preferences = discover_credentials() - except HubGroupProjectIDInvalidStateError as ex: - raise IBMProviderCredentialsInvalidFormat( - "Invalid hub/group/project data found {}".format(str(ex)) - ) from ex - credentials_list = list(saved_credentials.values()) - if not credentials_list: - raise IBMProviderCredentialsNotFound( - "No IBM Quantum credentials found." - ) - if len(credentials_list) > 1: - raise IBMProviderMultipleCredentialsFound( - "Multiple IBM Quantum Experience credentials found." - ) - account_credentials = credentials_list[0] - return account_credentials, preferences + self._discover_backends() def _discover_remote_backends(self) -> Dict[str, "ibm_backend.IBMBackend"]: """Return the remote backends available for this service instance. @@ -330,8 +273,10 @@ def _initialize_hgps( for hub_info in user_hubs: # Build credentials. hgp_credentials = Credentials( - credentials.token, + auth=credentials.auth, + token=credentials.token, access_token=auth_client.current_access_token(), + instance=credentials.instance, url=service_urls["http"], auth_url=credentials.auth_url, websockets_url=service_urls["ws"], @@ -582,127 +527,74 @@ def _aliased_backend_names() -> Dict[str, str]: "ibmq_20_austin": "QS1_1", } - def active_account(self) -> Optional[Dict[str, str]]: + def active_account(self) -> dict: """Return the IBM Quantum account currently in use for the session. Returns: - A dictionary with information about the account currently in the session, - None if there is no active account in session + A dictionary with information about the account currently in the session. """ - if not self._hgps: - return None - first_hgp = self._get_hgp() - return { - "token": first_hgp.credentials.token, - "url": first_hgp.credentials.auth_url, - } + return self.account.to_saved_format() @staticmethod - def delete_account() -> None: - """Delete the saved account from disk. + def delete_account(name: Optional[str]) -> bool: + """Delete a saved account from disk. - Raises: - IBMProviderCredentialsNotFound: If no valid IBM Quantum - credentials can be found on disk. - IBMProviderCredentialsInvalidUrl: If invalid IBM Quantum - credentials are found on disk. + Args: + name: Custom name of the saved account. Defaults to "default". + + Returns: + True if the account with the given name was deleted. + False if no account was found for the given name. """ - stored_credentials, _ = read_credentials_from_qiskitrc() - if not stored_credentials: - raise IBMProviderCredentialsNotFound( - "No IBM Quantum credentials found on disk." - ) - credentials = list(stored_credentials.values())[0] - if credentials.url != QISKIT_IBM_RUNTIME_API_URL: - raise IBMProviderCredentialsInvalidUrl( - "Invalid IBM Quantum credentials found on disk. " - ) - remove_credentials(credentials) + + return AccountManager.delete(name=name) @staticmethod def save_account( - token: str, - url: str = QISKIT_IBM_RUNTIME_API_URL, - hub: Optional[str] = None, - group: Optional[str] = None, - project: Optional[str] = None, - overwrite: bool = False, - **kwargs: Any, + token: Optional[str] = None, + url: Optional[str] = None, + instance: Optional[str] = None, + auth: Optional[AccountType] = None, + name: Optional[str] = None, + proxies: Optional[dict] = None, + verify: Optional[bool] = None, ) -> None: """Save the account to disk for future use. - Note: - If storing a default hub/group/project to disk, all three parameters - `hub`, `group`, `project` must be specified. - Args: - token: IBM Quantum token. - url: URL for the IBM Quantum authentication server. - hub: Name of the hub. - group: Name of the group. - project: Name of the project. - overwrite: Overwrite existing credentials. - **kwargs: - * proxies (dict): Proxy configuration for the server. - * verify (bool): If False, ignores SSL certificates errors - - Raises: - IBMProviderCredentialsInvalidUrl: If the `url` is not a valid - IBM Quantum authentication URL. - IBMProviderCredentialsInvalidToken: If the `token` is not a valid - IBM Quantum token. - IBMProviderValueError: If only one or two parameters from `hub`, `group`, - `project` are specified. + token: IBM Cloud API key or IBM Quantum API token. + url: The API URL. + Defaults to https://cloud.ibm.com (cloud) or + https://auth.quantum-computing.ibm.com/api (legacy). + instance: The CRN (cloud) or hub/group/project (legacy). + auth: Authentication type. `cloud` or `legacy`. + name: Name of the account to save. + proxies: Proxy configuration for the server. + verify: Verify the server's TLS certificate. """ - if url != QISKIT_IBM_RUNTIME_API_URL: - raise IBMProviderCredentialsInvalidUrl( - "Invalid IBM Quantum credentials found." - ) - if not token or not isinstance(token, str): - raise IBMProviderCredentialsInvalidToken( - "Invalid IBM Quantum token " - 'found: "{}" of type {}.'.format(token, type(token)) - ) - # If any `hub`, `group`, or `project` is specified, make sure all parameters are set. - if any([hub, group, project]) and not all([hub, group, project]): - raise IBMProviderValueError( - "The hub, group, and project parameters must all be " - "specified when storing a default hub/group/project to " - 'disk: hub = "{}", group = "{}", project = "{}"'.format( - hub, group, project - ) - ) - # If specified, get the hub/group/project to store. - default_hgp_id = ( - HubGroupProjectID(hub, group, project) - if all([hub, group, project]) - else None - ) - credentials = Credentials( - token=token, url=url, default_provider=default_hgp_id, **kwargs + + return AccountManager.save( + token=token, + url=url, + instance=instance, + auth=auth, + name=name, + proxies=proxies, + verify=verify, ) - store_credentials(credentials, overwrite=overwrite) @staticmethod - def saved_account() -> Dict[str, str]: - """List the account saved on disk. + def saved_accounts() -> dict: + """List the accounts saved on disk. Returns: - A dictionary with information about the account saved on disk. + A dictionary with information about the accounts saved on disk. Raises: IBMProviderCredentialsInvalidUrl: If invalid IBM Quantum credentials are found on disk. """ - stored_credentials, _ = read_credentials_from_qiskitrc() - if not stored_credentials: - return {} - credentials = list(stored_credentials.values())[0] - if credentials.url != QISKIT_IBM_RUNTIME_API_URL: - raise IBMProviderCredentialsInvalidUrl( - "Invalid IBM Quantum credentials found on disk." - ) - return {"token": credentials.token, "url": credentials.url} + return AccountManager.list() def get_backend( self, @@ -1460,7 +1352,12 @@ def _decode_job(self, raw_data: Dict) -> RuntimeJob: backend_name=raw_data["backend"], service=self, credentials=Credentials( - token="", url="", hub=hub, group=group, project=project + auth="legacy", + token="", + url="", + hub=hub, + group=group, + project=project, ), api=None, ) diff --git a/test/contextmanagers.py b/test/contextmanagers.py index 61bc090ff..c58f7bb54 100644 --- a/test/contextmanagers.py +++ b/test/contextmanagers.py @@ -13,14 +13,13 @@ """Context managers for using with IBM Provider unit tests.""" import os -from typing import Optional, Dict from contextlib import ContextDecorator, contextmanager -from tempfile import NamedTemporaryFile +from typing import Optional, Dict from unittest.mock import patch -from qiskit_ibm_runtime.credentials import configrc, Credentials -from qiskit_ibm_runtime.credentials.environ import VARIABLES_MAP from qiskit_ibm_runtime import IBMRuntimeService +from qiskit_ibm_runtime.credentials import Credentials +from qiskit_ibm_runtime.credentials.environ import VARIABLES_MAP CREDENTIAL_ENV_VARS = VARIABLES_MAP.keys() @@ -76,29 +75,6 @@ def __exit__(self, *exc): os.environ = self.os_environ_original -class custom_qiskitrc(ContextDecorator): - """Context manager that uses a temporary qiskitrc.""" - - # pylint: disable=invalid-name - - def __init__(self, contents=b""): - # Create a temporary file with the contents. - self.tmp_file = NamedTemporaryFile() - self.tmp_file.write(contents) - self.tmp_file.flush() - self.default_qiskitrc_file_original = configrc.DEFAULT_QISKITRC_FILE - - def __enter__(self): - # Temporarily modify the default location of the qiskitrc file. - configrc.DEFAULT_QISKITRC_FILE = self.tmp_file.name - return self - - def __exit__(self, *exc): - # Delete the temporary file and restore the default location. - self.tmp_file.close() - configrc.DEFAULT_QISKITRC_FILE = self.default_qiskitrc_file_original - - class no_file(ContextDecorator): """Context manager that disallows access to a file.""" diff --git a/test/decorators.py b/test/decorators.py index ef5aeda5b..ba9f1ef28 100644 --- a/test/decorators.py +++ b/test/decorators.py @@ -88,7 +88,7 @@ def requires_providers(func): def _wrapper(*args, **kwargs): qe_token = kwargs.pop("qe_token") qe_url = kwargs.pop("qe_url") - service = IBMRuntimeService(auth="legacy", token=qe_token, locator=qe_url) + service = IBMRuntimeService(auth="legacy", token=qe_token, url=qe_url) # Get open access hgp open_hgp = _get_open_hgp(service) if not open_hgp: @@ -140,7 +140,7 @@ def requires_provider(func): def _wrapper(*args, **kwargs): token = kwargs.pop("qe_token") url = kwargs.pop("qe_url") - service = IBMRuntimeService(auth="legacy", token=token, locator=url) + service = IBMRuntimeService(auth="legacy", token=token, url=url) hub, group, project = _get_custom_hgp() kwargs.update( {"service": service, "hub": hub, "group": group, "project": project} @@ -168,7 +168,7 @@ def requires_private_provider(func): def _wrapper(*args, **kwargs): token = kwargs.pop("qe_token") url = kwargs.pop("qe_url") - service = IBMRuntimeService(auth="legacy", token=token, locator=url) + service = IBMRuntimeService(auth="legacy", token=token, url=url) hub, group, project = _get_private_hgp() kwargs.update( {"service": service, "hub": hub, "group": group, "project": project} @@ -250,7 +250,7 @@ def _wrapper(obj, *args, **kwargs): def _get_backend(qe_token, qe_url, backend_name): """Get the specified backend.""" - service = IBMRuntimeService(auth="legacy", token=qe_token, locator=qe_url) + service = IBMRuntimeService(auth="legacy", token=qe_token, url=qe_url) _backend = None hub, group, project = _get_custom_hgp() if backend_name: diff --git a/test/ibm/test_ibm_provider.py b/test/ibm/test_ibm_provider.py index a5ce15fa0..4ecf25c45 100644 --- a/test/ibm/test_ibm_provider.py +++ b/test/ibm/test_ibm_provider.py @@ -13,32 +13,22 @@ """Tests for the IBMRuntimeService class.""" from datetime import datetime -import os -from unittest import skipIf, mock -from configparser import ConfigParser +from unittest import mock + from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister from qiskit.providers.exceptions import QiskitBackendNotFoundError from qiskit.providers.models.backendproperties import BackendProperties -from qiskit_ibm_runtime.ibm_backend import IBMSimulator, IBMBackend from qiskit_ibm_runtime import IBMRuntimeService from qiskit_ibm_runtime import hub_group_project -from qiskit_ibm_runtime.api.exceptions import RequestsApiError from qiskit_ibm_runtime.api.clients import AccountClient +from qiskit_ibm_runtime.api.exceptions import RequestsApiError from qiskit_ibm_runtime.exceptions import ( - IBMProviderError, - IBMProviderValueError, IBMProviderCredentialsInvalidUrl, - IBMProviderCredentialsInvalidToken, - IBMProviderCredentialsNotFound, ) -from qiskit_ibm_runtime.credentials.hub_group_project_id import HubGroupProjectID -from qiskit_ibm_runtime.constants import QISKIT_IBM_RUNTIME_API_URL - -from ..ibm_test_case import IBMTestCase +from qiskit_ibm_runtime.ibm_backend import IBMSimulator, IBMBackend from ..decorators import requires_qe_access, requires_provider -from ..contextmanagers import custom_qiskitrc, no_envs, CREDENTIAL_ENV_VARS -from ..utils import get_hgp +from ..ibm_test_case import IBMTestCase API_URL = "https://api.quantum-computing.ibm.com/api" AUTH_URL = "https://auth.quantum-computing.ibm.com/api" @@ -68,7 +58,7 @@ def test_pass_unreachable_proxy(self, qe_token, qe_url): } with self.assertRaises(RequestsApiError) as context_manager: IBMRuntimeService( - auth="legacy", token=qe_token, locator=qe_url, proxies=proxies + auth="legacy", token=qe_token, url=qe_url, proxies=proxies ) self.assertIn("ProxyError", str(context_manager.exception)) @@ -78,7 +68,7 @@ def test_provider_init_non_auth_url(self): qe_url = API_URL with self.assertRaises(IBMProviderCredentialsInvalidUrl) as context_manager: - IBMRuntimeService(auth="legacy", token=qe_token, locator=qe_url) + IBMRuntimeService(auth="legacy", token=qe_token, url=qe_url) self.assertIn("authentication URL", str(context_manager.exception)) @@ -88,21 +78,10 @@ def test_provider_init_non_auth_url_with_hub(self): qe_url = API_URL + "/Hubs/X/Groups/Y/Projects/Z" with self.assertRaises(IBMProviderCredentialsInvalidUrl) as context_manager: - IBMRuntimeService(auth="legacy", token=qe_token, locator=qe_url) + IBMRuntimeService(auth="legacy", token=qe_token, url=qe_url) self.assertIn("authentication URL", str(context_manager.exception)) - def test_provider_init_no_credentials(self): - """Test initializing IBMRuntimeService with no credentials.""" - with custom_qiskitrc(), self.assertRaises( - IBMProviderCredentialsNotFound - ) as context_manager, no_envs(CREDENTIAL_ENV_VARS): - IBMRuntimeService(auth="legacy") - - self.assertIn( - "No IBM Quantum credentials found.", str(context_manager.exception) - ) - @requires_qe_access def test_discover_backend_failed(self, qe_token, qe_url): """Test discovering backends failed.""" @@ -114,225 +93,17 @@ def test_discover_backend_failed(self, qe_token, qe_url): with self.assertLogs( hub_group_project.logger, level="WARNING" ) as context_manager: - IBMRuntimeService(auth="legacy", token=qe_token, locator=qe_url) + IBMRuntimeService(auth="legacy", token=qe_token, url=qe_url) self.assertIn("bad_backend", str(context_manager.output)) -@skipIf(os.name == "nt", "Test not supported in Windows") -class TestIBMProviderAccounts(IBMTestCase): - """Tests for account handling.""" - - @classmethod - def setUpClass(cls): - """Initial class setup.""" - super().setUpClass() - cls.token = "API_TOKEN" - - def test_save_account(self): - """Test saving an account.""" - with custom_qiskitrc(): - IBMRuntimeService.save_account(self.token, url=QISKIT_IBM_RUNTIME_API_URL) - stored_cred = IBMRuntimeService.saved_account() - - self.assertEqual(stored_cred["token"], self.token) - self.assertEqual(stored_cred["url"], QISKIT_IBM_RUNTIME_API_URL) - - @requires_qe_access - def test_provider_init_saved_account(self, qe_token, qe_url): - """Test initializing IBMRuntimeService with credentials from qiskitrc file.""" - if qe_url != QISKIT_IBM_RUNTIME_API_URL: - # save expects an auth production URL. - self.skipTest("Test requires production auth URL") - - with custom_qiskitrc(), no_envs(CREDENTIAL_ENV_VARS): - IBMRuntimeService.save_account(qe_token, url=qe_url) - service = IBMRuntimeService(auth="legacy") - - self.assertIsInstance(service, IBMRuntimeService) - self.assertEqual(service._default_hgp.credentials.token, qe_token) - self.assertEqual(service._default_hgp.credentials.auth_url, qe_url) - - def test_save_account_specified_provider(self): - """Test saving an account with a specified hub/group/project.""" - default_hgp_to_save = "ibm-q/open/main" - - with custom_qiskitrc() as custom_qiskitrc_cm: - hgp_id = HubGroupProjectID.from_stored_format(default_hgp_to_save) - IBMRuntimeService.save_account( - token=self.token, - url=QISKIT_IBM_RUNTIME_API_URL, - hub=hgp_id.hub, - group=hgp_id.group, - project=hgp_id.project, - ) - - # Ensure the `default_provider` name was written to the config file. - config_parser = ConfigParser() - config_parser.read(custom_qiskitrc_cm.tmp_file.name) - - for name in config_parser.sections(): - single_credentials = dict(config_parser.items(name)) - self.assertIn("default_provider", single_credentials) - self.assertEqual( - single_credentials["default_provider"], default_hgp_to_save - ) - - def test_save_account_specified_provider_invalid(self): - """Test saving an account without specifying all the hub/group/project fields.""" - invalid_hgp_ids_to_save = [ - HubGroupProjectID("", "default_group", ""), - HubGroupProjectID("default_hub", None, "default_project"), - ] - for invalid_hgp_id in invalid_hgp_ids_to_save: - with self.subTest(invalid_hgp_id=invalid_hgp_id), custom_qiskitrc(): - with self.assertRaises(IBMProviderValueError) as context_manager: - IBMRuntimeService.save_account( - token=self.token, - url=QISKIT_IBM_RUNTIME_API_URL, - hub=invalid_hgp_id.hub, - group=invalid_hgp_id.group, - project=invalid_hgp_id.project, - ) - self.assertIn( - "The hub, group, and project parameters must all be specified", - str(context_manager.exception), - ) - - def test_delete_account(self): - """Test deleting an account.""" - with custom_qiskitrc(): - IBMRuntimeService.save_account(self.token, url=QISKIT_IBM_RUNTIME_API_URL) - IBMRuntimeService.delete_account() - stored_cred = IBMRuntimeService.saved_account() - - self.assertEqual(len(stored_cred), 0) - - @requires_qe_access - def test_load_account_saved_provider(self, qe_token, qe_url): - """Test loading an account that contains a saved hub/group/project.""" - if qe_url != QISKIT_IBM_RUNTIME_API_URL: - # .save_account() expects an auth production URL. - self.skipTest("Test requires production auth URL") - - # Get a non default hub/group/project. - non_default_hgp = get_hgp(qe_token, qe_url, default=False) - - with custom_qiskitrc(), no_envs(CREDENTIAL_ENV_VARS): - IBMRuntimeService.save_account( - token=qe_token, - url=qe_url, - hub=non_default_hgp.credentials.hub, - group=non_default_hgp.credentials.group, - project=non_default_hgp.credentials.project, - ) - saved_provider = IBMRuntimeService(auth="legacy") - if saved_provider._default_hgp != non_default_hgp: - # Prevent tokens from being logged. - saved_provider._default_hgp.credentials.token = None - non_default_hgp.credentials.token = None - self.fail( - "loaded default hgp ({}) != expected ({})".format( - saved_provider._default_hgp.credentials.__dict__, - non_default_hgp.credentials.__dict__, - ) - ) - - self.assertEqual(saved_provider._default_hgp.credentials.token, qe_token) - self.assertEqual(saved_provider._default_hgp.credentials.auth_url, qe_url) - self.assertEqual( - saved_provider._default_hgp.credentials.hub, non_default_hgp.credentials.hub - ) - self.assertEqual( - saved_provider._default_hgp.credentials.group, - non_default_hgp.credentials.group, - ) - self.assertEqual( - saved_provider._default_hgp.credentials.project, - non_default_hgp.credentials.project, - ) - - @requires_qe_access - def test_load_saved_account_invalid_hgp(self, qe_token, qe_url): - """Test loading an account that contains a saved hub/group/project that does not exist.""" - if qe_url != QISKIT_IBM_RUNTIME_API_URL: - # .save_account() expects an auth production URL. - self.skipTest("Test requires production auth URL") - - # Hub, group, project in correct format but does not exists. - invalid_hgp_to_store = "invalid_hub/invalid_group/invalid_project" - with custom_qiskitrc(), no_envs(CREDENTIAL_ENV_VARS): - hgp_id = HubGroupProjectID.from_stored_format(invalid_hgp_to_store) - with self.assertRaises(IBMProviderError) as context_manager: - IBMRuntimeService.save_account( - token=qe_token, - url=qe_url, - hub=hgp_id.hub, - group=hgp_id.group, - project=hgp_id.project, - ) - IBMRuntimeService(auth="legacy") - - self.assertIn( - "No hub/group/project matches the specified criteria", - str(context_manager.exception), - ) - - def test_load_saved_account_invalid_hgp_format(self): - """Test loading an account that contains a saved provider in an invalid format.""" - # Format {'test_case_input': 'error message from raised exception'} - invalid_hgps = { - "hub_group_project": 'Use the "//" format', - "default_hub//default_project": "Every field must be specified", - "default_hub/default_group/": "Every field must be specified", - } - - for invalid_hgp, error_message in invalid_hgps.items(): - with self.subTest(invalid_hgp=invalid_hgp): - with custom_qiskitrc() as temp_qiskitrc, no_envs(CREDENTIAL_ENV_VARS): - # Save the account. - IBMRuntimeService.save_account( - token=self.token, url=QISKIT_IBM_RUNTIME_API_URL - ) - # Add an invalid provider field to the account stored. - with open(temp_qiskitrc.tmp_file.name, "a") as _file: - _file.write("default_provider = {}".format(invalid_hgp)) - # Ensure an error is raised if the stored provider is in an invalid format. - with self.assertRaises(IBMProviderError) as context_manager: - IBMRuntimeService(auth="legacy") - self.assertIn(error_message, str(context_manager.exception)) - - @requires_qe_access - def test_active_account(self, qe_token, qe_url): - """Test get active account""" - service = IBMRuntimeService(auth="legacy", token=qe_token, locator=qe_url) - active_account = service.active_account() - self.assertIsNotNone(active_account) - self.assertEqual(active_account["token"], qe_token) - self.assertEqual(active_account["url"], qe_url) - - def test_save_token_invalid(self): - """Test saving an account with invalid tokens. See #391.""" - invalid_tokens = [None, "", 0] - for invalid_token in invalid_tokens: - with self.subTest(invalid_token=invalid_token): - with self.assertRaises( - IBMProviderCredentialsInvalidToken - ) as context_manager: - IBMRuntimeService.save_account( - token=invalid_token, url=QISKIT_IBM_RUNTIME_API_URL - ) - self.assertIn( - "Invalid IBM Quantum token found", str(context_manager.exception) - ) - - class TestIBMProviderHubGroupProject(IBMTestCase): """Tests for IBMRuntimeService HubGroupProject related methods.""" @requires_qe_access def _initialize_provider(self, qe_token=None, qe_url=None): """Initialize and return provider.""" - return IBMRuntimeService(auth="legacy", token=qe_token, locator=qe_url) + return IBMRuntimeService(auth="legacy", token=qe_token, url=qe_url) def setUp(self): """Initial test setup.""" diff --git a/test/ibm/test_proxies.py b/test/ibm/test_proxies.py index 5ae1942e1..36bd72d33 100644 --- a/test/ibm/test_proxies.py +++ b/test/ibm/test_proxies.py @@ -59,7 +59,7 @@ def test_proxies_ibm_account(self, qe_token, qe_url): service = IBMRuntimeService( auth="legacy", token=qe_token, - locator=qe_url, + url=qe_url, proxies={"urls": VALID_PROXIES}, ) diff --git a/test/ibm/test_registration.py b/test/ibm/test_registration.py index 60d101e09..294dcc613 100644 --- a/test/ibm/test_registration.py +++ b/test/ibm/test_registration.py @@ -12,36 +12,12 @@ """Test the registration and credentials modules.""" -import logging -import os -import warnings -from unittest import skipIf -from typing import Dict, Any -import copy - from requests_ntlm import HttpNtlmAuth -from qiskit_ibm_runtime import IBMRuntimeService -from qiskit_ibm_runtime.constants import QISKIT_IBM_RUNTIME_API_URL + from qiskit_ibm_runtime.credentials import ( Credentials, - discover_credentials, - read_credentials_from_qiskitrc, - store_credentials, - store_preferences, - HubGroupProjectID, ) -from qiskit_ibm_runtime.credentials import configrc -from qiskit_ibm_runtime.exceptions import IBMProviderError - from ..ibm_test_case import IBMTestCase -from ..contextmanagers import ( - custom_envs, - no_envs, - custom_qiskitrc, - CREDENTIAL_ENV_VARS, - mock_ibm_provider, -) - IBM_TEMPLATE = "https://localhost/api/Hubs/{}/Groups/{}/Projects/{}" @@ -53,70 +29,6 @@ } -# TODO: NamedTemporaryFiles do not support name in Windows -@skipIf(os.name == "nt", "Test not supported in Windows") -class TestCredentials(IBMTestCase): - """Tests for the credential modules.""" - - def test_load_account_no_credentials(self) -> None: - """Test load account with no credentials available.""" - - with custom_qiskitrc(), no_envs(CREDENTIAL_ENV_VARS): - with self.assertRaises(IBMProviderError) as context_manager: - IBMRuntimeService(auth="legacy") - - self.assertIn( - "No IBM Quantum credentials found", str(context_manager.exception) - ) - - def test_store_credentials_overwrite(self) -> None: - """Test overwriting qiskitrc credentials.""" - credentials = Credentials("QISKITRC_TOKEN", url=QISKIT_IBM_RUNTIME_API_URL) - credentials2 = Credentials("QISKITRC_TOKEN_2", url=QISKIT_IBM_RUNTIME_API_URL) - - with custom_qiskitrc(): - store_credentials(credentials) - # Cause all warnings to always be triggered. - warnings.simplefilter("always") - - # Get the logger for `store_credentials`. - config_rc_logger = logging.getLogger(store_credentials.__module__) - - # Attempt overwriting. - with self.assertLogs( - logger=config_rc_logger, level="WARNING" - ) as log_records: - store_credentials(credentials) - self.assertIn("already present", log_records.output[0]) - - with no_envs(CREDENTIAL_ENV_VARS), mock_ibm_provider(): - # Attempt overwriting. - store_credentials(credentials2, overwrite=True) - service = IBMRuntimeService(auth="legacy") - - # Ensure that the credentials are the overwritten ones. - # pylint: disable=no-member - self.assertEqual(service._hgp["credentials"].token, credentials2.token) - - def test_environ_over_qiskitrc(self) -> None: - """Test credential discovery order.""" - credentials = Credentials("QISKITRC_TOKEN", url=QISKIT_IBM_RUNTIME_API_URL) - - with custom_qiskitrc(): - # Prepare the credentials: both env and qiskitrc present - store_credentials(credentials) - with custom_envs( - { - "QISKIT_IBM_RUNTIME_API_TOKEN": "ENVIRON_TOKEN", - "QISKIT_IBM_RUNTIME_API_URL": "ENVIRON_URL", - } - ): - credentials, _ = discover_credentials() - - self.assertEqual(len(credentials), 1) - self.assertEqual(list(credentials.values())[0].token, "ENVIRON_TOKEN") - - class TestCredentialsKwargs(IBMTestCase): """Test for ``Credentials.connection_parameters()``.""" @@ -201,151 +113,3 @@ def test_malformed_ntlm_params(self) -> None: # in NTLM credentials due to int not facilitating 'split'. with self.assertRaises(AttributeError): _ = malformed_ntlm_credentials.connection_parameters() - - -@skipIf(os.name == "nt", "Test not supported in Windows") -class TestPreferences(IBMTestCase): - """Tests for the preferences.""" - - def test_save_preferences(self): - """Test saving preferences.""" - preferences = self._get_pref_dict() - with custom_qiskitrc(): - store_preferences(preferences) - _, stored_pref = read_credentials_from_qiskitrc() - self.assertEqual(preferences, stored_pref) - - def test_update_preferences(self): - """Test updating preferences.""" - pref1 = self._get_pref_dict() - with custom_qiskitrc(): - store_preferences(pref1) - pref2 = self._get_pref_dict(pref_val=False) - store_preferences(pref2) - _, stored_pref = read_credentials_from_qiskitrc() - self.assertEqual(pref2, stored_pref) - - def test_new_provider_pref(self): - """Test adding preference for another provider.""" - pref1 = self._get_pref_dict() - with custom_qiskitrc(): - store_preferences(pref1) - pref2 = self._get_pref_dict("hub2/group2/project2", pref_val=False) - store_preferences(pref2) - _, stored_pref = read_credentials_from_qiskitrc() - self.assertEqual({**pref1, **pref2}, stored_pref) - - def test_update_one_of_many_providers(self): - """Test updating one of many provider preferences.""" - pref1 = self._get_pref_dict(pref_val=False) - pref2 = self._get_pref_dict("hub2/group2/project2", pref_val=False) - with custom_qiskitrc(): - store_preferences(pref1) - store_preferences(pref2) - pref1 = self._get_pref_dict(pref_val=True) - store_preferences(pref1) - - _, stored_pref = read_credentials_from_qiskitrc() - self.assertEqual({**pref1, **pref2}, stored_pref) - - def test_save_same_value_twice(self): - """Test saving same value twice.""" - pref = self._get_pref_dict(pref_val=True) - with custom_qiskitrc(): - store_preferences(pref) - store_preferences(pref) - - _, stored_pref = read_credentials_from_qiskitrc() - self.assertEqual(pref, stored_pref) - - def test_new_pref_cat(self): - """Test adding a new preference category.""" - pref1 = self._get_pref_dict() - orig_active_pref = copy.deepcopy(configrc._ACTIVE_PREFERENCES) - try: - configrc._ACTIVE_PREFERENCES.update({"foo": {"bar": str}}) - with custom_qiskitrc(): - store_preferences(pref1) - new_cat = self._get_pref_dict( - cat="foo", pref_key="bar", pref_val="foobar" - ) - store_preferences(new_cat) - _, stored_pref = read_credentials_from_qiskitrc() - - key = list(pref1.keys())[0] - pref1[key].update(new_cat[key]) - self.assertEqual(pref1, stored_pref) - finally: - configrc._ACTIVE_PREFERENCES = orig_active_pref - - def test_overwrite_category_keys(self): - """Test overwriting preference keys in a category.""" - pref1 = self._get_pref_dict() - orig_active_pref = copy.deepcopy(configrc._ACTIVE_PREFERENCES) - try: - configrc._ACTIVE_PREFERENCES["experiment"].update({"foo": str}) - with custom_qiskitrc(): - store_preferences(pref1) - new_cat = self._get_pref_dict(pref_key="foo", pref_val="bar") - store_preferences(new_cat) - _, stored_pref = read_credentials_from_qiskitrc() - - key = list(pref1.keys())[0] - pref1[key]["experiment"] = {"foo": "bar"} - self.assertEqual(pref1, stored_pref) - finally: - configrc._ACTIVE_PREFERENCES = orig_active_pref - - def test_save_preferences_credentials(self): - """Test saving both preferences and credentials.""" - preferences = self._get_pref_dict() - credentials = Credentials("QISKITRC_TOKEN", url=QISKIT_IBM_RUNTIME_API_URL) - with custom_qiskitrc(): - store_preferences(preferences) - store_credentials(credentials) - stored_cred, stored_pref = read_credentials_from_qiskitrc() - self.assertEqual(preferences, stored_pref) - self.assertEqual(credentials, stored_cred[credentials.unique_id()]) - - def test_update_preferences_with_credentials(self): - """Test updating preferences with credentials.""" - preferences = self._get_pref_dict() - pref2 = self._get_pref_dict(pref_val=False) - credentials = Credentials("QISKITRC_TOKEN", url=QISKIT_IBM_RUNTIME_API_URL) - credentials2 = Credentials("QISKITRC_TOKEN_2", url=QISKIT_IBM_RUNTIME_API_URL) - with custom_qiskitrc(): - store_preferences(preferences) - store_credentials(credentials) - # Update preferences. - store_preferences(pref2) - stored_cred, stored_pref = read_credentials_from_qiskitrc() - self.assertEqual(pref2, stored_pref) - self.assertEqual(credentials, stored_cred[credentials.unique_id()]) - # Update credentials. - store_credentials(credentials2, overwrite=True) - stored_cred, stored_pref = read_credentials_from_qiskitrc() - self.assertEqual(pref2, stored_pref) - self.assertEqual(credentials2, stored_cred[credentials2.unique_id()]) - - def test_remove_credentials(self): - """Test removing credentials when preferences are set.""" - preferences = self._get_pref_dict() - credentials = Credentials("QISKITRC_TOKEN", url=QISKIT_IBM_RUNTIME_API_URL) - with custom_qiskitrc(): - store_credentials(credentials) - store_preferences(preferences) - configrc.remove_credentials(credentials) - stored_cred, stored_pref = read_credentials_from_qiskitrc() - self.assertEqual(preferences, stored_pref) - self.assertFalse(stored_cred) - - def _get_pref_dict( - self, - hgp_id: str = "my-hub/my-group/my-project", - cat: str = "experiment", - pref_key: str = "auto_save", - pref_val: Any = True, - ) -> Dict: - """Generate a new preference dictionary.""" - hub, group, project = hgp_id.split("/") - return {HubGroupProjectID(hub, group, project): {cat: {pref_key: pref_val}}} diff --git a/test/utils.py b/test/utils.py index eaec580ac..8a818619c 100644 --- a/test/utils.py +++ b/test/utils.py @@ -147,7 +147,7 @@ def get_hgp(qe_token: str, qe_url: str, default: bool = True) -> HubGroupProject A HubGroupProject, as specified by `default`. """ service = IBMRuntimeService( - auth="legacy", token=qe_token, locator=qe_url + auth="legacy", token=qe_token, url=qe_url ) # Default hub/group/project. open_hgp = service._get_hgp() # Open access hgp hgp_to_return = open_hgp