-
-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
24 changed files
with
688 additions
and
443 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
## Change Streams | ||
|
||
[Change streams](https://www.mongodb.com/developer/quickstart/nodejs-change-streams-triggers/) let you listen for updates to documents in a given model's collection, or even documents in an entire database. | ||
Unlike [middleware](/docs/middleware.html), change streams are a MongoDB server construct, which means they pick up changes from anywhere. | ||
Even if you update a document from a MongoDB GUI, your Mongoose change stream will be notified. | ||
|
||
The `watch()` function creates a change stream. | ||
Change streams emit a `'data'` event when a document is updated. | ||
|
||
```javascript | ||
const Person = mongoose.model('Person', new mongoose.Schema({ name: String })); | ||
|
||
// Create a change stream. The 'change' event gets emitted when there's a | ||
// change in the database. Print what the change stream emits. | ||
Person.watch(). | ||
on('change', data => console.log(data)); | ||
|
||
// Insert a doc, will trigger the change stream handler above | ||
await Person.create({ name: 'Axl Rose' }); | ||
``` | ||
|
||
The above script will print output that looks like: | ||
|
||
``` | ||
{ | ||
_id: { | ||
_data: '8262408DAC000000012B022C0100296E5A10042890851837DB4792BE6B235E8B85489F46645F6964006462408DAC6F5C42FF5EE087A20004' | ||
}, | ||
operationType: 'insert', | ||
clusterTime: new Timestamp({ t: 1648397740, i: 1 }), | ||
fullDocument: { | ||
_id: new ObjectId("62408dac6f5c42ff5ee087a2"), | ||
name: 'Axl Rose', | ||
__v: 0 | ||
}, | ||
ns: { db: 'test', coll: 'people' }, | ||
documentKey: { _id: new ObjectId("62408dac6f5c42ff5ee087a2") } | ||
} | ||
``` | ||
|
||
Note that you **must** be connected to a MongoDB replica set or sharded cluster to use change streams. | ||
If you try to call `watch()` when connected to a standalone MongoDB server, you'll get the below error. | ||
|
||
``` | ||
MongoServerError: The $changeStream stage is only supported on replica sets | ||
``` | ||
|
||
If you're using `watch()` in production, we recommend using [MongoDB Atlas](https://www.mongodb.com/atlas/database). | ||
For local development, we recommend [mongodb-memory-server](https://www.npmjs.com/package/mongodb-memory-server) or [run-rs](https://www.npmjs.com/package/run-rs) to start a replica set locally. | ||
|
||
### Iterating using `next()` | ||
|
||
If you want to iterate through a change stream in a [AWS Lambda function](/docs/lambda.html), do **not** use event emitters to listen to the change stream. | ||
You need to make sure you close your change stream when your Lambda function is done executing, because your change stream may end up in an inconsistent state if Lambda stops your container while the change stream is pulling data from MongoDB. | ||
|
||
Change streams also have a `next()` function that lets you explicitly wait for the next change to come in. | ||
Use `resumeAfter` to track where the last change stream left off, and add a timeout to make sure your handler doesn't wait forever if no changes come in. | ||
|
||
```javascript | ||
let resumeAfter = undefined; | ||
|
||
exports.handler = async (event, context) => { | ||
// add this so that we can re-use any static/global variables between function calls if Lambda | ||
// happens to re-use existing containers for the invocation. | ||
context.callbackWaitsForEmptyEventLoop = false; | ||
|
||
await connectToDatabase(); | ||
|
||
// Use MongoDB Node driver's `watch()` function, because Mongoose change streams | ||
// don't support `next()` yet. See https://github.com/Automattic/mongoose/issues/11527 | ||
const changeStream = await Country.watch([], { resumeAfter }); | ||
|
||
// Change stream `next()` will wait forever if there are no changes. So make sure to | ||
// stop listening to the change stream after a fixed period of time. | ||
let timeoutPromise = new Promise(resolve => setTimeout(() => resolve(false), 1000)); | ||
let doc = null; | ||
while (doc = await Promise.race([changeStream.next(), timeoutPromise])) { | ||
console.log('Got', doc); | ||
} | ||
|
||
// `resumeToken` tells you where the change stream left off, so next function instance | ||
// can pick up any changes that happened in the meantime. | ||
resumeAfter = changeStream.resumeToken; | ||
await changeStream.close(); | ||
}; | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.