Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorize emulator results handling #1171

Merged
merged 14 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
languages.python = {
enable = true;
libraries = with pkgs; [zlib];
version = "3.11";
poetry = {
enable = true;
install = {
Expand All @@ -63,7 +62,7 @@
extras = [
(lib.strings.concatStrings
(lib.strings.intersperse " -E "
["qblox" "qm" "zh" "rfsoc" "los"]))
["qblox" "qm" "zh" "rfsoc" "los" "emulator"]))
];
};
};
Expand Down
174 changes: 100 additions & 74 deletions src/qibolab/_core/instruments/emulator/emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from collections import defaultdict
from functools import reduce
from itertools import chain
from operator import or_
from typing import Any, Optional, cast

import numpy as np
from numpy.typing import NDArray
from qutip import mesolve

from qibolab import Readout
Expand All @@ -15,13 +16,15 @@
AveragingMode,
ExecutionParameters,
)
from qibolab._core.identifier import Result
from qibolab._core.instruments.abstract import Controller
from qibolab._core.pulses.pulse import PulseId
from qibolab._core.sequence import PulseSequence
from qibolab._core.sweeper import ParallelSweepers

from ...serialize import replace
from .hamiltonians import waveform
from .hamiltonians import HamiltonianConfig, waveform
from .operators import INITIAL_STATE, SIGMAZ
from .utils import merge_results
from .utils import shots


class EmulatorController(Controller):
Expand All @@ -46,99 +49,123 @@ def initial_state(self):
# initial state: qubit in ground state
return INITIAL_STATE

def _play_sequence(self, sequence, configs, options):
"""Play single sequence on emulator."""

hamiltonian = configs["hamiltonian"].hamiltonian
hamiltonian += self._pulse_sequence_to_hamiltonian(sequence, configs, options)
measurement, tlist = self._measurement(sequence, configs, options)
def _play_sequence(
self, sequence: PulseSequence, configs: dict[str, Config], updates: dict
) -> NDArray:
"""Play single sequence on emulator.

The array returned by this function has a single dimension, over
the various measurements included in the sequence.
"""
config = cast(HamiltonianConfig, configs["hamiltonian"])
hamiltonian = config.hamiltonian
hamiltonian += self._pulse_sequence_to_hamiltonian(sequence, configs, updates)
measurements, tlist = self._measurement(sequence, configs, updates)
results = mesolve(
hamiltonian,
self.initial_state,
tlist,
configs["hamiltonian"].decoherence,
e_ops=[SIGMAZ],
hamiltonian, self.initial_state, tlist, config.decoherence, e_ops=[SIGMAZ]
)
averaged_results = {
ro_pulse_id: (1 - results.expect[0][sample - 1]) / 2
for ro_pulse_id, sample in measurement.items()
}
if options.averaging_mode == AveragingMode.SINGLESHOT:
results = {
ro_pulse_id: np.random.choice(
[0, 1],
size=options.nshots,
p=[1 - prob, prob],
)
for ro_pulse_id, prob in averaged_results.items()
}
return results

# dropping probability of 0 to keep compatibility with qibolab
return {pulse: prob[1] for pulse, prob in averaged_results.items()}

def _sweep(self, sequence, configs, options, sweepers):
"""Sweep over sequence."""
results = {}
samples = np.array(list(measurements.values())) - 1
return (1 - results.expect[0][samples]) / 2

def _sweep(
self,
sequence: PulseSequence,
configs: dict[str, Config],
sweepers: list[ParallelSweepers],
updates: Optional[dict] = None,
) -> NDArray:
"""Sweep over sequence.

This function invokes itself recursively, adding an array
dimension at each call as the outermost one. The extra dimension
corresponds to the values in the first nested sweep (with the
lowest index, interpreted as the outermost as well).
"""
# use a default dictionary, merging existing values
updates = defaultdict(dict) | ({} if updates is None else updates)

if len(sweepers) == 0:
return self._play_sequence(sequence, configs, options)
assert len(sweepers[0]) == 1, "Parallel sweepers not supported."
sweeper = sweepers[0][0]
updates = dict(chain.from_iterable(d.items() for d in options.updates))
for value in sweeper.values:
if sweeper.pulses is not None:
for pulse in sweeper.pulses:
try:
return self._play_sequence(sequence, configs, updates)

parsweep = sweepers[0]
# collect slices of results, corresponding to the current iteration
results = []
# execute once for each parallel value
for values in zip(*(s.values for s in parsweep)):
# update all parallel sweepers with the respective values
for sweeper, value in zip(parsweep, values):
if sweeper.pulses is not None:
for pulse in sweeper.pulses:
updates[pulse.id].update({sweeper.parameter.name: value})
except KeyError:
updates[pulse.id] = {sweeper.parameter.name: value}
if sweeper.channels is not None:
for channel in sweeper.channels:
try:
if sweeper.channels is not None:
for channel in sweeper.channels:
updates[channel].update({sweeper.parameter.name: value})
except KeyError:
updates[channel] = {sweeper.parameter.name: value}
options = replace(options, updates=[{k: v} for k, v in updates.items()])
if len(sweepers) > 1:
temp = self._sweep(sequence, configs, options, sweepers[1:])
else:
temp = self._play_sequence(sequence, configs, options)

results = merge_results(
results,
temp,
)

# reshaping results
for key, value in results.items():
results[key] = results[key].reshape(options.results_shape(sweepers))
return results

