Skip to content

Commit

Permalink
Add support for QHub
Browse files Browse the repository at this point in the history
  • Loading branch information
YakBizzarro committed Oct 9, 2024
1 parent 6219fe2 commit 13f716b
Show file tree
Hide file tree
Showing 8 changed files with 5,318 additions and 920 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# zhinst-toolkit Changelog

## Version 0.7.0
* Add QHub driver

## Version 0.6.4
* Function `wait_for_state_change` now works with zero timeout; the node value is always checked at least once.
* Improved the stability of device interface auto-detection when it is not supplied while connecting to a device.
Expand Down
7 changes: 6 additions & 1 deletion src/zhinst/toolkit/driver/devices/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
from zhinst.toolkit.driver.devices.base import BaseInstrument
from zhinst.toolkit.driver.devices.hdawg import HDAWG
from zhinst.toolkit.driver.devices.pqsc import PQSC
from zhinst.toolkit.driver.devices.qhub import QHub
from zhinst.toolkit.driver.devices.shfqa import SHFQA
from zhinst.toolkit.driver.devices.shfsg import SHFSG
from zhinst.toolkit.driver.devices.uhfli import UHFLI
from zhinst.toolkit.driver.devices.uhfqa import UHFQA

from zhinst.toolkit.driver.devices.shfqc import SHFQC # isort:skip

DeviceType = t.Union[BaseInstrument, HDAWG, PQSC, SHFQA, SHFSG, UHFLI, UHFQA, SHFQC]
DeviceType = t.Union[
BaseInstrument, HDAWG, PQSC, QHub, SHFQA, SHFSG, UHFLI, UHFQA, SHFQC
]

