-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmongo_queue.rb
141 lines (125 loc) · 4.98 KB
/
mongo_queue.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class Mongo::Queue
attr_reader :connection, :config
DEFAULT_CONFIG = {
:database => 'mongo_queue',
:collection => 'mongo_queue',
:timeout => 300,
:attempts => 3
}.freeze
DEFAULT_INSERT = {
:priority => 0,
:attempts => 0,
:locked_by => nil,
:locked_at => nil,
:last_error => nil
}.freeze
# Create a new instance of MongoQueue with the provided mongodb connection and optional configuration.
# See +DEFAULT_CONFIG+ for default configuration and possible configuration options.
#
# Example:
# db = Mongo::Connection.new('localhost')
# config = {:timeout => 90, :attempts => 2}
# queue = Mongo::Queue.new(db, config)
#
def initialize(connection, opts={})
@connection = connection
@config = DEFAULT_CONFIG.merge(opts)
end
# Remove all items from the queue. Use with caution!
def flush!
collection.drop
end
# Insert a new item in to the queue with required queue message parameters.
#
# Example:
# queue.insert(:name => 'Billy', :email => 'billy@example.com', :message => 'Here is the thing you asked for')
def insert(hash)
id = collection.insert DEFAULT_INSERT.merge(hash)
collection.find_one(:_id => Mongo::ObjectID.from_string(id.to_s))
end
# Lock and return the next queue message if one is available. Returns nil if none are available. Be sure to
# review the README.rdoc regarding proper usage of the locking process identifier (locked_by).
# Example:
# locked_doc = queue.lock_next(Thread.current.object_id)
def lock_next(locked_by)
cmd = OrderedHash.new
cmd['findandmodify'] = @config[:collection]
cmd['update'] = {'$set' => {:locked_by => locked_by, :locked_at => Time.now.utc}}
cmd['query'] = {:locked_by => nil, :locked_by => nil, :attempts => {'$lt' => @config[:attempts]}}
cmd['sort'] = sort_hash
cmd['limit'] = 1
cmd['new'] = true
value_of collection.db.command(cmd)
end
# Removes stale locks that have exceeded the timeout and places them back in the queue.
def cleanup!
cursor = collection.find({:locked_by => /.*/, :locked_at => {'$lt' => Time.now.utc - config[:timeout]}})
doc = cursor.next_document
while doc
release(doc, doc['locked_by'])
doc = cursor.next_document
end
end
# Release a lock on the specified document and allow it to become available again.
def release(doc, locked_by)
cmd = OrderedHash.new
cmd['findandmodify'] = @config[:collection]
cmd['update'] = {'$set' => {:locked_by => nil, :locked_at => nil}}
cmd['query'] = {:locked_by => locked_by, :_id => Mongo::ObjectID.from_string(doc['_id'].to_s)}
cmd['limit'] = 1
cmd['new'] = true
value_of collection.db.command(cmd)
end
# Remove the document from the queue. This should be called when the work is done and the document is no longer needed.
# You must provide the process identifier that the document was locked with to complete it.
def complete(doc, locked_by)
cmd = OrderedHash.new
cmd['findandmodify'] = @config[:collection]
cmd['query'] = {:locked_by => locked_by, :_id => Mongo::ObjectID.from_string(doc['_id'].to_s)}
cmd['remove'] = true
cmd['limit'] = 1
value_of collection.db.command(cmd)
end
# Increase the error count on the locked document and release. Optionally provide an error message.
def error(doc, error_message=nil)
doc['attempts'] +=1
collection.save doc.merge({
'last_error' => error_message,
'locked_by' => nil,
'locked_at' => nil
})
end
# Provides some information about what is in the queue. We are using an eval to ensure that a
# lock is obtained during the execution of this query so that the results are not skewed.
# please be aware that it will lock the database during the execution, so avoid using it too
# often, even though it it very tiny and should be relatively fast.
def stats
js = "function queue_stat(){
return db.eval(
function(){
var a = db.#{config[:collection]}.count({'locked_by': null, 'attempts': {$lt: #{config[:attempts]}}});
var l = db.#{config[:collection]}.count({'locked_by': /.*/});
var e = db.#{config[:collection]}.count({'attempts': {$gte: #{config[:attempts]}}});
var t = db.#{config[:collection]}.count();
return [a, l, e, t];
}
);
}"
available, locked, errors, total = collection.db.eval(js)
{ :locked => locked.to_i,
:errors => errors.to_i,
:available => available.to_i,
:total => total.to_i }
end
protected
def sort_hash #:nodoc:
sh = OrderedHash.new
sh['priority'] = -1 ; sh
end
def value_of(result) #:nodoc:
result['okay'] == 0 ? nil : result['value']
end
def collection #:nodoc:
@connection.db(@config[:database]).collection(@config[:collection])
end
end