# append new slice for the current parallel value
results.append(self._sweep(sequence, configs, sweepers[1:], updates))

# stack all slices in a single array, along the current outermost dimension
return np.stack(results)

def _single_sequence(
self,
sequence: PulseSequence,
configs: dict[str, Config],
options: ExecutionParameters,
sweepers: list[ParallelSweepers],
) -> dict[int, Result]:
"""Collect results for a single pulse sequence.

The dictionary returned is already compliant with the expected
result for the execution of this single sequence, thus suitable
to be returned as is.
"""
# probabilities for the |1> state, for each swept value
probabilities = self._sweep(sequence, configs, sweepers)

assert options.nshots is not None
# extract results from probabilities, according to the requested averaging mode
res = (
shots(probabilities, options.nshots)
if options.averaging_mode == AveragingMode.SINGLESHOT
else probabilities
)
# move measurements dimension to the front, getting ready for extraction
measurements = np.moveaxis(res, -1, 0)
# match measurements with their IDs, in order to already comply with the general
# format established by the `Controller` interface
measurement_ids = self._measurement(sequence, configs, {})[0].keys()
return dict(zip(measurement_ids, list(measurements)))

def play(
self,
configs: dict[str, Config],
sequences: list[PulseSequence],
options: ExecutionParameters,
sweepers: list = None,
):
sweepers: list[ParallelSweepers],
) -> dict[int, Result]:
assert options.acquisition_type == AcquisitionType.DISCRIMINATION, (
"Emulator only supports DISCRIMINATION acquisition type."
)
# just merge the results of multiple executions in a single dictionary
return reduce(
or_,
(
self._sweep(sequence, configs, options, sweepers)
self._single_sequence(sequence, configs, options, sweepers)
for sequence in sequences
),
{},
)

def _measurement(self, sequence, configs, options):
def _measurement(
self,
sequence: PulseSequence,
configs: dict[str, Config],
updates: dict,
) -> tuple[dict[PulseId, Any], NDArray]:
"""Given sequence creates a dictionary of readout pulses and their
sample index."""
duration = 0
pulses = {}
updates = dict(chain.from_iterable(d.items() for d in options.updates))
for channel, pulse in sequence:
if isinstance(configs[channel], AcquisitionConfig):
if isinstance(pulse, Readout):
Expand All @@ -156,11 +183,10 @@ def _measurement(self, sequence, configs, options):
return pulses, tlist

def _pulse_sequence_to_hamiltonian(
self, sequence: PulseSequence, configs, options
self, sequence: PulseSequence, configs: dict[str, Config], updates: dict
) -> dict[str, list]:
"""Construct Hamiltonian dependent term for qutip simulation."""

updates = dict(chain.from_iterable(d.items() for d in options.updates))
hamiltonians = defaultdict(list)
h_t = []
for channel, pulse in sequence:
Expand Down
45 changes: 38 additions & 7 deletions src/qibolab/_core/instruments/emulator/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,46 @@
import numpy as np
from numpy.typing import NDArray

GHZ_TO_HZ = 1e9
"""Converting GHz to Hz."""
HZ_TO_GHZ = 1e-9
"""Converting Hz to GHz."""


def merge_results(a: dict, b: dict) -> dict:
"""Merge dictionary together using np.column_stack."""
merged = a.copy()
for key, value in b.items():
x = (a[key],) if key in a else ()
merged[key] = np.column_stack(x + (value,))
return merged
def ndchoice(probabilities: NDArray, samples: int) -> NDArray:
"""Sample elements with n-dimensional probabilities.

This is the n-dimensional version of :func:`np.random.choice`, which instead of
vectorizing over the picked elements, it assumes them to be just a plain integer
range (which in turn could be used to index a suitable array, if relevant), while it
allows the probabilities to be higher dimensional.

The ``probabilities`` argument specifies the set of probabilities, which are
intended to be normalized arrays over the innermost dimension. Such that the whole
array describe a set of ``probabilities.shape[:-1]`` discrete distributions.

`samples` is instead the number of samples to extract from each distribution.

.. seealso::

Generalized from https://stackoverflow.com/a/47722393, which presents the
two-dimensional version.
"""
return (
probabilities.cumsum(-1).reshape(*probabilities.shape, -1)
> np.random.rand(*probabilities.shape[:-1], 1, samples)
).argmax(-2)


def shots(probabilities: NDArray, nshots: int) -> NDArray:
"""Extract shots from state |1> probabilities.

This function just wraps :func:`ndchoice`, taking care of creating the n-D array of
binomial distributions, and extracting shots as the outermost dimension.
"""
# discrete distributions of [|0>, |1>] states, over the innermost dimension
dists = np.stack([1 - probabilities, probabilities], axis=-1)
# shots, in the innermost dimension, replacing the distribution dimension
shots = ndchoice(dists, nshots)
# move shots from innermost to outermost dimension
return np.moveaxis(shots, -1, 0)
Empty file.
13 changes: 13 additions & 0 deletions tests/instruments/emulator/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pathlib import Path

import pytest

import qibolab

HERE = Path(__file__).parent


@pytest.fixture
def platform(monkeypatch) -> qibolab.Platform:
monkeypatch.setenv("QIBOLAB_PLATFORMS", HERE.parent)
return qibolab.create_platform(HERE.name)
Loading