DEVICE_CLASS_BY_MODEL = {
"SHFQC": SHFQC,
Expand All @@ -22,6 +25,7 @@
"HDAWG4": HDAWG,
"HDAWG8": HDAWG,
"PQSC": PQSC,
"QHUB": QHub,
"UHFQA": UHFQA,
"UHFLI": UHFLI,
"UHFAWG": UHFLI,
Expand All @@ -33,6 +37,7 @@
"BaseInstrument",
"HDAWG",
"PQSC",
"QHub",
"SHFQA",
"SHFSG",
"UHFLI",
Expand Down
273 changes: 2 additions & 271 deletions src/zhinst/toolkit/driver/devices/pqsc.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
"""PQSC Instrument Driver."""
from zhinst.toolkit.driver.devices.quantum_system_hub import QuantumSystemHub

import logging
import time
from typing import List, Union

from zhinst.toolkit.driver.devices.base import BaseInstrument
from zhinst.toolkit.exceptions import ToolkitError


logger = logging.getLogger(__name__)


class PQSC(BaseInstrument):
class PQSC(QuantumSystemHub):
"""High-level driver for the Zurich Instruments PQSC."""

def arm(self, *, deep=True, repetitions: int = None, holdoff: float = None) -> None:
Expand Down Expand Up @@ -48,262 +38,3 @@ def arm(self, *, deep=True, repetitions: int = None, holdoff: float = None) -> N
self.execution.holdoff(holdoff)
# Clear register bank
self.feedback.registerbank.reset(1, deep=deep)

def run(self, *, deep: bool = True) -> None:
"""Start sending out triggers.
This method activates the trigger generation to trigger all
connected instruments over ZSync ports.
Args:
deep: A flag that specifies if a synchronization
should be performed between the device and the data
server after enabling the PQSC (default: True).
"""
self.execution.enable(True, deep=deep)

def arm_and_run(self, *, repetitions: int = None, holdoff: float = None) -> None:
"""Arm the PQSC and start sending out triggers.
Simply combines the methods arm and run. A synchronization
is performed between the device and the data server after
arming and running the PQSC.
Args:
repetitions: If specified, the number of triggers sent
over ZSync ports will be set (default: None).
holdoff: If specified, the time between repeated
triggers sent over ZSync ports will be set. It has a
minimum value and a granularity of 100 ns
(default: None).
"""
self.arm(deep=True, repetitions=repetitions, holdoff=holdoff)
self.run(deep=True)

def stop(self, *, deep: bool = True) -> None:
"""Stop the trigger generation.
Args:
deep: A flag that specifies if a synchronization
should be performed between the device and the data
server after disabling the PQSC (default: True).
"""
self.execution.enable(False, deep=deep)

def wait_done(self, *, timeout: float = 10.0, sleep_time: float = 0.005) -> None:
"""Wait until trigger generation and feedback processing is done.
Args:
timeout: The maximum waiting time in seconds for the
PQSC (default: 10.0).
sleep_time: Time in seconds to wait between
requesting PQSC state
Raises:
TimeoutError: If the PQSC is not done sending out all
triggers and processing feedback before the timeout.
"""
try:
self.execution.enable.wait_for_state_change(
0, timeout=timeout, sleep_time=sleep_time
)
except TimeoutError as error:
raise TimeoutError("PQSC timed out.") from error

def check_ref_clock(
self, *, timeout: float = 30.0, sleep_time: float = 1.0
) -> bool:
"""Check if reference clock is locked successfully.
Args:
timeout: Maximum time in seconds the program waits
(default: 30.0).
sleep_time: Time in seconds to wait between
requesting the reference clock status (default: 1)
Raises:
TimeoutError: If the process of locking to the reference clock
exceeds the specified timeout.
"""
ref_clock_status = self.system.clocks.referenceclock.in_.status
ref_clock = self.system.clocks.referenceclock.in_.source
ref_clock_actual = self.system.clocks.referenceclock.in_.sourceactual
try:
ref_clock_status.wait_for_state_change(
2, invert=True, timeout=timeout, sleep_time=sleep_time
)
except TimeoutError as error:
raise TimeoutError(
"Timeout during locking to reference clock signal"
) from error
if ref_clock_status() == 0:
return True
if ref_clock_status() == 1 and ref_clock_actual() != ref_clock():
ref_clock("internal", deep=True)
logger.error(
f"There was an error locking the device({self.serial}) "
f"onto reference clock signal. Automatically switching to internal "
f"reference clock. Please try again."
)
return False

def check_zsync_connection(
self,
inputs: Union[List[int], int, List[BaseInstrument], BaseInstrument],
*,
timeout: float = 10.0,
sleep_time: float = 0.1,
) -> Union[List[bool], bool]:
"""Check if a ZSync connection is established.
Checks the current status of the instrument connected to the given ports.
If a instrument(s) is given instead of a port number, first finds the correct
port number(s).
Args:
inputs: The port numbers to check the ZSync connection for.
It can either be a single port number given as integer, a list
of several port numbers an instrument or a list of instruments.
timeout: Maximum time in seconds the program waits (default: 10.0).
sleep_time: Time in seconds to wait between requesting the reference
clock status (default: 0.1)
.. versionchanged:: 0.6.1: Reduce default timeout and sleep_time.
.. versionchanged:: 0.6.1: Raise an error if the port is in a faulty
state, instead of return False.
Raises:
TimeoutError: If the process of establishing a ZSync connection on
one of the specified ports exceeds the specified timeout.
"""
inputs_list = inputs if isinstance(inputs, list) else [inputs]

start_time = time.time()

# Check the status of all ports
status = []
for input in inputs_list:
# Convert the instrument into a port, if needed
if isinstance(input, BaseInstrument):
port = self.find_zsync_worker_port(
input,
timeout=max(0, timeout - (time.time() - start_time)),
sleep_time=sleep_time,
)
else:
port = input

# Check or wait until the connection is ready
status.append(
self._check_zsync_connection(
port,
timeout=max(0, timeout - (time.time() - start_time)),
sleep_time=sleep_time,
)
)
return status if isinstance(inputs, list) else status[0]

def _check_zsync_connection(
self, port: int, timeout: float, sleep_time: float
) -> bool:
"""Check if the ZSync connection on the given port is successful.
This function checks the current status of the instrument
connected to the given port.
Args:
ports: Port number to check the ZSync connection for.
timeout: Maximum time in seconds the program waits.
sleep_time: Time in seconds to wait between requesting the status
Raises:
TimeoutError: If the process of establishing a ZSync connection the
specified port exceeds the specified timeout.
"""
status_node = self.zsyncs[port].connection.status
try:
# Waits until the status node is "connected" (2)
status_node.wait_for_state_change(2, timeout=timeout, sleep_time=sleep_time)
except TimeoutError as error:
status = status_node()
err_msg = (
"Timeout while establishing ZSync connection to the instrument "
f"on the port {port}."
)

if status == 0:
# No connection
err_msg += "No instrument detected."
elif status == 1:
# In progress
err_msg += (
"Connection still in progress. Consider increasing the timeout."
)
elif status == 3:
# Error
err_msg += (
"Impossible to establish a connect. Check cabling and FW version"
)

raise TimeoutError(err_msg) from error
return True

def find_zsync_worker_port(
self,
device: BaseInstrument,
timeout: float = 10.0,
sleep_time: float = 0.1,
) -> int:
"""Find the ID of the PQSC ZSync port connected to a given device.
The function checks until the given timeout for the specified device to
show up in the connection list.
Args:
device: device for which the connected ZSync port shall be found.
timeout: Maximum time in seconds the program waits (default: 10.0).
sleep_time: Time in seconds to wait between requesting the port
serials list (default: 0.1)
.. versionchanged:: 0.6.1: Added timeout and sleep_time parameters.
Returns:
Index of the searched PQSC ZSync port.
Raises:
ToolkitError: If the given device doesn't appear to be connected
to the PQSC via ZSync.
.. versionadded:: 0.5.1
"""
device_serial = device.serial[3:]

start = time.time()
while time.time() - start < timeout:
node_to_serial_dict = self.zsyncs["*"].connection.serial()

if device_serial in node_to_serial_dict.values():
break

time.sleep(sleep_time)
else:
raise ToolkitError(
"No ZSync connection found between the PQSC "
f"{self.serial} and the device {device.serial}."
)

# Get the node of the ZSync connected to the device
# (will have the form "/devXXXX/zsyncs/N/connection/serial")
serial_to_node_dict = {
serial: node for node, serial in node_to_serial_dict.items()
}
device_zsync_node = serial_to_node_dict[device_serial]

# Just interested in knowing N: split in
# ['', 'devXXXX', 'zsyncs', 'N', 'connection', 'serial']
# and take fourth value
return int(device_zsync_node.split("/")[3])
35 changes: 35 additions & 0 deletions src/zhinst/toolkit/driver/devices/qhub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from zhinst.toolkit.driver.devices.quantum_system_hub import QuantumSystemHub


class QHub(QuantumSystemHub):
"""High-level driver for the Zurich Instruments QHub."""

def arm(self, *, deep=True, repetitions: int = None, holdoff: float = None) -> None:
"""Prepare QHub for triggering the instruments.
This method configures the execution engine of QHub.
Optionally, the *number of triggers*
and *hold-off time* can be set when specified as keyword
arguments. If they are not specified, they are not changed.
Note that the QHub is disabled at the end of the hold-off time
after sending out the last trigger.
Args:
deep: A flag that specifies if a synchronization
should be performed between the device and the data
server after stopping QHub (default: True).
repetitions: If specified, the number of triggers sent
over ZSync ports will be set (default: None).
holdoff: If specified, the time between repeated
triggers sent over ZSync ports will be set. It has a
minimum value and a granularity of 100 ns
(default: None).
"""
# Stop QHub if it is already running
self.stop(deep=deep)
if repetitions is not None:
self.execution.repetitions(repetitions)
if holdoff is not None:
self.execution.holdoff(holdoff)
Loading

0 comments on commit 13f716b

Please sign in to comment.