Skip to content

Commit be9f242

Browse files
Ignore responses to timed out calls.
1 parent 3e56a16 commit be9f242

File tree

3 files changed

+153
-2
lines changed

3 files changed

+153
-2
lines changed

lib/async/container/supervisor/connection.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,12 @@ def run(target)
185185
if call = @calls[id]
186186
# Response to a call:
187187
call.push(**message)
188-
else
188+
elsif message.key?(:do)
189189
# Incoming call:
190190
Call.dispatch(self, target, id, message)
191+
else
192+
# Likely a response to a timed-out call, ignore it:
193+
Console.debug(self, "Ignoring message:", message)
191194
end
192195
else
193196
Console.error(self, "Unknown message:", message)

lib/async/container/supervisor/dispatchable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def dispatch(call)
1414
method_name = "do_#{call.message[:do]}"
1515
self.public_send(method_name, call)
1616
rescue => error
17-
Console.error(self, "Error while dispatching call.", exception: error, call: call)
17+
Console.error(self, "Error while dispatching call!", exception: error, call: call)
1818

1919
call.fail(error: {
2020
class: error.class,

test/async/container/server.rb

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "async/container/supervisor/a_server"
7+
require "sus/fixtures/console/null_logger"
8+
9+
describe Async::Container::Supervisor::Server do
10+
include Sus::Fixtures::Console::NullLogger
11+
include Async::Container::Supervisor::AServer
12+
13+
it "can handle unexpected failures" do
14+
# First, send invalid JSON to trigger the error:
15+
endpoint.connect do |stream|
16+
# Send malformed JSON that will cause parsing errors:
17+
stream.write("not valid json\n")
18+
stream.flush
19+
end
20+
21+
# Now send a valid message to confirm the server is still working:
22+
stream = endpoint.connect
23+
24+
# Send a valid register message:
25+
message = {id: 1, do: :register, state: {process_id: ::Process.pid}}
26+
stream.puts(JSON.dump(message))
27+
stream.flush
28+
29+
# Read the response:
30+
response = JSON.parse(stream.gets, symbolize_names: true)
31+
32+
# The server should respond with a finished message:
33+
expect(response).to have_keys(
34+
id: be == 1,
35+
finished: be == true
36+
)
37+
38+
stream.close
39+
end
40+
41+
with "failing monitor" do
42+
let(:failing_monitor) do
43+
Class.new do
44+
def run
45+
end
46+
47+
def register(connection)
48+
end
49+
50+
def remove(connection)
51+
end
52+
53+
def status(call)
54+
raise "Monitor failed to get status!"
55+
end
56+
end.new
57+
end
58+
59+
let(:monitors) {[failing_monitor]}
60+
61+
it "can handle monitor status failures" do
62+
# Send a status request:
63+
stream = endpoint.connect
64+
65+
message = {id: 1, do: :status}
66+
stream.puts(JSON.dump(message))
67+
stream.flush
68+
69+
# Read the response:
70+
response = JSON.parse(stream.gets, symbolize_names: true)
71+
72+
# The server should still respond with a finished message despite the monitor error:
73+
expect(response).to have_keys(
74+
id: be == 1,
75+
finished: be == true,
76+
error: have_keys(
77+
class: be == "RuntimeError",
78+
message: be == "Monitor failed to get status!",
79+
backtrace: be_a(Array)
80+
)
81+
)
82+
83+
stream.close
84+
end
85+
end
86+
87+
it "handles responses arriving after timeout" do
88+
# This reproduces the production bug:
89+
# 1. Client makes a call with timeout.
90+
# 2. Timeout expires, call ID is deleted from tracking.
91+
# 3. Response arrives late.
92+
# 4. System should recognize it's a response (no 'do' key) and ignore it.
93+
stream = endpoint.connect
94+
95+
# Simulate what happens when a timed-out response arrives:
96+
# The response only has id and finished (no 'do' key) because it's a response, not a request
97+
message = {id: 1, finished: true}
98+
stream.puts(JSON.dump(message))
99+
stream.flush
100+
101+
# Send a valid message to confirm the server is still working:
102+
valid_message = {id: 3, do: :register, state: {process_id: ::Process.pid}}
103+
stream.puts(JSON.dump(valid_message))
104+
stream.flush
105+
106+
# Read the response to the valid message:
107+
response = JSON.parse(stream.gets, symbolize_names: true)
108+
109+
# The server should have ignored the stale response and processed the valid one:
110+
expect(response).to have_keys(
111+
id: be == 3,
112+
finished: be == true
113+
)
114+
115+
stream.close
116+
end
117+
118+
it "does not send error response for stale messages" do
119+
# Verify that stale messages are silently ignored, not treated as errors.
120+
# Before the fix, this would cause NoMethodError: undefined method 'do_'
121+
stream = endpoint.connect
122+
123+
# Send a stale response:
124+
stale_message = {id: 5, finished: true}
125+
stream.puts(JSON.dump(stale_message))
126+
stream.flush
127+
128+
# Send a valid message:
129+
valid_message = {id: 7, do: :register, state: {process_id: ::Process.pid}}
130+
stream.puts(JSON.dump(valid_message))
131+
stream.flush
132+
133+
# We should only get ONE response - for the valid message.
134+
# Not an error response for the stale message.
135+
response = JSON.parse(stream.gets, symbolize_names: true)
136+
137+
expect(response).to have_keys(
138+
id: be == 7,
139+
finished: be == true
140+
)
141+
142+
# Verify the response is successful, not an error:
143+
expect(response[:failed]).to be_nil
144+
expect(response[:error]).to be_nil
145+
146+
stream.close
147+
end
148+
end

0 commit comments

Comments
 (0)