-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathmain.coffee
90 lines (71 loc) · 2.58 KB
/
main.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
{EventEmitter} = require 'events'
formats = require './formats'
getOplogStream = require './getOplogStream'
{walk, convertObjectID} = require './util'
applyDefaults = (options) ->
options or= {}
options.port or= 27017
options.host or= 'localhost'
options.authdb or= 'admin'
options.replicaSet or= null
options.dbOpts or= {w: 1}
options.format or= 'raw'
options.useMasterOplog or= false
options.convertObjectIDs ?= true
options.onError or= (error) -> console.log 'Error - MongoWatch:', (error?.stack or error)
options.onDebug or= ->
#options.username or= null
#options.password or= null
return options
class MongoWatch
status: 'connecting'
watching: []
constructor: (options) ->
@options = applyDefaults options
@channel = new EventEmitter
@channel.on 'error', @options.onError
@channel.on 'connected', => @status = 'connected'
@debug = @options.onDebug
getOplogStream @options, (err, @stream, @oplogClient) =>
@channel.emit 'error', err if err
@debug "Emiting 'connected'. Stream exists:", @stream?
@channel.emit 'connected'
ready: (done) ->
isReady = @status is 'connected'
@debug 'Ready:', isReady
if isReady
return done()
else
@channel.once 'connected', done
watch: (collection, notify) ->
collection ||= 'all'
notify ||= console.log
@ready =>
unless @watching[collection]?
watcher = (data) =>
relevant = (collection is 'all') or (data.ns is collection)
@debug 'Data changed:', {data: data, watching: collection, relevant: relevant}
return unless relevant
channel = if collection then "change:#{collection}" else 'change'
formatter = formats[@options.format] or formats['raw']
event = formatter data
# convert ObjectIDs to strings
if @options.convertObjectIDs is true
event = walk event, convertObjectID
@debug 'Emitting event:', {channel: channel, event: event}
@channel.emit collection, event
# watch user model
@debug 'Adding emitter for:', {collection: collection}
@stream.on 'data', watcher
@watching[collection] = watcher
@debug 'Adding listener on:', {collection: collection}
@channel.on collection, notify
stop: (collection) ->
@debug 'Removing listeners for:', collection
collection ||= 'all'
@channel.removeAllListeners collection
@stream.removeListener 'data', @watching[collection]
delete @watching[collection]
stopAll: ->
@stop coll for coll of @watching
module.exports = MongoWatch