Skip to content
This repository was archived by the owner on Oct 24, 2024. It is now read-only.

Commit dc285af

Browse files
committed
Option to reset tries in ping()
The `ping()` method can be useful for setting up recurring jobs if we deliberately avoid acking the job. For example: 1. Submit job 2. Pull job from queue 3. Process job 4. `ping()` 5. Go to Step 2 A real-world example of this might be notifying for recurring appointments, or setting up long-running, cross-process, periodic jobs. The main advantage this has over using `ack()` and `add()` is that it effectively requeues a job in a single, atomic commit. If we tried the above with `ack()` and `add()`: 1. Submit job 2. Pull job from queue 3. Process job 4. `ack()` 5. `add()` 6. Go to Step 2 In this version, the process could crash or quit between Steps 4 & 5, and our recurring job would be lost. We could also try inverting Steps 4 & 5, but then we get the opposite issue: if the process crashes or quits, then we might accidentally duplicate our recurring job. It also prevents us from setting up any unique indexes on our `payload`. Using `ping()` perfectly solves this problem: there's only ever one version of the job, and it's never dropped (because it's never acked). If the process crashes before we `ping()`, we'll retry it, as with any other normal job. The one issue with this approach is that `tries` will steadily increase, and - if you have `maxRetries` set up - the job will eventually be moved to the dead queue, which isn't what we want. This change adds an option to the `ping()` method: `resetTries`, which will reset `tries` to zero, so that the job is treated like a "new" job when it's pinged, and is only moved to the dead queue if it's genuinely retried.
1 parent 4a10a75 commit dc285af

File tree

3 files changed

+71
-0
lines changed

3 files changed

+71
-0
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,17 @@ queue.get((err, msg) => {
345345
})
346346
```
347347

348+
You can also reset the job tries, effectively creating an atomic ack + add for the
349+
same job using `resetTries`:
350+
351+
```js
352+
queue.get((err, msg) => {
353+
queue.ping(msg.ack, { resetTries: true }, (err, id) => {
354+
// This message now has 0 tries
355+
})
356+
})
357+
```
358+
348359
### .total() ###
349360

350361
Returns the total number of messages that has ever been in the queue, including

mongodb-queue.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ Queue.prototype.ping = function(ack, opts, callback) {
175175
visible : nowPlusSecs(visibility)
176176
}
177177
}
178+
179+
if (opts.resetTries) {
180+
update.$set.tries = 0
181+
}
182+
178183
self.col.findOneAndUpdate(query, update, { returnOriginal : false }, function(err, msg, blah) {
179184
if (err) return callback(err)
180185
if ( !msg.value ) {

test/ping.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,61 @@ test("ping: check visibility option overrides the queue visibility", function(t)
174174
)
175175
})
176176

177+
test("ping: reset tries", function(t) {
178+
var queue = mongoDbQueue(db, 'ping', { visibility: 3 })
179+
var msg
180+
181+
async.series(
182+
[
183+
function(next) {
184+
queue.add('Hello, World!', function(err, id) {
185+
t.ok(!err, 'There is no error when adding a message.')
186+
t.ok(id, 'There is an id returned when adding a message.')
187+
next()
188+
})
189+
},
190+
function(next) {
191+
queue.get(function(err, thisMsg) {
192+
msg = thisMsg
193+
// message should reset in three seconds
194+
t.ok(msg.id, 'Got a msg.id (sanity check)')
195+
setTimeout(next, 2 * 1000)
196+
})
197+
},
198+
function(next) {
199+
queue.ping(msg.ack, { resetTries: true }, function(err, id) {
200+
t.ok(!err, 'No error when pinging a message')
201+
t.ok(id, 'Received an id when acking this message')
202+
// wait until the msg has returned to the queue
203+
setTimeout(next, 6 * 1000)
204+
})
205+
},
206+
function(next) {
207+
queue.get(function(err, msg) {
208+
t.equal(msg.tries, 1, 'Tries were reset')
209+
queue.ack(msg.ack, function(err) {
210+
t.ok(!err, 'No error when acking the message')
211+
next()
212+
})
213+
})
214+
},
215+
function(next) {
216+
queue.get(function(err, msg) {
217+
// no more messages
218+
t.ok(!err, 'No error when getting no messages')
219+
t.ok(!msg, 'No msg received')
220+
next()
221+
})
222+
}
223+
],
224+
function(err) {
225+
if (err) t.fail(err)
226+
t.pass('Finished test ok')
227+
t.end()
228+
}
229+
)
230+
})
231+
177232
test('client.close()', function(t) {
178233
t.pass('client.close()')
179234
client.close()

0 commit comments

Comments
 (0)