Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Additional features for error mitigation protocols #56

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/error_mitigation/incoherent_error_mitigation.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ print(f"noiseless_expectation = {model_noiseless.expectation()}") # markdown-exe

```

In analog_zne, you can choose between two ZNE fitting methods: polynomial extrapolation and exponential decay extrapolation. This option can be specified in the mitigate function by setting `zne_type` to `poly` or `exp`, with the default being polynomial extrapolation. When you want to use `exp` method, you need minimum of three data points.

```python exec="on" source="material-block" session="zne" result="json"

from qadence import NoiseProtocol, NoiseHandler
Expand All @@ -29,13 +31,13 @@ model = QuantumModel(
circuit=circuit, observable=observable, backend=BackendName.PULSER, diff_mode=DiffMode.GPSR
)

for data_points in [2,5]:
options = {"stretches": torch.linspace(1, 3, data_points)}
for data_points, zne_type in zip([2,5], ["poly", "exp"]):
options = {"stretches": torch.linspace(1, 3, data_points), "zen_type": zne_type}
mitigate = Mitigations(protocol=Mitigations.ANALOG_ZNE, options=options)

mitigated_expectation = mitigate(model=model, noise=noise)

print(f"noiseless_expectation with {data_points} data points", mitigated_expectation) # markdown-exec: hide
print(f"noiseless_expectation with {zne_type} extrapolation and {data_points} data points", mitigated_expectation) # markdown-exec: hide

```

Expand Down
32 changes: 30 additions & 2 deletions docs/error_mitigation/readout_mitigation.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,32 @@ p_corr_inv_mle = mle_solve(tensor_rank_mult(noise_matrices_inv, observed_prob))
distance = wasserstein_distance(p_corr_mthree_gmres_mle, p_corr_inv_mle)
print(f"Wasserstein distance between the 2 distributions: {distance}") # markdown-exec: hide


```

We have used `wasserstein_distance` instead of `kl_divergence` as many of the bistrings have 0 probabilites. If the expected solution lies outside the space of observed bitstrings, `MTHREE` will fail. We next look at majority voting to circument this problem when the expected output is a single bitstring.

Additionally, we can further enhance sparsity by integrating the Hamming distance approach into the `MTHREE` method. This method utilizes only the noise matrix values that fall within the specified Hamming distance of the correct quantum state. This functionality can be enabled by setting the `ham_dist` option.

```python exec="on" source="material-block" session="m3" result="json"
from qadence_protocols.mitigations.readout import ham_dist_redistribution

confusion_matrix_subspace = normalized_subspace_kron(noise_matrices, observed_prob.nonzero()[0])

# we consider a small hamming distance for this method and set it to 3
confusion_matrix_subspace_ham = ham_dist_redistribution(
confusion_matrix_subspace, ham_dist=3
)
p_corr_mthree_gmres_ham = gmres(confusion_matrix_subspace_ham, input_csr.toarray())[0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm these variables names are a bit hard to understand. Could you please use more meaningful names or comment what you're doing here ?

p_corr_mthree_gmres_mle_ham = mle_solve(p_corr_mthree_gmres_ham)

noise_matrices_inv = list(map(matrix_inv, noise_matrices))
p_corr_inv_mle = mle_solve(tensor_rank_mult(noise_matrices_inv, observed_prob))

distance_ham = wasserstein_distance(p_corr_mthree_gmres_mle_ham, p_corr_inv_mle)

print(f"Wasserstein distance between the 2 distributions: {distance_ham}") # markdown-exec: hide
```

### Majority Voting

Mitigation protocol to be used only when the circuit output has a single expected bitstring as the solution [^4]. The method votes on the likeliness of each qubit to be a 0 or 1 assuming a tensor product structure for the output. The method is valid only when the readout errors are not correlated.
Expand Down Expand Up @@ -235,7 +256,7 @@ print(f"Mitigates samples: {mitigated_samples_opt}") # markdown-exec: hide

### Twirl mitigation

This protocol makes use of all possible so-called twirl operations to average out the effect of readout errors into an effective scaling. The twirl operation consists of using bit flip operators before the measurement and after the measurement is obtained[^5]. The number of twirl operations can be reduced through random sampling. The method is exact in that it requires no calibration which might be prone to errors of modelling.
This protocol makes use of all possible so-called twirl operations to average out the effect of readout errors into an effective scaling. The twirl operation consists of using bit flip operators before the measurement and after the measurement is obtained[^5]. The number of twirl operations can be reduced through random sampling with `twirl_samples` option. The method is exact in that it requires no calibration which might be prone to errors of modelling.

