Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## [Unreleased]
### Added
- Parsing of Dictionary tables
- Add support for tables

## [7.5.2] - 2025-11-24
### Added
Expand Down
7 changes: 7 additions & 0 deletions docs/api/base_classes/table.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
======
Tables
======

.. automodule:: ingenialink.table
:members:
:member-order: groupwise
21 changes: 21 additions & 0 deletions ingenialink/servo.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
ILValueError,
)
from ingenialink.register import Register
from ingenialink.table import Table
from ingenialink.utils._utils import convert_bytes_to_dtype, convert_dtype_to_bytes
from ingenialink.virtual.dictionary import VirtualDictionaryV2, VirtualDictionaryV3

Expand Down Expand Up @@ -1765,6 +1766,26 @@ def dictionary(self, dictionary: Dictionary) -> None:
"""Sets the dictionary object."""
self._dictionary = dictionary

def get_table(self, uid: str, axis: Optional[int] = None) -> Table:
"""Get a table from the dictionary.

Args:
uid: Table uid.
axis: Axis. Should be specified if multiaxis, None otherwise.

Returns:
Table: The requested table object.

Raises:
KeyError: If the specified axis does not exist.
KeyError: If the table is not present in the specified axis.
ValueError: If the table is not found in any axis, if axis is not provided.
ValueError: If the table is found in multiple axes, if axis is not provided.

"""
dictionary_table = self.dictionary.get_table(uid, axis=axis)
return Table(self, dictionary_table, axis=axis)

@property
def full_name(self) -> str:
"""Drive full name."""
Expand Down
174 changes: 174 additions & 0 deletions ingenialink/table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from collections.abc import Iterator, Sequence
from typing import TYPE_CHECKING, Optional

from ingenialink.utils._utils import REG_VALUE

if TYPE_CHECKING:
from ingenialink import Servo
from ingenialink.dictionary import DictionaryTable


class Table:
"""Table.

Internal table that stores N values that are accessed by index register
and read/written via value register.
"""

def __init__(
self, servo: "Servo", table: "DictionaryTable", axis: Optional[int] = None
) -> None:
"""Initializes the Table.

Args:
servo: Servo instance.
table: Dictionary table instance.
axis: Axis number for multi-axis servos

Raises:
ValueError: If index register does not have integer range.
"""
self.__servo = servo
self.__table = table

self.__index_register = self.__servo.dictionary.get_register(
self.__table.id_index, axis=axis
)
self.__value_register = self.__servo.dictionary.get_register(
self.__table.id_value, axis=axis
)

min_index, max_index = self.__index_register.range
if not isinstance(min_index, int) or not isinstance(max_index, int):
raise ValueError("Index register must have integer range.")

if min_index < 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a possibility?
A non-checked max_index out of bounds that makes anything (or some things) to crash?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually min index is -1. This value is used to avoid write in the table.

# Negative indexes may be used to not request any particular index.
min_index = 0

self.__min_index = min_index
self.__max_index = max_index

def get_value(self, index: int) -> REG_VALUE:
"""Reads a value from the table.

Args:
index: Index of the value to read.

Returns:
Value at the specified index.
"""
self.__servo.write(self.__index_register, index)
return self.__servo.read(self.__value_register)

def set_value(self, index: int, value: REG_VALUE) -> None:
"""Writes a value to the table.

Args:
index: Index of the value to write.
value: Value to write at the specified index.
"""
self.__servo.write(self.__index_register, index)
self.__servo.write(self.__value_register, value)

def __len__(self) -> int:
"""Returns the number of elements in the table.

Returns:
Number of elements in the table
"""
return self.__max_index - self.__min_index + 1

def __iter__(self) -> Iterator[REG_VALUE]:
"""Iterate over all values in the table.

Yields:
Each value in the table from min_index to max_index.
"""
for i in range(self.__min_index, self.__max_index + 1):
yield self.get_value(i)

def __getitem__(self, index: int) -> REG_VALUE:
"""Read a value from the table using bracket notation.

Args:
index: Index of the value to read.

Returns:
Value at the specified index.

Raises:
IndexError: If index is out of range.
"""
if index < self.__min_index or index > self.__max_index:
raise IndexError(f"Index {index} out of range [{self.__min_index}, {self.__max_index}]")
return self.get_value(index)

def __setitem__(self, index: int, value: REG_VALUE) -> None:
"""Write a value to the table using bracket notation.

Args:
index: Index of the value to write.
value: Value to write at the specified index.

Raises:
IndexError: If index is out of range.
"""
if index < self.__min_index or index > self.__max_index:
raise IndexError(f"Index {index} out of range [{self.__min_index}, {self.__max_index}]")
self.set_value(index, value)

