Skip to content

Commit e0ae0f1

Browse files
committed
💥 Promisify and add mongodb@5 support
[`mongodb@5`][1] drops support for callbacks, which breaks this library, which is all written with callbacks. This is a **BREAKING** change which drops callback support from this library as well, and fully embraces promises through `async`/`await` syntax in both the library code and test code. This allows us to support both `mongodb@4` and `mongodb@5`. [1]: https://github.com/mongodb/node-mongodb-native/releases/tag/v5.0.0
1 parent 9c7e02f commit e0ae0f1

File tree

14 files changed

+513
-1297
lines changed

14 files changed

+513
-1297
lines changed

‎mongodb-queue.js‎

Lines changed: 61 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -51,35 +51,21 @@ function Queue(db, name, opts) {
5151
}
5252
}
5353

54-
Queue.prototype.createIndexes = function(callback) {
55-
var self = this
56-
57-
self.col.createIndex({ deleted : 1, visible : 1 }, function(err, indexname) {
58-
if (err) return callback(err)
59-
self.col.createIndex({ ack : 1 }, { unique : true, sparse : true }, function(err) {
60-
if (err) return callback(err)
61-
self.col.createIndex({ deleted : 1 }, { sparse : true }, function(err) {
62-
if (err) return callback(err)
63-
callback(null, indexname)
64-
})
65-
})
66-
})
54+
Queue.prototype.createIndexes = async function() {
55+
var indexname = await this.col.createIndex({ deleted : 1, visible : 1 })
56+
await this.col.createIndex({ack: 1}, {unique: true, sparse: true})
57+
await this.col.createIndex({deleted: 1}, {sparse: true})
58+
return indexname
6759
}
6860

69-
Queue.prototype.add = function(payload, opts, callback) {
70-
var self = this
71-
if ( !callback ) {
72-
callback = opts
73-
opts = {}
74-
}
75-
var delay = opts.delay || self.delay
61+
Queue.prototype.add = async function(payload, opts = {}) {
62+
var delay = opts.delay || this.delay
7663
var visible = delay ? nowPlusSecs(delay) : now()
7764

7865
var msgs = []
7966
if (payload instanceof Array) {
8067
if (payload.length === 0) {
81-
var errMsg = 'Queue.add(): Array payload length must be greater than 0'
82-
return callback(new Error(errMsg))
68+
throw new Error('Queue.add(): Array payload length must be greater than 0')
8369
}
8470
payload.forEach(function(payload) {
8571
msgs.push({
@@ -94,21 +80,13 @@ Queue.prototype.add = function(payload, opts, callback) {
9480
})
9581
}
9682

97-
self.col.insertMany(msgs, function(err, results) {
98-
if (err) return callback(err)
99-
if (payload instanceof Array) return callback(null, '' + results.insertedIds)
100-
callback(null, '' + results.insertedIds[0])
101-
})
83+
var results = await this.col.insertMany(msgs)
84+
if (payload instanceof Array) return '' + results.insertedIds
85+
return '' + results.insertedIds[0]
10286
}
10387

104-
Queue.prototype.get = function(opts, callback) {
105-
var self = this
106-
if ( !callback ) {
107-
callback = opts
108-
opts = {}
109-
}
110-
111-
var visibility = opts.visibility || self.visibility
88+
Queue.prototype.get = async function(opts = {}) {
89+
var visibility = opts.visibility || this.visibility
11290
var query = {
11391
deleted : null,
11492
visible : { $lte : now() },
@@ -128,50 +106,37 @@ Queue.prototype.get = function(opts, callback) {
128106
returnDocument: 'after'
129107
}
130108

131-
self.col.findOneAndUpdate(query, update, options, function(err, result) {
132-
if (err) return callback(err)
133-
var msg = result.value
134-
if (!msg) return callback()
135-
136-
// convert to an external representation
137-
msg = {
138-
// convert '_id' to an 'id' string
139-
id : '' + msg._id,
140-
ack : msg.ack,
141-
payload : msg.payload,
142-
tries : msg.tries,
143-
}
144-
// if we have a deadQueue, then check the tries, else don't
145-
if ( self.deadQueue ) {
146-
// check the tries
147-
if ( msg.tries > self.maxRetries ) {
148-
// So:
149-
// 1) add this message to the deadQueue
150-
// 2) ack this message from the regular queue
151-
// 3) call ourself to return a new message (if exists)
152-
self.deadQueue.add(msg, function(err) {
153-
if (err) return callback(err)
154-
self.ack(msg.ack, function(err) {
155-
if (err) return callback(err)
156-
self.get(callback)
157-
})
158-
})
159-
return
160-
}
109+
var result = await this.col.findOneAndUpdate(query, update, options)
110+
var msg = result.value
111+
if (!msg) return
112+
113+
// convert to an external representation
114+
msg = {
115+
// convert '_id' to an 'id' string
116+
id : '' + msg._id,
117+
ack : msg.ack,
118+
payload : msg.payload,
119+
tries : msg.tries,
120+
}
121+
// if we have a deadQueue, then check the tries, else don't
122+
if ( this.deadQueue ) {
123+
// check the tries
124+
if ( msg.tries > this.maxRetries ) {
125+
// So:
126+
// 1) add this message to the deadQueue
127+
// 2) ack this message from the regular queue
128+
// 3) call ourthis to return a new message (if exists)
129+
await this.deadQueue.add(msg)
130+
await this.ack(msg.ack)
131+
return this.get()
161132
}
133+
}
162134

163-
callback(null, msg)
164-
})
135+
return msg
165136
}
166137

167-
Queue.prototype.ping = function(ack, opts, callback) {
168-
var self = this
169-
if ( !callback ) {
170-
callback = opts
171-
opts = {}
172-
}
173-
174-
var visibility = opts.visibility || self.visibility
138+
Queue.prototype.ping = async function(ack, opts = {}) {
139+
var visibility = opts.visibility || this.visibility
175140
var query = {
176141
ack : ack,
177142
visible : { $gt : now() },
@@ -190,18 +155,14 @@ Queue.prototype.ping = function(ack, opts, callback) {
190155
update.$set.tries = 0
191156
}
192157

193-
self.col.findOneAndUpdate(query, update, options, function(err, msg, blah) {
194-
if (err) return callback(err)
195-
if ( !msg.value ) {
196-
return callback(new Error("Queue.ping(): Unidentified ack : " + ack))
197-
}
198-
callback(null, '' + msg.value._id)
199-
})
158+
var msg = await this.col.findOneAndUpdate(query, update, options)
159+
if ( !msg.value ) {
160+
throw new Error("Queue.ping(): Unidentified ack : " + ack)
161+
}
162+
return '' + msg.value._id
200163
}
201164

202-
Queue.prototype.ack = function(ack, callback) {
203-
var self = this
204-
165+
Queue.prototype.ack = async function(ack) {
205166
var query = {
206167
ack : ack,
207168
visible : { $gt : now() },
@@ -215,72 +176,48 @@ Queue.prototype.ack = function(ack, callback) {
215176
var options = {
216177
returnDocument: 'after'
217178
}
218-
self.col.findOneAndUpdate(query, update, options, function(err, msg) {
219-
if (err) return callback(err)
220-
if ( !msg.value ) {
221-
return callback(new Error("Queue.ack(): Unidentified ack : " + ack))
222-
}
223-
callback(null, '' + msg.value._id)
224-
})
179+
var msg = await this.col.findOneAndUpdate(query, update, options)
180+
if ( !msg.value ) {
181+
throw new Error("Queue.ack(): Unidentified ack : " + ack)
182+
}
183+
return '' + msg.value._id
225184
}
226185

227-
Queue.prototype.clean = function(callback) {
228-
var self = this
229-
186+
Queue.prototype.clean = function() {
230187
var query = {
231188
deleted : { $exists : true },
232189
}
233190

234-
self.col.deleteMany(query, callback)
191+
return this.col.deleteMany(query)
235192
}
236193

237-
Queue.prototype.total = function(callback) {
238-
var self = this
239-
240-
self.col.countDocuments(function(err, count) {
241-
if (err) return callback(err)
242-
callback(null, count)
243-
})
194+
Queue.prototype.total = function() {
195+
return this.col.countDocuments()
244196
}
245197

246-
Queue.prototype.size = function(callback) {
247-
var self = this
248-
198+
Queue.prototype.size = function() {
249199
var query = {
250200
deleted : null,
251201
visible : { $lte : now() },
252202
}
253203

254-
self.col.countDocuments(query, function(err, count) {
255-
if (err) return callback(err)
256-
callback(null, count)
257-
})
204+
return this.col.countDocuments(query)
258205
}
259206

260-
Queue.prototype.inFlight = function(callback) {
261-
var self = this
262-
207+
Queue.prototype.inFlight = function() {
263208
var query = {
264209
ack : { $exists : true },
265210
visible : { $gt : now() },
266211
deleted : null,
267212
}
268213

269-
self.col.countDocuments(query, function(err, count) {
270-
if (err) return callback(err)
271-
callback(null, count)
272-
})
214+
return this.col.countDocuments(query)
273215
}
274216

275-
Queue.prototype.done = function(callback) {
276-
var self = this
277-
217+
Queue.prototype.done = function() {
278218
var query = {
279219
deleted : { $exists : true },
280220
}
281221

282-
self.col.countDocuments(query, function(err, count) {
283-
if (err) return callback(err)
284-
callback(null, count)
285-
})
222+
return this.col.countDocuments(query)
286223
}

‎package.json‎

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
{
22
"name": "@reedsy/mongodb-queue",
3-
"version": "4.0.0-reedsy-2.0.1",
3+
"version": "4.0.0-reedsy-3.0.0",
44
"description": "Message queues which uses MongoDB.",
55
"main": "mongodb-queue.js",
66
"scripts": {
77
"test": "set -e; for FILE in test/*.js; do echo --- $FILE ---; node $FILE; done"
88
},
99
"dependencies": {},
1010
"devDependencies": {
11-
"async": "^2.6.2",
12-
"mongodb": "^4.0.0",
11+
"mongodb": "^5.0.0",
1312
"tape": "^4.10.1"
1413
},
1514
"peerDependencies": {
16-
"mongodb": "^4.0.0"
15+
"mongodb": "^4.0.0 || ^5.0.0"
1716
},
1817
"homepage": "https://github.com/chilts/mongodb-queue",
1918
"repository": {

‎test/_timeout.js‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
function timeout(millis) {
2+
return new Promise((resolve) => setTimeout(resolve, millis))
3+
}
4+
5+
module.exports = {
6+
timeout,
7+
}

0 commit comments

Comments
 (0)