Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Handle batches with partial results #707

Merged
merged 10 commits into from
Sep 18, 2024
62 changes: 48 additions & 14 deletions pulser-core/pulser/backend/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ class SubmissionStatus(Enum):
PAUSED = auto()


class JobStatus(Enum):
"""Status of a remote job."""

PENDING = auto()
RUNNING = auto()
DONE = auto()
CANCELED = auto()
ERROR = auto()
PAUSED = auto()


class RemoteResultsError(Exception):
"""Error raised when fetching remote results fails."""

Expand Down Expand Up @@ -103,28 +114,42 @@ def get_status(self) -> SubmissionStatus:
"""Gets the status of the remote submission."""
return self._connection._get_submission_status(self._submission_id)

def _get_available_result(self, submission_id: str) -> dict[str, Result]:
"""Return available results and ignore jobs with no results."""
return {
k: v
for k, v in self._connection._query_result(submission_id).items()
if v is not None
def get_available_results(self, submission_id: str) -> dict[str, Result]:
"""Returns the available results of a submission.

Unlike the `results` property, this method does not raise an error if
some jobs associated to the submission do not have results.

It returns a dictionnary mapping the job ID to their results. Jobs with
no result are omitted.
MatthieuMoreau0 marked this conversation as resolved.
Show resolved Hide resolved
"""

results = {
k: v[1]
for k, v in self._connection._query_job_progress(
submission_id
).items()
if v[1] is not None
}

if self._job_ids:
return {k: v for k, v in results.items() if k in self._job_ids}
return results

def __getattr__(self, name: str) -> Any:
MatthieuMoreau0 marked this conversation as resolved.
Show resolved Hide resolved
if name == "_results":
status = self.get_status()
if status == SubmissionStatus.DONE:
try:
self._results = tuple(
self._connection._fetch_result(
self._submission_id, self._job_ids
)
)
return self._results
raise RemoteResultsError(
"The results are not available. The submission's status is "
f"{str(status)}."
)
except RemoteResultsError as e:
raise RemoteResultsError(
"Results are not available for all jobs. Use the "
"`get_available_results` method to retrieve partial "
"results."
) from e
raise AttributeError(
f"'RemoteResults' object has no attribute '{name}'."
)
Expand All @@ -148,7 +173,16 @@ def _fetch_result(
pass

@abstractmethod
def _query_result(self, submission_id: str) -> Mapping[str, Result | None]:
def _query_job_progress(
self, submission_id: str
) -> Mapping[str, tuple[JobStatus, Result | None]]:
"""Fetches the status and results of a submission.
MatthieuMoreau0 marked this conversation as resolved.
Show resolved Hide resolved

Unlike `_fetch_result`, this method does not raise an error if some
jobs associated to the submission do not have results.

It returns a dictionnary mapping the job ID to their status and results.
"""
pass

@abstractmethod
Expand Down
40 changes: 26 additions & 14 deletions pulser-pasqal/pulser_pasqal/pasqal_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
from pulser.backend.qpu import QPUBackend
from pulser.backend.remote import (
JobParams,
JobStatus,
RemoteConnection,
RemoteResults,
RemoteResultsError,
SubmissionStatus,
)
from pulser.devices import Device
Expand Down Expand Up @@ -183,19 +185,27 @@ def _fetch_result(
self, submission_id: str, job_ids: list[str] | None
) -> tuple[Result, ...]:
# For now, the results are always sampled results
full_results = self._query_result(submission_id)
jobs = self._query_job_progress(submission_id)

if job_ids is None:
job_ids = list(full_results.keys())
job_ids = list(jobs.keys())

results = []
results: list[Result] = []
for id in job_ids:
assert full_results[id] is not None, "Failed to fetch the results."
results.append(cast(Result, full_results[id]))
status, result = jobs[id]
if status != {JobStatus.PENDING, JobStatus.RUNNING}:
MatthieuMoreau0 marked this conversation as resolved.
Show resolved Hide resolved
raise RemoteResultsError(
f"The results are not yet available, job {id} status is {status}."
)
if result is None:
raise RemoteResultsError(f"No results found for job {id}.")
results.append(result)

return tuple(results)

def _query_result(self, submission_id: str) -> Mapping[str, Result | None]:
def _query_job_progress(
self, submission_id: str
) -> Mapping[str, tuple[JobStatus, Result | None]]:
get_batch_fn = backoff_decorator(self._sdk_connection.get_batch)
batch = get_batch_fn(id=submission_id)

Expand All @@ -204,21 +214,23 @@ def _query_result(self, submission_id: str) -> Mapping[str, Result | None]:
all_qubit_ids = reg.qubit_ids
meas_basis = seq_builder.get_measurement_basis()

results: dict[str, Result | None] = {}
sdk_jobs = batch.ordered_jobs
results: dict[str, tuple[JobStatus, Result | None]] = {}

MatthieuMoreau0 marked this conversation as resolved.
Show resolved Hide resolved
for job in sdk_jobs:
for job in batch.ordered_jobs:
vars = job.variables
size: int | None = None
if vars and "qubits" in vars:
size = len(vars["qubits"])
if job.result is None:
results[job.id] = None
results[job.id] = (JobStatus[job.status], None)
else:
results[job.id] = SampledResult(
atom_order=all_qubit_ids[slice(size)],
meas_basis=meas_basis,
bitstring_counts=job.result,
results[job.id] = (
JobStatus[job.status],
SampledResult(
atom_order=all_qubit_ids[slice(size)],
meas_basis=meas_basis,
bitstring_counts=job.result,
),
)
return results

Expand Down
19 changes: 12 additions & 7 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pulser.backend.config import EmulatorConfig
from pulser.backend.qpu import QPUBackend
from pulser.backend.remote import (
JobStatus,
RemoteConnection,
RemoteResults,
RemoteResultsError,
Expand Down Expand Up @@ -89,20 +90,24 @@ def test_emulator_config_type_errors(param, msg):
class _MockConnection(RemoteConnection):
def __init__(self):
self._status_calls = 0
self.result = SampledResult(
("q0", "q1"),
meas_basis="ground-rydberg",
bitstring_counts={"00": 100},
)

def submit(self, sequence, wait: bool = False, **kwargs) -> RemoteResults:
return RemoteResults("abcd", self)

def _fetch_result(
self, submission_id: str, job_ids: list[str] | None = None
) -> typing.Sequence[Result]:
return (
SampledResult(
("q0", "q1"),
meas_basis="ground-rydberg",
bitstring_counts={"00": 100},
),
)
return (self.result,)

def _query_job_progress(
self, submission_id: str
) -> typing.Mapping[str, tuple[JobStatus, Result | None]]:
return {"abcd": (JobStatus.DONE, self.result)}

def _get_submission_status(self, submission_id: str) -> SubmissionStatus:
self._status_calls += 1
Expand Down
Loading