Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Handle batches with partial results #707

Merged
merged 10 commits into from
Sep 18, 2024
61 changes: 54 additions & 7 deletions pulser-core/pulser/backend/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import typing
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import Any, TypedDict
from typing import Any, Mapping, TypedDict

from pulser.backend.abc import Backend
from pulser.devices import Device
Expand All @@ -44,6 +44,17 @@ class SubmissionStatus(Enum):
PAUSED = auto()


class JobStatus(Enum):
"""Status of a remote job."""

PENDING = auto()
RUNNING = auto()
DONE = auto()
CANCELED = auto()
ERROR = auto()
PAUSED = auto()


class RemoteResultsError(Exception):
"""Error raised when fetching remote results fails."""

Expand Down Expand Up @@ -103,20 +114,43 @@ def get_status(self) -> SubmissionStatus:
"""Gets the status of the remote submission."""
return self._connection._get_submission_status(self._submission_id)

def get_available_results(self, submission_id: str) -> dict[str, Result]:
"""Returns the available results of a submission.

Unlike the `results` property, this method does not raise an error if
some jobs associated to the submission do not have results.

Returns:
dict[str, Result]: A dictionary mapping the job ID to its results.
Jobs with no result are omitted.
"""
results = {
k: v[1]
for k, v in self._connection._query_job_progress(
submission_id
).items()
if v[1] is not None
}

if self._job_ids:
return {k: v for k, v in results.items() if k in self._job_ids}
return results

