Skip to content

Commit 42f7838

Browse files
RUBY-2748 Retry reads/writes on another mongos (mongodb#2717)
* First implementation attempt * Adjust spec requirement * Improve specs * Add write spec * Add documenting comments * Use proper error codes in prose tests
1 parent c303ab6 commit 42f7838

File tree

6 files changed

+363
-25
lines changed

6 files changed

+363
-25
lines changed

lib/mongo/retryable.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ module Retryable
4646
# @api private
4747
#
4848
# @return [ Mongo::Server ] A server matching the server preference.
49-
def select_server(cluster, server_selector, session)
50-
server_selector.select_server(cluster, nil, session)
49+
def select_server(cluster, server_selector, session, failed_server = nil)
50+
server_selector.select_server(cluster, nil, session, deprioritized: [failed_server].compact)
5151
end
5252

5353
# Returns the read worker for handling retryable reads.

lib/mongo/retryable/read_worker.rb

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,13 @@ def deprecated_legacy_read_with_retry(&block)
190190
#
191191
# @return [ Result ] The result of the operation.
192192
def modern_read_with_retry(session, server_selector, &block)
193-
yield select_server(cluster, server_selector, session)
193+
server = select_server(cluster, server_selector, session)
194+
yield server
194195
rescue *retryable_exceptions, Error::OperationFailure, Auth::Unauthorized, Error::PoolError => e
195196
e.add_notes('modern retry', 'attempt 1')
196197
raise e if session.in_transaction?
197198
raise e if !is_retryable_exception?(e) && !e.write_retryable?
198-
retry_read(e, session, server_selector, &block)
199+
retry_read(e, session, server_selector, failed_server: server, &block)
199200
end
200201

201202
# Attempts to do a "legacy" read with retry. The operation will be
@@ -257,12 +258,14 @@ def read_without_retry(session, server_selector, &block)
257258
# being run on.
258259
# @param [ Mongo::ServerSelector::Selectable ] server_selector Server
259260
# selector for the operation.
261+
# @param [ Mongo::Server ] failed_server The server on which the original
262+
# operation failed.
260263
# @param [ Proc ] block The block to execute.
261264
#
262265
# @return [ Result ] The result of the operation.
263-
def retry_read(original_error, session, server_selector, &block)
266+
def retry_read(original_error, session, server_selector, failed_server: nil, &block)
264267
begin
265-
server = select_server(cluster, server_selector, session)
268+
server = select_server(cluster, server_selector, session, failed_server)
266269
rescue Error, Error::AuthError => e
267270
original_error.add_note("later retry failed: #{e.class}: #{e}")
268271
raise original_error
@@ -289,8 +292,6 @@ def retry_read(original_error, session, server_selector, &block)
289292
raise original_error
290293
end
291294
end
292-
293295
end
294-
295296
end
296297
end

lib/mongo/retryable/write_worker.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ def modern_write_with_retry(session, server, context, &block)
240240

241241
# Context#with creates a new context, which is not necessary here
242242
# but the API is less prone to misuse this way.
243-
retry_write(e, txn_num, context: context.with(is_retry: true), &block)
243+
retry_write(e, txn_num, context: context.with(is_retry: true), failed_server: server, &block)
244244
end
245245

246246
# Called after a failed write, this will retry the write no more than
@@ -250,17 +250,19 @@ def modern_write_with_retry(session, server, context, &block)
250250
# retry.
251251
# @param [ Number ] txn_num The transaction number.
252252
# @param [ Operation::Context ] context The context for the operation.
253+
# @param [ Mongo::Server ] failed_server The server on which the original
254+
# operation failed.
253255
#
254256
# @return [ Result ] The result of the operation.
255-
def retry_write(original_error, txn_num, context:, &block)
257+
def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
256258
session = context.session
257259

258260
# We do not request a scan of the cluster here, because error handling
259261
# for the error which triggered the retry should have updated the
260262
# server description and/or topology as necessary (specifically,
261263
# a socket error or a not master error should have marked the respective
262264
# server unknown). Here we just need to wait for server selection.
263-
server = select_server(cluster, ServerSelector.primary, session)
265+
server = select_server(cluster, ServerSelector.primary, session, failed_server)
264266

265267
unless server.retry_writes?
266268
# Do not need to add "modern retry" here, it should already be on

lib/mongo/server_selector/base.rb

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ def ==(other)
164164
# for mongos pinning. Added in version 2.10.0.
165165
# @param [ true | false ] write_aggregation Whether we need a server that
166166
# supports writing aggregations (e.g. with $merge/$out) on secondaries.
167+
# @param [ Array<Server> ] deprioritized A list of servers that should
168+
# be selected from only if no other servers are available. This is
169+
# used to avoid selecting the same server twice in a row when
170+
# retrying a command.
167171
#
168172
# @return [ Mongo::Server ] A server matching the server preference.
169173
#
@@ -174,16 +178,16 @@ def ==(other)
174178
# lint mode is enabled.
175179
#
176180
# @since 2.0.0
177-
def select_server(cluster, ping = nil, session = nil, write_aggregation: false)
178-
select_server_impl(cluster, ping, session, write_aggregation).tap do |server|
181+
def select_server(cluster, ping = nil, session = nil, write_aggregation: false, deprioritized: [])
182+
select_server_impl(cluster, ping, session, write_aggregation, deprioritized).tap do |server|
179183
if Lint.enabled? && !server.pool.ready?
180184
raise Error::LintError, 'Server selector returning a server with a pool which is not ready'
181185
end
182186
end
183187
end
184188

185189
# Parameters and return values are the same as for select_server.
186-
private def select_server_impl(cluster, ping, session, write_aggregation)
190+
private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized)
187191
if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
188192
return cluster.servers.first
189193
end
@@ -266,7 +270,7 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false)
266270
end
267271
end
268272

