Skip to content

Commit

Permalink
[FEAT] Handle batches with partial results (#707)
Browse files Browse the repository at this point in the history
* Introduce new method to return available results

* mypy

* review

* Apply suggestions from code review

Co-authored-by: Henrique Silvério <29920212+HGSilveri@users.noreply.github.com>

* [TEST] Add test for get_available_result and update existing tests

* style

* Add more assertions to tests

* Move sequence declaration to dedicated function

* style

---------

Co-authored-by: Henrique Silvério <29920212+HGSilveri@users.noreply.github.com>
  • Loading branch information
MatthieuMoreau0 and HGSilveri authored Sep 18, 2024
1 parent 02122e2 commit 7af1d2d
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 67 deletions.
61 changes: 54 additions & 7 deletions pulser-core/pulser/backend/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import typing
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import Any, TypedDict
from typing import Any, Mapping, TypedDict

from pulser.backend.abc import Backend
from pulser.devices import Device
Expand All @@ -44,6 +44,17 @@ class SubmissionStatus(Enum):
PAUSED = auto()


class JobStatus(Enum):
"""Status of a remote job."""

PENDING = auto()
RUNNING = auto()
DONE = auto()
CANCELED = auto()
ERROR = auto()
PAUSED = auto()


class RemoteResultsError(Exception):
"""Error raised when fetching remote results fails."""

Expand Down Expand Up @@ -103,20 +114,43 @@ def get_status(self) -> SubmissionStatus:
"""Gets the status of the remote submission."""
return self._connection._get_submission_status(self._submission_id)

def get_available_results(self, submission_id: str) -> dict[str, Result]:
"""Returns the available results of a submission.
Unlike the `results` property, this method does not raise an error if
some jobs associated to the submission do not have results.
Returns:
dict[str, Result]: A dictionary mapping the job ID to its results.
Jobs with no result are omitted.
"""
results = {
k: v[1]
for k, v in self._connection._query_job_progress(
submission_id
).items()
if v[1] is not None
}

if self._job_ids:
return {k: v for k, v in results.items() if k in self._job_ids}
return results

def __getattr__(self, name: str) -> Any:
if name == "_results":
status = self.get_status()
if status == SubmissionStatus.DONE:
try:
self._results = tuple(
self._connection._fetch_result(
self._submission_id, self._job_ids
)
)
return self._results
raise RemoteResultsError(
"The results are not available. The submission's status is "
f"{str(status)}."
)
except RemoteResultsError as e:
raise RemoteResultsError(
"Results are not available for all jobs. Use the "
"`get_available_results` method to retrieve partial "
"results."
) from e
raise AttributeError(
f"'RemoteResults' object has no attribute '{name}'."
)
Expand All @@ -139,6 +173,19 @@ def _fetch_result(
"""Fetches the results of a completed submission."""
pass

@abstractmethod
def _query_job_progress(
self, submission_id: str
) -> Mapping[str, tuple[JobStatus, Result | None]]:
"""Fetches the status and results of all the jobs in a submission.
Unlike `_fetch_result`, this method does not raise an error if some
jobs associated to the submission do not have results.
It returns a dictionnary mapping the job ID to its status and results.
"""
pass

@abstractmethod
def _get_submission_status(self, submission_id: str) -> SubmissionStatus:
"""Gets the status of a submission from its ID.
Expand Down
60 changes: 40 additions & 20 deletions pulser-pasqal/pulser_pasqal/pasqal_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import json
from dataclasses import fields
from typing import Any, Type, cast
from typing import Any, Mapping, Type, cast

import backoff
import numpy as np
Expand All @@ -32,8 +32,10 @@
from pulser.backend.qpu import QPUBackend
from pulser.backend.remote import (
JobParams,
JobStatus,
RemoteConnection,
RemoteResults,
RemoteResultsError,
SubmissionStatus,
)
from pulser.devices import Device
Expand Down Expand Up @@ -186,37 +188,55 @@ def _fetch_result(
self, submission_id: str, job_ids: list[str] | None
) -> tuple[Result, ...]:
# For now, the results are always sampled results
jobs = self._query_job_progress(submission_id)

if job_ids is None:
job_ids = list(jobs.keys())

results: list[Result] = []
for id in job_ids:
status, result = jobs[id]
if status in {JobStatus.PENDING, JobStatus.RUNNING}:
raise RemoteResultsError(
f"The results are not yet available, job {id} status is "
f"{status}."
)
if result is None:
raise RemoteResultsError(f"No results found for job {id}.")
results.append(result)

return tuple(results)

def _query_job_progress(
self, submission_id: str
) -> Mapping[str, tuple[JobStatus, Result | None]]:
get_batch_fn = backoff_decorator(self._sdk_connection.get_batch)
batch = get_batch_fn(id=submission_id)

seq_builder = Sequence.from_abstract_repr(batch.sequence_builder)
reg = seq_builder.get_register(include_mappable=True)
all_qubit_ids = reg.qubit_ids
meas_basis = seq_builder.get_measurement_basis()

results = []
sdk_jobs = batch.ordered_jobs
if job_ids is not None:
ind_job_pairs = [
(job_ids.index(job.id), job)
for job in sdk_jobs
if job.id in job_ids
]
ind_job_pairs.sort()
sdk_jobs = [job for _, job in ind_job_pairs]
for job in sdk_jobs:
results: dict[str, tuple[JobStatus, Result | None]] = {}

for job in batch.ordered_jobs:
vars = job.variables
size: int | None = None
if vars and "qubits" in vars:
size = len(vars["qubits"])
assert job.result is not None, "Failed to fetch the results."
results.append(
SampledResult(
atom_order=all_qubit_ids[slice(size)],
meas_basis=meas_basis,
bitstring_counts=job.result,
if job.result is None:
results[job.id] = (JobStatus[job.status], None)
else:
results[job.id] = (
JobStatus[job.status],
SampledResult(
atom_order=all_qubit_ids[slice(size)],
meas_basis=meas_basis,
bitstring_counts=job.result,
),
)
)
return tuple(results)
return results

@backoff_decorator
def _get_submission_status(self, submission_id: str) -> SubmissionStatus:
Expand Down
37 changes: 25 additions & 12 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pulser.backend.config import EmulatorConfig
from pulser.backend.qpu import QPUBackend
from pulser.backend.remote import (
JobStatus,
RemoteConnection,
RemoteResults,
RemoteResultsError,
Expand Down Expand Up @@ -89,25 +90,31 @@ def test_emulator_config_type_errors(param, msg):
class _MockConnection(RemoteConnection):
def __init__(self):
self._status_calls = 0
self._progress_calls = 0
self.result = SampledResult(
("q0", "q1"),
meas_basis="ground-rydberg",
bitstring_counts={"00": 100},
)

def submit(self, sequence, wait: bool = False, **kwargs) -> RemoteResults:
return RemoteResults("abcd", self)

def _fetch_result(
self, submission_id: str, job_ids: list[str] | None = None
) -> typing.Sequence[Result]:
return (
SampledResult(
("q0", "q1"),
meas_basis="ground-rydberg",
bitstring_counts={"00": 100},
),
)
self._progress_calls += 1
if self._progress_calls == 1:
raise RemoteResultsError("Results not available")

return (self.result,)

def _query_job_progress(
self, submission_id: str
) -> typing.Mapping[str, tuple[JobStatus, Result | None]]:
return {"abcd": (JobStatus.DONE, self.result)}

def _get_submission_status(self, submission_id: str) -> SubmissionStatus:
self._status_calls += 1
if self._status_calls == 1:
return SubmissionStatus.RUNNING
return SubmissionStatus.DONE


Expand Down Expand Up @@ -176,10 +183,16 @@ def test_qpu_backend(sequence):

with pytest.raises(
RemoteResultsError,
match="The results are not available. The submission's status is"
" SubmissionStatus.RUNNING",
match=(
"Results are not available for all jobs. "
"Use the `get_available_results` method to retrieve partial "
"results."
),
):
remote_results.results

results = remote_results.results
assert results[0].sampling_dist == {"00": 1.0}

available_results = remote_results.get_available_results("id")
assert available_results == {"abcd": connection.result}
Loading

0 comments on commit 7af1d2d

Please sign in to comment.