Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Add check_ref_clock-functions for SHF*, and HDAWG #281

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# zhinst-toolkit Changelog

## Version 0.7.0
* Added `check_ref_clock`-function for HDAWG, and SHF* devices

## Version 0.6.4
* Function `wait_for_state_change` now works with zero timeout; the node value is always checked at least once.
* Improved the stability of device interface auto-detection when it is not supplied while connecting to a device.
Expand Down
2 changes: 2 additions & 0 deletions examples/repeat_until_success.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ pqsc.check_ref_clock()
# SHFSG and SHFQA ZSync clock
shfsg.system.clocks.referenceclock.in_.source("zsync")
shfqa.system.clocks.referenceclock.in_.source("zsync")
shfsg.check_ref_clock()
shfqa.check_ref_clock()

# Verify if the ZSync connection is successful
pqsc.check_zsync_connection([shfqa, shfsg])
Comment on lines +94 to 98
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
shfsg.check_ref_clock()
shfqa.check_ref_clock()
# Verify if the ZSync connection is successful
pqsc.check_zsync_connection([shfqa, shfsg])
# Verify if the ZSync connection is successful
shfsg.check_ref_clock()
shfqa.check_ref_clock()
pqsc.check_zsync_connection([shfqa, shfsg])

Expand Down
41 changes: 41 additions & 0 deletions src/zhinst/toolkit/driver/devices/hdawg.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""HDAWG Instrument Driver."""

import logging
import typing as t

from zhinst.toolkit.driver.devices.base import BaseInstrument
Expand All @@ -10,6 +12,8 @@
from zhinst.toolkit.nodetree.node import NodeList
from zhinst.toolkit.exceptions import ToolkitError

logger = logging.getLogger(__name__)


class HDAWG(BaseInstrument):
"""High-level driver for the Zurich Instruments HDAWG."""
Expand Down Expand Up @@ -89,3 +93,40 @@ def awgs(self) -> t.Sequence[AWG]:
self._root,
self._tree + ("awgs",),
)

def check_ref_clock(
self, *, timeout: float = 30.0, sleep_time: float = 1.0
) -> bool:
"""Check if reference clock is locked successfully.

Args:
timeout: Maximum time in seconds the program waits
(default: 30.0).
sleep_time: Time in seconds to wait between
requesting the reference clock status (default: 1)

Raises:
TimeoutError: If the process of locking to the reference clock
exceeds the specified timeout.
"""
ref_clock_status = self.system.clocks.referenceclock.status
ref_clock = self.system.clocks.referenceclock.source
ref_clock_actual = self.system.clocks.referenceclock.sourceactual
try:
ref_clock_status.wait_for_state_change(
2, invert=True, timeout=timeout, sleep_time=sleep_time
)
except TimeoutError as error:
raise TimeoutError(
"Timeout during locking to reference clock signal"
) from error
if ref_clock_status() == 0:
return True
if ref_clock_status() == 1 and ref_clock_actual() != ref_clock():
ref_clock("internal", deep=True)
logger.error(
f"There was an error locking the device({self.serial}) "
f"onto reference clock signal. Automatically switching to internal "
f"reference clock. Please try again."
)
return False
Comment on lines +125 to +132
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Wouldn't it make more sense to return code and raise an error if its not or an error occurred?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Andrea and I agree that raising an error probably makes sense.

We don't understand what you mean by "return code" however.

In any case this was taken over from an existing pqsc function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to add a proposal. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this. #281 (comment)

... The "return code" was just a mixup in my head ... I meant return None and rais an error

48 changes: 48 additions & 0 deletions src/zhinst/toolkit/driver/devices/shf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""SHF* Instrument Driver."""

import logging

from zhinst.toolkit.driver.devices.base import BaseInstrument

logger = logging.getLogger(__name__)


class SHF(BaseInstrument):
"""Class for SHF*-common functionality."""