269-
server = try_select_server(cluster, write_aggregation: write_aggregation)
273+
server = try_select_server(cluster, write_aggregation: write_aggregation, deprioritized: deprioritized)
270274

271275
if server
272276
unless cluster.topology.compatible?
@@ -321,11 +325,15 @@ def select_server(cluster, ping = nil, session = nil, write_aggregation: false)
321325
# an eligible server.
322326
# @param [ true | false ] write_aggregation Whether we need a server that
323327
# supports writing aggregations (e.g. with $merge/$out) on secondaries.
328+
# @param [ Array<Server> ] deprioritized A list of servers that should
329+
# be selected from only if no other servers are available. This is
330+
# used to avoid selecting the same server twice in a row when
331+
# retrying a command.
324332
#
325333
# @return [ Server | nil ] A suitable server, if one exists.
326334
#
327335
# @api private
328-
def try_select_server(cluster, write_aggregation: false)
336+
def try_select_server(cluster, write_aggregation: false, deprioritized: [])
329337
servers = if write_aggregation && cluster.replica_set?
330338
# 1. Check if ALL servers in cluster support secondary writes.
331339
is_write_supported = cluster.servers.reduce(true) do |res, server|
@@ -347,7 +355,7 @@ def try_select_server(cluster, write_aggregation: false)
347355
# by the selector (e.g. for secondary preferred, the first
348356
# server may be a secondary and the second server may be primary)
349357
# and we should take the first server here respecting the order
350-
server = servers.first
358+
server = suitable_server(servers, deprioritized)
351359

352360
if server
353361
if Lint.enabled?
@@ -418,6 +426,24 @@ def suitable_servers(cluster)
418426

419427
private
420428

429+
# Returns a server from the list of servers that is suitable for
430+
# executing the operation.
431+
#
432+
# @param [ Array<Server> ] servers The candidate servers.
433+
# @param [ Array<Server> ] deprioritized A list of servers that should
434+
# be selected from only if no other servers are available.
435+
#
436+
# @return [ Server | nil ] The suitable server or nil if no suitable
437+
# server is available.
438+
def suitable_server(servers, deprioritized)
439+
preferred = servers - deprioritized
440+
if preferred.empty?
441+
servers.first
442+
else
443+
preferred.first
444+
end
445+
end
446+
421447
# Convert this server preference definition into a format appropriate
422448
# for sending to a MongoDB server (i.e., as a command field).
423449
#

spec/integration/retryable_reads_errors_spec.rb

Lines changed: 161 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020

2121
let(:failpoint) do
2222
{
23-
configureFailPoint: "failCommand",
24-
mode: { times: 1 },
25-
data: {
26-
failCommands: [ "find" ],
27-
errorCode: 91,
28-
blockConnection: true,
29-
blockTimeMS: 1000
30-
}
23+
configureFailPoint: "failCommand",
24+
mode: { times: 1 },
25+
data: {
26+
failCommands: [ "find" ],
27+
errorCode: 91,
28+
blockConnection: true,
29+
blockTimeMS: 1000
30+
}
3131
}
3232
end
3333