def read(
self, start_index: Optional[int] = None, count: Optional[int] = None
) -> list[REG_VALUE]:
"""Read multiple values from the table.

Args:
start_index: Starting index. Defaults to min_index.
count: Number of values to read. Defaults to all remaining.

Returns:
List of values read from the table.

Raises:
IndexError: If the range is out of bounds.
"""
if start_index is None:
start_index = self.__min_index

if count is None:
count = self.__max_index - start_index + 1

end_index = start_index + count - 1

if start_index < self.__min_index or end_index > self.__max_index:
raise IndexError(
f"Range [{start_index}, {end_index}] out of bounds "
f"[{self.__min_index}, {self.__max_index}]"
)

return [self.get_value(i) for i in range(start_index, end_index + 1)]

def write(self, values: Sequence[REG_VALUE], start_index: Optional[int] = None) -> None:
"""Write multiple values to the table.

Args:
values: Sequence of values to write to the table.
start_index: Starting index. Defaults to min_index.

Raises:
IndexError: If the range is out of bounds.
"""
if start_index is None:
start_index = self.__min_index

end_index = start_index + len(values) - 1

if start_index < self.__min_index or end_index > self.__max_index:
raise IndexError(
f"Range [{start_index}, {end_index}] out of bounds "
f"[{self.__min_index}, {self.__max_index}]"
)

for i, value in enumerate(values):
self.set_value(start_index + i, value)
7 changes: 5 additions & 2 deletions ingenialink/utils/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ def convert_int_to_ip(int_ip: int) -> str:
return f"{drive_ip1}.{drive_ip2}.{drive_ip3}.{drive_ip4}"


def convert_bytes_to_dtype(data: bytes, dtype: RegDtype) -> Union[float, int, str, bytes]:
REG_VALUE = Union[float, int, str, bytes]


def convert_bytes_to_dtype(data: bytes, dtype: RegDtype) -> REG_VALUE:
"""Convert data in bytes to corresponding dtype.

Bytes have to be ordered in LSB.
Expand Down Expand Up @@ -253,7 +256,7 @@ def convert_bytes_to_dtype(data: bytes, dtype: RegDtype) -> Union[float, int, st
return value


def convert_dtype_to_bytes(data: Union[int, float, str, bytes], dtype: RegDtype) -> bytes:
def convert_dtype_to_bytes(data: REG_VALUE, dtype: RegDtype) -> bytes:
"""Convert data in dtype to bytes.

Bytes will be ordered in LSB.
Expand Down
9 changes: 5 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dependencies = [
"ingenialogger>=0.2.1",
"ping3==4.0.3",
"pysoem>=1.1.11, <1.2.0",
"numpy>=1.26.0",
"bitarray==2.9.2",
"multiping==1.1.2",
]
Expand All @@ -43,7 +42,7 @@ Source = "https://github.com/ingeniamc/ingenialink-python"
version = "7.5.2" # base version

[project.optional-dependencies]
virtual_drive = ["virtual_drive==0.0.0+gced50b1fe"]
virtual_drive = ["virtual_drive==0.0.0+pr4b9"]

[[tool.poetry.source]]
name = "PyPI"
Expand Down
7 changes: 6 additions & 1 deletion tests/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
TEST_DICT_ECAT_EOE_SAFE_v3 = (resources_path / "test_dict_ecat_eoe_safe_v3.0.xdf").as_posix()
TEST_DICT_ECAT_EOE_v3 = (resources_path / "test_dict_ecat_eoe_v3.0.xdf").as_posix()
TEST_CONFIG_FILE = (resources_path / "test_config_file.xcf").as_posix()
TEST_DICTIONARY_WITH_TABLES = (resources_path / "dictionary_with_tables_minimal.xdf3").as_posix()
TEST_DICTIONARY_WITH_TABLES_FOR_ALL_COM_TYPES = (
resources_path / "dictionary_with_tables_minimal.xdf3"
).as_posix()
DEN_NET_E_WITH_TABLES = (resources_path / "den-net-e_dev4f4c26.xdf3").as_posix()

__all__ = [
"DEN_NET_E_2_8_0_xdf_v3",
"TEST_DICT_ECAT_EOE_SAFE_v3",
"TEST_DICT_ECAT_EOE_v3",
"TEST_CONFIG_FILE",
"TEST_DICTIONARY_WITH_TABLES_FOR_ALL_COM_TYPES",
"DEN_NET_E_WITH_TABLES",
"canopen",
"comkit",
"ethercat",
Expand Down
Loading