Skip to content

Commit 1ce6a57

Browse files
committed
Fix: avoid uninterruptible loops (with subscribtions)
1 parent 97c759c commit 1ce6a57

File tree

2 files changed

+28
-24
lines changed

2 files changed

+28
-24
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ def redis_params
129129
return connectionParams.merge(baseParams)
130130
end
131131

132+
TIMEOUT = 5 # Redis only supports Integer values
133+
private_constant :TIMEOUT
134+
132135
def new_redis_instance
133136
::Redis.new(redis_params)
134137
end
@@ -238,7 +241,7 @@ def list_batch_listener(redis, output_queue)
238241
end
239242

240243
def list_single_listener(redis, output_queue)
241-
item = redis.blpop(@key, 0, :timeout => 1)
244+
item = redis.blpop(@key, 0, :timeout => TIMEOUT)
242245
return unless item # from timeout or other conditions
243246

244247
# blpop returns the 'key' read from as well as the item result
@@ -268,62 +271,67 @@ def redis_runner
268271
begin
269272
@redis ||= connect
270273
yield
274+
rescue ::Redis::TimeoutError
275+
@logger.debug("Redis timeout, retrying")
276+
retry unless stop?
277+
rescue ::Redis::BaseConnectionError, ::Redis::ProtocolError => e
278+
@logger.warn("Redis connection error", message: e.message, exception: e.class)
279+
reset_and_sleep
280+
retry unless stop?
271281
rescue ::Redis::BaseError => e
272-
@logger.warn("Redis connection problem", :exception => e)
273-
# Reset the redis variable to trigger reconnect
274-
@redis = nil
275-
Stud.stoppable_sleep(1) { stop? }
276-
retry if !stop?
282+
@logger.warn("Redis error", message: e.message, exception: e.class, backtrace: e.backtrace)
283+
reset_and_sleep
284+
retry unless stop?
277285
end
278286
end
279287

288+
def reset_and_sleep
289+
# Reset the redis variable to trigger reconnect
290+
@redis = nil
291+
Stud.stoppable_sleep(1) { stop? }
292+
end
293+
280294
# private
281295
def channel_runner(output_queue)
282-
redis_runner do
283-
channel_listener(output_queue)
284-
end
296+
redis_runner { channel_listener(output_queue) }
285297
end
286298

287299
# private
288300
def channel_listener(output_queue)
289-
@redis.subscribe(@key) do |on|
301+
@redis.subscribe_with_timeout(TIMEOUT, @key) do |on|
290302
on.subscribe do |channel, count|
291-
@logger.info("Subscribed", :channel => channel, :count => count)
303+
@logger.debug("Subscribed", :channel => channel, :count => count)
292304
end
293305

294306
on.message do |channel, message|
295307
queue_event(message, output_queue, channel)
296308
end
297309

298310
on.unsubscribe do |channel, count|
299-
@logger.info("Unsubscribed", :channel => channel, :count => count)
311+
@logger.debug("Unsubscribed", :channel => channel, :count => count)
300312
end
301313
end
302314
end
303315

304316
def pattern_channel_runner(output_queue)
305-
redis_runner do
306-
pattern_channel_listener(output_queue)
307-
end
317+
redis_runner { pattern_channel_listener(output_queue) }
308318
end
309319

310320
# private
311321
def pattern_channel_listener(output_queue)
312-
@redis.psubscribe @key do |on|
322+
@redis.psubscribe_with_timeout(TIMEOUT, @key) do |on|
313323
on.psubscribe do |channel, count|
314-
@logger.info("Subscribed", :channel => channel, :count => count)
324+
@logger.debug("Subscribed", :channel => channel, :count => count)
315325
end
316326

317327
on.pmessage do |pattern, channel, message|
318328
queue_event(message, output_queue, channel)
319329
end
320330

321331
on.punsubscribe do |channel, count|
322-
@logger.info("Unsubscribed", :channel => channel, :count => count)
332+
@logger.debug("Unsubscribed", :channel => channel, :count => count)
323333
end
324334
end
325335
end
326336

327-
# end
328-
329337
end end end # Redis Inputs LogStash

spec/inputs/redis_spec.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,6 @@ def close_thread(inst, rt)
375375

376376
["list", "channel", "pattern_channel"].each do |data_type|
377377
context data_type do
378-
# TODO pending
379-
# redis-rb ends up in a read wait loop since we do not use subscribe_with_timeout
380-
next unless data_type == 'list'
381-
382378
it_behaves_like "an interruptible input plugin", :redis => true do
383379
let(:config) { { 'key' => 'foo', 'data_type' => data_type } }
384380
end

0 commit comments

Comments
 (0)