diff --git a/ingenialink/canopen/network.py b/ingenialink/canopen/network.py index 20cfdf3d..598f8304 100644 --- a/ingenialink/canopen/network.py +++ b/ingenialink/canopen/network.py @@ -17,6 +17,7 @@ from can import CanError from can.interfaces.kvaser.canlib import __get_canlib_function as get_canlib_function from can.interfaces.pcan.pcan import PcanCanOperationError +from typing_extensions import override from ingenialink.canopen.register import CanopenRegister from ingenialink.canopen.servo import CANOPEN_SDO_RESPONSE_TIMEOUT, CanopenServo @@ -174,30 +175,50 @@ def __init__(self, network: "CanopenNetwork"): self.__network = network self.__stop = False + def process(self, timestamps: dict[int, float]) -> dict[int, float]: + """Process network status for all servos. + + This method checks the status of all servos in the network and notifies + subscribers of any state changes (connection/disconnection). + + Args: + timestamps: Dictionary mapping node IDs to their last known timestamps. + + Returns: + Updated timestamps dictionary. + """ + if self.__network._connection is None: + return timestamps + for node_id, node in list(self.__network._connection.nodes.items()): + sleep(1.5) + current_timestamp = node.nmt.timestamp + if node_id not in timestamps: + timestamps[node_id] = current_timestamp + continue + is_alive = current_timestamp != timestamps[node_id] + servo_state = self.__network.get_servo_state(node_id) + if is_alive: + if servo_state != NetState.CONNECTED: + self.__network._notify_status(node_id, NetDevEvt.ADDED) + self.__network._set_servo_state(node_id, NetState.CONNECTED) + timestamps[node_id] = node.nmt.timestamp + elif servo_state == NetState.DISCONNECTED: + self.__network.recover_from_disconnection() + else: + self.__network._notify_status(node_id, NetDevEvt.REMOVED) + self.__network._set_servo_state(node_id, NetState.DISCONNECTED) + return timestamps + def run(self) -> None: """Check the network status.""" - timestamps = {} if self.__network._connection is None: return + timestamps: dict[int, float] = {} while not self.__stop: - for node_id, node in list(self.__network._connection.nodes.items()): - sleep(1.5) - current_timestamp = node.nmt.timestamp - if node_id not in timestamps: - timestamps[node_id] = current_timestamp - continue - is_alive = current_timestamp != timestamps[node_id] - servo_state = self.__network.get_servo_state(node_id) - if is_alive: - if servo_state != NetState.CONNECTED: - self.__network._notify_status(node_id, NetDevEvt.ADDED) - self.__network._set_servo_state(node_id, NetState.CONNECTED) - timestamps[node_id] = node.nmt.timestamp - elif servo_state == NetState.DISCONNECTED: - self.__network._reset_connection() - else: - self.__network._notify_status(node_id, NetDevEvt.REMOVED) - self.__network._set_servo_state(node_id, NetState.DISCONNECTED) + try: + timestamps = self.process(timestamps) + except Exception as e: # noqa: PERF203 + logger.exception(f"Exception occurred while processing network status: {e}") def stop(self) -> None: """Stop the listener.""" @@ -218,6 +239,7 @@ class CanopenNetwork(Network): PRODUCT_CODE_SUB_IX = 2 REVISION_NUMBER_SUB_IX = 3 NODE_GUARDING_PERIOD_S = 1 + MAX_NUMBER_SERVO_ALIVE_ATTEMPTS = 5 def __init__( self, @@ -1092,6 +1114,37 @@ def protocol(self) -> NetProt: """Obtain network protocol.""" return NetProt.CAN + @override + def recover_from_disconnection(self, servo: Optional[Servo] = None) -> bool: + """Recover the CANopen communication with a servo after a disconnection. + + This method attempts to re-establish communication by resetting the entire + CANopen network connection and re-adding all servos. + + Args: + servo: not used in this implementation but kept for interface consistency. + + Returns: + True if communication is recovered, False otherwise. + """ + try: + self._reset_connection() + for attempt in range(self.MAX_NUMBER_SERVO_ALIVE_ATTEMPTS): + all_servos_alive = all(s.is_alive() for s in self.servos) + if all_servos_alive: + break + sleep(0.1) + if attempt == self.MAX_NUMBER_SERVO_ALIVE_ATTEMPTS - 1: + logger.warning( + "CANopen communication recovered, but some servos are still disconnected." + ) + return False + logger.info("CANopen communication recovered.") + return True + except Exception as e: + logger.warning(f"Failed to recover CANopen communication: {e}") + return False + def get_servo_state(self, servo_id: Union[int, str]) -> NetState: """Get the state of a servo that's a part of network. diff --git a/ingenialink/ethercat/network.py b/ingenialink/ethercat/network.py index d2372417..f0859655 100644 --- a/ingenialink/ethercat/network.py +++ b/ingenialink/ethercat/network.py @@ -9,6 +9,7 @@ from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast import ingenialogger +from typing_extensions import override from ingenialink.pdo_network_manager import PDONetworkManager from ingenialink.servo import Servo @@ -150,7 +151,7 @@ def process(self) -> None: if ( is_servo_alive and servo_state == NetState.DISCONNECTED - and self.__network._recover_from_disconnection() + and self.__network.recover_from_disconnection() ): self.__network._notify_status(slave_id, NetDevEvt.ADDED) self.__network._set_servo_state(slave_id, NetState.CONNECTED) @@ -971,11 +972,15 @@ def _notify_status(self, slave_id: int, status: NetDevEvt) -> None: for callback in self.__observers_net_state[slave_id]: callback(status) - def _recover_from_disconnection(self) -> bool: + @override + def recover_from_disconnection(self, servo: Optional[Servo] = None) -> bool: """Recover the CoE communication after a disconnection. All the connected slaves need to transitioned to the PreOp state. + Args: + servo: not used in this implementation but kept for interface consistency. + Returns: True if all the connected slaves reach the PreOp state. diff --git a/ingenialink/ethernet/network.py b/ingenialink/ethernet/network.py index ecc13fd7..573bc2e3 100644 --- a/ingenialink/ethernet/network.py +++ b/ingenialink/ethernet/network.py @@ -51,28 +51,35 @@ def __init__(self, network: "EthernetNetwork", refresh_time: float = 0.25) -> No self.__network = network self.__refresh_time = refresh_time self.__stop = False - self.__max_unsuccessful_pings = MAX_NUM_UNSUCCESSFUL_PINGS + + def process(self) -> None: + """Process network status for all servos. + + This method checks the status of all servos in the network and notifies + subscribers of any state changes (connection/disconnection). + """ + for servo in self.__network.servos: + servo_ip = servo.ip_address + servo_state = self.__network.get_servo_state(servo_ip) + is_servo_alive = servo.is_alive(attemps=MAX_NUM_UNSUCCESSFUL_PINGS) + if servo_state == NetState.CONNECTED and not is_servo_alive: + self.__network._notify_status(servo_ip, NetDevEvt.REMOVED) + self.__network._set_servo_state(servo_ip, NetState.DISCONNECTED) + if ( + servo_state == NetState.DISCONNECTED + and is_servo_alive + and self.__network.recover_from_disconnection(servo) + ): + self.__network._notify_status(servo_ip, NetDevEvt.ADDED) + self.__network._set_servo_state(servo_ip, NetState.CONNECTED) def run(self) -> None: """Check the network status.""" while not self.__stop: - for servo in self.__network.servos: - unsuccessful_pings = 0 - servo_ip = servo.ip_address - servo_state = self.__network.get_servo_state(servo_ip) - while unsuccessful_pings < self.__max_unsuccessful_pings: - response = servo.is_alive() - if not response: - unsuccessful_pings += 1 - else: - break - ping_response = unsuccessful_pings != self.__max_unsuccessful_pings - if servo_state == NetState.CONNECTED and not ping_response: - self.__network._notify_status(servo_ip, NetDevEvt.REMOVED) - self.__network._set_servo_state(servo_ip, NetState.DISCONNECTED) - if servo_state == NetState.DISCONNECTED and ping_response: - self.__network._notify_status(servo_ip, NetDevEvt.ADDED) - self.__network._set_servo_state(servo_ip, NetState.CONNECTED) + try: + self.process() + except Exception as e: + logger.exception(f"Exception occurred while processing network status: {e}") time.sleep(self.__refresh_time) def stop(self) -> None: @@ -338,6 +345,33 @@ def _notify_status(self, ip: str, status: NetDevEvt) -> None: for callback in self.__observers_net_state[ip]: callback(status) + @override + def recover_from_disconnection(self, servo: Optional[Servo] = None) -> bool: + """Recover the communication with a servo after a disconnection. + + This method attempts to re-establish communication with a servo + that has been previously disconnected. It checks if the servo + is responding again. + + Args: + servo: The servo to recover communication with. + + Raises: + ValueError: If the servo argument is None. + + Returns: + True if communication with the servo is recovered, False otherwise. + """ + if servo is None or not isinstance(servo, EthernetServo): + raise ValueError("Ethernet Servo instance must be provided for recovery.") + + if servo.is_alive(attemps=MAX_NUM_UNSUCCESSFUL_PINGS): + logger.info(f"Communication with servo at IP {servo.ip_address} recovered.") + return True + else: + logger.warning(f"Failed to recover communication with servo at IP {servo.ip_address}.") + return False + def subscribe_to_status(self, ip: str, callback: Callable[[NetDevEvt], Any]) -> None: # type: ignore [override] """Subscribe to network state changes. diff --git a/ingenialink/network.py b/ingenialink/network.py index b4cb3c43..0e96d24d 100644 --- a/ingenialink/network.py +++ b/ingenialink/network.py @@ -91,6 +91,20 @@ def disconnect_from_slave(self, servo: Servo) -> None: """ raise NotImplementedError + @abstractmethod + def recover_from_disconnection(self, servo: Optional[Servo] = None) -> bool: + """Recovers the connection to a previously disconnected drive. + + Args: + servo: Instance of the servo to recover. + For some protocols, this argument might be optional. + + Returns: + True if communication is recovered, False otherwise. + + """ + raise NotImplementedError + @abstractmethod def load_firmware(self, *args: Any, **kwargs: Any) -> None: """Loads a given firmware file to a target drive. diff --git a/ingenialink/servo.py b/ingenialink/servo.py index 13d4ce6a..5ceb1498 100644 --- a/ingenialink/servo.py +++ b/ingenialink/servo.py @@ -1104,20 +1104,32 @@ def unsubscribe_from_status(self, callback: Callable[[ServoState, int], Any]) -> return self.__observers_servo_state.remove(callback) - def is_alive(self) -> bool: + def is_alive(self, attemps: int = 1) -> bool: """Checks if the servo responds to a reading a register. + Args: + attemps: Number of attemps to check if the servo is alive. + Defaults to 1. + Returns: Return code with the result of the read. - """ - _is_alive = True - try: - self.read(self.STATUS_WORD_REGISTERS) - except ILError as e: - _is_alive = False - logger.error(e) - return _is_alive + + def _is_servo_alive() -> bool: + try: + self.read(self.STATUS_WORD_REGISTERS) + return True + except ILError: + return False + + unsuccessful_attemps = 0 + while unsuccessful_attemps < attemps: + if not _is_servo_alive(): + unsuccessful_attemps += 1 + else: + return True + logger.error("Servo is not alive after %d attempts", attemps) + return False def reload_errors(self, dictionary: str) -> None: """Force to reload all dictionary errors. diff --git a/poetry.lock b/poetry.lock index 669b9d81..5e0b2904 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3623,13 +3623,13 @@ test = ["pytest"] [[package]] name = "summit-testing-framework" -version = "0.1.5+pr54b2" +version = "0.1.5+pr55b1" description = "Testing framework for Novanta drives" optional = false python-versions = ">=3.9" groups = ["tests"] files = [ - {file = "summit_testing_framework-0.1.5+pr54b2-py3-none-any.whl", hash = "sha256:06f1eca1364a85326c4d35ea8b3b7292fc8f6b28525b98e7c7617033c6b91153"}, + {file = "summit_testing_framework-0.1.5+pr55b1-py3-none-any.whl", hash = "sha256:e9036698cce448e822feff15bde415ed39c1f1061e13c86ca5fcf1006884d47f"}, ] [package.dependencies] @@ -4311,4 +4311,4 @@ virtual-drive = ["virtual_drive"] [metadata] lock-version = "2.1" python-versions = ">=3.9" -content-hash = "e1a5f6ccdd1f9b98937957b66d3c9d08a801ebf5d73d51fd60db4168b3c614f9" +content-hash = "5de7e6bcdd01b057fdb39a99d4434d434f800198f8af38ca19b6dcdceee2b7b2" diff --git a/pyproject.toml b/pyproject.toml index 0737f968..f903d9ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -119,7 +119,7 @@ pytest-cov = "==2.12.1" pytest-mock = "==3.6.1" pytest-console-scripts = "==1.4.1" twisted = "==24.11.0" -summit-testing-framework = {extras = ["ingeniamotion"], version = "==0.1.5+pr54b2"} +summit-testing-framework = {extras = ["ingeniamotion"], version = "==0.1.5+pr55b1"} # ----------------------------------------------------------------------- # TASKS diff --git a/tests/canopen/test_canopen_network.py b/tests/canopen/test_canopen_network.py index 363695fd..8117898d 100644 --- a/tests/canopen/test_canopen_network.py +++ b/tests/canopen/test_canopen_network.py @@ -1,4 +1,5 @@ import platform +from typing import TYPE_CHECKING import pytest from summit_testing_framework.setups import ( @@ -9,6 +10,9 @@ from ingenialink.canopen.network import CanBaudrate, CanDevice, CanopenNetwork from ingenialink.exceptions import ILError +if TYPE_CHECKING: + from ingenialink.canopen.servo import CanopenServo + test_bus = "virtual" test_baudrate = 1000000 test_channel = 0 @@ -152,3 +156,35 @@ def test_load_firmware(servo, net, setup_descriptor): assert new_fw_version != fw_version net.disconnect_from_slave(servo) + + +@pytest.mark.canopen +def test_recover_from_disconnection(net: "CanopenNetwork", servo: "CanopenServo", caplog): + """Test that recover_from_disconnection properly resets the CANopen network. + + This test uses a real CANopen drive and verifies that the recover_from_disconnection + method successfully calls _reset_connection() to re-establish communication. + """ + assert servo is not None + assert len(net.servos) == 1 + assert net._connection is not None + + # Read the firmware version to ensure communication is working + fw_version = servo.read("DRV_ID_SOFTWARE_VERSION") + assert fw_version is not None and fw_version != "" + + # Call recover_from_disconnection and verify it succeeds + with caplog.at_level("INFO"): + result = net.recover_from_disconnection() + assert result is True, "recover_from_disconnection should successfully reset connection" + assert "CANopen communication recovered." in caplog.text, ( + "Should log recovery success message" + ) + + # Verify connection is still established after recovery + assert net._connection is not None + assert len(net.servos) == 1 + + # Verify we can still communicate with the servo after recovery + new_fw_version = servo.read("DRV_ID_SOFTWARE_VERSION") + assert new_fw_version == fw_version, "Firmware version should remain the same after recovery" diff --git a/tests/ethercat/test_ethercat_network.py b/tests/ethercat/test_ethercat_network.py index 5caed18b..68ee5fc5 100644 --- a/tests/ethercat/test_ethercat_network.py +++ b/tests/ethercat/test_ethercat_network.py @@ -770,8 +770,8 @@ def test_net_status_listener_detects_slave_reconnection(pysoem_mock_network, moc def status_callback(event: NetDevEvt): events_detected.append(event) - # Mock _recover_from_disconnection to return True - mocker.patch.object(EthercatNetwork, "_recover_from_disconnection", return_value=True) + # Mock recover_from_disconnection to return True + mocker.patch.object(EthercatNetwork, "recover_from_disconnection", return_value=True) net = EthercatNetwork("dummy_ifname") @@ -868,3 +868,32 @@ def mock_send_receive_processdata(*args, **kwargs) -> None: # type: ignore [no- # Net should restore servos to PREOP state assert servo.slave is not None assert servo.slave.state is pysoem.PREOP_STATE + + +@pytest.mark.ethercat +def test_recover_from_disconnection(net: "EthercatNetwork", servo: "EthercatServo", caplog) -> None: + """Test that recover_from_disconnection properly rediscovers slaves after disconnection. + + This test uses a real EtherCAT drive and simulates a disconnection scenario by setting + the slave reference to None. The recover_from_disconnection method should call + __init_nodes() to rediscover the physical drive and restore communication. + """ + # Verify initial state - servo is connected and in PREOP state + assert servo.slave_exists is True + assert servo.slave.state == pysoem.PREOP_STATE, "Servo should be in PREOP state initially" + + # Verify that recover_from_disconnection returns True when servo is properly connected + assert net.recover_from_disconnection() is True + + # Simulate slave disconnection by changing state to INIT + net._change_nodes_state(servo, pysoem.INIT_STATE) + assert servo.slave.state == pysoem.INIT_STATE, ( + "Servo should be in INIT state after disconnection" + ) + caplog.clear() + with caplog.at_level("WARNING"): + result = net.recover_from_disconnection() + assert result is True, "recover_from_disconnection should rediscover the physical slave" + assert "CoE communication recovered." in caplog.text, "Should log recovery success message" + + assert servo.slave.state == pysoem.PREOP_STATE, "Servo should be in PREOP state after recovery" diff --git a/tests/ethernet/test_ethernet_network.py b/tests/ethernet/test_ethernet_network.py index be4f1807..975ae3f2 100644 --- a/tests/ethernet/test_ethernet_network.py +++ b/tests/ethernet/test_ethernet_network.py @@ -3,6 +3,7 @@ import time from ftplib import error_temp from threading import Thread +from typing import TYPE_CHECKING import pytest from summit_testing_framework.setups import ( @@ -20,6 +21,9 @@ from ingenialink.ethernet.network import EthernetNetwork, NetDevEvt, NetProt, NetState from ingenialink.exceptions import ILError, ILFirmwareLoadError +if TYPE_CHECKING: + from ingenialink.ethernet.servo import EthernetServo + class FTPServer(Thread): """FTP Server. @@ -271,3 +275,26 @@ def test_unsubscribe_from_status(virtual_drive, mocker): # Assert that the net status callback is not notified of net status change event assert len(status_list) == 0 + + +@pytest.mark.ethernet +def test_recover_from_disconnection(net: "EthernetNetwork", servo: "EthernetServo", mocker) -> None: + """Test that recover_from_disconnection properly checks if a servo can communicate. + + This test uses a real Ethernet drive and simulates disconnection/reconnection + scenarios to verify the recovery behavior. + """ + assert len(net.servos) == 1 + assert servo in net.servos + assert servo.is_alive() is True + + # Verify that recover_from_disconnection returns True when servo is connected + assert net.recover_from_disconnection(servo) is True + + # Simulate servo disconnection by mocking is_alive to return False + mocker.patch.object(servo, "is_alive", return_value=False) + assert net.recover_from_disconnection(servo) is False + + # Simulate servo reconnection by mocking is_alive to return True again + mocker.patch.object(servo, "is_alive", return_value=True) + assert net.recover_from_disconnection(servo) is True diff --git a/tests/test_servo.py b/tests/test_servo.py index 3afc0b34..ad504205 100644 --- a/tests/test_servo.py +++ b/tests/test_servo.py @@ -3,12 +3,12 @@ import shutil import time from pathlib import Path +from typing import TYPE_CHECKING, Union from xml.etree import ElementTree import pytest from packaging import version from summit_testing_framework.rack_service_client import PartNumber -from summit_testing_framework.setups import DriveCanOpenSetup, DriveEthernetSetup from summit_testing_framework.setups.specifiers import ( RackServiceConfigSpecifier, ) @@ -30,6 +30,11 @@ from ingenialink.register import RegAddressType from ingenialink.servo import Servo, ServoState +if TYPE_CHECKING: + from ingenialink.canopen.network import CanopenNetwork + from ingenialink.ethercat.network import EthercatNetwork + from ingenialink.ethernet.network import EthernetNetwork + MONITORING_CH_DATA_SIZE = 4 MONITORING_NUM_SAMPLES = 100 DISTURBANCE_CH_DATA_SIZE = 4 @@ -54,17 +59,12 @@ def _clean(filename): os.remove(filename) -def _get_reg_address(register, descriptor): - if isinstance(descriptor, DriveEthernetSetup): - return register.address - elif isinstance(descriptor, DriveCanOpenSetup): - return register.idx - raise ValueError - - -def wait_until_alive(servo, timeout=None): +def wait_until_alive( + servo, net: Union["CanopenNetwork", "EthernetNetwork", "EthercatNetwork"], timeout: float = None +): init_time = time.time() - while not servo.is_alive(): + # https://novantamotion.atlassian.net/browse/CIT-526 + while not net.recover_from_disconnection(servo): if timeout is not None and (init_time + timeout) < time.time(): pytest.fail("The drive is unresponsive after the recovery timeout.") time.sleep(1) @@ -294,7 +294,9 @@ def test_load_configuration_to_subnode_zero(setup_descriptor, servo, net): @pytest.mark.canopen @pytest.mark.ethernet @pytest.mark.ethercat -def test_store_parameters(servo, environment): +def test_store_parameters( + servo, environment, net: Union["CanopenNetwork", "EthernetNetwork", "EthercatNetwork"] +) -> None: user_over_voltage_register = "DRV_PROT_USER_OVER_VOLT" initial_user_over_voltage_value = servo.read(user_over_voltage_register) @@ -310,13 +312,17 @@ def test_store_parameters(servo, environment): environment.power_cycle(wait_for_drives=False) - wait_until_alive(servo, timeout=20) + wait_until_alive(servo, net, timeout=20) assert servo.read(user_over_voltage_register) == new_user_over_voltage_value + servo.write(user_over_voltage_register, initial_user_over_voltage_value) + @pytest.mark.fsoe -def test_store_safe_parameters(servo, environment): +def test_store_safe_parameters( + servo, environment, net: Union["CanopenNetwork", "EthernetNetwork", "EthercatNetwork"] +) -> None: ss1_time_to_sto_register = "FSOE_SS1_TIME_TO_STO_1" # Change the value of a safe parameter initial_register_value = servo.read(ss1_time_to_sto_register) @@ -330,7 +336,7 @@ def test_store_safe_parameters(servo, environment): # Power cycle the drive environment.power_cycle(wait_for_drives=False) # Wait until the drive recovers from the power cycle - wait_until_alive(servo, timeout=20) + wait_until_alive(servo, net, timeout=20) # Verify that the value is retained after power cycling assert servo.read(ss1_time_to_sto_register) == new_register_value @@ -338,7 +344,9 @@ def test_store_safe_parameters(servo, environment): @pytest.mark.canopen @pytest.mark.ethernet @pytest.mark.ethercat -def test_restore_parameters(servo, environment): +def test_restore_parameters( + servo, environment, net: Union["CanopenNetwork", "EthernetNetwork", "EthercatNetwork"] +) -> None: user_over_voltage_register = "DRV_PROT_USER_OVER_VOLT" new_user_over_voltage_value = servo.read(user_over_voltage_register) + 5 @@ -353,13 +361,15 @@ def test_restore_parameters(servo, environment): environment.power_cycle(wait_for_drives=False) - wait_until_alive(servo, timeout=20) + wait_until_alive(servo, net, timeout=20) assert servo.read(user_over_voltage_register) != new_user_over_voltage_value @pytest.mark.fsoe -def test_restore_safe_parameters(servo, environment): +def test_restore_safe_parameters( + servo, environment, net: Union["CanopenNetwork", "EthernetNetwork", "EthercatNetwork"] +) -> None: ss1_time_to_sto_register = "FSOE_SS1_TIME_TO_STO_1" # Change the value of a safe parameter initial_register_value = servo.read(ss1_time_to_sto_register) @@ -373,7 +383,7 @@ def test_restore_safe_parameters(servo, environment): # Power cycle the drive environment.power_cycle(wait_for_drives=False) # Wait until the drive recovers from the power cycle - wait_until_alive(servo, timeout=20) + wait_until_alive(servo, net, timeout=20) # Verify that the value is restored to the default value after power cycling assert servo.read(ss1_time_to_sto_register) != new_register_value