Skip to content

DOCSP-49981 Monitor data with change streams #535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/redirects
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ raw: ${prefix}/master -> ${base}/upcoming/

# CC Redirects
[v1.12-master]: ${prefix}/${version}/usage-examples/struct-tagging/ -> ${base}/${version}/data-formats/struct-tagging/
[v1.12-master]: ${prefix}/${version}/fundamentals/context/ -> ${base}/${version}/context/
[v1.12-master]: ${prefix}/${version}/fundamentals/context/ -> ${base}/${version}/context
[v1.12-master]: ${prefix}/${version}/usage-examples/changestream -> ${base}/${version}/monitoring-and-logging/change-streams/
[v1.12-master]: ${prefix}/${version}/fundamentals/crud/read-operations/changestream/ -> ${base}/${version}/monitoring-and-logging/change-streams/
2 changes: 1 addition & 1 deletion source/includes/usage-examples/code-snippets/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func main() {

var uri string
if uri = os.Getenv("MONGODB_URI"); uri == "" {
log.Fatal("You must set your 'MONGODB_URI' environment variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable")
log.Fatal("You must set your 'MONGODB_URI' environment variable.")
}

client, err := mongo.Connect(options.Client().ApplyURI(uri))
Expand Down
10 changes: 10 additions & 0 deletions source/includes/usage-examples/example-intro.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.. note:: Example Setup

This example connects to an instance of MongoDB by using a
connection URI. To learn more about connecting to your MongoDB
instance, see the :ref:`golang-mongoclient` guide. This example
also uses the ``restaurants`` collection in the ``sample_restaurants`` database
included in the :atlas:`Atlas sample datasets </sample-data>`. You
can load them into your database on the free tier of MongoDB Atlas
by following the :atlas:`Get Started with Atlas Guide
</getting-started/#atlas-getting-started>`.
292 changes: 290 additions & 2 deletions source/monitoring-and-logging/change-streams.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,298 @@
.. _golang-watch-changes:
.. _golang-monitor-changes:
.. _golang-watch:
.. _golang-usageex-monitor-changes:

================================
Monitor Data with Change Streams
================================

.. TODO
.. facet::
:name: genre
:values: reference

.. meta::
:keywords: code example, delta
:description: Learn how to monitor document changes in MongoDB by using change streams, including opening streams and modifying output with pipelines and options.

.. contents:: On this page
:local:
:backlinks: none
:depth: 2
:class: singlecol

Overview
--------

In this guide, you can learn how to use a **change stream** to monitor real-time
changes to your database. A change stream is a MongoDB Server feature that
allows your application to subscribe to data changes on a collection, database,
or deployment.

A change stream outputs new change events, providing access to real-time data
changes. You can open a change stream on a collection, database, or client
object.

Sample Data
~~~~~~~~~~~

The examples in this guide use the following ``Course`` struct as a model for
documents in the ``courses`` collection:

.. literalinclude:: /includes/fundamentals/code-snippets/CRUD/changeStream.go
:start-after: begin struct
:end-before: end struct
:language: go
:dedent:

To run the examples in this guide, load these documents into the ``courses``
collection in the ``db`` database by using the following snippet:

.. literalinclude:: /includes/fundamentals/code-snippets/CRUD/changeStream.go
:language: go
:dedent:
:start-after: begin insertDocs
:end-before: end insertDocs

.. include:: /includes/fundamentals/automatic-db-coll-creation.rst

Each document contains a description of a university course that includes the
course title and maximum enrollment, corresponding to the ``title`` and
``enrollment`` fields in each document.

.. note::

Each example output shows truncated ``_data``, ``clusterTime``, and
``ObjectID`` values because the driver generates them uniquely.

Open a Change Stream
--------------------

You can watch for changes in MongoDB by using the ``Watch()`` method on the
following objects:

- **Collection**: Monitor changes to a specific collection
- **Database**: Monitor changes to all collections in a database
- **MongoClient**: Monitor changes across all databases

For each object, the ``Watch()`` method opens a change stream to emit change
event documents when they occur.

The ``Watch()`` method requires a context parameter and a pipeline parameter. To
return all changes, pass in an empty ``Pipeline`` object.

The ``Watch()`` method optionally takes an aggregation pipeline which consists
of an array of aggregation stages as the first parameter. The aggregation stages
filter and transform the change events.

Example
~~~~~~~

The following example opens a change stream on the ``courses`` collection and
prints the change stream events as they occur:

.. literalinclude:: /includes/fundamentals/code-snippets/CRUD/changeStream.go
:language: go
:dedent:
:start-after: begin open stream
:end-before: end open stream

If you modify the ``courses`` collection in a separate program or shell, this
code prints your changes as they occur. Inserting a document with a ``title``
value of ``"Advanced Screenwriting"`` and an ``enrollment`` value of ``20``
results in the following change event:

.. code-block:: none
:copyable: false

map[_id:map[_data:...] clusterTime: {...}
documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...")
enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db]
operationType:insert]

To view a fully runnable example, see :ref:`Open a Change Stream Example:
Full File <golang-change-streams-full-file>` section in this guide.

Filter Change Events
~~~~~~~~~~~~~~~~~~~~