```python exec="on" source="material-block" session="mfm" result="json"
from qadence import NoiseHandler, NoiseProtocol
Expand Down Expand Up @@ -279,7 +300,14 @@ print(f"noisy expectation value {noisy_model.expectation(measurement=tomo_measur
mitigate = Mitigations(protocol=Mitigations.TWIRL)
expectation_mitigated = mitigate(noise=noise, model=noisy_model)


# we set a number of qubits as the sample count. It can be changed for higher accuracy.
options={"twirl_samples": block.n_qubits}
mitigate_sample = Mitigations(protocol=Mitigations.TWIRL, options=options)
expectation_mitigated_sample = mitigate_sample(noise=noise, model=noisy_model)

print(f"expected mitigation value {expectation_mitigated}") # markdown-exec: hide
print(f"expected sampled mitigation value {expectation_mitigated_sample}") # markdown-exec: hide

```

Expand Down
56 changes: 54 additions & 2 deletions qadence_protocols/mitigations/analog_zne.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from typing import Callable

import numpy as np
import torch
from qadence import BackendName, QuantumModel
Expand All @@ -13,6 +15,7 @@
from qadence.transpile import apply_fn_to_blocks
from qadence.types import NoiseProtocol
from qadence.utils import Endianness
from scipy.optimize import curve_fit
from torch import Tensor

supported_noise_models = [NoiseProtocol.ANALOG.DEPOLARIZING, NoiseProtocol.ANALOG.DEPHASING]
Expand All @@ -27,6 +30,42 @@ def zne(noise_levels: Tensor, zne_datasets: list[list]) -> Tensor:
return torch.tensor(poly_fits)


def exp_func(x: np.ndarray, a: float, b: float, c: float) -> np.ndarray:
return a * np.exp(-b * x) + c


def zne_exp(noise_levels: Tensor, zne_datasets: list[list[float]]) -> Tensor:
exp_fits: list[float] = []
noise_levels_np: np.ndarray = noise_levels.numpy()

for dataset in zne_datasets:
dataset_np: np.ndarray = np.array(dataset)

# check if datapoints are enough
if len(noise_levels_np) < 3:
raise ValueError("At least 3 noise levels are required for exponential fitting.")

try:
# Execute fitting
popt, _ = curve_fit(
exp_func,
noise_levels_np,
dataset_np,
p0=(dataset_np[0], 1, dataset_np[-1]),
maxfev=5000,
)

# expected value when noise is zero
zero_noise_value: float = float(exp_func(0, *popt))
exp_fits.append(zero_noise_value)

except RuntimeError:
print("Warning: Optimal parameters not found, using last known value.")
exp_fits.append(dataset_np[-1])

return torch.tensor(exp_fits)


