Skip to content

Commit b285d38

Browse files
committed
add default backpressure via Async::Semaphore and handle client disconnects
1 parent 49e73e2 commit b285d38

File tree

1 file changed

+12
-3
lines changed
  • react_on_rails_pro/lib/react_on_rails_pro/concerns

1 file changed

+12
-3
lines changed

react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
4141
if ReactOnRailsPro.configuration.concurrent_stream_drain
4242
require "async"
4343
require "async/queue"
44+
require "async/semaphore"
4445

4546
Sync do |parent|
4647
queue = Async::Queue.new
48+
semaphore = Async::Semaphore.new(64)
4749
remaining = @rorp_rendering_fibers.size
4850

4951
unless remaining.zero?
@@ -52,10 +54,10 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
5254
tasks << parent.async do
5355
begin
5456
while (chunk = fiber.resume)
55-
queue.enqueue([idx, chunk])
57+
semaphore.acquire { queue.enqueue([idx, chunk]) }
5658
end
5759
rescue StandardError => e
58-
queue.enqueue([idx, "<!-- rorp stream error: #{e.class}: #{e.message} -->"]) # minimal signal
60+
semaphore.acquire { queue.enqueue([idx, "<!-- stream error: #{e.class}: #{e.message} -->"]) } # minimal signal
5961
ensure
6062
queue.enqueue([idx, :__done__])
6163
end
@@ -71,7 +73,14 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
7173
next
7274
end
7375
Rails.logger.info { "[ReactOnRailsPro] stream write (mode=concurrent) idx=#{_idx} bytes=#{item.bytesize}" } if ReactOnRailsPro.configuration.tracing
74-
response.stream.write(item)
76+
begin
77+
response.stream.write(item)
78+
rescue IOError, ActionController::Live::ClientDisconnected
79+
# Client disconnected: stop early.
80+
break
81+
ensure
82+
semaphore.release
83+
end
7584
end
7685
end
7786

0 commit comments

Comments
 (0)