Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 72 additions & 19 deletions ingenialink/canopen/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from can import CanError
from can.interfaces.kvaser.canlib import __get_canlib_function as get_canlib_function
from can.interfaces.pcan.pcan import PcanCanOperationError
from typing_extensions import override

from ingenialink.canopen.register import CanopenRegister
from ingenialink.canopen.servo import CANOPEN_SDO_RESPONSE_TIMEOUT, CanopenServo
Expand Down Expand Up @@ -174,30 +175,50 @@ def __init__(self, network: "CanopenNetwork"):
self.__network = network
self.__stop = False

def process(self, timestamps: dict[int, float]) -> dict[int, float]:
"""Process network status for all servos.

This method checks the status of all servos in the network and notifies
subscribers of any state changes (connection/disconnection).

Args:
timestamps: Dictionary mapping node IDs to their last known timestamps.

Returns:
Updated timestamps dictionary.
"""
if self.__network._connection is None:
return timestamps
for node_id, node in list(self.__network._connection.nodes.items()):
sleep(1.5)
current_timestamp = node.nmt.timestamp
if node_id not in timestamps:
timestamps[node_id] = current_timestamp
continue
is_alive = current_timestamp != timestamps[node_id]
servo_state = self.__network.get_servo_state(node_id)
if is_alive:
if servo_state != NetState.CONNECTED:
self.__network._notify_status(node_id, NetDevEvt.ADDED)
self.__network._set_servo_state(node_id, NetState.CONNECTED)
timestamps[node_id] = node.nmt.timestamp
elif servo_state == NetState.DISCONNECTED:
self.__network.recover_from_disconnection()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something I wonder....
If you manually call recover_from_disconnection, why shouldn't the _notify_status be called?
Maybe it does not belong to this issue, but it makes no sense to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the issue was to be able to do this stuff without the status listener.
So, if we want to notify the status when we use this method, that implies that there is some net observer.

The only reliable way of having a "observation of the net" is through the status listener.
With the new method you will be able to know when you're connected back again, but not when you're disconnected, that is only possible with the net status listener.

So I do not really see the point of having to notify the status here. That is done by the net status listener already.

else:
self.__network._notify_status(node_id, NetDevEvt.REMOVED)
self.__network._set_servo_state(node_id, NetState.DISCONNECTED)
return timestamps

def run(self) -> None:
"""Check the network status."""
timestamps = {}
if self.__network._connection is None:
return
timestamps: dict[int, float] = {}
while not self.__stop:
for node_id, node in list(self.__network._connection.nodes.items()):
sleep(1.5)
current_timestamp = node.nmt.timestamp
if node_id not in timestamps:
timestamps[node_id] = current_timestamp
continue
is_alive = current_timestamp != timestamps[node_id]
servo_state = self.__network.get_servo_state(node_id)
if is_alive:
if servo_state != NetState.CONNECTED:
self.__network._notify_status(node_id, NetDevEvt.ADDED)
self.__network._set_servo_state(node_id, NetState.CONNECTED)
timestamps[node_id] = node.nmt.timestamp
elif servo_state == NetState.DISCONNECTED:
self.__network._reset_connection()
else:
self.__network._notify_status(node_id, NetDevEvt.REMOVED)
self.__network._set_servo_state(node_id, NetState.DISCONNECTED)
try:
timestamps = self.process(timestamps)
except Exception as e: # noqa: PERF203
logger.exception(f"Exception occurred while processing network status: {e}")

def stop(self) -> None:
"""Stop the listener."""
Expand All @@ -218,6 +239,7 @@ class CanopenNetwork(Network):
PRODUCT_CODE_SUB_IX = 2
REVISION_NUMBER_SUB_IX = 3
NODE_GUARDING_PERIOD_S = 1
MAX_NUMBER_SERVO_ALIVE_ATTEMPTS = 5

