-
Notifications
You must be signed in to change notification settings - Fork 16
Queue PubSub
Thanks to the change feed technology build into RethinkDB, rethinkdb-job-queue
can be a distributed system. Depending on how you configure the Queue object you can have either Publisher nodes or Subscriber nodes running on the same host or spread across multiple hosts.
The configuration for a Publisher or Subscriber Queue object is very similar. The only difference is the Subscriber has the Queue.process method call passing your handler function.
This document describes the configuration for both a Publisher and Subscriber Queue object.
Note: It is important to be aware that there is nothing wrong with using the same Queue object for both Publisher and Subscriber. This document is purely to highlight the configuration options for each.
To create a Publisher you need to create the Queue object using the Queue Constructor. Two things define the Queue object as a Publisher; It adds jobs to the queue, and it does not have a process handler.
This example creates a Queue object named pub
used for adding jobs to the queue. It will not process jobs.
const Queue = require('rethinkdb-job-queue')
const cxnOptions = {
host: 'db01.domain.com',
port: 12345,
db: 'JobQueue'
}
const qOptions = {
name: 'Registration',
changeFeed: false
}
const pub = new Queue(cxnOptions, qOptions)
// Elsewhere in your code
const job = pub.createJob({ registrationData: 'Important Job Info' })
.setTimeout(10000)
pub.addJob(job).catch(err => console.error(err))
Here are some points of interest for the Publisher example above.
- The RethinkDB instance is located on a server called
db01.domain.com
at port12345
. - The database name is
JobQueue
. - The queue name is
Registration
. - Change feed is disabled for the Publisher. They can be enabled if needed.
- Jobs have a
registrationData
property. - Jobs are being added to the queue.
As with a Publisher, you need to create the Queue object using the Queue Constructor to create a Subscriber. Two things define the Queue object as a Subscriber; It does not add jobs to the queue, and it has a process handler.
This example creates a Queue object named sub
used for processing jobs in the queue. It will not add jobs to the queue.
const Queue = require('rethinkdb-job-queue')
const cxnOptions = {
host: 'db02.domain.com',
port: 12345,
db: 'JobQueue'
}
const qOptions = {
name: 'Registration',
changeFeed: true
}
const sub = new Queue(cxnOptions, qOptions)
sub.process((job, next) => {
// Process your jobs here
// Use the job.registrationData for job details
console.dir(job.registrationData)
// Call next() when complete.
return next(null, 'Registration Completed')
})
Here are some points of interest for the Subscriber example above.
- The RethinkDB instance is located on a server called
db02.domain.com
at port12345
. For this example RethinkDB is configured with a cluster. It does not need to point to a different host. - The database name is
JobQueue
. This must be the same as the Publisher database name. - The queue name is
Registration
. This must be the same as the Publisher queue name. - Change feed is enabled for the Subscriber. If the change feed is not enabled the Subscriber will not process jobs added to the queue by other Queue objects unless a job is added locally.
- Jobs use the
registrationData
property to carry out the work of the job.
In a distributed queue processing environment with rethinkdb-job-queue
, global state changes are achieved by using a State Document. This enables globally pausing, resuming, and restarting of Queue objects. See the State Document wiki page for more detail.
- Introduction
- Tutorial
- Queue Constructor
- Queue Connection
- Queue Options
- Queue PubSub
- Queue Master
- Queue Events
- State Document
- Job Processing
- Job Options
- Job Status
- Job Retry
- Job Repeat
- Job Logging
- Job Editing
- Job Schema
- Job Name
- Complex Job
- Delayed Job
- Cancel Job
- Error Handling
- Queue.createJob
- Queue.addJob
- Queue.getJob
- Queue.findJob
- Queue.findJobByName
- Queue.containsJobByName
- Queue.cancelJob
- Queue.reanimateJob
- Queue.removeJob
- Queue.process
- Queue.review
- Queue.summary
- Queue.ready
- Queue.pause
- Queue.resume
- Queue.reset
- Queue.stop
- Queue.drop
- Queue.Job
- Queue.host
- Queue.port
- Queue.db
- Queue.name
- Queue.r
- Queue.id
- Queue.jobOptions [R/W]
- Queue.changeFeed
- Queue.master
- Queue.masterInterval
- Queue.removeFinishedJobs
- Queue.running
- Queue.concurrency [R/W]
- Queue.paused
- Queue.idle
- Event.ready
- Event.added
- Event.updated
- Event.active
- Event.processing
- Event.progress
- Event.log
- Event.pausing
- Event.paused
- Event.resumed
- Event.completed
- Event.cancelled
- Event.failed
- Event.terminated
- Event.reanimated
- Event.removed
- Event.idle
- Event.reset
- Event.error
- Event.reviewed
- Event.detached
- Event.stopping
- Event.stopped
- Event.dropped
- Job.setName
- Job.setPriority
- Job.setTimeout
- Job.setDateEnable
- Job.setRetryMax
- Job.setRetryDelay
- Job.setRepeat
- Job.setRepeatDelay
- Job.updateProgress
- Job.update
- Job.getCleanCopy
- Job.addLog
- Job.getLastLog