-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker PID Cleanup on Boot #32
Comments
I ended up with this that not only removes dead workers, but also fails stuck jobs so you can re-queue them using the interface if you want. os = require("os")
exec = require("child_process").exec
async = require 'async'
pidCleanup = (hostname, redis, pids, callback) ->
hostKey = "resque:worker:" + hostname + "*"
redis.keys hostKey, (err, keys) ->
#console.log err, keys, hostKey
return callback(err) if err
return callback() unless keys.length
workersToDelete = []
keysToDelete = []
jobsToFail = []
processKey = (key, cb) ->
parts = key.split(":")
[prefix, worker, hostname, pid, queues] = parts[..5]
pid = parseInt(pid, 10)
# skip known process ids
return cb() if pid in pids
# delete both the "in-process" job keys as well as the "started" keys
keysToDelete.push key
# Started seems to mean when the worker was created.
# If this key doesn't end with :started, then it means it is the job
# that's in progress.
isStarted = (parts.length == 6 and parts[5] == 'started')
if isStarted
workerKey = "#{hostname}:#{pid}:#{queues}"
workersToDelete.push workerKey
cb()
else
# in-process job that has to be retrieved so we can fail it
redis.get key, (err, ipString) ->
ip = JSON.parse(ipString)
jobName = ip.payload['class']
jobsToFail.push
queue: ip.queue
payload: ip.payload
worker: "#{hostname}:#{pid}:#{queues}"
exception: "Error"
error: "Stuck Job: #{jobName}"
backtrace: [] # is this necessary?
failed_at: new Date()
cb()
async.map keys, processKey, (err) ->
return callback(err) if err
unless keysToDelete.length or workersToDelete.length or jobsToFail.length
return callback()
console.log "keysToDelete", keysToDelete
console.log "workersToDelete", workersToDelete
console.log "jobsToFail", jobsToFail
multi = redis.multi()
for key in keysToDelete
multi.del key
for member in workersToDelete
multi.srem "resque:workers", member
for job in jobsToFail
multi.incr "resque:stat:failed"
jobString = JSON.stringify(job)
multi.lpush "resque:failed", jobString
multi.exec (err, results) ->
return callback(err) if err
callback()
getPids = (matcher, callback) ->
grepString = "ps awx | grep \"#{matcher}\" | grep -v grep"
child = exec grepString, (err, stdout, stderr) ->
pids = []
for line in stdout.split("\n")
line = line.trim()
continue unless line
pid = parseInt(line.split(" ")[0], 10)
pids.push pid
callback err, pids
cleanDeadWorkers = (callback) ->
getPids "resque", (err, pids) ->
return callback(err) if err
# NOTE: pids might contain more than just job queue worker processes, but
# that's OK because it is a whitelist only.
pidCleanup os.hostname(), jobq.redis, pids, callback
if require.main == module
cleanDeadWorkers (err) ->
throw err if err
run()
|
I guess I should update that we've moved over to our own resque package, and here's what we use for our pid cleanup logic now: worker.prototype.workerCleanup = function(callback){
var self = this;
self.getPids(function(err, pids){
self.connection.redis.smembers(self.connection.key('workers'), function(err, workers){
workers.forEach(function(w){
var parts = w.split(":");
var host = parts[0]; var pid = parseInt(parts[1]); var queues = parseInt(parts[2]);
if(host === os.hostname() && pids.indexOf(pid) < 0){
(function(w){
self.emit("cleaning_worker", w, pid);
var parts = w.split(":");
var queues = parts.splice(-1, 1);
var pureName = parts.join(':')
self.untrack(pureName, queues);
})(w)
}
});
if(typeof callback == "function"){ callback(); }
});
});
}
worker.prototype.getPids = function(callback){
var child = exec('ps awx | grep -v grep', function(error, stdout, stderr){
var pids = [];
stdout.split("\n").forEach(function(line){
line = line.trim();
if(line.length > 0){
var pid = parseInt(line.split(' ')[0]);
pids.push(pid);
}
});
callback(error, pids);
});
} and worker.prototype.untrack = function(name, queues, callback) {
var self = this;
self.connection.redis.srem(self.connection.key('workers'), (name + ":" + queues), function(){
self.connection.redis.del([
self.connection.key('worker', name, self.stringQueues()),
self.connection.key('worker', name, self.stringQueues(), 'started'),
self.connection.key('stat', 'failed', name),
self.connection.key('stat', 'processed', name)
], function(err){
if(typeof callback == "function"){ callback(err); }
});
});
}; More details here |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for this awesome module!
One of the things that ruby-resque does at boot is inspect the list of workers redis has, and check if any of them used to run on
this
server, and check if they are still running. This is important for knowing the status of your ecosystem (and allowing the resque web interface to continue to make sense)In our projects, we use the following methods to clean up pids at boot:
The text was updated successfully, but these errors were encountered: