Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to Sequence to estimate delay introduced before adding pulse #773

Merged
merged 6 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions pulser-core/pulser/sequence/_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,15 @@ def disable_eom(self, channel_id: str, _skip_buffer: bool = False) -> None:
else:
self.wait_for_fall(channel_id)

def add_pulse(
def get_next_time_slot(
a-corni marked this conversation as resolved.
Show resolved Hide resolved
self,
pulse: Pulse,
channel: str,
phase_barrier_ts: list[int],
protocol: str,
phase_drift_params: _PhaseDriftParams | None = None,
) -> None:
block_if_exceeds: bool = False,
a-corni marked this conversation as resolved.
Show resolved Hide resolved
) -> _TimeSlot:
def corrected_phase(tf: int) -> pm.AbstractArray:
phase_drift = pm.AbstractArray(
phase_drift_params.calc_phase_drift(tf)
Expand Down Expand Up @@ -447,11 +448,10 @@ def corrected_phase(tf: int) -> pm.AbstractArray:
delay_duration = max(current_max_t - t0, phase_jump_buffer)
if delay_duration > 0:
delay_duration = self[channel].adjust_duration(delay_duration)
self.add_delay(delay_duration, channel)

ti = t0 + delay_duration
tf = ti + pulse.duration
self._check_duration(tf)
self._check_duration(tf, block_if_exceeds)
# dataclasses.replace() does not work on Pulse (because init=False)
if phase_drift_params is not None:
pulse = Pulse(
Expand All @@ -460,7 +460,29 @@ def corrected_phase(tf: int) -> pm.AbstractArray:
phase=corrected_phase(ti),
post_phase_shift=pulse.post_phase_shift,
)
self[channel].slots.append(_TimeSlot(pulse, ti, tf, last.targets))
return _TimeSlot(pulse, ti, tf, last.targets)

def add_pulse(
self,
pulse: Pulse,
channel: str,
phase_barrier_ts: list[int],
protocol: str,
phase_drift_params: _PhaseDriftParams | None = None,
) -> None:
last = self[channel][-1]
time_slot = self.get_next_time_slot(
pulse,
channel,
phase_barrier_ts,
protocol,
phase_drift_params,
True,
)
delay_duration = time_slot.ti - last.tf
if delay_duration > 0:
self.add_delay(delay_duration, channel)
self[channel].slots.append(time_slot)

def add_delay(self, duration: int, channel: str) -> None:
last = self[channel][-1]
Expand Down Expand Up @@ -557,9 +579,12 @@ def _get_last_pulse_phase(self, channel: str) -> pm.AbstractArray:
phase = pm.AbstractArray(0.0)
return phase

def _check_duration(self, t: int) -> None:
def _check_duration(self, t: int, block_if_exceeds: bool = True) -> None:
if self.max_duration is not None and t > self.max_duration:
raise RuntimeError(
msg = (
"The sequence's duration exceeded the maximum duration allowed"
f" by the device ({self.max_duration} ns)."
)
if block_if_exceeds:
raise RuntimeError(msg)
warnings.warn(msg, UserWarning)
90 changes: 90 additions & 0 deletions pulser-core/pulser/sequence/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,96 @@ def delay(
"""
self._delay(duration, channel, at_rest)

def estimate_added_delay(
self,
pulse: Union[Pulse, Parametrized],
channel: str,
protocol: PROTOCOLS = "min-delay",
) -> int:
"""Delay that will be added before the pulse when added to a channel.

When adding a pulse to a channel of the Sequence, a delay can be added
to account for the modulation bandwidth of the channel or the protocol
chosen. This method estimates the delay that will be added before the
pulse if this pulse was added to this channel with this protocol. It
works even if the channel is in EOM mode, but to be appropriate, the
Pulse should be a ConstantPulse with amplitude and detuning
respectively the rabi_freq and detuning_on of the EOM block.

Args:
pulse: The pulse object to add to the channel.
channel: The channel's name provided when declared.
protocol: Stipulates how to deal with
eventual conflicts with other channels, specifically in terms
of having multiple channels act on the same target
simultaneously.

- ``'min-delay'``: Before adding the pulse, introduces the
smallest possible delay that avoids all exisiting conflicts.
- ``'no-delay'``: Adds the pulse to the channel, regardless of
existing conflicts.
- ``'wait-for-all'``: Before adding the pulse, adds a delay
that idles the channel until the end of the other channels'
latest pulse.

Returns:
The delay that would be added before the pulse.
"""
self._validate_channel(
channel,
block_if_slm=channel.startswith("dmm_"),
)
self._validate_add_protocol(protocol)
if self.is_parametrized() or isinstance(pulse, Parametrized):
raise ValueError(
"Can't compute the delay to add before a pulse if sequence or"
"pulse is parametrized."
)
if self.is_in_eom_mode(channel):
eom_settings = self._schedule[channel].eom_blocks[-1]
if np.any(pulse.amplitude.samples != eom_settings.rabi_freq):
warnings.warn(
f"Channel {channel} is in EOM mode, amplitude of the pulse"
f" must be constant, equal to {eom_settings.rabi_freq}.",
a-corni marked this conversation as resolved.
Show resolved Hide resolved
UserWarning,
)
if np.any(pulse.detuning.samples != eom_settings.detuning_on):
warnings.warn(
f"Channel {channel} is in EOM mode, detuning of the pulse "
f"must be constant, equal to {eom_settings.detuning_on}.",
UserWarning,
a-corni marked this conversation as resolved.
Show resolved Hide resolved
)
channel_obj = self._schedule[channel].channel_obj
last = self._last(channel)
basis = channel_obj.basis

ph_refs = {
self._basis_ref[basis][q].phase.last_phase for q in last.targets
}
if isinstance(channel_obj, DMM):
phase_ref = None
elif len(ph_refs) != 1:
raise ValueError(
"Cannot do a multiple-target pulse on qubits with different "
"phase references for the same basis."
)
else:
phase_ref = ph_refs.pop()

pulse = self._validate_and_adjust_pulse(pulse, channel, phase_ref)

phase_barriers = [
self._basis_ref[basis][q].phase.last_time for q in last.targets
]
next_time_slot = self._schedule.get_next_time_slot(
pulse,
channel,
phase_barriers,
protocol,
# phase_drift_params does not impact delay between pulses
)
return next_time_slot.ti - last.tf

@seq_decorators.store
@seq_decorators.block_if_measured
def measure(self, basis: str = "ground-rydberg") -> None:
Expand Down
89 changes: 89 additions & 0 deletions tests/test_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1729,6 +1729,95 @@ def test_sequence(reg, device, patch_plt_show):
assert str(seq) == str(seq_)


@pytest.mark.parametrize("eom", [False, True])
def test_estimate_added_delay(eom):
reg = Register.square(2, 5)
seq = Sequence(reg, AnalogDevice)
pulse_0 = Pulse.ConstantPulse(100, 1, 0, 0)
pulse_pi_2 = Pulse.ConstantPulse(100, 1, 0, np.pi / 2)

with pytest.raises(
ValueError, match="Use the name of a declared channel."
):
seq.estimate_added_delay(pulse_0, "ising", "min-delay")
seq.declare_channel("ising", "rydberg_global")
ising_obj = seq.declared_channels["ising"]
if eom:
seq.enable_eom_mode("ising", 1, 0)
with pytest.warns(
UserWarning,
match="Channel ising is in EOM mode, amplitude of the pulse",
):
assert (
seq.estimate_added_delay(
Pulse.ConstantPulse(100, 2, 0, 0), "ising"
)
== 0
)
with pytest.warns(
UserWarning,
match="Channel ising is in EOM mode, detuning of the pulse",
):
assert (
seq.estimate_added_delay(
Pulse.ConstantPulse(100, 1, 1, 0), "ising"
)
== 0
)
assert seq.estimate_added_delay(pulse_0, "ising", "min-delay") == 0
seq._add(pulse_0, "ising", "min-delay")
HGSilveri marked this conversation as resolved.
Show resolved Hide resolved
first_pulse = seq._last("ising")
assert first_pulse.ti == 0
delay = pulse_0.fall_time(ising_obj, eom) + ising_obj.phase_jump_time
assert seq.estimate_added_delay(pulse_pi_2, "ising") == delay
seq._add(pulse_pi_2, "ising", "min-delay")
second_pulse = seq._last("ising")
assert second_pulse.ti - first_pulse.tf == delay
assert seq.estimate_added_delay(pulse_0, "ising") == delay
seq.delay(100, "ising")
assert seq.estimate_added_delay(pulse_0, "ising") == delay - 100
with pytest.warns(
UserWarning,
match="The sequence's duration exceeded the maximum duration",
):
seq.estimate_added_delay(
pulser.Pulse.ConstantPulse(4000, 1, 0, np.pi), "ising"
)
var = seq.declare_variable("var", dtype=int)
with pytest.raises(
ValueError, match="Can't compute the delay to add before a pulse"
):
seq.estimate_added_delay(Pulse.ConstantPulse(var, 1, 0, 0), "ising")
# We shift the phase of just one qubit, which blocks addition
# of new pulses on this basis
seq.phase_shift(1.0, 0, basis="ground-rydberg")
with pytest.raises(
ValueError,
match="Cannot do a multiple-target pulse on qubits with different",
):
seq.estimate_added_delay(pulse_0, "ising")

# With DMM
det_pulse = Pulse.ConstantPulse(100, 0, -1, 0)
seq = Sequence(Register.square(2, 5), DigitalAnalogDevice)
seq.declare_channel("ising", "rydberg_global")
seq.config_slm_mask([0, 1])
with pytest.raises(
ValueError, match="You should add a Pulse to a Global Channel"
):
seq.estimate_added_delay(det_pulse, "dmm_0")
seq.add(pulse_0, "ising")
assert seq.estimate_added_delay(det_pulse, "dmm_0") == 0
with pytest.raises(
ValueError, match="The detuning in a DMM must not be positive."
):
seq.estimate_added_delay(Pulse.ConstantPulse(100, 0, 1, 0), "dmm_0")
with pytest.raises(
ValueError, match="The pulse's amplitude goes over the maximum"
):
seq.estimate_added_delay(pulse_0, "dmm_0")
a-corni marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("qubit_ids", [["q0", "q1", "q2"], [0, 1, 2]])
def test_config_slm_mask(qubit_ids, device, det_map):
reg: Register | MappableRegister
Expand Down
Loading