def pulse_experiment(
backend: Backend,
circuit: QuantumCircuit,
Expand All @@ -35,6 +74,7 @@ def pulse_experiment(
noise: NoiseHandler,
stretches: Tensor,
endianness: Endianness,
zne_func: Callable,
state: Tensor | None = None,
) -> Tensor:
def mutate_params(block: AbstractBlock, stretch: float) -> AbstractBlock:
Expand Down Expand Up @@ -91,7 +131,7 @@ def mutate_params(block: AbstractBlock, stretch: float) -> AbstractBlock:
res = res if len(res.shape) > 0 else res.reshape(1)

# Zero-noise extrapolate.
extrapolated_exp_values = zne(
extrapolated_exp_values = zne_func(
noise_levels=stretches,
zne_datasets=res.real,
)
Expand All @@ -105,6 +145,7 @@ def noise_level_experiment(
param_values: dict[str, Tensor],
noise: NoiseHandler,
endianness: Endianness,
zne_func: Callable,
state: Tensor | None = None,
) -> Tensor:
noise_probs = noise.options[-1].get("noise_probs")
Expand All @@ -118,7 +159,8 @@ def noise_level_experiment(
noise=noise,
endianness=endianness,
)
return zne(noise_levels=noise_probs, zne_datasets=zne_datasets)

return zne_func(noise_levels=noise_probs, zne_datasets=zne_datasets)


def analog_zne(
Expand All @@ -133,6 +175,14 @@ def analog_zne(
raise ValueError("Only BackendName.PULSER supports analog simulations.")
backend = backend_factory(backend=model._backend_name, diff_mode=None)
stretches = options.get("stretches", None)
zne_type = options.get("zne_type", None)
if zne_type == "exp":
zne_func = zne_exp
elif zne_type == "poly" or zne_type is None:
zne_func = zne
else:
raise ValueError("analog zne supports only polynomial or exponential extrapolation.")

if stretches is not None:
extrapolated_exp_values = pulse_experiment(
backend=backend,
Expand All @@ -142,6 +192,7 @@ def analog_zne(
noise=noise,
stretches=stretches,
endianness=endianness,
zne_func=zne_func,
state=state,
)
else:
Expand All @@ -152,6 +203,7 @@ def analog_zne(
param_values=param_values,
noise=noise,
endianness=endianness,
zne_func=zne_func,
state=state,
)
return extrapolated_exp_values
Expand Down
37 changes: 37 additions & 0 deletions qadence_protocols/mitigations/readout.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,34 @@ def normalized_subspace_kron(noise_matrices: npt.NDArrary, subspace: npt.NDArray
return conf_matrix


def ham_dist_redistribution(conf_matrix: npt.NDArray, ham_dist: int) -> npt.NDArray:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use npt.NDArray here, I'd suggest to keep it consistent elsewhere too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that all variables referring to this type are currently using the specified type. Could you clarify which part might be insufficient?

"""Redistributes the confusion matrix based on a given Hamming distance."""

def get_ham_dist(row_index: int, col_index: int) -> int:
"""Compute Hamming Distance between two integers."""
return bin(row_index ^ col_index).count("1")

def get_valid_rows(num_rows: int, col_index: int, ham_dist: int) -> list:
"""Find valid rows within the given Hamming distance."""
return [row for row in range(num_rows) if get_ham_dist(row, col_index) < ham_dist]

def get_col_sum(conf_matrix: npt.NDArray, valid_rows: list, col_index: int) -> float:
"""Compute sum of column values for valid rows."""
return float(sum(conf_matrix[row, col_index] for row in valid_rows))

num_rows, num_cols = conf_matrix.shape
redistributed_matrix = np.zeros((num_rows, num_cols))
for col in range(num_cols):
valid_rows = get_valid_rows(num_rows, col, ham_dist)
partial_sum = get_col_sum(conf_matrix, valid_rows, col)

if partial_sum > 0: # Avoid division by zero
for row in valid_rows:
redistributed_matrix[row, col] = conf_matrix[row, col] / partial_sum
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt it possible to just get the sub_matrix
redistributed_matrix = conf_matrix[valid_rows, col] or something ?

this one is m3 with increased hamming distance subspace confusion matrix, correct ? I was under the impression it would be a square matrix with both rows and columns matching and subsampled

I dont think partial sum can add to 0.. entries of confusion matrix are positive


return redistributed_matrix


def tensor_rank_mult(qubit_ops: npt.NDArray, prob_vect: npt.NDArray) -> npt.NDArray:
"""
Fast multiplication of single qubit operators on a probability vector.
Expand Down Expand Up @@ -237,6 +265,15 @@ def mitigation_minimization(

elif optimization_type == ReadOutOptimization.MTHREE:
confusion_matrix_subspace = normalized_subspace_kron(noise_matrices, p_raw.nonzero()[0])

ham_dist = options.get("ham_dist")
if ham_dist:
if not isinstance(ham_dist, int):
raise ValueError("ham_dist value must be of type int.")
confusion_matrix_subspace = ham_dist_redistribution(
confusion_matrix_subspace, ham_dist
)

# GMRES (Generalized minimal residual) for linear equations in higher dimension
p_corr, exit_code = gmres(confusion_matrix_subspace, p_raw)

Expand Down
9 changes: 9 additions & 0 deletions qadence_protocols/mitigations/twirl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import itertools
import random
from collections import Counter

import torch
Expand Down Expand Up @@ -55,6 +56,14 @@ def mitigate(
]
)
)

# Random sample gate strings for scalability
twirl_samples = options.get("twirl_samples")
if isinstance(twirl_samples, int) and twirl_samples > 0:
twirls = random.sample(twirls, twirl_samples)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be clever to not sample from the whole twirl space, but rather generate as much needed, as that would be scalable. Not sure if this is straightforward here, feel free to ignore

elif twirl_samples is not None:
raise ValueError("twirl_samples must be a positive integer")

# Generate samples for all twirls of circuit
samples_twirl_num_list = []
samples_twirl_den_list = []
Expand Down
33 changes: 33 additions & 0 deletions tests/test_readout.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from qadence_protocols import Mitigations
from qadence_protocols.mitigations.readout import (
ham_dist_redistribution,
majority_vote,
matrix_inv,
mle_solve,
Expand Down Expand Up @@ -275,6 +276,38 @@ def test_readout_mthree_sparse() -> None:
assert wasserstein_distance(p_corr_mthree_gmres_mle, p_corr_inv_mle) < LOW_ACCEPTANCE


def test_readout_mthree_sparse_ham() -> None:
n_qubits = 10
exact_prob = np.random.rand(2 ** (n_qubits))
exact_prob[2 ** (n_qubits // 2) :] = 0
exact_prob = 0.90 * exact_prob + 0.1 * np.ones(2**n_qubits) / 2**n_qubits
exact_prob = exact_prob / sum(exact_prob)
np.random.shuffle(exact_prob)

observed_prob = np.array(exact_prob, copy=True)
observed_prob[exact_prob < 1 / 2 ** (n_qubits)] = 0

noise_matrices = []
for t in range(n_qubits):
t_a, t_b = np.random.rand(2) / 8
K = np.array([[1 - t_a, t_a], [t_b, 1 - t_b]]).transpose() # column sum be 1
noise_matrices.append(K)

confusion_matrix_subspace = normalized_subspace_kron(noise_matrices, observed_prob.nonzero()[0])

confusion_matrix_subspace_ham = ham_dist_redistribution(confusion_matrix_subspace, 3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you should first get the rows surrounding the non zero locations, expand by hamming distance and then call normalized_subspace over these locations


input_csr = csr_matrix(observed_prob, shape=(1, 2**n_qubits)).T

p_corr_mthree_gmres_ham = gmres(confusion_matrix_subspace_ham, input_csr.toarray())[0]
p_corr_mthree_gmres_mle_ham = mle_solve(p_corr_mthree_gmres_ham)

noise_matrices_inv = list(map(matrix_inv, noise_matrices))
p_corr_inv_mle = mle_solve(tensor_rank_mult(noise_matrices_inv, observed_prob))

assert wasserstein_distance(p_corr_mthree_gmres_mle_ham, p_corr_inv_mle) < LOW_ACCEPTANCE


@pytest.mark.flaky(max_runs=5)
@pytest.mark.parametrize(
"n_qubits,index",
Expand Down
59 changes: 59 additions & 0 deletions tests/test_twirl.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,62 @@ def test_readout_twirl_mitigation(
mitigate = Mitigations(protocol=Mitigations.TWIRL)
expectation_mitigated = mitigate(noise=noise, model=noisy_model)
assert torch.allclose(expectation_mitigated, expectation_noiseless, atol=1.0e-1, rtol=5.0e-2)


@pytest.mark.parametrize(
"block, observables",
[
(
chain(kron(RX(0, torch.pi / 3), RX(1, torch.pi / 3)), CNOT(0, 1)),
[add(kron(Z(0), Z(1)) + Z(0))],
),
(
chain(kron(RX(0, torch.pi / 4), RX(1, torch.pi / 5)), CNOT(0, 1)),
[2 * Z(1) + 3 * Z(0), 3 * kron(Z(0), Z(1)) - 1 * Z(0)],
),
(
chain(kron(RX(0, torch.pi / 3), RX(1, torch.pi / 6)), CNOT(0, 1)),
[add(Z(1), -Z(0)), 3 * kron(Z(0), Z(1)) + 2 * Z(0)],
),
(
chain(kron(RX(0, torch.pi / 6), RX(1, torch.pi / 4)), CNOT(0, 1)),
[add(Z(1), -2 * Z(0)), add(2 * kron(Z(0), Z(1)), 4 * Z(0))],
),
],
)
def test_readout_twirl_mitigation_sample(
block: AbstractBlock,
observables: list[AbstractBlock],
) -> None:
circuit = QuantumCircuit(block.n_qubits, block)
error_probability = 0.1
n_shots = 10000
backend = BackendName.PYQTORCH
noise = NoiseHandler(
protocol=NoiseProtocol.READOUT.INDEPENDENT, options={"error_probability": error_probability}
)
tomo_measurement = Measurements(
protocol=Measurements.TOMOGRAPHY,
options={"n_shots": n_shots},
)

model = QuantumModel(
circuit=circuit, observable=observables, measurement=tomo_measurement, backend=backend
)

expectation_noiseless = model.expectation(
measurement=tomo_measurement,
)

noisy_model = QuantumModel(
circuit=circuit,
observable=observables,
measurement=tomo_measurement,
noise=noise,
backend=backend,
)

options = {"twirl_samples": block.n_qubits}
mitigate = Mitigations(protocol=Mitigations.TWIRL, options=options)
expectation_mitigated = mitigate(noise=noise, model=noisy_model)
assert torch.allclose(expectation_mitigated, expectation_noiseless, atol=1.0e-1, rtol=5.0e-2)
Loading