Use the pipeline parameter to modify the change stream output. This parameter
allows you to only watch for certain change events. Format the pipeline
parameter as an array of documents, with each document representing an
aggregation stage.

You can use the following pipeline stages in this parameter:

- ``$addFields``
- ``$match``
- ``$project``
- ``$replaceRoot``
- ``$replaceWith``
- ``$redact``
- ``$set``
- ``$unset``

The following example opens a change stream on the ``db`` database but only
watches for new delete operations:

.. literalinclude:: /includes/fundamentals/code-snippets/CRUD/changeStream.go
:language: go
:dedent:
:start-after: begin delete events
:end-before: end delete events

.. note::

The ``Watch()`` method was called on the ``db`` database, so the code outputs
new delete operations on any collection within this database.

.. _golang-usageex-monitor-changes:
.. _golang-change-streams-full-file:

Open a Change Stream Example: Full File
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. include:: /includes/usage-examples/example-intro.rst

The following example opens a change stream on the ``restaurants`` collection
and prints inserted documents:

.. literalinclude:: /includes/usage-examples/code-snippets/watch.go
:start-after: begin watch
:end-before: end watch
:emphasize-lines: 7
:language: go
:dedent:

View a `fully runnable example. <{+example+}/watch.go>`__

Expected Result
^^^^^^^^^^^^^^^

.. TODO - when porting over all usage examples, ensure this link is correct.

After you run the full example, run the :ref:`Insert a Document full file
example <golang-insert-one>` in a different shell. When you run the insert
operation, you see output similar to the following:

.. code-block:: json
:copyable: false

// results truncated {
"_id": ..., "name": "8282", "cuisine": "Korean"
}

.. important::

When you finish working with this usage example, make sure to shut it down by
closing your terminal.

Configure Change Stream Options
-------------------------------

Use the ``options`` parameter to modify the behavior of the ``Watch()`` method.

You can specify the following options for the ``Watch()`` method:

- ``ResumeAfter``
- ``StartAfter``
- ``FullDocument``
- ``FullDocumentBeforeChange``
- ``BatchSize``
- ``MaxAwaitTime``
- ``Collation``
- ``StartAtOperationTime``
- ``Comment``
- ``ShowExpandedEvents``
- ``Custom``
- ``CustomPipeline``

For more information on these options, see the :manual:`db.collection.watch() </reference/method/db.collection.watch/>`
entry in the Server manual.

Pre- and Post-Images
~~~~~~~~~~~~~~~~~~~~

When you perform any CRUD operation on a collection, by default, the
corresponding change event document contains only the delta of the fields
modified by the operation. You can see the full document before and after a
change, in addition to the delta, by specifying settings in the ``options``
parameter of the ``Watch()`` method.

If you want to see a document's **post-image**, the full version of the document
after a change, set the ``FullDocument`` field of the ``options`` parameter to
one of the following values:

- ``UpdateLookup``: The change event document includes a copy of the entire
changed document.
- ``WhenAvailable``: The change event document includes a post-image of the
modified document for change events if the post-image is available.
- ``Required``: The output is the same as for ``WhenAvailable``, but the driver
raises a server-side error if the post-image is not available.

If you want to see a document's **pre-image**, the full version of the document
before a change, set the ``FullDocumentBeforeChange`` field of the ``options``
parameter to one of the following values:

- ``WhenAvailable``: The change event document includes a pre-image of the
modified document for change events if the pre-image is available.
- ``Required``: The output is the same as for ``WhenAvailable``, but the driver
raises a server-side error if the pre-image is not available.

.. important::

To access document pre- and post-images, you must enable
``changeStreamPreAndPostImages`` for the collection. See the :manual:`Change
Streams
</reference/command/collMod/#change-streams-with-document-pre--and-post-images>`
section of the collMod Database Command guide in the MongoDB Server manual
for instructions and more information.

.. note::

There is no pre-image for an inserted document and no post-image for a deleted document.

Example
~~~~~~~~

The following example calls the ``Watch()`` method on the ``courses``
collection. It specifies a value for the ``FullDocument`` field of the
``options`` parameter to output a copy of the entire modified document, instead
of only the changed fields:

.. literalinclude:: /includes/fundamentals/code-snippets/CRUD/changeStream.go
:language: go
:dedent:
:start-after: begin full document
:end-before: end full document

Updating the ``enrollment`` value of the document with the ``title`` of ``"World
Fiction"`` from ``35`` to ``30`` results in the following change event:

.. code-block:: none
:copyable: false

{"_id": {"_data": "..."},"operationType": "update","clusterTime":
{"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id":
{"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}},
"ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}},
"updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}},
"removedFields": [],"truncatedArrays": []}}

Without specifying the ``FullDocument`` option, the same update operation no
longer outputs the ``"fullDocument"`` value in the change event document.

Additional Information
----------------------

For more information on change streams, see :manual:`Change Streams
</changeStreams/>` in the Server manual.

API Documentation
~~~~~~~~~~~~~~~~~

To learn more about the ``Watch()`` method, see the following API
documentation:

- `Watch() for collections <{+api+}/mongo#Collection.Watch>`__
- `Watch() for databases <{+api+}/mongo#Database.Watch>`__
- `Watch() for clients <{+api+}/mongo#Client.Watch>`__
Loading