Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement socket communication with LeCroy #177

Merged
merged 14 commits into from
Dec 4, 2023
182 changes: 173 additions & 9 deletions scaaml/capture/scope/lecroy/lecroy_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Based on by code kindly shared by Victor Lomné - NinjaLab
"""Communication with the LeCroy oscilloscope. Either using pyvisa or socket.
"""

from abc import ABC, abstractmethod
import hashlib
import logging
import socket
from struct import pack, unpack
from typing import Optional

import pyvisa
Expand Down Expand Up @@ -71,7 +75,9 @@ def query_binary_values(self,
message: str,
datatype="B",
container=bytearray) -> bytearray:
"""Query binary data."""
"""Query binary data. Beware that the results from socket version might
contain headers which are stripped by pyvisa.
"""

def _check_response_template(self):
""" Check if the hash of the waveform template matches the supported
Expand Down Expand Up @@ -119,7 +125,7 @@ def __init__(self, ip_address: str, timeout: float = 5.0):
def connect(self):
# For portability and ease of setup we enforce the pure Python backend
self._resource_manager = pyvisa.ResourceManager("@py")
assert self._resource_manager is not None
assert self._resource_manager

scope_resource = self._resource_manager.open_resource(
f"TCPIP::{self._ip_address}::INSTR",
Expand All @@ -128,7 +134,7 @@ def connect(self):
assert isinstance(scope_resource, pyvisa.resources.MessageBasedResource)
self._scope = scope_resource

assert self._scope is not None
assert self._scope
self._scope.timeout = self._timeout * 1_000 # Convert second to ms
self._scope.clear()

Expand All @@ -137,17 +143,17 @@ def connect(self):

@make_custom_exception
def close(self) -> None:
assert self._scope is not None
assert self._scope
self._scope.before_close()
self._scope.close()
assert self._resource_manager is not None
assert self._resource_manager
self._resource_manager.close()

@make_custom_exception
def write(self, message: str) -> None:
"""Write a message to the oscilloscope.
"""
assert self._scope is not None
assert self._scope
self._logger.debug("write(message=\"%s\")", message)
self._scope.write(message)

Expand All @@ -156,15 +162,15 @@ def query(self, message: str) -> str:
"""Query the oscilloscope (write, read, and decode the answer as a
string).
"""
assert self._scope is not None
assert self._scope
self._logger.debug("query(message=\"%s\")", message)
return self._scope.query(message).strip()

@make_custom_exception
def get_waveform(self, channel: LECROY_CHANNEL_NAME_T) -> LecroyWaveform:
"""Get a LecroyWaveform object representing a single waveform.
"""
assert self._scope is not None
assert self._scope

return self._scope.query_binary_values(
f"{channel}:WAVEFORM?",
Expand All @@ -178,10 +184,168 @@ def query_binary_values(self,
datatype="B",
container=bytearray):
"""Query binary data."""
assert self._scope is not None
assert self._scope
self._logger.debug("query_binary_values(message=\"%s\")", message)
return self._scope.query_binary_values(
message,
datatype=datatype,
container=container,
)


class LeCroyCommunicationSocket(LeCroyCommunication):
"""Use Python socket to communicate using the TCP/IP (VICP).
("Utilities > Utilities Setup > Remote" and choose TCPIP)."""

def __init__(self, ip_address: str, timeout: float = 5.0):
super().__init__(
ip_address=ip_address,
timeout=timeout,
)
# Header format (see section "VICP Headers"):
# operation: byte
# header_version: byte
# sequence_number: byte
# spare: byte = 0 (reserved for future)
# block_length: long = length of the command (block to be sent)
self._lecroy_command_header = ">4BL"
self._socket: Optional[socket.socket] = None

@make_custom_exception
def connect(self):
assert self._socket is None

self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self._timeout)

# establish connection
self._socket.connect((self._ip_address, 1861))

# Check if the response template is what LecroyWaveform expects
self._check_response_template()

@make_custom_exception
def close(self) -> None:
assert self._socket
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
self._socket = None

@make_custom_exception
def write(self, message: str) -> None:
"""Write a message to the oscilloscope.
"""
assert self._socket
self._socket.send(self._format_command(message))

@make_custom_exception
def query(self, message: str) -> str:
"""Query the oscilloscope (write, read, and decode the answer as a
string).
"""
return self.query_binary_values(message).decode()

@make_custom_exception
def get_waveform(self, channel: LECROY_CHANNEL_NAME_T) -> LecroyWaveform:
"""Get a LecroyWaveform object representing a single waveform.

Args:
channel (LECROY_CHANNEL_NAME_T): The name of queried channel.
"""
raw_data = self.query_binary_values(f"{channel}:WAVEFORM?")
assert raw_data[:6] == b"ALL,#9" # followed by 9 digits for size
len_raw_data = int(raw_data[6:15]) # length without the header
raw_data = raw_data[15:]
jmichelp marked this conversation as resolved.
Show resolved Hide resolved
if len_raw_data + 1 == len(raw_data):
raw_data = raw_data[:-1] # last is linefeed
assert len(raw_data) == len_raw_data
return LecroyWaveform(raw_data)

@make_custom_exception
def query_binary_values(self,
message: str,
datatype="B",
container=None) -> bytes:
"""Query binary data.

Args:
message (str): Query message.
datatype (str): Ignored.
container: A bytearray is always used.

Returns: a bytes representation of the response.
"""
assert self._socket

del datatype # ignored
del container # ignored

self._logger.debug("\"%s\"", message)

# Send message
self.write(message)

# Receive and decode answer
return self._get_raw_response()

def _format_command(self, command: str) -> bytes:
"""Method formatting leCroy command.

Args:
command (str): The command to be formatted for sending over a
socket.

Returns: bytes representation to be directly sent over a socket.
"""
# Compute header for the current command, header:
# operation = DATA | EOI
command_header = pack(self._lecroy_command_header, 129, 1, 1, 0,
len(command))

formatted_command = command_header + command.encode("ascii")
return formatted_command

def _get_raw_response(self) -> bytes:
"""Get raw response from the socket.

Returns: bytes representation of the response.
"""
assert self._socket
response = bytearray()

while True:
header = bytearray()

# Loop until we get a full header (8 bytes)
while len(header) < 8:
header.extend(self._socket.recv(8 - len(header)))

# Parse formated response
(
operation,
header_version, # unused
sequence_number, # unused
spare, # unused
total_bytes) = unpack(self._lecroy_command_header, header)

# Delete unused values
del header_version
del sequence_number
del spare

# Buffer for the current portion of data
buffer = bytearray()

# Loop until we get all data
while len(buffer) < total_bytes:
buffer.extend(
self._socket.recv(min(total_bytes - len(buffer), 8_192)))

# Accumulate final response
response.extend(buffer)

# Leave the loop when the EOI bit is set
if operation & 1:
break

return bytes(response)