def __getattr__(self, name: str) -> Any:
MatthieuMoreau0 marked this conversation as resolved.
Show resolved Hide resolved
if name == "_results":
status = self.get_status()
if status == SubmissionStatus.DONE:
try:
self._results = tuple(
self._connection._fetch_result(
self._submission_id, self._job_ids
)
)
return self._results
raise RemoteResultsError(
"The results are not available. The submission's status is "
f"{str(status)}."
)
except RemoteResultsError as e:
raise RemoteResultsError(
"Results are not available for all jobs. Use the "
"`get_available_results` method to retrieve partial "
"results."
) from e
raise AttributeError(
f"'RemoteResults' object has no attribute '{name}'."
)
Expand All @@ -139,6 +173,19 @@ def _fetch_result(
"""Fetches the results of a completed submission."""
pass

@abstractmethod
def _query_job_progress(
self, submission_id: str
) -> Mapping[str, tuple[JobStatus, Result | None]]:
"""Fetches the status and results of all the jobs in a submission.

Unlike `_fetch_result`, this method does not raise an error if some
jobs associated to the submission do not have results.

It returns a dictionnary mapping the job ID to its status and results.
"""
pass

@abstractmethod
def _get_submission_status(self, submission_id: str) -> SubmissionStatus:
"""Gets the status of a submission from its ID.
Expand Down
60 changes: 40 additions & 20 deletions pulser-pasqal/pulser_pasqal/pasqal_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import json
from dataclasses import fields
from typing import Any, Type, cast
from typing import Any, Mapping, Type, cast

import backoff
import numpy as np
Expand All @@ -32,8 +32,10 @@
from pulser.backend.qpu import QPUBackend
from pulser.backend.remote import (
JobParams,
JobStatus,
RemoteConnection,
RemoteResults,
RemoteResultsError,
SubmissionStatus,
)
from pulser.devices import Device
Expand Down Expand Up @@ -186,37 +188,55 @@ def _fetch_result(
self, submission_id: str, job_ids: list[str] | None
) -> tuple[Result, ...]:
# For now, the results are always sampled results
jobs = self._query_job_progress(submission_id)

if job_ids is None:
job_ids = list(jobs.keys())

results: list[Result] = []
for id in job_ids:
status, result = jobs[id]
if status in {JobStatus.PENDING, JobStatus.RUNNING}:
raise RemoteResultsError(
f"The results are not yet available, job {id} status is "
f"{status}."
)
if result is None:
raise RemoteResultsError(f"No results found for job {id}.")
results.append(result)

return tuple(results)

def _query_job_progress(
self, submission_id: str
) -> Mapping[str, tuple[JobStatus, Result | None]]:
get_batch_fn = backoff_decorator(self._sdk_connection.get_batch)
batch = get_batch_fn(id=submission_id)

seq_builder = Sequence.from_abstract_repr(batch.sequence_builder)
reg = seq_builder.get_register(include_mappable=True)
all_qubit_ids = reg.qubit_ids
meas_basis = seq_builder.get_measurement_basis()

results = []
sdk_jobs = batch.ordered_jobs
if job_ids is not None:
ind_job_pairs = [
(job_ids.index(job.id), job)
for job in sdk_jobs
if job.id in job_ids
]
ind_job_pairs.sort()
sdk_jobs = [job for _, job in ind_job_pairs]
for job in sdk_jobs:
results: dict[str, tuple[JobStatus, Result | None]] = {}

MatthieuMoreau0 marked this conversation as resolved.
Show resolved Hide resolved
for job in batch.ordered_jobs:
vars = job.variables
size: int | None = None
if vars and "qubits" in vars:
size = len(vars["qubits"])
assert job.result is not None, "Failed to fetch the results."
results.append(
SampledResult(
atom_order=all_qubit_ids[slice(size)],
meas_basis=meas_basis,
bitstring_counts=job.result,
if job.result is None:
results[job.id] = (JobStatus[job.status], None)
else:
results[job.id] = (
JobStatus[job.status],
SampledResult(
atom_order=all_qubit_ids[slice(size)],
meas_basis=meas_basis,
bitstring_counts=job.result,
),
)
)
return tuple(results)
return results

@backoff_decorator
def _get_submission_status(self, submission_id: str) -> SubmissionStatus:
Expand Down
37 changes: 25 additions & 12 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pulser.backend.config import EmulatorConfig
from pulser.backend.qpu import QPUBackend
from pulser.backend.remote import (
JobStatus,
RemoteConnection,
RemoteResults,
RemoteResultsError,
Expand Down Expand Up @@ -89,25 +90,31 @@ def test_emulator_config_type_errors(param, msg):
class _MockConnection(RemoteConnection):
def __init__(self):
self._status_calls = 0
self._progress_calls = 0
self.result = SampledResult(
("q0", "q1"),
meas_basis="ground-rydberg",
bitstring_counts={"00": 100},
)

def submit(self, sequence, wait: bool = False, **kwargs) -> RemoteResults:
return RemoteResults("abcd", self)

def _fetch_result(
self, submission_id: str, job_ids: list[str] | None = None
) -> typing.Sequence[Result]:
return (
SampledResult(
("q0", "q1"),
meas_basis="ground-rydberg",
bitstring_counts={"00": 100},
),
)
self._progress_calls += 1
if self._progress_calls == 1:
raise RemoteResultsError("Results not available")

return (self.result,)

def _query_job_progress(
self, submission_id: str
) -> typing.Mapping[str, tuple[JobStatus, Result | None]]:
return {"abcd": (JobStatus.DONE, self.result)}

def _get_submission_status(self, submission_id: str) -> SubmissionStatus:
self._status_calls += 1
if self._status_calls == 1:
return SubmissionStatus.RUNNING
return SubmissionStatus.DONE


Expand Down Expand Up @@ -176,10 +183,16 @@ def test_qpu_backend(sequence):

with pytest.raises(
RemoteResultsError,
match="The results are not available. The submission's status is"
" SubmissionStatus.RUNNING",
match=(
"Results are not available for all jobs. "
"Use the `get_available_results` method to retrieve partial "
"results."
),
):
remote_results.results

results = remote_results.results
assert results[0].sampling_dist == {"00": 1.0}

available_results = remote_results.get_available_results("id")
assert available_results == {"abcd": connection.result}
Loading