def check_ref_clock(
self, *, timeout: float = 30.0, sleep_time: float = 1.0
) -> bool:
"""Check if reference clock is locked successfully.

Args:
timeout: Maximum time in seconds the program waits
(default: 30.0).
sleep_time: Time in seconds to wait between
requesting the reference clock status (default: 1)

Raises:
TimeoutError: If the process of locking to the reference clock
exceeds the specified timeout.
"""
ref_clock_status = self.system.clocks.referenceclock.in_.status
ref_clock = self.system.clocks.referenceclock.in_.source
ref_clock_actual = self.system.clocks.referenceclock.in_.sourceactual
try:
ref_clock_status.wait_for_state_change(
2, invert=True, timeout=timeout, sleep_time=sleep_time
)
except TimeoutError as error:
raise TimeoutError(
"Timeout during locking to reference clock signal"
) from error
if ref_clock_status() == 0:
return True
if ref_clock_status() == 1 and ref_clock_actual() != ref_clock():
ref_clock("internal", deep=True)
logger.error(
f"There was an error locking the device({self.serial}) "
f"onto reference clock signal. Automatically switching to internal "
f"reference clock. Please try again."
)
return False
Comment on lines +13 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def check_ref_clock(
self, *, timeout: float = 30.0, sleep_time: float = 1.0
) -> bool:
"""Check if reference clock is locked successfully.
Args:
timeout: Maximum time in seconds the program waits
(default: 30.0).
sleep_time: Time in seconds to wait between
requesting the reference clock status (default: 1)
Raises:
TimeoutError: If the process of locking to the reference clock
exceeds the specified timeout.
"""
ref_clock_status = self.system.clocks.referenceclock.in_.status
ref_clock = self.system.clocks.referenceclock.in_.source
ref_clock_actual = self.system.clocks.referenceclock.in_.sourceactual
try:
ref_clock_status.wait_for_state_change(
2, invert=True, timeout=timeout, sleep_time=sleep_time
)
except TimeoutError as error:
raise TimeoutError(
"Timeout during locking to reference clock signal"
) from error
if ref_clock_status() == 0:
return True
if ref_clock_status() == 1 and ref_clock_actual() != ref_clock():
ref_clock("internal", deep=True)
logger.error(
f"There was an error locking the device({self.serial}) "
f"onto reference clock signal. Automatically switching to internal "
f"reference clock. Please try again."
)
return False
def check_ref_clock(
self, *, timeout: float = 30.0, sleep_time: float = 1.0
) -> None:
"""Check if reference clock is locked successfully.
Args:
timeout: Maximum time in seconds the program waits
(default: 30.0).
sleep_time: Time in seconds to wait between
requesting the reference clock status (default: 1)
Raises:
ToolkitError: It the reference clock is not locked successfully or
there was an error.
TimeoutError: If the process of locking to the reference clock
exceeds the specified timeout.
"""
ref_clock_status = self.system.clocks.referenceclock.in_.status
ref_clock = self.system.clocks.referenceclock.in_.source
ref_clock_actual = self.system.clocks.referenceclock.in_.sourceactual
try:
ref_clock_status.wait_for_state_change(
2, invert=True, timeout=timeout, sleep_time=sleep_time
)
except TimeoutError as error:
raise TimeoutError(
"Timeout during locking to reference clock signal"
) from error
if ref_clock_status() == 0:
return
if ref_clock_status() == 1 and ref_clock_actual() != ref_clock():
ref_clock("internal", deep=True)
msg = (
f"There was an error locking the device({self.serial}) "
f"onto reference clock signal. Automatically switching to internal "
f"reference clock. Please try again."
)
raise ToolkitError(msg)
msg = "Reference clock is not locked sucessfully."
raise ToolkitError(msg)

4 changes: 2 additions & 2 deletions src/zhinst/toolkit/driver/devices/shfqa.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import zhinst.utils.shfqa as utils

from zhinst.toolkit.driver.devices.base import BaseInstrument
from zhinst.toolkit.driver.devices.shf import SHF
from zhinst.toolkit.driver.nodes.awg import AWG
from zhinst.toolkit.driver.nodes.readout import Readout
from zhinst.toolkit.driver.nodes.shfqa_scope import SHFScope
Expand Down Expand Up @@ -229,7 +229,7 @@ def spectroscopy(self) -> Spectroscopy:
)


class SHFQA(BaseInstrument):
class SHFQA(SHF):
"""High-level driver for the Zurich Instruments SHFQA."""

@not_callable_in_transactions
Expand Down
4 changes: 2 additions & 2 deletions src/zhinst/toolkit/driver/devices/shfsg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import zhinst.utils.shfsg as utils

