diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1a07cca6..973eea61 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -169,6 +169,11 @@ jobs: lcov --extract coverage.raw.info \ '*/ros2_medkit/src/*/src/*' \ '*/ros2_medkit/src/*/include/*' \ + --output-file coverage.extracted.info \ + --ignore-errors unused,empty + + lcov --remove coverage.extracted.info \ + '*/vendored/*' \ --output-file coverage.info \ --ignore-errors unused,empty diff --git a/codecov.yml b/codecov.yml index cd1114a7..dd593056 100644 --- a/codecov.yml +++ b/codecov.yml @@ -23,6 +23,7 @@ ignore: - "test/**/*" # Test files - "**/test_*.cpp" # Test source files - "**/demo_nodes/**" # Demo nodes (test fixtures) + - "**/vendored/**" # Third-party vendored code - "build/**/*" # Build artifacts - "install/**/*" # Install artifacts - "log/**/*" # Log files diff --git a/docs/design/index.rst b/docs/design/index.rst index 48611c28..168ca97c 100644 --- a/docs/design/index.rst +++ b/docs/design/index.rst @@ -10,5 +10,5 @@ This section contains design documentation for the ros2_medkit project packages. ros2_medkit_fault_manager/index ros2_medkit_fault_reporter/index ros2_medkit_gateway/index + ros2_medkit_integration_tests/index ros2_medkit_serialization/index - diff --git a/docs/design/ros2_medkit_integration_tests b/docs/design/ros2_medkit_integration_tests new file mode 120000 index 00000000..1dc72c69 --- /dev/null +++ b/docs/design/ros2_medkit_integration_tests @@ -0,0 +1 @@ +../../src/ros2_medkit_integration_tests/design \ No newline at end of file diff --git a/docs/getting_started.rst b/docs/getting_started.rst index 1c944ca4..56ee6d38 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -53,9 +53,10 @@ You should see: .. code-block:: bash - ros2 launch ros2_medkit_gateway demo_nodes.launch.py + ros2 launch ros2_medkit_integration_tests demo_nodes.launch.py -This launches automotive demo nodes that we'll use to explore the API. +This launches automotive demo nodes from the integration tests package that we'll +use to explore the API. .. list-table:: Demo Nodes Created by demo_nodes.launch.py :header-rows: 1 diff --git a/docs/introduction.rst b/docs/introduction.rst index 9021ce10..b54383ca 100644 --- a/docs/introduction.rst +++ b/docs/introduction.rst @@ -62,6 +62,9 @@ ros2_medkit consists of several ROS 2 packages: **ros2_medkit_msgs** Message and service definitions for fault management. +**ros2_medkit_integration_tests** + Integration tests, automotive-themed demo nodes, and shared test utilities. + Next Steps ---------- @@ -76,4 +79,3 @@ Community - 💬 **Discord**: `Join our server `_ - 🐛 **Issues**: `Report bugs `_ - 💡 **Discussions**: `GitHub Discussions `_ - diff --git a/docs/tutorials/devcontainer.rst b/docs/tutorials/devcontainer.rst index 96c1a91b..964202a8 100644 --- a/docs/tutorials/devcontainer.rst +++ b/docs/tutorials/devcontainer.rst @@ -132,8 +132,8 @@ Running Tests # All tests colcon test && colcon test-result --verbose - # Unit tests only - colcon test --ctest-args -E test_integration + # Unit tests only (exclude integration tests package) + colcon test --packages-skip ros2_medkit_integration_tests # Linters only colcon test --ctest-args -L linters diff --git a/postman/README.md b/postman/README.md index 04fd9561..fa55023b 100644 --- a/postman/README.md +++ b/postman/README.md @@ -74,7 +74,7 @@ All endpoints are prefixed with `/api/v1` for API versioning. ```bash # Terminal 1 - Demo Nodes (sensors, actuators, services, actions) -ros2 launch ros2_medkit_gateway demo_nodes.launch.py +ros2 launch ros2_medkit_integration_tests demo_nodes.launch.py # Terminal 2 - Fault Manager (required for Faults API and LIDAR Fault Workflow) ros2 run ros2_medkit_fault_manager fault_manager_node diff --git a/src/ros2_medkit_gateway/CMakeLists.txt b/src/ros2_medkit_gateway/CMakeLists.txt index 6a73df8e..17b8476e 100644 --- a/src/ros2_medkit_gateway/CMakeLists.txt +++ b/src/ros2_medkit_gateway/CMakeLists.txt @@ -33,8 +33,6 @@ find_package(yaml_cpp_vendor REQUIRED) medkit_find_yaml_cpp() find_package(ament_index_cpp REQUIRED) find_package(action_msgs REQUIRED) -find_package(rclcpp_action REQUIRED) -find_package(example_interfaces REQUIRED) find_package(ros2_medkit_msgs REQUIRED) find_package(ros2_medkit_serialization REQUIRED) @@ -391,190 +389,6 @@ if(BUILD_TESTING) endforeach() endif() - # Integration testing - find_package(ament_cmake_pytest REQUIRED) - find_package(launch_testing_ament_cmake REQUIRED) - - # Install test directory - install(DIRECTORY test - DESTINATION share/${PROJECT_NAME} - ) - - # Add integration tests - add_launch_test( - test/test_integration.test.py - TARGET test_integration - TIMEOUT 600 - ) - - # Add CORS integration tests - add_launch_test( - test/test_cors.test.py - TARGET test_cors - TIMEOUT 60 - ) - - # Add rate limiting integration tests - add_launch_test( - test/test_rate_limiting.test.py - TARGET test_rate_limiting - TIMEOUT 90 - ) - - # Add authentication integration tests - add_launch_test( - test/test_auth.test.py - TARGET test_auth - TIMEOUT 120 - ) - - # Add TLS/HTTPS integration tests - add_launch_test( - test/test_tls.test.py - TARGET test_tls - TIMEOUT 120 - ) - - # Add manifest-only discovery integration tests - add_launch_test( - test/test_discovery_manifest.test.py - TARGET test_discovery_manifest - TIMEOUT 120 - ) - - # Add hybrid discovery integration tests - add_launch_test( - test/test_discovery_hybrid.test.py - TARGET test_discovery_hybrid - TIMEOUT 120 - ) - - # Add heuristic apps discovery integration tests - add_launch_test( - test/test_discovery_heuristic_apps.test.py - TARGET test_discovery_heuristic_apps - TIMEOUT 120 - ) - - # Add bulk data upload/delete integration tests - add_launch_test( - test/test_bulkdata_upload.test.py - TARGET test_bulkdata_upload - TIMEOUT 120 - ) - - # Add cyclic subscriptions integration tests - add_launch_test( - test/test_cyclic_subscriptions.test.py - TARGET test_cyclic_subscriptions - TIMEOUT 120 - ) - - # Demo automotive nodes - add_executable(demo_engine_temp_sensor - test/demo_nodes/engine_temp_sensor.cpp - ) - ament_target_dependencies(demo_engine_temp_sensor - rclcpp - rcl_interfaces - sensor_msgs - ) - install(TARGETS demo_engine_temp_sensor - DESTINATION lib/${PROJECT_NAME} - ) - - add_executable(demo_rpm_sensor - test/demo_nodes/rpm_sensor.cpp - ) - ament_target_dependencies(demo_rpm_sensor - rclcpp - std_msgs - ) - install(TARGETS demo_rpm_sensor - DESTINATION lib/${PROJECT_NAME} - ) - - add_executable(demo_brake_pressure_sensor - test/demo_nodes/brake_pressure_sensor.cpp - ) - ament_target_dependencies(demo_brake_pressure_sensor - rclcpp - std_msgs - ) - install(TARGETS demo_brake_pressure_sensor - DESTINATION lib/${PROJECT_NAME} - ) - - add_executable(demo_door_status_sensor - test/demo_nodes/door_status_sensor.cpp - ) - ament_target_dependencies(demo_door_status_sensor - rclcpp - std_msgs - ) - install(TARGETS demo_door_status_sensor - DESTINATION lib/${PROJECT_NAME} - ) - - # demo actuators - add_executable(demo_brake_actuator - test/demo_nodes/brake_actuator.cpp - ) - ament_target_dependencies(demo_brake_actuator - rclcpp - std_msgs - ) - install(TARGETS demo_brake_actuator - DESTINATION lib/${PROJECT_NAME} - ) - - add_executable(demo_light_controller - test/demo_nodes/light_controller.cpp - ) - ament_target_dependencies(demo_light_controller - rclcpp - std_msgs - ) - install(TARGETS demo_light_controller - DESTINATION lib/${PROJECT_NAME} - ) - - add_executable(demo_calibration_service - test/demo_nodes/calibration_service.cpp - ) - ament_target_dependencies(demo_calibration_service - rclcpp - std_srvs - ) - install(TARGETS demo_calibration_service - DESTINATION lib/${PROJECT_NAME} - ) - - add_executable(demo_long_calibration_action - test/demo_nodes/long_calibration_action.cpp - ) - ament_target_dependencies(demo_long_calibration_action - rclcpp - rclcpp_action - example_interfaces - ) - install(TARGETS demo_long_calibration_action - DESTINATION lib/${PROJECT_NAME} - ) - - add_executable(demo_lidar_sensor - test/demo_nodes/lidar_sensor.cpp - ) - ament_target_dependencies(demo_lidar_sensor - rclcpp - rcl_interfaces - sensor_msgs - std_srvs - ros2_medkit_msgs - ) - install(TARGETS demo_lidar_sensor - DESTINATION lib/${PROJECT_NAME} - ) endif() ament_package() diff --git a/src/ros2_medkit_gateway/README.md b/src/ros2_medkit_gateway/README.md index f16e466a..d9c61e27 100644 --- a/src/ros2_medkit_gateway/README.md +++ b/src/ros2_medkit_gateway/README.md @@ -997,7 +997,7 @@ ros2 launch ros2_medkit_gateway gateway_https.launch.py **Start demo nodes:** ```bash -ros2 launch ros2_medkit_gateway demo_nodes.launch.py +ros2 launch ros2_medkit_integration_tests demo_nodes.launch.py ``` **Test the API:** @@ -1340,7 +1340,7 @@ The package includes demo automotive nodes for testing: **Launch all demo nodes:** ```bash -ros2 launch ros2_medkit_gateway demo_nodes.launch.py +ros2 launch ros2_medkit_integration_tests demo_nodes.launch.py ``` ## URL Encoding @@ -1368,7 +1368,7 @@ We provide a Postman collection for easy API testing: 2. **Import environment:** `postman/environments/local.postman_environment.json` 3. **Activate environment:** Select "ROS 2 Medkit Gateway - Local" in Postman 4. **Start gateway:** `ros2 launch ros2_medkit_gateway gateway.launch.py` -5. **Start demo nodes:** `ros2 launch ros2_medkit_gateway demo_nodes.launch.py` +5. **Start demo nodes:** `ros2 launch ros2_medkit_integration_tests demo_nodes.launch.py` 6. **Test:** Send requests from Postman! See [postman/README.md](postman/README.md) for detailed instructions. diff --git a/src/ros2_medkit_gateway/config/examples/demo_nodes_manifest.yaml b/src/ros2_medkit_gateway/config/examples/demo_nodes_manifest.yaml index c9a6a0b2..32c2f3b0 100644 --- a/src/ros2_medkit_gateway/config/examples/demo_nodes_manifest.yaml +++ b/src/ros2_medkit_gateway/config/examples/demo_nodes_manifest.yaml @@ -3,7 +3,7 @@ # This manifest describes the demo nodes used for ros2_medkit gateway # integration testing. It models an automotive system with subsystems. # -# Use with: ros2 launch ros2_medkit_gateway demo_nodes.launch.py +# Use with: ros2 launch ros2_medkit_integration_tests demo_nodes.launch.py manifest_version: "1.0" diff --git a/src/ros2_medkit_gateway/package.xml b/src/ros2_medkit_gateway/package.xml index 8388faf0..a3ad907c 100644 --- a/src/ros2_medkit_gateway/package.xml +++ b/src/ros2_medkit_gateway/package.xml @@ -35,12 +35,6 @@ ament_cmake_clang_format ament_cmake_clang_tidy ament_cmake_gtest - ament_cmake_pytest - launch_testing_ament_cmake - python3-requests - rclcpp_action - example_interfaces - ros2_medkit_fault_manager ament_cmake diff --git a/src/ros2_medkit_gateway/test/test_discovery_hybrid.test.py b/src/ros2_medkit_gateway/test/test_discovery_hybrid.test.py deleted file mode 100644 index c127322e..00000000 --- a/src/ros2_medkit_gateway/test/test_discovery_hybrid.test.py +++ /dev/null @@ -1,716 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2026 bburda -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Integration tests for hybrid discovery mode. - -This test file validates discovery endpoints when the gateway is configured -with discovery_mode: hybrid, combining manifest definitions with runtime -ROS 2 graph discovery. - -Tests verify: -- Areas from manifest are present -- Components from manifest are present -- Apps from manifest are enriched with runtime data (topics, services) -- Runtime-discovered nodes are linked to manifest apps -- Functions aggregate data from their hosting apps -- Orphan nodes (not in manifest) are handled according to config -""" - -import os -import time -import unittest - -from ament_index_python.packages import get_package_share_directory -from launch import LaunchDescription -from launch.actions import TimerAction -import launch_ros.actions -import launch_testing.actions -import requests - - -def get_coverage_env(): - """Get environment variables for gcov coverage data collection.""" - try: - from ament_index_python.packages import get_package_prefix - pkg_prefix = get_package_prefix('ros2_medkit_gateway') - workspace = os.path.dirname(os.path.dirname(pkg_prefix)) - build_dir = os.path.join(workspace, 'build', 'ros2_medkit_gateway') - if os.path.exists(build_dir): - return { - 'GCOV_PREFIX': build_dir, - 'GCOV_PREFIX_STRIP': str(build_dir.count(os.sep)), - } - except Exception: - # Coverage env is optional - gracefully continue without coverage settings - pass - return {} - - -def generate_test_description(): - """Generate launch description with gateway in hybrid discovery mode.""" - pkg_share = get_package_share_directory('ros2_medkit_gateway') - manifest_path = os.path.join( - pkg_share, 'config', 'examples', 'demo_nodes_manifest.yaml' - ) - - coverage_env = get_coverage_env() - - # Gateway node with hybrid discovery mode - gateway_node = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='gateway_node', - name='ros2_medkit_gateway', - output='screen', - parameters=[{ - 'discovery_mode': 'hybrid', - 'manifest_path': manifest_path, - 'manifest_strict_validation': False, # Allow warnings about subarea references - # Allow orphan nodes to be discovered (warn, not fail) - 'unmanifested_nodes': 'warn', - }], - additional_env=coverage_env, - ) - - # Launch demo nodes matching the manifest - demo_nodes = [ - # Powertrain/Engine - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_engine_temp_sensor', - name='temp_sensor', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ), - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_rpm_sensor', - name='rpm_sensor', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ), - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_calibration_service', - name='calibration', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ), - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_long_calibration_action', - name='long_calibration', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ), - # Chassis/Brakes - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_brake_pressure_sensor', - name='pressure_sensor', - namespace='/chassis/brakes', - output='screen', - additional_env=coverage_env, - ), - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_brake_actuator', - name='actuator', - namespace='/chassis/brakes', - output='screen', - additional_env=coverage_env, - ), - # Body - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_door_status_sensor', - name='status_sensor', - namespace='/body/door/front_left', - output='screen', - additional_env=coverage_env, - ), - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_light_controller', - name='controller', - namespace='/body/lights', - output='screen', - additional_env=coverage_env, - ), - # Perception - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_lidar_sensor', - name='lidar_sensor', - namespace='/perception/lidar', - output='screen', - additional_env=coverage_env, - ), - ] - - delayed_nodes = TimerAction( - period=2.0, - actions=demo_nodes, - ) - - return ( - LaunchDescription([ - gateway_node, - delayed_nodes, - launch_testing.actions.ReadyToTest(), - ]), - {'gateway_node': gateway_node}, - ) - - -API_BASE_PATH = '/api/v1' - - -class TestDiscoveryHybridMode(unittest.TestCase): - """Integration tests for hybrid discovery mode.""" - - BASE_URL = f'http://localhost:8080{API_BASE_PATH}' - MAX_DISCOVERY_WAIT = 60.0 - MIN_EXPECTED_APPS_ONLINE = 5 - - @classmethod - def setUpClass(cls): - """Wait for gateway and discovery to complete.""" - # Wait for gateway health - for i in range(30): - try: - response = requests.get(f'{cls.BASE_URL}/health', timeout=2) - if response.status_code == 200: - break - except requests.exceptions.RequestException: - # Gateway not ready yet, retry after sleep - pass - time.sleep(1) - else: - raise unittest.SkipTest('Gateway not responding') - - # Wait for apps to come online (runtime linking) - start_time = time.time() - while time.time() - start_time < cls.MAX_DISCOVERY_WAIT: - try: - response = requests.get(f'{cls.BASE_URL}/apps', timeout=5) - if response.status_code == 200: - data = response.json() - online_count = sum( - 1 for a in data['items'] if a.get('is_online', False) - ) - if online_count >= cls.MIN_EXPECTED_APPS_ONLINE: - print(f'✓ Hybrid discovery: {online_count} apps online') - return - print(f' Waiting for apps: {online_count}/{cls.MIN_EXPECTED_APPS_ONLINE}...') - except requests.exceptions.RequestException: - # Apps endpoint not ready yet, retry after sleep - pass - time.sleep(2) - - print('⚠ Warning: Not all expected apps came online') - - # ========================================================================= - # Areas - Manifest + Runtime - # ========================================================================= - - def test_areas_from_manifest(self): - """ - Test areas are loaded from manifest in hybrid mode. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/areas', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - area_ids = [a['id'] for a in data['items']] - - # Manifest-defined areas should be present - self.assertIn('powertrain', area_ids) - self.assertIn('chassis', area_ids) - self.assertIn('body', area_ids) - self.assertIn('perception', area_ids) - self.assertIn('engine', area_ids) - - def test_area_with_description(self): - """ - Test area descriptions from manifest are preserved. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/areas/powertrain', timeout=5) - self.assertEqual(response.status_code, 200) - - area = response.json() - self.assertEqual(area['id'], 'powertrain') - # Description should come from manifest - if 'description' in area: - self.assertIn('Engine', area['description']) - - def test_area_subareas_hierarchy(self): - """ - Test subarea relationships from manifest. - - @verifies REQ_INTEROP_004 - """ - response = requests.get(f'{self.BASE_URL}/areas/body/subareas', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - subarea_ids = [s['id'] for s in data['items']] - - # Body has subareas: door, lights - self.assertIn('door', subarea_ids) - self.assertIn('lights', subarea_ids) - - def test_nested_subareas(self): - """ - Test deeply nested subareas (door -> front-left-door). - - @verifies REQ_INTEROP_004 - """ - response = requests.get(f'{self.BASE_URL}/areas/door/subareas', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - subarea_ids = [s['id'] for s in data['items']] - self.assertIn('front-left-door', subarea_ids) - - # ========================================================================= - # Components - Manifest Definitions - # ========================================================================= - - def test_components_from_manifest(self): - """ - Test components are loaded from manifest. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/components', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - component_ids = [c['id'] for c in data['items']] - - # Hardware components from manifest - self.assertIn('engine-ecu', component_ids) - self.assertIn('temp-sensor-hw', component_ids) - self.assertIn('brake-ecu', component_ids) - self.assertIn('lidar-unit', component_ids) - - def test_component_type_preserved(self): - """ - Test component type from manifest is preserved. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/components/engine-ecu', timeout=5) - self.assertEqual(response.status_code, 200) - - component = response.json() - self.assertEqual(component['x-medkit']['type'], 'controller') - - def test_component_area_relationship(self): - """ - Test component is associated with correct area. - - @verifies REQ_INTEROP_006 - """ - response = requests.get(f'{self.BASE_URL}/areas/engine/components', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - component_ids = [c['id'] for c in data['items']] - - # Engine area should have these components - self.assertIn('engine-ecu', component_ids) - self.assertIn('temp-sensor-hw', component_ids) - self.assertIn('rpm-sensor-hw', component_ids) - - def test_hybrid_component_subcomponents(self): - """ - Test GET /components/{id}/subcomponents returns subcomponents in hybrid mode. - - @verifies REQ_INTEROP_005 - """ - # Test subcomponents endpoint for a component - # Returns empty list if no subcomponents defined, but endpoint works - response = requests.get(f'{self.BASE_URL}/components/engine-ecu/subcomponents', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - # Subcomponents may be empty but format should be correct - self.assertIsInstance(data['items'], list) - - def test_hybrid_component_subcomponents_not_found(self): - """ - Test GET /components/{id}/subcomponents returns 404 for unknown component in hybrid mode. - - @verifies REQ_INTEROP_005 - """ - response = requests.get(f'{self.BASE_URL}/components/nonexistent/subcomponents', timeout=5) - self.assertEqual(response.status_code, 404) - - def test_component_depends_on_returns_items(self): - """ - Test GET /components/{id}/depends-on returns dependency references. - - @verifies REQ_INTEROP_008 - """ - response = requests.get(f'{self.BASE_URL}/components/engine-ecu/depends-on', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - - # engine-ecu depends on temp-sensor-hw and rpm-sensor-hw - dep_ids = [d['id'] for d in data['items']] - self.assertIn('temp-sensor-hw', dep_ids) - self.assertIn('rpm-sensor-hw', dep_ids) - - # Each item should have href link - for item in data['items']: - self.assertIn('href', item) - self.assertTrue(item['href'].startswith('/api/v1/components/')) - - def test_component_depends_on_empty(self): - """ - Test GET /components/{id}/depends-on returns empty list for component without deps. - - @verifies REQ_INTEROP_008 - """ - # temp-sensor-hw has no dependencies - response = requests.get(f'{self.BASE_URL}/components/temp-sensor-hw/depends-on', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - self.assertEqual(len(data['items']), 0) - - def test_component_depends_on_not_found(self): - """ - Test GET /components/{id}/depends-on returns 404 for unknown component. - - @verifies REQ_INTEROP_008 - """ - response = requests.get(f'{self.BASE_URL}/components/nonexistent/depends-on', timeout=5) - self.assertEqual(response.status_code, 404) - - def test_component_capabilities_includes_depends_on_link(self): - """ - Test component with dependencies has depends-on in capabilities. - - @verifies REQ_INTEROP_008 - """ - response = requests.get(f'{self.BASE_URL}/components/engine-ecu', timeout=5) - self.assertEqual(response.status_code, 200) - - component = response.json() - # capabilities is in x-medkit extension - self.assertIn('x-medkit', component) - self.assertIn('capabilities', component['x-medkit']) - - # Should have depends-on capability - cap_hrefs = [c.get('href', '') for c in component['x-medkit']['capabilities']] - self.assertTrue( - any('/depends-on' in href for href in cap_hrefs), - f'Expected depends-on capability in: {cap_hrefs}' - ) - - # ========================================================================= - # Apps - Manifest + Runtime Linking - # ========================================================================= - - def test_apps_from_manifest(self): - """ - Test apps are loaded from manifest. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/apps', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - app_ids = [a['id'] for a in data['items']] - - # All manifest apps should be present - expected_apps = [ - 'engine-temp-sensor', - 'engine-rpm-sensor', - 'engine-calibration-service', - 'engine-long-calibration', - 'brake-pressure-sensor', - 'brake-actuator', - 'door-status-sensor', - 'light-controller', - 'lidar-sensor', - ] - - for app_id in expected_apps: - self.assertIn(app_id, app_ids, f'Missing app: {app_id}') - - def test_app_online_with_runtime_node(self): - """ - Test apps linked to running nodes have is_online=true. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/apps', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - apps_by_id = {a['id']: a for a in data['items']} - - # is_online is in x-medkit extension - online_apps = [ - app_id for app_id, app in apps_by_id.items() - if app.get('x-medkit', {}).get('is_online', False) - ] - - self.assertGreater( - len(online_apps), 0, - 'No apps are online - runtime linking may have failed' - ) - - def test_app_has_runtime_topics(self): - """Test online app has topics from runtime discovery.""" - # Wait a bit for runtime linking - time.sleep(3) - - response = requests.get( - f'{self.BASE_URL}/apps/engine-temp-sensor/data', timeout=5 - ) - - # This test verifies runtime linking when it succeeds. - # Skip assertion if no topics found - runtime linking may take longer. - if response.status_code == 200: - data = response.json() - # Should have temperature topic if runtime linking worked - if 'items' in data and data['items']: - topic_names = [t.get('name', '') for t in data['items']] - # Demo node publishes 'temperature' - self.assertTrue( - any('temperature' in name for name in topic_names), - f'Expected temperature topic, got: {topic_names}' - ) - - def test_app_has_runtime_service(self): - """Test app with service has it discovered at runtime.""" - response = requests.get( - f'{self.BASE_URL}/apps/engine-calibration-service/operations', timeout=5 - ) - - # This test verifies runtime linking when it succeeds. - # Skip assertion if no operations found - runtime linking may take longer. - if response.status_code == 200: - data = response.json() - if 'items' in data and data['items']: - op_names = [o.get('name', '') for o in data['items']] - # Demo node provides 'calibrate' service - self.assertTrue( - any('calibrate' in name for name in op_names), - f'Expected calibrate service, got: {op_names}' - ) - - def test_app_component_relationship(self): - """Test app is_located_on links to correct component.""" - response = requests.get(f'{self.BASE_URL}/apps/engine-temp-sensor', timeout=5) - self.assertEqual(response.status_code, 200) - - app = response.json() - # Should be located on temp-sensor-hw via HATEOAS link - self.assertIn('_links', app, 'App response should contain _links') - self.assertIn( - 'is-located-on', app['_links'], - 'App should have is-located-on link when component is specified' - ) - self.assertEqual( - app['_links']['is-located-on'], - '/api/v1/components/temp-sensor-hw' - ) - - def test_app_depends_on_relationship(self): - """ - Test app depends_on creates dependency link. - - @verifies REQ_INTEROP_009 - """ - response = requests.get(f'{self.BASE_URL}/apps/engine-long-calibration', timeout=5) - self.assertEqual(response.status_code, 200) - - app = response.json() - # Should depend on engine-calibration-service - if 'depends_on' in app: - self.assertIn('engine-calibration-service', app['depends_on']) - - # ========================================================================= - # Functions - Aggregation from Hosts - # ========================================================================= - - def test_functions_from_manifest(self): - """ - Test functions are loaded from manifest. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/functions', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - function_ids = [f['id'] for f in data['items']] - - expected_functions = [ - 'engine-monitoring', - 'engine-calibration', - 'brake-system', - 'body-electronics', - 'perception-system', - ] - - for func_id in expected_functions: - self.assertIn(func_id, function_ids, f'Missing function: {func_id}') - - def test_function_hosts_relationship(self): - """ - Test function hosts are correctly linked. - - @verifies REQ_INTEROP_007 - """ - response = requests.get( - f'{self.BASE_URL}/functions/engine-monitoring/hosts', timeout=5 - ) - self.assertEqual(response.status_code, 200) - - data = response.json() - host_ids = [h['id'] for h in data['items']] - - # engine-monitoring hosted by temp-sensor and rpm-sensor - self.assertIn('engine-temp-sensor', host_ids) - self.assertIn('engine-rpm-sensor', host_ids) - - def test_function_aggregates_host_data(self): - """Test function /data aggregates topics from all hosts.""" - response = requests.get( - f'{self.BASE_URL}/functions/engine-monitoring/data', timeout=5 - ) - - if response.status_code == 200: - data = response.json() - if 'items' in data: - # Should have topics from both temp_sensor and rpm_sensor - topic_names = [t.get('name', '') for t in data['items']] - # At minimum should have some data - self.assertIsInstance(topic_names, list) - - def test_function_aggregates_host_operations(self): - """Test function /operations aggregates services from all hosts.""" - response = requests.get( - f'{self.BASE_URL}/functions/engine-calibration/operations', timeout=5 - ) - - if response.status_code == 200: - data = response.json() - if 'items' in data: - # Should have calibrate service and long_calibration action - op_names = [o.get('name', '') for o in data['items']] - self.assertIsInstance(op_names, list) - - def test_function_with_tags(self): - """ - Test function tags from manifest are preserved. - - @verifies REQ_INTEROP_011 - """ - response = requests.get(f'{self.BASE_URL}/functions/brake-system', timeout=5) - self.assertEqual(response.status_code, 200) - - func = response.json() - if 'tags' in func: - self.assertIn('safety-critical', func['tags']) - - # ========================================================================= - # Hybrid-Specific Behavior - # ========================================================================= - - def test_runtime_enriches_manifest_data(self): - """Test runtime discovery adds data to manifest entities.""" - # Get an app that's online - response = requests.get(f'{self.BASE_URL}/apps/lidar-sensor', timeout=5) - self.assertEqual(response.status_code, 200) - - app = response.json() - - # Manifest data should be present - self.assertEqual(app['name'], 'LiDAR Sensor') - - # Tags from manifest - if 'tags' in app: - self.assertIn('fault-reporter', app['tags']) - - def test_capabilities_include_runtime_resources(self): - """Test capabilities reflect runtime-discovered resources.""" - response = requests.get(f'{self.BASE_URL}/apps/engine-temp-sensor', timeout=5) - self.assertEqual(response.status_code, 200) - - app = response.json() - self.assertIn('capabilities', app) - - # Should have data capability (topics discovered at runtime) - cap_hrefs = [c.get('href', '') for c in app['capabilities']] - self.assertTrue( - any('/data' in href for href in cap_hrefs), - 'Expected data capability' - ) - - # ========================================================================= - # Error Handling - # ========================================================================= - - def test_nonexistent_area(self): - """Test 404 for non-existent area.""" - response = requests.get(f'{self.BASE_URL}/areas/nonexistent', timeout=5) - self.assertEqual(response.status_code, 404) - - def test_nonexistent_component(self): - """Test 404 for non-existent component.""" - response = requests.get(f'{self.BASE_URL}/components/nonexistent', timeout=5) - self.assertEqual(response.status_code, 404) - - def test_nonexistent_app(self): - """Test 404 for non-existent app.""" - response = requests.get(f'{self.BASE_URL}/apps/nonexistent', timeout=5) - self.assertEqual(response.status_code, 404) - - def test_nonexistent_function(self): - """Test 404 for non-existent function.""" - response = requests.get(f'{self.BASE_URL}/functions/nonexistent', timeout=5) - self.assertEqual(response.status_code, 404) - - -@launch_testing.post_shutdown_test() -class TestDiscoveryHybridModeShutdown(unittest.TestCase): - """Post-shutdown tests.""" - - def test_exit_code(self, proc_info): - """Check gateway exited cleanly.""" - launch_testing.asserts.assertExitCodes(proc_info) diff --git a/src/ros2_medkit_gateway/test/test_discovery_manifest.test.py b/src/ros2_medkit_gateway/test/test_discovery_manifest.test.py deleted file mode 100644 index f1d5c3db..00000000 --- a/src/ros2_medkit_gateway/test/test_discovery_manifest.test.py +++ /dev/null @@ -1,559 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2026 bburda -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Integration tests for manifest-only discovery mode. - -This test file validates discovery endpoints when the gateway is configured -with discovery_mode: manifest_only using demo_nodes_manifest.yaml. - -Tests verify: -- Areas are loaded from manifest (not runtime discovery) -- Components are loaded from manifest -- Apps are loaded from manifest with correct ros_binding -- Functions are loaded from manifest with hosted_by relationships -- Subareas and related-components relationships work -- Entity details and capabilities are correct -""" - -import os -import time -import unittest - -from ament_index_python.packages import get_package_share_directory -from launch import LaunchDescription -from launch.actions import TimerAction -import launch_ros.actions -import launch_testing.actions -import requests - - -def get_coverage_env(): - """Get environment variables for gcov coverage data collection.""" - try: - from ament_index_python.packages import get_package_prefix - pkg_prefix = get_package_prefix('ros2_medkit_gateway') - workspace = os.path.dirname(os.path.dirname(pkg_prefix)) - build_dir = os.path.join(workspace, 'build', 'ros2_medkit_gateway') - if os.path.exists(build_dir): - return { - 'GCOV_PREFIX': build_dir, - 'GCOV_PREFIX_STRIP': str(build_dir.count(os.sep)), - } - except Exception: - # Coverage env is optional - gracefully continue without coverage settings - pass - return {} - - -def generate_test_description(): - """Generate launch description with gateway in manifest_only mode.""" - pkg_share = get_package_share_directory('ros2_medkit_gateway') - manifest_path = os.path.join( - pkg_share, 'config', 'examples', 'demo_nodes_manifest.yaml' - ) - - coverage_env = get_coverage_env() - - # Gateway node with manifest_only discovery mode - gateway_node = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='gateway_node', - name='ros2_medkit_gateway', - output='screen', - parameters=[{ - 'discovery_mode': 'manifest_only', - 'manifest_path': manifest_path, - 'manifest_strict_validation': False, # Allow warnings about subarea references - }], - additional_env=coverage_env, - ) - - # Launch demo nodes to verify apps become online - demo_nodes = [ - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_engine_temp_sensor', - name='temp_sensor', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ), - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_rpm_sensor', - name='rpm_sensor', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ), - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_brake_pressure_sensor', - name='pressure_sensor', - namespace='/chassis/brakes', - output='screen', - additional_env=coverage_env, - ), - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_calibration_service', - name='calibration', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ), - launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_lidar_sensor', - name='lidar_sensor', - namespace='/perception/lidar', - output='screen', - additional_env=coverage_env, - ), - ] - - delayed_nodes = TimerAction( - period=2.0, - actions=demo_nodes, - ) - - return ( - LaunchDescription([ - gateway_node, - delayed_nodes, - launch_testing.actions.ReadyToTest(), - ]), - {'gateway_node': gateway_node}, - ) - - -API_BASE_PATH = '/api/v1' - - -class TestDiscoveryManifestMode(unittest.TestCase): - """Integration tests for manifest-only discovery mode.""" - - BASE_URL = f'http://localhost:8080{API_BASE_PATH}' - MAX_WAIT = 30.0 - - @classmethod - def setUpClass(cls): - """Wait for gateway to be ready.""" - for i in range(30): - try: - response = requests.get(f'{cls.BASE_URL}/health', timeout=2) - if response.status_code == 200: - # Give time for manifest to be loaded - time.sleep(2) - return - except requests.exceptions.RequestException: - # Gateway not ready yet, retry after sleep - pass - time.sleep(1) - raise unittest.SkipTest('Gateway not responding') - - # ========================================================================= - # Areas Endpoints - # ========================================================================= - - def test_list_areas(self): - """ - Test GET /areas returns all manifest-defined areas. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/areas', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - self.assertIn('total_count', data.get('x-medkit', {})) - - # Manifest defines: powertrain, chassis, body, perception (top-level) - # Plus subareas: engine, brakes, door, front-left-door, lights, lidar - area_ids = [a['id'] for a in data['items']] - - # Check top-level areas - self.assertIn('powertrain', area_ids) - self.assertIn('chassis', area_ids) - self.assertIn('body', area_ids) - self.assertIn('perception', area_ids) - - # Check subareas - self.assertIn('engine', area_ids) - self.assertIn('brakes', area_ids) - self.assertIn('lidar', area_ids) - - def test_get_area_details(self): - """ - Test GET /areas/{id} returns area with capabilities. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/areas/powertrain', timeout=5) - self.assertEqual(response.status_code, 200) - - area = response.json() - self.assertEqual(area['id'], 'powertrain') - self.assertEqual(area['name'], 'Powertrain') - self.assertIn('capabilities', area) - self.assertIn('_links', area) - - def test_get_area_not_found(self): - """Test GET /areas/{id} returns 404 for unknown area.""" - response = requests.get(f'{self.BASE_URL}/areas/nonexistent', timeout=5) - self.assertEqual(response.status_code, 404) - - def test_area_subareas(self): - """ - Test GET /areas/{id}/subareas returns nested areas. - - @verifies REQ_INTEROP_004 - """ - response = requests.get(f'{self.BASE_URL}/areas/powertrain/subareas', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - subarea_ids = [s['id'] for s in data['items']] - self.assertIn('engine', subarea_ids) - - def test_area_components(self): - """ - Test GET /areas/{id}/components returns components in area. - - @verifies REQ_INTEROP_006 - """ - response = requests.get(f'{self.BASE_URL}/areas/engine/components', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - - # ========================================================================= - # Components Endpoints - # ========================================================================= - - def test_list_components(self): - """ - Test GET /components returns all manifest-defined components. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/components', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - self.assertIn('total_count', data.get('x-medkit', {})) - - component_ids = [c['id'] for c in data['items']] - - # Check manifest-defined components - self.assertIn('engine-ecu', component_ids) - self.assertIn('temp-sensor-hw', component_ids) - self.assertIn('brake-ecu', component_ids) - self.assertIn('lidar-unit', component_ids) - - def test_get_component_details(self): - """ - Test GET /components/{id} returns component with capabilities. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/components/engine-ecu', timeout=5) - self.assertEqual(response.status_code, 200) - - component = response.json() - self.assertEqual(component['id'], 'engine-ecu') - self.assertEqual(component['name'], 'Engine ECU') - # Capabilities is in x-medkit extension - self.assertIn('x-medkit', component) - self.assertIn('capabilities', component['x-medkit']) - self.assertIn('_links', component) - - def test_get_component_not_found(self): - """Test GET /components/{id} returns 404 for unknown component.""" - response = requests.get(f'{self.BASE_URL}/components/nonexistent', timeout=5) - self.assertEqual(response.status_code, 404) - - def test_component_subcomponents(self): - """ - Test GET /components/{id}/subcomponents returns subcomponents. - - @verifies REQ_INTEROP_005 - """ - # Test subcomponents endpoint for a component - # Returns empty list if no subcomponents defined, but endpoint works - response = requests.get(f'{self.BASE_URL}/components/engine-ecu/subcomponents', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - # Subcomponents may be empty but format should be correct - self.assertIsInstance(data['items'], list) - - def test_component_subcomponents_not_found(self): - """ - Test GET /components/{id}/subcomponents returns 404 for unknown component. - - @verifies REQ_INTEROP_005 - """ - response = requests.get(f'{self.BASE_URL}/components/nonexistent/subcomponents', timeout=5) - self.assertEqual(response.status_code, 404) - - # ========================================================================= - # Apps Endpoints - # ========================================================================= - - def test_list_apps(self): - """ - Test GET /apps returns all manifest-defined apps. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/apps', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - self.assertIn('total_count', data.get('x-medkit', {})) - - app_ids = [a['id'] for a in data['items']] - - # Check manifest-defined apps - self.assertIn('engine-temp-sensor', app_ids) - self.assertIn('engine-rpm-sensor', app_ids) - self.assertIn('brake-pressure-sensor', app_ids) - self.assertIn('lidar-sensor', app_ids) - - def test_get_app_details(self): - """ - Test GET /apps/{id} returns app with capabilities. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/apps/engine-temp-sensor', timeout=5) - self.assertEqual(response.status_code, 200) - - app = response.json() - self.assertEqual(app['id'], 'engine-temp-sensor') - self.assertEqual(app['name'], 'Engine Temperature Sensor') - self.assertIn('capabilities', app) - self.assertIn('_links', app) - - def test_get_app_not_found(self): - """Test GET /apps/{id} returns 404 for unknown app.""" - response = requests.get(f'{self.BASE_URL}/apps/nonexistent', timeout=5) - self.assertEqual(response.status_code, 404) - - def test_app_online_status(self): - """Test that apps with running nodes have is_online=true.""" - # Wait for nodes to be discovered - time.sleep(5) - - response = requests.get(f'{self.BASE_URL}/apps', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - apps_by_id = {a['id']: a for a in data['items']} - - # engine-temp-sensor should be online (demo node running) - if 'engine-temp-sensor' in apps_by_id: - # May or may not be online depending on timing - pass # Just verify it exists - - def test_app_data_endpoint(self): - """Test GET /apps/{id}/data returns topic list.""" - response = requests.get(f'{self.BASE_URL}/apps/engine-temp-sensor/data', timeout=5) - # App is defined in manifest, should always be found - self.assertEqual(response.status_code, 200) - - def test_app_operations_endpoint(self): - """Test GET /apps/{id}/operations returns services/actions.""" - response = requests.get( - f'{self.BASE_URL}/apps/engine-calibration-service/operations', timeout=5 - ) - # App is defined in manifest, should always be found - self.assertEqual(response.status_code, 200) - - def test_app_configurations_endpoint(self): - """ - Test GET /apps/{id}/configurations in manifest-only mode. - - App is defined in manifest with bound_fqn. Configurations are retrieved - from the ROS 2 parameter service on the node. When the node is running, - returns 200 with the list of parameters. - - @verifies REQ_INTEROP_003 - """ - response = requests.get( - f'{self.BASE_URL}/apps/lidar-sensor/configurations', timeout=5 - ) - # App has bound_fqn and node is running - returns configurations - self.assertEqual(response.status_code, 200) - data = response.json() - self.assertIn('items', data) - # x-medkit extension should include aggregation info - self.assertIn('x-medkit', data) - - def test_app_data_item_endpoint(self): - """ - Test GET /apps/{id}/data/{data_id} returns sampled topic data. - - @verifies REQ_INTEROP_003 - """ - # First get the list of data items for the app - response = requests.get(f'{self.BASE_URL}/apps/engine-temp-sensor/data', timeout=5) - if response.status_code != 200: - self.skipTest('App data endpoint not available') - - data = response.json() - if not data.get('items'): - self.skipTest('No data items for app') - - # Get the first data item - data_id = data['items'][0]['id'] - response = requests.get( - f'{self.BASE_URL}/apps/engine-temp-sensor/data/{data_id}', timeout=5 - ) - # Data item exists since we just got it from the list - self.assertEqual(response.status_code, 200) - - if response.status_code == 200: - item = response.json() - self.assertIn('id', item) - self.assertIn('direction', item) - - # ========================================================================= - # Functions Endpoints - # ========================================================================= - - def test_list_functions(self): - """ - Test GET /functions returns all manifest-defined functions. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/functions', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - self.assertIn('total_count', data.get('x-medkit', {})) - - function_ids = [f['id'] for f in data['items']] - - # Check manifest-defined functions - self.assertIn('engine-monitoring', function_ids) - self.assertIn('engine-calibration', function_ids) - self.assertIn('brake-system', function_ids) - self.assertIn('body-electronics', function_ids) - self.assertIn('perception-system', function_ids) - - def test_get_function_details(self): - """ - Test GET /functions/{id} returns function with capabilities. - - @verifies REQ_INTEROP_003 - """ - response = requests.get(f'{self.BASE_URL}/functions/engine-monitoring', timeout=5) - self.assertEqual(response.status_code, 200) - - func = response.json() - self.assertEqual(func['id'], 'engine-monitoring') - self.assertEqual(func['name'], 'Engine Monitoring') - self.assertIn('capabilities', func) - self.assertIn('_links', func) - - def test_get_function_not_found(self): - """Test GET /functions/{id} returns 404 for unknown function.""" - response = requests.get(f'{self.BASE_URL}/functions/nonexistent', timeout=5) - self.assertEqual(response.status_code, 404) - - def test_function_hosts(self): - """ - Test GET /functions/{id}/hosts returns hosting apps. - - @verifies REQ_INTEROP_007 - """ - response = requests.get(f'{self.BASE_URL}/functions/engine-monitoring/hosts', timeout=5) - self.assertEqual(response.status_code, 200) - - data = response.json() - self.assertIn('items', data) - host_ids = [h['id'] for h in data['items']] - - # engine-monitoring is hosted by engine-temp-sensor and engine-rpm-sensor - self.assertIn('engine-temp-sensor', host_ids) - self.assertIn('engine-rpm-sensor', host_ids) - - def test_function_data(self): - """Test GET /functions/{id}/data aggregates data from hosts.""" - response = requests.get(f'{self.BASE_URL}/functions/engine-monitoring/data', timeout=5) - # Function is defined in manifest, should always be found - self.assertEqual(response.status_code, 200) - - def test_function_operations(self): - """Test GET /functions/{id}/operations aggregates operations from hosts.""" - response = requests.get( - f'{self.BASE_URL}/functions/engine-calibration/operations', timeout=5 - ) - # Function is defined in manifest, should always be found - self.assertEqual(response.status_code, 200) - - # ========================================================================= - # Discovery Statistics - # ========================================================================= - - def test_discovery_stats(self): - """Test GET /discovery/stats returns manifest mode info.""" - response = requests.get(f'{self.BASE_URL}/discovery/stats', timeout=5) - if response.status_code == 200: - stats = response.json() - # Should indicate manifest_only mode - if 'mode' in stats: - self.assertEqual(stats['mode'], 'manifest_only') - - # ========================================================================= - # Error Cases - # ========================================================================= - - def test_invalid_area_id(self): - """ - Test GET /areas/{id} with invalid ID format returns 400. - - Entity IDs only allow alphanumeric, underscore, and hyphen characters. - Dots are not allowed, so 'invalid..id' is rejected with 400 Bad Request. - """ - response = requests.get(f'{self.BASE_URL}/areas/invalid..id', timeout=5) - # Invalid ID format (contains '.' which is not allowed) returns 400 - self.assertEqual(response.status_code, 400) - - def test_invalid_component_id(self): - """Test GET /components/{id} with path traversal returns 404.""" - response = requests.get(f'{self.BASE_URL}/components/../etc/passwd', timeout=5) - # Path traversal attempt - component doesn't exist - self.assertEqual(response.status_code, 404) - - -@launch_testing.post_shutdown_test() -class TestDiscoveryManifestModeShutdown(unittest.TestCase): - """Post-shutdown tests.""" - - def test_exit_code(self, proc_info): - """Check gateway exited cleanly.""" - launch_testing.asserts.assertExitCodes(proc_info) diff --git a/src/ros2_medkit_gateway/test/test_fault_manager.cpp b/src/ros2_medkit_gateway/test/test_fault_manager.cpp index 2dd35834..240894a0 100644 --- a/src/ros2_medkit_gateway/test/test_fault_manager.cpp +++ b/src/ros2_medkit_gateway/test/test_fault_manager.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -34,6 +35,10 @@ using ros2_medkit_msgs::srv::GetSnapshots; class FaultManagerTest : public ::testing::Test { protected: static void SetUpTestSuite() { + // Isolate from other packages' tests running in parallel (e.g. ros2_medkit_fault_manager + // launches a real fault_manager_node with /fault_manager/get_snapshots on the default domain). + // Without this, our mock services collide with real ones on Humble's slower DDS cleanup. + setenv("ROS_DOMAIN_ID", "99", 1); rclcpp::init(0, nullptr); } @@ -42,10 +47,13 @@ class FaultManagerTest : public ::testing::Test { } void SetUp() override { - // Create node with short timeout for faster tests - node_ = std::make_shared("test_fault_manager_node", rclcpp::NodeOptions().parameter_overrides({ - {"fault_service_timeout_sec", 1.0}, - })); + // Use unique node names to avoid DDS participant name collisions between tests. + // On Humble (CycloneDDS), reusing the same node name across sequential tests + // causes stale discovery state that can corrupt service responses. + std::string node_name = "test_fault_manager_node_" + std::to_string(test_counter_++); + node_ = std::make_shared(node_name, rclcpp::NodeOptions().parameter_overrides({ + {"fault_service_timeout_sec", 3.0}, + })); // Create executor for spinning executor_ = std::make_unique(); @@ -67,6 +75,7 @@ class FaultManagerTest : public ::testing::Test { }); } + static inline int test_counter_ = 0; std::shared_ptr node_; std::unique_ptr executor_; std::thread spin_thread_; @@ -80,7 +89,10 @@ TEST_F(FaultManagerTest, GetSnapshotsServiceNotAvailable) { auto result = fault_manager.get_snapshots("TEST_FAULT"); EXPECT_FALSE(result.success); - EXPECT_EQ(result.error_message, "GetSnapshots service not available"); + // On Humble, wait_for_service may report ready before DDS confirms absence, + // so the call proceeds to async_send_request which then times out. + EXPECT_TRUE(result.error_message == "GetSnapshots service not available" || + result.error_message == "GetSnapshots service call timed out"); } // @verifies REQ_INTEROP_088 @@ -199,7 +211,10 @@ TEST_F(FaultManagerTest, GetRosbagServiceNotAvailable) { auto result = fault_manager.get_rosbag("TEST_FAULT"); EXPECT_FALSE(result.success); - EXPECT_EQ(result.error_message, "GetRosbag service not available"); + // On Humble, wait_for_service may report ready before DDS confirms absence, + // so the call proceeds to async_send_request which then times out. + EXPECT_TRUE(result.error_message == "GetRosbag service not available" || + result.error_message == "GetRosbag service call timed out"); } // @verifies REQ_INTEROP_088 diff --git a/src/ros2_medkit_gateway/test/test_integration.test.py b/src/ros2_medkit_gateway/test/test_integration.test.py deleted file mode 100644 index e6cb0e66..00000000 --- a/src/ros2_medkit_gateway/test/test_integration.test.py +++ /dev/null @@ -1,5037 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2025 bburda, mfaferek93 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Launch file for ROS 2 Medkit Gateway integration tests. - -This launch file: -1. Starts the ROS 2 Medkit Gateway node -2. Launches demo nodes in different namespaces (powertrain, chassis, body) -3. Runs integration tests -4. Cleans up all processes -""" - -import os -import time -import unittest -from urllib.parse import quote - -from launch import LaunchDescription -from launch.actions import TimerAction -import launch_ros.actions -import launch_testing.actions -import requests - - -def get_coverage_env(): - """ - Get environment variables for gcov coverage data collection. - - When running with coverage enabled (ENABLE_COVERAGE=ON), subprocess nodes - need GCOV_PREFIX set to write coverage data to the correct build directory. - This allows integration test coverage to be captured alongside unit tests. - - Returns - ------- - dict - Environment variables dict with GCOV_PREFIX and GCOV_PREFIX_STRIP, - or empty dict if coverage path cannot be determined. - - """ - try: - from ament_index_python.packages import get_package_prefix - pkg_prefix = get_package_prefix('ros2_medkit_gateway') - # pkg_prefix is like /path/to/workspace/install/ros2_medkit_gateway - # workspace is 2 levels up from install/package_name - workspace = os.path.dirname(os.path.dirname(pkg_prefix)) - build_dir = os.path.join(workspace, 'build', 'ros2_medkit_gateway') - - if os.path.exists(build_dir): - # GCOV_PREFIX_STRIP removes leading path components from compiled-in paths - # GCOV_PREFIX prepends the new path for .gcda file output - return { - 'GCOV_PREFIX': build_dir, - 'GCOV_PREFIX_STRIP': str(build_dir.count(os.sep)), - } - except Exception: - # Ignore: if coverage environment cannot be determined, - # return empty dict so tests proceed without coverage data. - pass - return {} - - -def encode_topic_path(topic_path: str) -> str: - """ - Encode a ROS topic path for use in URLs. - - Slashes are encoded as %2F for proper URL routing. - Example: '/powertrain/engine/temperature' -> 'powertrain%2Fengine%2Ftemperature' - - Parameters - ---------- - topic_path : str - Full ROS topic path starting with '/' - - Returns - ------- - str - URL-encoded topic path without leading slash - - Raises - ------ - ValueError - If topic_path doesn't start with '/' - - """ - # Validate that the topic path starts with a leading slash - if not topic_path.startswith('/'): - raise ValueError(f"Topic path must start with '/': {topic_path}") - # Remove leading slash and encode the rest - topic_path = topic_path[1:] - return quote(topic_path, safe='') - - -def generate_test_description(): - """Generate launch description with gateway node, demo nodes, and tests.""" - # Launch the ROS 2 Medkit Gateway node - # additional_env sets GCOV_PREFIX for coverage data collection from subprocess - # Use fast refresh interval (1s) for tests to ensure cache is updated quickly - gateway_node = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='gateway_node', - name='ros2_medkit_gateway', - output='screen', - parameters=[{'refresh_interval_ms': 1000}], - additional_env=get_coverage_env(), - ) - - # Launch demo automotive sensor nodes - # All demo nodes also get coverage env for completeness - coverage_env = get_coverage_env() - - engine_temp_sensor = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_engine_temp_sensor', - name='temp_sensor', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ) - - rpm_sensor = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_rpm_sensor', - name='rpm_sensor', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ) - - brake_pressure_sensor = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_brake_pressure_sensor', - name='pressure_sensor', - namespace='/chassis/brakes', - output='screen', - additional_env=coverage_env, - ) - - door_status_sensor = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_door_status_sensor', - name='status_sensor', - namespace='/body/door/front_left', - output='screen', - additional_env=coverage_env, - ) - - brake_actuator = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_brake_actuator', - name='actuator', - namespace='/chassis/brakes', - output='screen', - additional_env=coverage_env, - ) - - light_controller = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_light_controller', - name='controller', - namespace='/body/lights', - output='screen', - additional_env=coverage_env, - ) - - calibration_service = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_calibration_service', - name='calibration', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ) - - long_calibration_action = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_long_calibration_action', - name='long_calibration', - namespace='/powertrain/engine', - output='screen', - additional_env=coverage_env, - ) - - # LIDAR sensor with intentionally invalid parameters to trigger faults - lidar_sensor = launch_ros.actions.Node( - package='ros2_medkit_gateway', - executable='demo_lidar_sensor', - name='lidar_sensor', - namespace='/perception/lidar', - output='screen', - additional_env=coverage_env, - parameters=[{ - 'min_range': 10.0, # Invalid: greater than max_range - 'max_range': 5.0, # Invalid: less than min_range - 'scan_frequency': 25.0, # Unsupported: exceeds 20.0 Hz - 'angular_resolution': 0.5, - }], - ) - - # Launch the fault_manager node for fault REST API tests - # Use in-memory storage to avoid filesystem permission issues in CI - # Enable rosbag capture for integration testing - capture lidar scan topic - # which is published by the lidar_sensor demo node - fault_manager_node = launch_ros.actions.Node( - package='ros2_medkit_fault_manager', - executable='fault_manager_node', - name='fault_manager', - output='screen', - additional_env=coverage_env, - parameters=[{ - 'storage_type': 'memory', - 'snapshots.rosbag.enabled': True, - 'snapshots.rosbag.duration_sec': 2.0, - 'snapshots.rosbag.duration_after_sec': 0.5, - 'snapshots.rosbag.topics': 'explicit', - 'snapshots.rosbag.include_topics': ['/perception/lidar/scan'], - }], - ) - - # Start demo nodes with a delay to ensure gateway starts first - delayed_sensors = TimerAction( - period=2.0, - actions=[ - engine_temp_sensor, - rpm_sensor, - brake_pressure_sensor, - door_status_sensor, - brake_actuator, - light_controller, - calibration_service, - long_calibration_action, - lidar_sensor, - fault_manager_node, - ], - ) - - return ( - LaunchDescription( - [ - # Launch gateway first - gateway_node, - # Launch demo nodes with delay - delayed_sensors, - # Start tests after nodes have time to initialize - launch_testing.actions.ReadyToTest(), - ] - ), - { - 'gateway_node': gateway_node, - }, - ) - - -# API version prefix - must match rest_server.cpp -API_BASE_PATH = '/api/v1' - -# Action timeout - 30s should be sufficient, if still flaky then code has performance issues -ACTION_TIMEOUT = 30.0 - - -class TestROS2MedkitGatewayIntegration(unittest.TestCase): - """Integration tests for ROS 2 Medkit Gateway REST API and discovery.""" - - BASE_URL = f'http://localhost:8080{API_BASE_PATH}' - - # Expected entities from demo_nodes.launch.py: - # - Apps: temp_sensor, rpm_sensor, pressure_sensor, status_sensor, actuator, - # controller, calibration, long_calibration, lidar_sensor (9 total) - # - Areas: powertrain, chassis, body, perception, root (5 total) - # - Components: Synthetic groupings by area (powertrain, chassis, body, perception) - # created from nodes in those namespaces - # - # Minimum expected components (synthetic, grouped by top-level namespace) - # With default config: powertrain, chassis, body, perception (root may not have synthetic) - MIN_EXPECTED_COMPONENTS = 4 - # Minimum expected apps (ROS 2 nodes from demo launch) - MIN_EXPECTED_APPS = 8 - # Required areas that must be discovered (not just count, but specific IDs) - REQUIRED_AREAS = {'powertrain', 'chassis', 'body'} - # Required apps that must be discovered for deterministic tests - REQUIRED_APPS = {'temp_sensor', 'long_calibration', 'lidar_sensor', 'actuator'} - - # Maximum time to wait for discovery (seconds) - MAX_DISCOVERY_WAIT = 60.0 - # Interval between discovery checks (seconds) - DISCOVERY_CHECK_INTERVAL = 1.0 - - @classmethod - def setUpClass(cls): - """Wait for gateway to be ready and apps/areas to be discovered.""" - # First, wait for gateway to respond - max_retries = 30 - for i in range(max_retries): - try: - response = requests.get(f'{cls.BASE_URL}/health', timeout=2) - if response.status_code == 200: - break - except requests.exceptions.RequestException: - if i == max_retries - 1: - raise unittest.SkipTest('Gateway not responding after 30 retries') - time.sleep(1) - - # Wait for required apps AND areas to be discovered (CI can be slow) - start_time = time.time() - while time.time() - start_time < cls.MAX_DISCOVERY_WAIT: - try: - apps_response = requests.get(f'{cls.BASE_URL}/apps', timeout=5) - areas_response = requests.get(f'{cls.BASE_URL}/areas', timeout=5) - if apps_response.status_code == 200 and areas_response.status_code == 200: - apps = apps_response.json().get('items', []) - areas = areas_response.json().get('items', []) - app_ids = {a.get('id', '') for a in apps} - area_ids = {a.get('id', '') for a in areas} - - # Check if all required areas and apps are discovered - missing_areas = cls.REQUIRED_AREAS - area_ids - missing_apps = cls.REQUIRED_APPS - app_ids - apps_ok = len(apps) >= cls.MIN_EXPECTED_APPS and not missing_apps - areas_ok = not missing_areas - - if apps_ok and areas_ok: - print(f'✓ Discovery complete: {len(apps)} apps, {len(areas)} areas') - return - - print(f' Waiting: {len(apps)}/{cls.MIN_EXPECTED_APPS} apps, ' - f'{len(areas)} areas. Missing areas: {missing_areas}, ' - f'Missing apps: {missing_apps}') - except requests.exceptions.RequestException: - # Ignore connection errors during discovery wait; will retry until timeout - pass - time.sleep(cls.DISCOVERY_CHECK_INTERVAL) - - # If we get here, not all entities were discovered but continue anyway - print('Warning: Discovery timeout, some tests may fail') - - def _get_json(self, endpoint: str, timeout: int = 10): - """Get JSON from an endpoint.""" - response = requests.get(f'{self.BASE_URL}{endpoint}', timeout=timeout) - response.raise_for_status() - return response.json() - - def _ensure_calibration_app_ready(self, timeout: float = 10.0, interval: float = 0.2): - """ - Wait for the calibration app REST resource to become available. - - This is a workaround for a discovery readiness race condition in CI: - Discovery may complete (setUpClass passes) but individual app resources - may not yet be accessible via REST endpoints. This helper polls the - calibration app endpoint and skips the test if it's not available within - the timeout, avoiding flaky CI failures. - - Parameters - ---------- - timeout : float - Maximum time to wait in seconds (default: 10.0). - interval : float - Time between polling attempts in seconds (default: 0.2). - - Raises - ------ - unittest.SkipTest - If the calibration app is not available within the timeout. - - """ - start_time = time.time() - last_error = None - while time.time() - start_time < timeout: - try: - response = requests.get( - f'{self.BASE_URL}/apps/calibration', - timeout=2 - ) - if response.status_code == 200: - return # Calibration app is ready - last_error = f'Status {response.status_code}' - except requests.exceptions.RequestException as e: - last_error = str(e) - time.sleep(interval) - - # Timeout reached - skip this test due to discovery readiness race in CI - raise unittest.SkipTest( - f'Calibration app not available after {timeout}s ' - f'(flaky discovery readiness race in CI). Last error: {last_error}' - ) - - def _wait_for_execution_status( - self, execution_id: str, target_statuses: list, max_wait: float = None - ) -> dict: - """ - Poll execution status until it reaches one of the target statuses. - - Parameters - ---------- - execution_id : str - The execution ID (goal_id) to check status for. - target_statuses : list - List of SOVD status strings to wait for (e.g., ['completed', 'failed']). - max_wait : float - Maximum time to wait in seconds. Defaults to ACTION_TIMEOUT (30s). - - Returns - ------- - dict - The status response data when target status is reached. - - Raises - ------ - AssertionError - If target status is not reached within max_wait. - - """ - if max_wait is None: - max_wait = ACTION_TIMEOUT - start_time = time.time() - last_status = None - while time.time() - start_time < max_wait: - try: - status_response = requests.get( - f'{self.BASE_URL}/apps/long_calibration/operations/' - f'long_calibration/executions/{execution_id}', - timeout=5 - ) - if status_response.status_code == 200: - data = status_response.json() - last_status = data.get('status') - if last_status in target_statuses: - return data - except requests.exceptions.RequestException: - pass # Retry on transient errors - time.sleep(0.5) - - raise AssertionError( - f'Execution did not reach status {target_statuses} within {max_wait}s. ' - f'Last status: {last_status}' - ) - - def test_01_root_endpoint(self): - """ - Test GET / returns server capabilities and entry points. - - @verifies REQ_INTEROP_010 - """ - data = self._get_json('/') - self.assertIn('name', data) - self.assertIn('version', data) - self.assertIn('endpoints', data) - self.assertIn('capabilities', data) - - self.assertEqual(data['name'], 'ROS 2 Medkit Gateway') - self.assertEqual(data['version'], '0.1.0') - - # Verify endpoints list - self.assertIsInstance(data['endpoints'], list) - self.assertIn('GET /api/v1/health', data['endpoints']) - self.assertIn('GET /api/v1/version-info', data['endpoints']) - self.assertIn('GET /api/v1/areas', data['endpoints']) - self.assertIn('GET /api/v1/components', data['endpoints']) - self.assertIn( - 'PUT /api/v1/components/{component_id}/data/{data_id}', data['endpoints'] - ) - - # Verify api_base field - self.assertIn('api_base', data) - self.assertEqual(data['api_base'], API_BASE_PATH) - - # Verify capabilities - self.assertIn('discovery', data['capabilities']) - self.assertIn('data_access', data['capabilities']) - self.assertTrue(data['capabilities']['discovery']) - self.assertTrue(data['capabilities']['data_access']) - print('✓ Root endpoint test passed') - - def test_01b_version_info_endpoint(self): - """ - Test GET /version-info returns valid format and data. - - @verifies REQ_INTEROP_001 - """ - data = self._get_json('/version-info') - # Check sovd_info array - self.assertIn('sovd_info', data) - self.assertIsInstance(data['sovd_info'], list) - self.assertGreaterEqual(len(data['sovd_info']), 1) - - # Check first sovd_info entry - info = data['sovd_info'][0] - self.assertIn('version', info) - self.assertIn('base_uri', info) - self.assertIn('vendor_info', info) - self.assertIn('version', info['vendor_info']) - self.assertIn('name', info['vendor_info']) - self.assertEqual(info['vendor_info']['name'], 'ros2_medkit') - print('✓ Version info endpoint test passed') - - def test_01c_endpoint_smoke_test(self): - """ - Smoke test: verify all advertised GET endpoints are implemented and don't return 5xx. - - This test ensures that: - 1. All endpoints listed in GET / are actually implemented - 2. No endpoint returns a server error (5xx) - 3. Documentation in handle_root matches actual implementation - - Only GET endpoints are tested (safe, read-only operations). - POST/PUT/DELETE endpoints are skipped as they modify state. - - @verifies REQ_INTEROP_010 - """ - # Get all advertised endpoints - data = self._get_json('/') - endpoints = data.get('endpoints', []) - self.assertIsInstance(endpoints, list) - self.assertGreater(len(endpoints), 0, 'No endpoints returned from /') - - # Test substitution values for path parameters - # These use known entities from demo_nodes.launch.py - substitutions = { - '{area_id}': 'powertrain', - '{component_id}': 'powertrain', - '{app_id}': 'temp_sensor', - '{function_id}': 'nonexistent_function', # Functions not used in demo - '{data_id}': 'temperature', - '{topic_name}': 'temperature', - '{operation_id}': 'nonexistent_op', # Will return 404, but not 5xx - '{execution_id}': 'nonexistent_exec', - '{param_name}': 'use_sim_time', - '{fault_code}': 'nonexistent_fault', - } - - # Filter for GET endpoints only (safe to call) - get_endpoints = [ep for ep in endpoints if ep.startswith('GET ')] - - tested_count = 0 - errors = [] - - for endpoint in get_endpoints: - # Parse method and path - parts = endpoint.split(' ', 1) - if len(parts) != 2: - continue - method, path = parts - - # Skip auth endpoints (require authentication) - if '/auth/' in path: - continue - - # Skip SSE stream endpoint (keeps connection open) - if path.endswith('/stream'): - continue - - # Substitute path parameters - test_path = path - for placeholder, value in substitutions.items(): - test_path = test_path.replace(placeholder, value) - - # Remove the /api/v1 prefix if present (BASE_URL already has it) - if test_path.startswith('/api/v1'): - test_path = test_path[7:] # len('/api/v1') = 7 - - try: - url = f'{self.BASE_URL}{test_path}' - response = requests.get(url, timeout=10) - - # Check for server errors (5xx) - # 501 Not Implemented is acceptable (intentional for unimplemented features) - if response.status_code >= 500: - # Skip known limitations - if response.status_code == 501: - pass # Intentional - feature not implemented for ROS 2 - else: - errors.append( - f'{endpoint} -> {test_path}: returned {response.status_code}' - ) - tested_count += 1 - - except requests.exceptions.RequestException as e: - errors.append(f'{endpoint} -> {test_path}: request failed - {e}') - - # Report results - print(f'✓ Smoke test: {tested_count} GET endpoints tested') - - if errors: - error_msg = '\n'.join(errors) - self.fail(f'Server errors detected:\n{error_msg}') - - def test_02_list_areas(self): - """ - Test GET /areas returns all discovered areas. - - @verifies REQ_INTEROP_003 - """ - data = self._get_json('/areas') - self.assertIn('items', data) - areas = data['items'] - self.assertIsInstance(areas, list) - self.assertGreaterEqual(len(areas), 1) - area_ids = [area['id'] for area in areas] - self.assertIn('root', area_ids) - print(f'✓ Areas test passed: {len(areas)} areas discovered') - - def test_03_list_components(self): - """ - Test GET /components returns all discovered synthetic components. - - With heuristic discovery (default), components are synthetic groups - created by namespace aggregation. ROS 2 nodes are exposed as Apps. - - @verifies REQ_INTEROP_003 - """ - data = self._get_json('/components') - self.assertIn('items', data) - components = data['items'] - self.assertIsInstance(components, list) - # With synthetic components, we have fewer components (grouped by namespace) - # Expected: powertrain, chassis, body, perception, root (at minimum) - self.assertGreaterEqual(len(components), 4) - - # Verify response structure - all components should have required fields - for component in components: - self.assertIn('id', component) - self.assertIn('name', component) - self.assertIn('href', component) - # x-medkit contains ROS2-specific fields - self.assertIn('x-medkit', component) - x_medkit = component['x-medkit'] - # namespace may be in x-medkit.ros2.namespace - self.assertTrue( - 'ros2' in x_medkit and 'namespace' in x_medkit.get('ros2', {}), - f"Component {component['id']} should have namespace in x-medkit.ros2" - ) - - # Verify expected synthetic component IDs are present - # With heuristic discovery, components are synthetic groups created - # by namespace aggregation. These IDs (powertrain, chassis, body) - # represent namespace-based component groups, not individual ROS 2 - # nodes. Individual nodes are exposed as Apps instead. - component_ids = [comp['id'] for comp in components] - self.assertIn('powertrain', component_ids) - self.assertIn('chassis', component_ids) - self.assertIn('body', component_ids) - - print(f'✓ Components test passed: {len(components)} synthetic components discovered') - - def test_04_automotive_areas_discovery(self): - """ - Test that automotive areas are properly discovered. - - @verifies REQ_INTEROP_003 - """ - data = self._get_json('/areas') - areas = data['items'] - area_ids = [area['id'] for area in areas] - - expected_areas = ['powertrain', 'chassis', 'body'] - for expected in expected_areas: - self.assertIn(expected, area_ids) - - print(f'✓ All automotive areas discovered: {area_ids}') - - def test_05_area_components_success(self): - """ - Test GET /areas/{area_id}/components returns components for valid area. - - With synthetic components, the powertrain area contains the 'powertrain' - synthetic component which aggregates all ROS 2 nodes in that namespace. - - @verifies REQ_INTEROP_006 - """ - # Test powertrain area - data = self._get_json('/areas/powertrain/components') - self.assertIn('items', data) - components = data['items'] - self.assertIsInstance(components, list) - self.assertGreater(len(components), 0) - - # All components should have EntityReference format with x-medkit - for component in components: - self.assertIn('id', component) - self.assertIn('name', component) - self.assertIn('href', component) - self.assertIn('x-medkit', component) - # Verify namespace is in x-medkit.ros2 - x_medkit = component['x-medkit'] - self.assertTrue( - 'ros2' in x_medkit and 'namespace' in x_medkit.get('ros2', {}), - 'Component should have namespace in x-medkit.ros2' - ) - - # Verify the synthetic 'powertrain' component exists - component_ids = [comp['id'] for comp in components] - self.assertIn('powertrain', component_ids) - - print( - f'✓ Area components test passed: {len(components)} components in powertrain' - ) - - def test_06_area_components_nonexistent_error(self): - """ - Test GET /areas/{area_id}/components returns 404 for nonexistent area. - - @verifies REQ_INTEROP_006 - """ - response = requests.get( - f'{self.BASE_URL}/areas/nonexistent/components', timeout=5 - ) - self.assertEqual(response.status_code, 404) - - data = response.json() - self.assertIn('error_code', data) - self.assertEqual(data['message'], 'Area not found') - self.assertIn('parameters', data) - self.assertIn('area_id', data['parameters']) - self.assertEqual(data['parameters'].get('area_id'), 'nonexistent') - - print('✓ Nonexistent area error test passed') - - def _ensure_app_data_ready(self, app_id: str, timeout: float = 10.0, interval: float = 0.2): - """ - Wait for an app's /data endpoint to become available. - - Workaround for discovery readiness race condition in CI. - Polls /apps/{app_id}/data directly since that's what tests depend on. - """ - start_time = time.time() - last_error = None - while time.time() - start_time < timeout: - try: - response = requests.get(f'{self.BASE_URL}/apps/{app_id}/data', timeout=2) - if response.status_code == 200: - return - last_error = f'Status {response.status_code}' - except requests.exceptions.RequestException as e: - last_error = str(e) - time.sleep(interval) - self.fail( - f'App {app_id} data not available after {timeout}s. ' - f'Last error: {last_error}' - ) - - def test_07_app_data_powertrain_engine(self): - """ - Test GET /apps/{app_id}/data for engine temperature sensor app. - - Apps are ROS 2 nodes. The temp_sensor app publishes temperature data. - - @verifies REQ_INTEROP_018 - """ - # Ensure app is ready (handles discovery race condition) - self._ensure_app_data_ready('temp_sensor') - # Get data from temp_sensor app (powertrain/engine) - data = self._get_json('/apps/temp_sensor/data') - self.assertIn('items', data) - items = data['items'] - self.assertIsInstance(items, list) - - # Should have at least one topic - if len(items) > 0: - for topic_data in items: - self.assertIn('id', topic_data) - self.assertIn('name', topic_data) - # direction is now in x-medkit.ros2 - self.assertIn('x-medkit', topic_data) - x_medkit = topic_data['x-medkit'] - self.assertIn('ros2', x_medkit) - self.assertIn('direction', x_medkit['ros2']) - direction = x_medkit['ros2']['direction'] - self.assertIn(direction, ['publish', 'subscribe', 'both']) - print( - f" - Topic: {topic_data['name']} ({direction})" - ) - - print(f'✓ Engine app data test passed: {len(items)} topics') - - def test_08_app_data_chassis_brakes(self): - """ - Test GET /apps/{app_id}/data for brakes pressure sensor app. - - @verifies REQ_INTEROP_018 - """ - # Ensure app is ready (handles discovery race condition) - self._ensure_app_data_ready('pressure_sensor') - # Get data from pressure_sensor app (chassis/brakes) - data = self._get_json('/apps/pressure_sensor/data') - self.assertIn('items', data) - items = data['items'] - self.assertIsInstance(items, list) - - # Check structure - if len(items) > 0: - for topic_data in items: - self.assertIn('id', topic_data) - self.assertIn('name', topic_data) - # direction is now in x-medkit.ros2 - self.assertIn('x-medkit', topic_data) - x_medkit = topic_data['x-medkit'] - self.assertIn('ros2', x_medkit) - self.assertIn('direction', x_medkit['ros2']) - - print(f'✓ Brakes app data test passed: {len(items)} topics') - - def test_09_app_data_body_door(self): - """ - Test GET /apps/{app_id}/data for door status sensor app. - - @verifies REQ_INTEROP_018 - """ - # Ensure app is ready (handles discovery race condition) - self._ensure_app_data_ready('status_sensor') - # Get data from status_sensor app (body/door/front_left) - data = self._get_json('/apps/status_sensor/data') - self.assertIn('items', data) - items = data['items'] - self.assertIsInstance(items, list) - - # Check structure - if len(items) > 0: - for topic_data in items: - self.assertIn('id', topic_data) - self.assertIn('name', topic_data) - # direction is now in x-medkit.ros2 - self.assertIn('x-medkit', topic_data) - x_medkit = topic_data['x-medkit'] - self.assertIn('ros2', x_medkit) - self.assertIn('direction', x_medkit['ros2']) - - print(f'✓ Door app data test passed: {len(items)} topics') - - def test_10_app_data_structure(self): - """ - Test GET /apps/{app_id}/data response structure. - - @verifies REQ_INTEROP_018 - """ - # Ensure app is ready (handles discovery race condition) - self._ensure_app_data_ready('temp_sensor') - data = self._get_json('/apps/temp_sensor/data') - self.assertIn('items', data) - items = data['items'] - self.assertIsInstance(items, list, 'Response should have items array') - - # If we have data, verify structure - if len(items) > 0: - first_item = items[0] - self.assertIn('id', first_item, "Each item should have 'id' field") - self.assertIn('name', first_item, "Each item should have 'name' field") - # direction and href moved to x-medkit for SOVD compliance - self.assertIn('x-medkit', first_item, "Each item should have 'x-medkit' field") - x_medkit = first_item['x-medkit'] - self.assertIn('ros2', x_medkit, 'x-medkit should have ros2 section') - self.assertIn('direction', x_medkit['ros2'], 'x-medkit.ros2 should have direction') - self.assertIsInstance( - first_item['name'], str, "'name' should be a string" - ) - self.assertIn(x_medkit['ros2']['direction'], ['publish', 'subscribe', 'both']) - - print('✓ App data structure test passed') - - def test_11_app_nonexistent_error(self): - """ - Test GET /apps/{app_id}/data returns 404 for nonexistent app. - - @verifies REQ_INTEROP_018 - """ - response = requests.get( - f'{self.BASE_URL}/apps/nonexistent_app/data', timeout=5 - ) - self.assertEqual(response.status_code, 404) - - data = response.json() - self.assertIn('error_code', data) - self.assertEqual(data['message'], 'Entity not found') - self.assertIn('parameters', data) - self.assertIn('entity_id', data['parameters']) - self.assertEqual(data['parameters'].get('entity_id'), 'nonexistent_app') - - print('✓ Nonexistent app error test passed') - - def test_12_app_no_topics(self): - """ - Test GET /apps/{app_id}/data returns empty array. - - Verifies that apps with no topics return an empty items array. - The calibration app typically has only services, no topics. - - @verifies REQ_INTEROP_018 - """ - # Ensure calibration app is available via REST (handles discovery race) - self._ensure_calibration_app_ready() - - # Test with calibration app that we know has no publishing topics - data = self._get_json('/apps/calibration/data') - self.assertIn('items', data) - self.assertIsInstance(data['items'], list, 'Response should have items array') - - print(f'✓ App with no topics test passed: {len(data["items"])} topics') - - def test_13_invalid_app_id_special_chars(self): - """ - Test GET /apps/{app_id}/data rejects special characters. - - @verifies REQ_INTEROP_018 - """ - # Test various invalid characters - invalid_ids = [ - 'app;drop', # SQL injection attempt - 'app