Skip to content

Commit

Permalink
Lecroy configurability (#182)
Browse files Browse the repository at this point in the history
* Allow arbitrary configuration for the LeCroy scope
  • Loading branch information
wsxrdv authored Dec 8, 2023
1 parent 472159a commit 4b985ad
Show file tree
Hide file tree
Showing 3 changed files with 386 additions and 17 deletions.
172 changes: 156 additions & 16 deletions scaaml/capture/scope/lecroy/lecroy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
"""Context manager for the scope."""

import base64
from copy import deepcopy
from pathlib import Path
import time
from typing import Dict, Optional
from typing import Any, Dict, List, Optional, Tuple
import xml.etree.ElementTree as ET

from chipwhisperer.common.utils import util
Expand All @@ -25,19 +26,28 @@
from scaaml.capture.scope.scope_base import AbstractSScope
from scaaml.capture.scope.lecroy.lecroy_communication import LeCroyCommunicationError
from scaaml.capture.scope.lecroy.lecroy_communication import LeCroyCommunication
from scaaml.capture.scope.lecroy.lecroy_communication import LeCroyCommunicationSocket
from scaaml.capture.scope.lecroy.lecroy_communication import LeCroyCommunicationVisa
from scaaml.capture.scope.lecroy.types import LECROY_CAPTURE_AREA, LECROY_CHANNEL_NAME_T
from scaaml.capture.scope.lecroy.types import LECROY_CAPTURE_AREA, LECROY_CHANNEL_NAME_T, LECROY_COMMUNICATION_CLASS_NAME
from scaaml.capture.scope.scope_template import ScopeTemplate
from scaaml.io import Dataset


class LeCroy(AbstractSScope):
"""Scope context manager."""

def __init__(self, samples: int, offset: int, ip_address: str,
def __init__(self,
samples: int,
offset: int,
ip_address: str,
trace_channel: LECROY_CHANNEL_NAME_T,
trigger_channel: LECROY_CHANNEL_NAME_T,
communication_timeout: float, trigger_timeout: float, **_):
communication_timeout: float,
trigger_timeout: float,
scope_setup_commands: List[Dict[str, Any]],
communication_class_name:
LECROY_COMMUNICATION_CLASS_NAME = "LeCroyCommunicationVisa",
**_) -> None:
"""Create scope context.
Args:
Expand All @@ -50,6 +60,45 @@ def __init__(self, samples: int, offset: int, ip_address: str,
`communication_timeout` seconds.
trigger_timeout (float): Number of seconds before the trigger times
out (in seconds).
scope_setup_commands (List[Dict[str, Any]]): List of commands used
to set up the scope. There are three possible actions taken in
the order (command, method, query):
- { "command": "command string" } The "command string" is sent to
the scope after being formatted with
`trace_channel=trace_channel, trigger_channel=trigger_channel`.
- { "method": "method_name", "kwargs": {} } The method is called
with given kwargs. The supported methods are: `set_trig_delay`.
This allows to do settings which are dependent on other settings
(e.g., setting TRIG_DELAY which depends on TIME_DIV) without
having to change two parameters.
- { "query": "query string" } The "query string" is sent to
the scope after being formatted with
`trace_channel=trace_channel, trigger_channel=trigger_channel`.
A dictionary of {"query string": "answer"} for all of these can
be obtained using `get_scope_answers`.
For convenience the following commands are prepended:
{ "command": "COMM_HEADER OFF", },
{ # Use full precision of measurements
"command": "COMM_FORMAT DEF9,WORD,BIN",
"query": "COMM_FORMAT?",
},
{ "command": "TRMD SINGLE", }, # Trigger mode
{ "command": "AUTO_CALIBRATE OFF", },
{ "command": "OFFSET 0", }, # Center the trace vertically
and the following is appended:
{"command": "STOP"} # Stop any signal acquisition
For a description of possible commands and queries see
https://cdn.teledynelecroy.com/files/manuals/maui-remote-control-and-automation-manual.pdf
communication_class_name (LECROY_COMMUNICATION_CLASS_NAME): Which
class to use for communication with the scope. Defaults to
LeCroyCommunicationVisa, the other possibility is
LeCroyCommunicationSocket.
_: LeCroy is expected to be initialized using capture_info
dictionary, this parameter allows to have additional information
there and initialize as LeCroy(**capture_info).
Expand All @@ -61,6 +110,9 @@ def __init__(self, samples: int, offset: int, ip_address: str,
self._trigger_channel = trigger_channel
self._communication_timeout = communication_timeout
self._trigger_timeout = trigger_timeout
self._communication_class_name = communication_class_name

self._scope_setup_commands = deepcopy(scope_setup_commands)

# Check that the trace_channel is analog
if not self._trace_channel.startswith("C"):
Expand All @@ -85,6 +137,8 @@ def __enter__(self):
trigger_channel=self._trigger_channel,
communication_timeout=self._communication_timeout,
trigger_timeout=self._trigger_timeout,
communication_class_name=self._communication_class_name,
scope_setup_commands=self._scope_setup_commands,
)
assert self._scope is not None
self._scope.con()
Expand Down Expand Up @@ -115,6 +169,7 @@ def post_init(self, dataset: Dataset) -> None:

# Update capture info with oscilloscope details.
dataset.capture_info.update(self._scope.get_identity_info())
dataset.capture_info["scope_answers"] = self._scope.get_scope_answers()
dataset.write_config()

def print_screen(
Expand All @@ -140,10 +195,13 @@ def print_screen(
class LeCroyScope(ScopeTemplate):
"""Scope."""

def __init__(self, samples: int, offset: int, ip_address: str,
trace_channel: LECROY_CHANNEL_NAME_T,
trigger_channel: LECROY_CHANNEL_NAME_T,
communication_timeout: float, trigger_timeout: float):
def __init__(
self, samples: int, offset: int, ip_address: str,
trace_channel: LECROY_CHANNEL_NAME_T,
trigger_channel: LECROY_CHANNEL_NAME_T,
communication_timeout: float, trigger_timeout: float,
scope_setup_commands: List[Dict[str, Any]],
communication_class_name: LECROY_COMMUNICATION_CLASS_NAME) -> None:
"""Create scope context.
Args:
Expand All @@ -156,6 +214,10 @@ def __init__(self, samples: int, offset: int, ip_address: str,
`communication_timeout` seconds.
trigger_timeout (float): Number of seconds before the trigger times
out (in seconds).
scope_setup_commands (List[Dict[str, Any]]): See docstring of
`LeCroy`.
communication_class_name (LECROY_COMMUNICATION_CLASS_NAME): Which
class to use for communication with the scope.
"""
self._samples = samples
self._offset = offset
Expand All @@ -164,32 +226,56 @@ def __init__(self, samples: int, offset: int, ip_address: str,
self._trigger_channel = trigger_channel
self._communication_timeout = communication_timeout
self._trigger_timeout = trigger_timeout
self._communication_class_name = communication_class_name

# Trace and trigger
self._last_trace = None

# Scope object
self._scope_communication: Optional[LeCroyCommunication] = None

# Desired settings of the scope
# Wrap default commands around the custom ones
commands = [
{ "command": "COMM_HEADER OFF", },
{ # Use full precision of measurements
"command": "COMM_FORMAT DEF9,WORD,BIN",
"query": "COMM_FORMAT?",
},
{ "command": "TRMD SINGLE", }, # Trigger mode
{ "command": "AUTO_CALIBRATE OFF", },
{ "command": "OFFSET 0", }, # Center the trace vertically
]
commands.extend(deepcopy(scope_setup_commands)) # Custom commands
commands.append({"command": "STOP"}) # Stop any signal acquisition
self._scope_setup_commands: Tuple[Dict[str, Any], ...] = tuple(commands)

# Actual settings of the scope
self._scope_answers: Dict[str, str] = {}

def con(self, sn=None) -> bool:
"""Set the scope for capture."""
communication_cls = {
"LeCroyCommunicationVisa": LeCroyCommunicationVisa,
"LeCroyCommunicationSocket": LeCroyCommunicationSocket,
}[self._communication_class_name]
# Connect to the oscilloscope.
self._scope_communication = LeCroyCommunicationVisa(
self._scope_communication = communication_cls(
ip_address=self._ip_address,
timeout=self._communication_timeout,
)

assert self._scope_communication is not None

# Scope settings
# Connect to the physical scope
self._scope_communication.connect()
self._scope_communication.write("COMM_HEADER OFF")
# Use full precision of measurements
self._scope_communication.write("COMM_FORMAT DEF9,WORD,BIN")
self._scope_communication.write("TRMD SINGLE")

self._scope_communication.write("STOP")
return True # Success
# Run all setup commands
for command in self._scope_setup_commands:
self._run_command(command)

# Success
return True

def dis(self) -> bool:
"""Disconnect from the scope."""
Expand Down Expand Up @@ -329,6 +415,60 @@ def get_identity_info(self) -> Dict[str, str]:
"lecroy_firmware_level": firmware_level,
}

def get_scope_answers(self) -> Dict[str, str]:
"""Return actual scope settings. When adding a scope setting the closes
valid value is used or the setting is ignored. These are results of
"query" in `scope_setup_commands` (see the init of `LeCroy`).
"""
return deepcopy(self._scope_answers)

def set_trig_delay(self, divs_left: float = -4.0) -> None:
"""Position the trigger point `divs_left` divisions left from the
center of the screen. Defaults to showing one division before (10% of
trace is pre-trigger).
divs_left (float): How much to move the trigger point (in divisions to
the left). Defaults to -4, which is 10% of trace is before the
trigger.
"""
assert self._scope_communication

timebase = float(self._scope_communication.query("TIME_DIV?").strip())
trig_position = divs_left * timebase
self._scope_communication.write(f"TRIG_DELAY {trig_position}")

def _run_command(self, setup_command: Dict[str, Any]) -> None:
"""For description see init of `LeCroy`.
"""
assert self._scope_communication

# Wildcards to fill in for commands
wildcards = {
"trace_channel": self._trace_channel,
"trigger_channel": self._trigger_channel,
}

# First run a command
if "command" in setup_command:
command_string = setup_command["command"]
# Fill in wildcards
command_string = command_string.format(**wildcards)
# Run the command
self._scope_communication.write(command_string)

# Run a method if there is one
if "method" in setup_command:
method_name = setup_command["method"]
kwargs = setup_command.get("kwargs", {})
if method_name == "set_trig_delay":
self.set_trig_delay(**kwargs)

# Update the actual setting
if "query" in setup_command:
query = setup_command["query"]
query = query.format(**wildcards)
self._scope_answers[query] = self._scope_communication.query(query)

def retrieve_file(self, source_file_path: str,
destination_file_path: Path) -> int:
r"""Transfer a file and return how many bytes were transfered.
Expand Down
3 changes: 3 additions & 0 deletions scaaml/capture/scope/lecroy/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from typing import Literal
from typing_extensions import TypeAlias

LECROY_COMMUNICATION_CLASS_NAME: TypeAlias = Literal[
"LeCroyCommunicationVisa", "LeCroyCommunicationSocket"]

LECROY_CAPTURE_AREA: TypeAlias = Literal["FULLSCREEN", "GRIDAREAONLY",
"DSOWINDOW"]

Expand Down
Loading

0 comments on commit 4b985ad

Please sign in to comment.