Skip to content

Comments

Implement SSE-based periodic data subscriptions #223

Merged
bburda merged 10 commits intomainfrom
feat/cyclic-sub
Feb 19, 2026
Merged

Implement SSE-based periodic data subscriptions #223
bburda merged 10 commits intomainfrom
feat/cyclic-sub

Conversation

@bburda
Copy link
Collaborator

@bburda bburda commented Feb 17, 2026

Pull Request

Summary

Implement cyclic subscriptions - SSE-based periodic data delivery for entity data resources. Clients can create, list, update, and delete subscriptions, and connect to an SSE stream that pushes the latest topic value at a configurable interval (fast/normal/slow). Subscriptions are temporary and auto-expire after the requested duration.

New components:

  • SubscriptionManager - thread-safe CRUD with capacity limits, expiry tracking, and condition-variable-based stream synchronization
  • CyclicSubscriptionHandlers - HTTP handlers for all 6 endpoints (POST, GET list, GET single, PUT, DELETE, GET events)
  • Routes registered for both /apps/ and /components/ entity paths
  • SSE EventEnvelope format with payload (ReadValue) or error (GenericError)

Issue


Type

  • Bug fix
  • New feature or tests
  • Breaking change
  • Documentation only

Testing

Unit tests (test_subscription_manager.cpp, test_cyclic_subscription_handlers.cpp):

  • CRUD lifecycle, capacity enforcement (503 at max), expiry cleanup, interval parsing, resource URI validation, concurrent access

Integration tests (test_cyclic_subscriptions.test.py - 21 tests):

  • Full lifecycle: create → list → get → update → delete
  • SSE stream: correct headers, periodic EventEnvelope delivery, stream closes on subscription delete
  • Validation: invalid interval, zero duration, unsupported protocol, malformed resource URI, nonexistent entities/subscriptions
  • Cross-entity: subscriptions work on both /apps/ and /components/
  # Run all tests
  colcon test --packages-select ros2_medkit_gateway

  # Integration only
  colcon test --ctest-args -R test_cyclic_subscriptions

Requirements verified: REQ_INTEROP_025, REQ_INTEROP_026, REQ_INTEROP_027, REQ_INTEROP_028, REQ_INTEROP_089, REQ_INTEROP_090


Checklist

  • Breaking changes are clearly described (and announced in docs / changelog if needed)
  • Tests were added or updated if needed
  • Docs were updated if behavior or public API changed

Tests trace to REQ_INTEROP_025-028, REQ_INTEROP_089-090.
All tests expected to fail until implementation is complete.
Integration tests now pass for full subscription lifecycle and SSE streaming.
@bburda bburda self-assigned this Feb 17, 2026
Add test_subscription_manager (18 tests) covering interval parsing,
capacity enforcement, ID uniqueness, expiry, update semantics,
concurrent access, and stream synchronization.

Add test_cyclic_subscription_handlers (4 tests) covering JSON
serialization and error response format.

Fix notify() bug: set notified flag so wait_for_update() predicate
wakes correctly on data-available notifications.
…m topics in SSE test

The SSE stream handler set Content-Type explicitly before passing it to
set_chunked_content_provider, causing cpp-httplib to emit the header
twice. The integration test selected /parameter_events (a ROS 2 system
topic with no continuous data) as the subscription target, causing
sample failures.
@bburda bburda requested a review from mfaferek93 February 18, 2026 08:21
@bburda bburda changed the title Feat/cyclic sub Implement SSE-based periodic data subscriptions Feb 18, 2026
@bburda bburda marked this pull request as ready for review February 18, 2026 08:22
Copilot AI review requested due to automatic review settings February 18, 2026 08:22
@bburda
Copy link
Collaborator Author

bburda commented Feb 18, 2026

@eclipse0922 This feature might be interesting for you 🙂

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements SSE-based periodic data subscriptions (cyclic subscriptions) for ros2_medkit_gateway, enabling clients to receive periodic push-based data delivery at configurable intervals (fast/normal/slow). The implementation adds comprehensive CRUD operations for subscriptions and an SSE streaming endpoint that delivers EventEnvelope payloads containing topic data or errors.

