From e4a552e7eb1d9effc7e7b904a2e1e559a4eb978b Mon Sep 17 00:00:00 2001 From: Justin Gordon Date: Sat, 15 Nov 2025 20:37:31 -1000 Subject: [PATCH 1/3] Add client disconnect handling and documentation to concurrent streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improves error handling and documentation for concurrent component streaming: 1. Client Disconnect Handling - Added IOError and Errno::EPIPE exception handling in producer tasks - Added response.stream.closed? check before expensive operations - Added exception handling in writer task to stop gracefully on disconnect - Prevents wasted resources when clients disconnect mid-stream 2. Documentation Enhancements - Added detailed comments explaining producer-consumer pattern - Documented ordering guarantees for concurrent streaming - Clarified that chunks from same component maintain order - Clarified that chunks from different components may interleave - Added memory management documentation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../lib/react_on_rails_pro/concerns/stream.rb | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index db55ba2c1e..13f3edae0e 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -47,6 +47,20 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true private + # Drains all streaming fibers concurrently using a producer-consumer pattern. + # + # Producer tasks: Each fiber drains its stream and enqueues chunks to a shared queue. + # Consumer task: Single writer dequeues chunks and writes them to the response stream. + # + # Ordering guarantees: + # - Chunks from the same component maintain their order + # - Chunks from different components may interleave based on production timing + # - The first component to produce a chunk will have it written first + # + # Memory management: + # - Uses a limited queue (configured via concurrent_component_streaming_buffer_size) + # - Producers block when the queue is full, providing backpressure + # - This prevents unbounded memory growth from fast producers def drain_streams_concurrently require "async" require "async/limited_queue" @@ -58,7 +72,9 @@ def drain_streams_concurrently buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size queue = Async::LimitedQueue.new(buffer_size) + # Consumer task: Single writer dequeues and writes to response stream writer = build_writer_task(parent: parent, queue: queue) + # Producer tasks: Each fiber drains its stream and enqueues chunks tasks = build_producer_tasks(parent: parent, queue: queue) # This structure ensures that even if a producer task fails, we always @@ -78,12 +94,18 @@ def build_producer_tasks(parent:, queue:) @rorp_rendering_fibers.each_with_index.map do |fiber, idx| parent.async do loop do + # Check if client disconnected before expensive operations + break if response.stream.closed? + chunk = fiber.resume break unless chunk # Will be blocked if the queue is full until a chunk is dequeued queue.enqueue([idx, chunk]) end + rescue IOError, Errno::EPIPE + # Client disconnected - stop producing + break end end end @@ -97,6 +119,9 @@ def build_writer_task(parent:, queue:) _idx_from_queue, item = pair response.stream.write(item) end + rescue IOError, Errno::EPIPE + # Client disconnected - stop writing + nil end end end From 763a9ff957dad521137d4b88ee192cecf98041cb Mon Sep 17 00:00:00 2001 From: Justin Gordon Date: Sat, 15 Nov 2025 21:10:36 -1000 Subject: [PATCH 2/3] Add debug logging for client disconnects and improve error handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improvements: - Add debug logging when client disconnects during streaming (producer/consumer) - Remove unnecessary explicit nil return in rescue block - Add test support for closed? method on mocked stream - Only logs when logging_on_server is enabled These changes improve observability for production debugging without affecting normal operation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../lib/react_on_rails_pro/concerns/stream.rb | 15 ++++++++++++--- .../spec/react_on_rails_pro/stream_spec.rb | 1 + 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index 13f3edae0e..d12666bdc8 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -103,8 +103,9 @@ def build_producer_tasks(parent:, queue:) # Will be blocked if the queue is full until a chunk is dequeued queue.enqueue([idx, chunk]) end - rescue IOError, Errno::EPIPE + rescue IOError, Errno::EPIPE => e # Client disconnected - stop producing + log_client_disconnect("producer", e) break end end @@ -119,9 +120,17 @@ def build_writer_task(parent:, queue:) _idx_from_queue, item = pair response.stream.write(item) end - rescue IOError, Errno::EPIPE + rescue IOError, Errno::EPIPE => e # Client disconnected - stop writing - nil + log_client_disconnect("consumer", e) + end + end + + def log_client_disconnect(context, exception) + return unless ReactOnRails.configuration.logging_on_server + + ReactOnRails.configuration.logger.debug do + "[React on Rails Pro] Client disconnected during streaming (#{context}): #{exception.class}" end end end diff --git a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb index a97f69ffde..12335496d0 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb @@ -388,6 +388,7 @@ def setup_stream_test(component_count: 2) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(mocked_stream).to receive(:write) allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) allow(controller).to receive(:response).and_return(mocked_response) [component_queues, controller, mocked_stream] From decc75e072cd54a020d383b9bcc95f28a7cdc3ae Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Sun, 16 Nov 2025 17:35:29 +0000 Subject: [PATCH 3/3] Add closed? method stub to streaming test mocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes 9 failing tests in react_on_rails_pro_helper_spec.rb by adding the closed? method stub to mocked streams. The production code now checks response.stream.closed? for client disconnect detection, so test mocks need to support this method. Changes: - Add closed? stub returning false to stream_view_containing_react_components tests - Add closed? stub returning false to cached_stream_react_component tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Abanoub Ghadban --- .../spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb index ac3debe934..50a544270c 100644 --- a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb +++ b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb @@ -443,6 +443,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) written_chunks << chunk end allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response) @@ -532,6 +533,7 @@ def execute_stream_view_containing_react_components written_chunks.clear allow(mocked_stream).to receive(:write) { |chunk| written_chunks << chunk } allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response)