@@ -107,4 +107,157 @@
107107
})
108108
end
109109
end
110+
111+
context 'Retries in a sharded cluster' do
112+
require_topology :sharded
113+
min_server_version '4.2'
114+
require_no_auth
115+
116+
let(:subscriber) { Mrss::EventSubscriber.new }
117+
118+
let(:find_started_events) do
119+
subscriber.started_events.select { |e| e.command_name == "find" }
120+
end
121+
122+
let(:find_failed_events) do
123+
subscriber.failed_events.select { |e| e.command_name == "find" }
124+
end
125+
126+
let(:find_succeeded_events) do
127+
subscriber.succeeded_events.select { |e| e.command_name == "find" }
128+
end
129+
130+
context 'when another mongos is available' do
131+
132+
let(:first_mongos) do
133+
Mongo::Client.new(
134+
[SpecConfig.instance.addresses.first],
135+
direct_connection: true,
136+
database: 'admin'
137+
)
138+
end
139+
140+
let(:second_mongos) do
141+
Mongo::Client.new(
142+
[SpecConfig.instance.addresses.last],
143+
direct_connection: false,
144+
database: 'admin'
145+
)
146+
end
147+
148+
let(:client) do
149+
new_local_client(
150+
[
151+
SpecConfig.instance.addresses.first,
152+
SpecConfig.instance.addresses.last,
153+
],
154+
SpecConfig.instance.test_options.merge(retry_reads: true)
155+
)
156+
end
157+
158+
let(:expected_servers) do
159+
[
160+
SpecConfig.instance.addresses.first.to_s,
161+
SpecConfig.instance.addresses.last.to_s
162+
].sort
163+
end
164+
165+
before do
166+
skip 'This test requires at least two mongos' if SpecConfig.instance.addresses.length < 2
167+
168+
first_mongos.database.command(
169+
configureFailPoint: 'failCommand',
170+
mode: { times: 1 },
171+
data: {
172+
failCommands: %w(find),
173+
closeConnection: false,
174+
errorCode: 6
175+
}
176+
)
177+
178+
second_mongos.database.command(
179+
configureFailPoint: 'failCommand',
180+
mode: { times: 1 },
181+
data: {
182+
failCommands: %w(find),
183+
closeConnection: false,
184+
errorCode: 6
185+
}
186+
)
187+
end
188+
189+
after do
190+
[first_mongos, second_mongos].each do |admin_client|
191+
admin_client.database.command(
192+
configureFailPoint: 'failCommand',
193+
mode: 'off'
194+
)
195+
admin_client.close
196+
end
197+
client.close
198+
end
199+
200+
it 'retries on different mongos' do
201+
client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
202+
expect { collection.find.first }.to raise_error(Mongo::Error::OperationFailure)
203+
expect(find_started_events.map { |e| e.address.to_s }.sort).to eq(expected_servers)
204+
expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq(expected_servers)
205+
end
206+
end
207+
208+
context 'when no other mongos is available' do
209+
let(:mongos) do
210+
Mongo::Client.new(
211+
[SpecConfig.instance.addresses.first],
212+
direct_connection: true,
213+
database: 'admin'
214+
)
215+
end
216+
217+
let(:client) do
218+
new_local_client(
219+
[
220+
SpecConfig.instance.addresses.first
221+
],
222+
SpecConfig.instance.test_options.merge(retry_reads: true)
223+
)
224+
end
225+
226+
before do
227+
mongos.database.command(
228+
configureFailPoint: 'failCommand',
229+
mode: { times: 1 },
230+
data: {
231+
failCommands: %w(find),
232+
closeConnection: false,
233+
errorCode: 6
234+
}
235+
)
236+
end
237+
238+
after do
239+
mongos.database.command(
240+
configureFailPoint: 'failCommand',
241+
mode: 'off'
242+
)
243+
mongos.close
244+
client.close
245+
end
246+
247+
it 'retries on the same mongos' do
248+
client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
249+
expect { collection.find.first }.not_to raise_error
250+
expect(find_started_events.map { |e| e.address.to_s }.sort).to eq([
251+
SpecConfig.instance.addresses.first.to_s,
252+
SpecConfig.instance.addresses.first.to_s
253+
])
254+
expect(find_failed_events.map { |e| e.address.to_s }.sort).to eq([
255+
SpecConfig.instance.addresses.first.to_s
256+
])
257+
expect(find_succeeded_events.map { |e| e.address.to_s }.sort).to eq([
258+
SpecConfig.instance.addresses.first.to_s
259+
])
260+
end
261+
end
262+
end
110263
end

0 commit comments

Comments
 (0)