Changes:

  • New SubscriptionManager class for thread-safe subscription lifecycle management with capacity enforcement and expiry tracking
  • New CyclicSubscriptionHandlers for 6 HTTP endpoints (POST create, GET list/single, PUT update, DELETE, GET events stream)
  • Integration with GatewayNode including periodic cleanup timer and parameter configuration
  • Complete test coverage with 231 lines of unit tests and 21 integration tests with requirement traceability

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/ros2_medkit_gateway/src/subscription_manager.cpp Core subscription CRUD logic with thread-safe state management and expiry handling
src/ros2_medkit_gateway/include/ros2_medkit_gateway/subscription_manager.hpp Public API for subscription management with condition variable-based stream synchronization
src/ros2_medkit_gateway/src/http/handlers/cyclic_subscription_handlers.cpp HTTP handlers implementing all 6 REST endpoints with validation and SSE streaming
src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/cyclic_subscription_handlers.hpp Handler interface declarations and helper methods
src/ros2_medkit_gateway/src/http/rest_server.cpp Route registration for apps and components entity paths
src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/rest_server.hpp Handler instance declaration
src/ros2_medkit_gateway/src/gateway_node.cpp Subscription manager initialization and cleanup timer setup
src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp Subscription manager member and accessor declaration
src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/handlers.hpp Include cyclic subscription handlers in handler collection
src/ros2_medkit_gateway/config/gateway_params.yaml SSE configuration section with max_clients and max_subscriptions parameters
src/ros2_medkit_gateway/CMakeLists.txt Build configuration for new source files and test targets
src/ros2_medkit_gateway/test/test_subscription_manager.cpp Unit tests for SubscriptionManager covering interval parsing, capacity, expiry, concurrency
src/ros2_medkit_gateway/test/test_cyclic_subscription_handlers.cpp Unit tests for JSON serialization and error response formats
src/ros2_medkit_gateway/test/test_cyclic_subscriptions.test.py Integration tests for complete CRUD lifecycle, SSE streaming, validation, and cross-entity support
docs/requirements/specs/subscriptions.rst Updated requirement status to verified for REQ_INTEROP_025-028, added REQ_INTEROP_089-090
docs/config/server.rst Documentation for sse.max_clients and sse.max_subscriptions parameters
docs/api/rest.rst Complete API documentation for all 6 cyclic subscription endpoints with examples

…client limit

Fix thread-safety issues in SubscriptionManager::get() and list() where
info was copied under map_mutex_ while update() modifies it under the
per-subscription lock, causing potential torn reads.

Add SSE keepalive mechanism to cyclic subscription streams so connections
are not dropped by proxies when sample_topic() blocks for extended periods.

Introduce shared SSEClientTracker so sse.max_clients is enforced as a
combined limit across fault and cyclic subscription streams, matching the
documented behavior in gateway_params.yaml.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 7 comments.

Copy link
Collaborator

@mfaferek93 mfaferek93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concurrency bug in timestamp formatting that will corrupt data with concurrent streams. One minor miss in the update→stream notification path.

Overall this is a really solid feature. Nice work.

…y validation

- Replace std::gmtime with gmtime_r for thread-safe timestamp formatting
- Set notified = true before cv.notify_all() in update() so SSE streams
  pick up interval changes immediately
- Lock per-subscription mutex in is_active() and cleanup_expired() before
  reading expires_at to prevent data races with concurrent update()
- Guard SSEClientTracker::disconnect() against size_t underflow with CAS loop
- Reject SSE connections for expired/inactive subscriptions (404)
- Validate entity_id in all subscription handlers to prevent cross-entity access
- Validate resource URI matches route entity in handle_create
@bburda bburda requested a review from mfaferek93 February 18, 2026 20:47
Copy link
Collaborator

@mfaferek93 mfaferek93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGMT!

@bburda bburda merged commit c652c64 into main Feb 19, 2026
6 checks passed
@bburda bburda deleted the feat/cyclic-sub branch February 19, 2026 08:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Cyclic Subscriptions - SSE-based periodic data delivery

2 participants