Skip to content

Commit

Permalink
MGO-141 implement watch function
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Banfield committed Sep 14, 2017
1 parent 7940d20 commit 0d8351c
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions changestreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,37 @@ type ChangeStreamOptions struct {
Collation *Collation
}

// Watch constructs a new ChangeStream capable of receiving continuing data
// from the database.
func (coll *Collection) Watch(pipeline interface{},
options ChangeStreamOptions) (*ChangeStream, error) {

if pipeline == nil {
pipeline = []bson.M{}
}

pipe := constructChangeStreamPipeline(pipeline, options)

pIter := coll.Pipe(&pipe).Iter()

// check that there was no issue creating the iterator.
// this will fail immediately with an error from the server if running against
// a standalone.
if err := pIter.Err(); err != nil {
return nil, err
}

pIter.isChangeStream = true

return &ChangeStream{
iter: pIter,
collection: coll,
resumeToken: nil,
options: options,
pipeline: pipeline,
}, nil
}

// Next retrieves the next document from the change stream, blocking if necessary.
// Next returns true if a document was successfully unmarshalled into result,
// and false if an error occured. When Next returns false, the Err method should
Expand Down

0 comments on commit 0d8351c

Please sign in to comment.