Skip to content

Commit

Permalink
Remove reconnect logic from worker.
Browse files Browse the repository at this point in the history
introduces a new class, Client, which will handle Redis stuff from
now on. As such, it needs the reconnect logic.
  • Loading branch information
yaauie authored and steveklabnik committed Apr 17, 2013
1 parent b805082 commit 65616ca
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 103 deletions.
3 changes: 3 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ to know.
Resque provides `resque` bin file instead which you should use for running workers and other Resque-related stuff.

Old `$ QUEUE=high,failure rake resque:work` translates to `$ resque work -q high,failure`. Check all available tasks by running `resque help`

* Resque::Workers#initialize now takes a client as an option. This manages
its connection to Redis.
44 changes: 44 additions & 0 deletions lib/resque/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
##
# Resque::Client is a wrapper around all things Redis.
#
# This provides a level of indirection so that the rest of our code
# doesn't need to know anything about Redis, and allows us to someday
# maybe even move away from Redis to another backend if we need to.
#
# Also helps because we can mock this out in our tests. Only mock
# stuff you own.
#
# Also, we can theoretically have multiple Redis/Resques going on
# one project.
module Resque
class Client
attr_reader :backend

def initialize(backend)
@backend = backend

@reconnected = false
end

# Reconnect to Redis to avoid sharing a connection with the parent,
# retry up to 3 times with increasing delay before giving up.
def reconnect
return if @reconnected

tries = 0
begin
backend.client.reconnect
@reconnected = true
rescue Redis::BaseConnectionError
if (tries += 1) <= 3
Resque.logger.info "Error reconnecting to Redis; retrying"
sleep(tries)
retry
else
Resque.logger.info "Error reconnecting to Redis; quitting"
raise
end
end
end
end
end
41 changes: 10 additions & 31 deletions lib/resque/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'resque/core_ext/hash'
require 'resque/worker_registry'
require 'resque/errors'
require 'resque/client'

module Resque
# A Resque Worker processes jobs. On platforms that support fork(2),
Expand All @@ -16,14 +17,6 @@ module Resque
class Worker
include Resque::Logging

def redis
Resque.redis
end

def self.redis
Resque.redis
end

# Boolean indicating whether this worker can or can not fork.
# Automatically set if a fork(2) fails.
attr_accessor :cant_fork
Expand All @@ -33,6 +26,8 @@ def self.redis

attr_writer :to_s

attr_reader :client

# Workers should be initialized with an array of string queue
# names. The order is important: a Worker will check the first
# queue given for a job. If none is found, it will check the
Expand All @@ -58,15 +53,16 @@ def initialize(queues = [], options = {})
:fork_per_job => true,
# When set to true, forked workers will exit with `exit`, calling any `at_exit` code handlers that have been
# registered in the application. Otherwise, forked workers exit with `exit!`
:run_at_exit_hooks => false
:run_at_exit_hooks => false,
}
@options.merge!(options.symbolize_keys)

@queues = (queues.is_a?(Array) ? queues : [queues]).map { |queue| queue.to_s.strip }
@shutdown = nil
@paused = nil
@cant_fork = false
@reconnected = false

@client = @options.fetch(:client) { Client.new(Resque.redis) }

validate_queues
end
Expand Down Expand Up @@ -304,6 +300,10 @@ def perform(job)
end
end

def reconnect
client.reconnect
end

protected
# Stop processing jobs after the current one has completed (if we're
# currently running one).
Expand Down Expand Up @@ -521,26 +521,5 @@ def reserve(interval = 5)
Resque.logger.debug "Found job on #{queue}"
Job.new(queue.name, job) if (queue && job)
end

# Reconnect to Redis to avoid sharing a connection with the parent,
# retry up to 3 times with increasing delay before giving up.
def reconnect
return if @reconnected

tries = 0
begin
redis.client.reconnect
@reconnected = true
rescue Redis::BaseConnectionError
if (tries += 1) <= 3
Resque.logger.info "Error reconnecting to Redis; retrying"
sleep(tries)
retry
else
Resque.logger.info "Error reconnecting to Redis; quitting"
raise
end
end
end
end
end
Loading

0 comments on commit 65616ca

Please sign in to comment.