def __init__(
self,
Expand Down Expand Up @@ -1092,6 +1114,37 @@ def protocol(self) -> NetProt:
"""Obtain network protocol."""
return NetProt.CAN

@override
def recover_from_disconnection(self, servo: Optional[Servo] = None) -> bool:
"""Recover the CANopen communication with a servo after a disconnection.

This method attempts to re-establish communication by resetting the entire
CANopen network connection and re-adding all servos.

Args:
servo: not used in this implementation but kept for interface consistency.

Returns:
True if communication is recovered, False otherwise.
"""
try:
self._reset_connection()
for attempt in range(self.MAX_NUMBER_SERVO_ALIVE_ATTEMPTS):
all_servos_alive = all(s.is_alive() for s in self.servos)
if all_servos_alive:
break
sleep(0.1)
if attempt == self.MAX_NUMBER_SERVO_ALIVE_ATTEMPTS - 1:
logger.warning(
"CANopen communication recovered, but some servos are still disconnected."
)
return False
logger.info("CANopen communication recovered.")
return True
except Exception as e:
logger.warning(f"Failed to recover CANopen communication: {e}")
return False

def get_servo_state(self, servo_id: Union[int, str]) -> NetState:
"""Get the state of a servo that's a part of network.

Expand Down
9 changes: 7 additions & 2 deletions ingenialink/ethercat/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast

import ingenialogger
from typing_extensions import override

from ingenialink.pdo_network_manager import PDONetworkManager
from ingenialink.servo import Servo
Expand Down Expand Up @@ -150,7 +151,7 @@ def process(self) -> None:
if (
is_servo_alive
and servo_state == NetState.DISCONNECTED
and self.__network._recover_from_disconnection()
and self.__network.recover_from_disconnection()
):
self.__network._notify_status(slave_id, NetDevEvt.ADDED)
self.__network._set_servo_state(slave_id, NetState.CONNECTED)
Expand Down Expand Up @@ -971,11 +972,15 @@ def _notify_status(self, slave_id: int, status: NetDevEvt) -> None:
for callback in self.__observers_net_state[slave_id]:
callback(status)

def _recover_from_disconnection(self) -> bool:
@override
def recover_from_disconnection(self, servo: Optional[Servo] = None) -> bool:
"""Recover the CoE communication after a disconnection.

All the connected slaves need to transitioned to the PreOp state.

Args:
servo: not used in this implementation but kept for interface consistency.

Returns:
True if all the connected slaves reach the PreOp state.

Expand Down
70 changes: 52 additions & 18 deletions ingenialink/ethernet/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,35 @@ def __init__(self, network: "EthernetNetwork", refresh_time: float = 0.25) -> No
self.__network = network
self.__refresh_time = refresh_time
self.__stop = False
self.__max_unsuccessful_pings = MAX_NUM_UNSUCCESSFUL_PINGS

def process(self) -> None:
"""Process network status for all servos.

This method checks the status of all servos in the network and notifies
subscribers of any state changes (connection/disconnection).
"""
for servo in self.__network.servos:
servo_ip = servo.ip_address
servo_state = self.__network.get_servo_state(servo_ip)
is_servo_alive = servo.is_alive(attemps=MAX_NUM_UNSUCCESSFUL_PINGS)
if servo_state == NetState.CONNECTED and not is_servo_alive:
self.__network._notify_status(servo_ip, NetDevEvt.REMOVED)
self.__network._set_servo_state(servo_ip, NetState.DISCONNECTED)
if (
servo_state == NetState.DISCONNECTED
and is_servo_alive
and self.__network.recover_from_disconnection(servo)
):
self.__network._notify_status(servo_ip, NetDevEvt.ADDED)
self.__network._set_servo_state(servo_ip, NetState.CONNECTED)

def run(self) -> None:
"""Check the network status."""
while not self.__stop:
for servo in self.__network.servos:
unsuccessful_pings = 0
servo_ip = servo.ip_address
servo_state = self.__network.get_servo_state(servo_ip)
while unsuccessful_pings < self.__max_unsuccessful_pings:
response = servo.is_alive()
if not response:
unsuccessful_pings += 1
else:
break
ping_response = unsuccessful_pings != self.__max_unsuccessful_pings
if servo_state == NetState.CONNECTED and not ping_response:
self.__network._notify_status(servo_ip, NetDevEvt.REMOVED)
self.__network._set_servo_state(servo_ip, NetState.DISCONNECTED)
if servo_state == NetState.DISCONNECTED and ping_response:
self.__network._notify_status(servo_ip, NetDevEvt.ADDED)
self.__network._set_servo_state(servo_ip, NetState.CONNECTED)
try:
self.process()
except Exception as e:
logger.exception(f"Exception occurred while processing network status: {e}")
time.sleep(self.__refresh_time)

def stop(self) -> None:
Expand Down Expand Up @@ -338,6 +345,33 @@ def _notify_status(self, ip: str, status: NetDevEvt) -> None:
for callback in self.__observers_net_state[ip]:
callback(status)

@override
def recover_from_disconnection(self, servo: Optional[Servo] = None) -> bool:
"""Recover the communication with a servo after a disconnection.

This method attempts to re-establish communication with a servo
that has been previously disconnected. It checks if the servo
is responding again.

Args:
servo: The servo to recover communication with.

Raises:
ValueError: If the servo argument is None.

Returns:
True if communication with the servo is recovered, False otherwise.
"""
if servo is None or not isinstance(servo, EthernetServo):
raise ValueError("Ethernet Servo instance must be provided for recovery.")

if servo.is_alive(attemps=MAX_NUM_UNSUCCESSFUL_PINGS):
logger.info(f"Communication with servo at IP {servo.ip_address} recovered.")
return True
else:
logger.warning(f"Failed to recover communication with servo at IP {servo.ip_address}.")
return False

def subscribe_to_status(self, ip: str, callback: Callable[[NetDevEvt], Any]) -> None: # type: ignore [override]
"""Subscribe to network state changes.

Expand Down
14 changes: 14 additions & 0 deletions ingenialink/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ def disconnect_from_slave(self, servo: Servo) -> None:
"""
raise NotImplementedError

@abstractmethod
def recover_from_disconnection(self, servo: Optional[Servo] = None) -> bool:
"""Recovers the connection to a previously disconnected drive.

Args:
servo: Instance of the servo to recover.
For some protocols, this argument might be optional.

Returns:
True if communication is recovered, False otherwise.

"""
raise NotImplementedError

@abstractmethod
def load_firmware(self, *args: Any, **kwargs: Any) -> None:
"""Loads a given firmware file to a target drive.
Expand Down
30 changes: 21 additions & 9 deletions ingenialink/servo.py
Original file line number Diff line number Diff line change
Expand Up @@ -1104,20 +1104,32 @@ def unsubscribe_from_status(self, callback: Callable[[ServoState, int], Any]) ->
return
self.__observers_servo_state.remove(callback)

def is_alive(self) -> bool:
def is_alive(self, attemps: int = 1) -> bool:
"""Checks if the servo responds to a reading a register.

Args:
attemps: Number of attemps to check if the servo is alive.
Defaults to 1.

Returns:
Return code with the result of the read.

"""
_is_alive = True
try:
self.read(self.STATUS_WORD_REGISTERS)
except ILError as e:
_is_alive = False
logger.error(e)
return _is_alive

def _is_servo_alive() -> bool:
try:
self.read(self.STATUS_WORD_REGISTERS)
return True
except ILError:
return False

unsuccessful_attemps = 0
while unsuccessful_attemps < attemps:
if not _is_servo_alive():
unsuccessful_attemps += 1
else:
return True
logger.error("Servo is not alive after %d attempts", attemps)
return False

def reload_errors(self, dictionary: str) -> None:
"""Force to reload all dictionary errors.
Expand Down
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pytest-cov = "==2.12.1"
pytest-mock = "==3.6.1"
pytest-console-scripts = "==1.4.1"
twisted = "==24.11.0"
summit-testing-framework = {extras = ["ingeniamotion"], version = "==0.1.5+pr54b2"}
summit-testing-framework = {extras = ["ingeniamotion"], version = "==0.1.5+pr55b1"}

# -----------------------------------------------------------------------
# TASKS
Expand Down
36 changes: 36 additions & 0 deletions tests/canopen/test_canopen_network.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import platform
from typing import TYPE_CHECKING

import pytest
from summit_testing_framework.setups import (
Expand All @@ -9,6 +10,9 @@
from ingenialink.canopen.network import CanBaudrate, CanDevice, CanopenNetwork
from ingenialink.exceptions import ILError

if TYPE_CHECKING:
from ingenialink.canopen.servo import CanopenServo

test_bus = "virtual"
test_baudrate = 1000000
test_channel = 0
Expand Down Expand Up @@ -152,3 +156,35 @@ def test_load_firmware(servo, net, setup_descriptor):

assert new_fw_version != fw_version
net.disconnect_from_slave(servo)


@pytest.mark.canopen
def test_recover_from_disconnection(net: "CanopenNetwork", servo: "CanopenServo", caplog):
"""Test that recover_from_disconnection properly resets the CANopen network.

This test uses a real CANopen drive and verifies that the recover_from_disconnection
method successfully calls _reset_connection() to re-establish communication.
"""
assert servo is not None
assert len(net.servos) == 1
assert net._connection is not None

# Read the firmware version to ensure communication is working
fw_version = servo.read("DRV_ID_SOFTWARE_VERSION")
assert fw_version is not None and fw_version != ""

# Call recover_from_disconnection and verify it succeeds
with caplog.at_level("INFO"):
result = net.recover_from_disconnection()
assert result is True, "recover_from_disconnection should successfully reset connection"
assert "CANopen communication recovered." in caplog.text, (
"Should log recovery success message"
)

# Verify connection is still established after recovery
assert net._connection is not None
assert len(net.servos) == 1

# Verify we can still communicate with the servo after recovery
new_fw_version = servo.read("DRV_ID_SOFTWARE_VERSION")
assert new_fw_version == fw_version, "Firmware version should remain the same after recovery"
Loading