from zhinst.toolkit.driver.devices.base import BaseInstrument
from zhinst.toolkit.driver.devices.shf import SHF
from zhinst.toolkit.driver.nodes.awg import AWG
from zhinst.toolkit.nodetree import Node
from zhinst.toolkit.nodetree.helper import lazy_property, not_callable_in_transactions
Expand Down Expand Up @@ -235,7 +235,7 @@ def awg(self) -> AWGCore:
)


class SHFSG(BaseInstrument):
class SHFSG(SHF):
"""High-level driver for the Zurich Instruments SHFSG."""

@lazy_property
Expand Down
5 changes: 5 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright (C) 2024 Zurich Instruments
#
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE file for details.
"""Pytest tests directory."""
5 changes: 5 additions & 0 deletions tests/test_shfqa.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from zhinst.toolkit import SHFQAChannelMode
from zhinst.toolkit.driver.devices.shfqa import Generator, QAChannel, Readout, SHFScope
from tests.utils import shf_test_ref_clock


def test_repr(shfqa):
Expand Down Expand Up @@ -81,3 +82,7 @@ def test_qa_readout(shfqa):
"0",
"readout",
)


def test_ref_clock(mock_connection, shfqa):
shf_test_ref_clock(mock_connection, shfqa)
5 changes: 5 additions & 0 deletions tests/test_shfqc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from zhinst.toolkit import SHFQAChannelMode
from zhinst.toolkit.driver.devices.shfqa import Generator, QAChannel, Readout, SHFScope
from zhinst.toolkit.driver.devices.shfsg import AWG, Node, SGChannel
from tests.utils import shf_test_ref_clock


def test_repr(shfqc):
Expand Down Expand Up @@ -250,3 +251,7 @@ def test_configure_sine_generation(mock_connection, shfqc):
gains=(3.0, -1.0, 5.0, 1.0),
sine_generator_index=8,
)


def test_ref_clock(mock_connection, shfqc):
shf_test_ref_clock(mock_connection, shfqc)
5 changes: 5 additions & 0 deletions tests/test_shfsg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from zhinst.toolkit.driver.devices.shfsg import AWG, Node, SGChannel
from tests.utils import shf_test_ref_clock


def test_repr(shfsg):
Expand Down Expand Up @@ -171,3 +172,7 @@ def test_configure_sine_generation(mock_connection, shfsg):
gains=(3.0, -1.0, 5.0, 1.0),
sine_generator_index=8,
)


def test_ref_clock(mock_connection, shfsg):
shf_test_ref_clock(mock_connection, shfsg)
46 changes: 46 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from itertools import cycle

import pytest


def shf_test_ref_clock(mock_connection, shf):
"""Test reference clock logic shared between all SHF devices."""
status = cycle([0])
source = 0
source_actual = 0

def getInt_side_effect(path):
if path == "/dev1234/system/clocks/referenceclock/in/status":
return next(status)
if path == "/dev1234/system/clocks/referenceclock/in/source":
return source
if path == "/dev1234/system/clocks/referenceclock/in/sourceactual":
return source_actual
raise RuntimeError("Invalid Node")

def get_side_effect(path, **kwargs):
value = getInt_side_effect(path)
return {path: {"timestamp": [0], "value": [value]}}

mock_connection.return_value.getInt.side_effect = getInt_side_effect
mock_connection.return_value.get.side_effect = get_side_effect

assert shf.check_ref_clock(sleep_time=0.001)
# Locked within time
status = iter([2] * 2 + [0] * 10)
assert shf.check_ref_clock(sleep_time=0.001)
# Locking error but actual_clock == clock
status = cycle([1])
assert not shf.check_ref_clock(sleep_time=0.001)
# Locking error and actual_clock != clock => reset clock to internal
source = 1
mock_connection.return_value.syncSetString.assert_not_called()
assert not shf.check_ref_clock(sleep_time=0.001)
mock_connection.return_value.syncSetString.assert_called_with(
"/dev1234/system/clocks/referenceclock/in/source", "internal"
)

# timeout
status = cycle([2])
with pytest.raises(TimeoutError) as e_info:
shf.check_ref_clock(timeout=0.01, sleep_time=0.001)
Loading