diff --git a/Gruntfile.coffee b/Gruntfile.coffee index 7718ef9..af9a833 100644 --- a/Gruntfile.coffee +++ b/Gruntfile.coffee @@ -2,21 +2,12 @@ module.exports = (grunt) -> # Project configuration. grunt.initConfig pkg: grunt.file.readJSON('package.json') - regarde: + watch: module: files: ["_src/**/*.coffee"] - tasks: [ "coffee:changed" ] + tasks: [ "coffee:base" ] coffee: - changed: - expand: true - cwd: '_src' - src: [ '<% print( _.first( ((typeof grunt !== "undefined" && grunt !== null ? (_ref = grunt.regarde) != null ? _ref.changed : void 0 : void 0) || ["_src/nothing"]) ).slice( "_src/".length ) ) %>' ] - # template to cut off `_src/` and throw on error on non-regrade call - # CF: `_.first( grunt?.regarde?.changed or [ "_src/nothing" ] ).slice( "_src/".length ) - dest: '' - ext: '.js' - base: expand: true cwd: '_src', @@ -63,7 +54,8 @@ module.exports = (grunt) -> options: require: [ "should" ] reporter: "spec" - bail: false + bail: process.env.BAIL or false + grep: process.env.GREP timeout: 3000 slow: 3 @@ -72,45 +64,25 @@ module.exports = (grunt) -> options: env: #NSQERR: true - severity_nsq_logger: "error" - - - docker: - serverdocs: - expand: true - src: ["_src/**/*.coffee", "README.md"] - dest: "_docs/" - options: - onlyUpdated: false - colourScheme: "autumn" - ignoreHidden: false - sidebarState: true - exclude: false - lineNums: true - js: [] - css: [] - extras: [] - + severity_nsq_logger: process.env.SEVERITY or "error" # Load npm modules - grunt.loadNpmTasks "grunt-regarde" + grunt.loadNpmTasks "grunt-contrib-watch" grunt.loadNpmTasks "grunt-contrib-coffee" grunt.loadNpmTasks "grunt-contrib-clean" grunt.loadNpmTasks "grunt-mocha-cli" grunt.loadNpmTasks "grunt-include-replace" - grunt.loadNpmTasks "grunt-docker" grunt.loadNpmTasks "grunt-banner" - # just a hack until this issue has been fixed: https://github.com/yeoman/grunt-regarde/issues/3 - grunt.option('force', not grunt.option('force')) - # ALIAS TASKS - grunt.registerTask "watch", "regarde" grunt.registerTask "default", "build" - grunt.registerTask "docs", "docker" - grunt.registerTask "clear", [ "clean:base" ] + grunt.registerTask "clear", [ "clean:base", "clean:nsq" ] grunt.registerTask "test", [ "mochacli:main", "clean:nsq" ] + grunt.registerTask "w", "watch" + grunt.registerTask "b", "build" + grunt.registerTask "t", "test" + # build the project grunt.registerTask "build", [ "clear", "coffee:base", "includereplace", "usebanner:js" ] - grunt.registerTask "build-dev", [ "clear", "coffee:base", "docs", "test" ] + grunt.registerTask "build-dev", [ "clear", "coffee:base", "test" ] diff --git a/_docs/README.md.html b/_docs/README.md.html deleted file mode 100644 index 3a71e0e..0000000 --- a/_docs/README.md.html +++ /dev/null @@ -1,1153 +0,0 @@ - - -
-Nsq service to read messages from all topics listed within a list of nsqlookupd services.|
- - - -INFO: all examples are written in coffee-script
- - - npm install nsq-logger
-
var logger = new NsqLogger( config );
-
Example:
- - -var NsqLogger = require( "nsq-logger" );
-
-var config = {
- clientId: "myFooClient"
-};
-
-// create the logger
-var logger = new NsqLogger( config );
-
-// create a writer instance
-/*
-var NsqWriter = require( "nsq-logger/writer" );
-var writer = new NsqLogger( config );
-*/
-// or just grab the one used inside the logger
-var writer = logger.Writer;
-
-logger.on( "message", function( topic, data, done ){
- // process your topic
- // Example response: -> topic="topic23" data ="Do some Stuff!"
-
- // mark message as done
- done();
-});
-
-writer.connect();
-writer.publish( "topic23", "Do some Stuff!" );
-
Config
- - - -String|Null
required ) An identifier used to disambiguate this client.Boolean
default=true ) Configuration to (en/dis)abel the nsq recorderString|Null
default=null ) Internally prefix the nsq topics. This will be handled transparent, but with this it's possible to separate different environments from each other. E.g. you can run a "staging" and "live" environment on one nsq cluster.String
default="nsqlogger" ) The channel name for the logger to each topicString
default="_exceeded" ) A topic name, that will store exceeded messages.String[]|Function
default=null ) A list of topics that should be ignored or a function that will called to check the ignored topics manuallyNumber
default=60 ) Time in seconds to poll the nsqlookupd servers to sync the available topicsNumber
default=1 ) The maximum number of messages to process at once. This value is shared between nsqd connections. It's highly recommended that this value is greater than the number of nsqd connections.Number
default=30 ) The frequency in seconds at which the nsqd will send heartbeats to this Reader.String[]
default=[ "127.0.0.1:4160", "127.0.0.1:4162" ] ) A list of nsq lookup serversString[]
default=[ "127.0.0.1:4161", "127.0.0.1:4163" ] ) A list of nsq lookup serversNumber
default=10 ) The number of times to a message can be requeued before it will be handed to the DISCARD handler and then automatically finished. 0 means that there is no limit. If not DISCARD handler is specified and maxAttempts > 0, then the message will be finished automatically when the number attempts has been exhausted.Number|Null
default=null ) Message timeout in ms or null
for no timeoutNumber|Null
default=null ) Deliver a percentage of all messages received to this connection. 1 <= sampleRate <= 99Number|Null
default=5 ) The delay is in seconds. This is how long nsqd will hold on the message before attempting it again.String
default="127.0.0.1" ) Host of a nsqdNumber
default=4150 ) Port of a nsqdBoolean
default=false ) Use zlib Deflate compression.Number
default=6 ) Use zlib Deflate compression level..activate()
- Activate the module
- -Return
- -( Boolean ): true
if it is now activated. false
if it was already active
.deactivate()
- Deactivate the module
- -Return
- -( Boolean ): true
if it is now deactivated. false
if it was already inactive
.active()
- Test if the module is currently active
- -Return
- -( Boolean ): Is active?
- - -.destroy( cb )
- This will stop all readers and disconnect every connection from nsq
- -Arguments
- -Function
) Called after destroymessage
- The main event to catch and process messages from all topics.
- -Arguments
- -String
) The topic of this messageString|Object|Array
) The message content. It tries to JSON.parse the message if possible. Otherwise it will be just a string.String
) You have to call this function until the message was processed. This will remove the message from the queue. Otherwise it will be requeued. If you add a argument cb( new Error("Dooh!") )
it will interpreted as an error and this message be requeued immediately Example:
- - -logger.on( "message", function( topic, data, done ){
- // process your message.
- // E.g: writing the data to a db with the topic as tablename
- myDB.write( "INSERT INTO " + topic + " VALUES ( " + data + " )", done );
-});
-
ready
- Emitted once the list of topics was received and the readers are created and connected.
-This is just an internal helper. The Method list
will also wait for the first response. The events add
, remove
and change
are active after this first response.
-Example:
topics.on( "ready", function( err ){
- // handle the error
-});
-
logger.config
- Type: ( Config )
- -This is the internaly used configuration.
- -Attributes
- -See logger config
- - -logger.Writer
- Type: ( NsqWriter )
- -To write messages you can use the internal writer instance.
- -Details see Writer
- - -logger.Topics
- Type: ( NsqTopics )
- -The logger uses a module called nsq-topics
to sync the existing topics and generate the readers for each topic.
-You can grab the internal used insatnce with logger.Topics
Details see nsq-topics
logger.ready
- Type: ( Boolean )
- -The logger is ready to use
- -var NsqWriter = require( "nsq-logger/writer" )
-
Example:
- - -var NsqWriter = require( "nsq-logger/writer" );
-
-var config = {
- clientId: "myFooClient",
- host: "127.0.0.1",
- port: 4150
-};
-
-// create the writer
-var writer = new NsqWriter( config );
-
-writer.connect();
-writer.publish( "topic23", "Do some Stuff!" );
-
Config
- - - -String|Null
required ) An identifier used to disambiguate this client.String|Null
default=null ) Internally prefix the nsq topics. This will be handled transparent, but with this it's possible to separate different environments from each other. E.g. you can run a "staging" and "live" environment on one nsq cluster.Boolean
default=true ) Configuration to (en/dis)abel the nsq recorderString
default="127.0.0.1" ) Host of a nsqdNumber
default=4150 ) Port of a nsqdBoolean
default=false ) Use zlib Deflate compression.Number
default=6 ) Use zlib Deflate compression level..connect()
- You have to connect the writer before publishing data
- -Return
- -( Writer ): retuns itself for chaining
- - -.disconnect()
- disconnect the client
- -Return
- -( Writer ): retuns itself for chaining
- - -.publish()
- You have to connect the writer before publishing data
- -Arguments
- -String
) Topic nameString|Object|Array
) Data to publish. If it's not a string it will be JSON stringifiedFunction
) Called after a successful publishReturn
- -( Writer ): retuns itself for chaining
- -Example:
- - -writer
- .connect()
- .publish(
- "hello", // the topic
- JSON.strinigify( { to: [ "nsq-logger" ] } ) // the data to send
- );
-
.activate()
- Activate the module
- -Return
- -( Boolean ): true
if it is now activated. false
if it was already active
.deactivate()
- Deactivate the module
- -Return
- -( Boolean ): true
if it is now deactivated. false
if it was already inactive
.active()
- Test if the module is currently active
- -Return
- -( Boolean ): Is active?
- - -.destroy( cb )
- Disconnect and remove all event listeners
- -Arguments
- -Function
) Called after destroymessage
- The main event to catch and process messages from all topics.
- -Arguments
- -String
) The topic of this messageString|Object|Array
) The message content. A String or parsed JSON data.String
) You have to call this function until the message was processed. This will remove the message from the queue. Otherwise it will be requeued. If you add a argument cb( new Error("Dooh!") )
it will interpreted as an error and this message be requeued immediately Message
) The raw nsqjs message.Example:
- - -logger.on( "message", function( topic, data, done ){
- // process your message.
- // E.g: writing the data to a db with the topic as tablename
- myDB.write( "INSERT INTO " + topic + " VALUES ( " + data + " )", done );
-});
-
ready
- Emitted once the list of topics was received and the readers are created and connected.
-This is just an internal helper. The Method list
will also wait for the first response. The events add
, remove
and change
are active after this first response.
-Example:
topics.on( "ready", function( err ){
- // handle the error
-});
-
writer.ready
- Type: ( Boolean )
- -The writer is ready to use
- - -writer.connected
- Type: ( Boolean )
- -The writer is connected to nsqd
var NsqReader = require( "nsq-logger/reader" )
-var reader = NsqReader( topic, channel, config )
-
Example:
- - -var NsqReader = require( "nsq-logger/reader" );
-
-var config = {
- clientId: "myFooClient",
- lookupdTCPAddresses: "127.0.0.1:4160",
- lookupdHTTPAddresses: "127.0.0.1: 4161",
-};
-
-// create the reader
-var reader = new NsqReader( "topic23", "channel42", config );
-
-
-reader.on( "message", function( topic, data, done ){
- // process your topic
- // Example response: -> data ="Do some Stuff!"
-
- // mark message as done
- done();
-});
-reader.connect();
-
Paramater
- -NsqReader( topic, channel, config )
String
required ) The topic to listen toString
required ) The nsq channel to use or createObject|Config
) Configuration object or a config objectConfig
- - - -String|Null
required ) An identifier used to disambiguate this client.String|Null
default=null ) Internally prefix the nsq topics. This will be handled transparent, but with this it's possible to separate different environments from each other. E.g. you can run a "staging" and "live" environment on one nsq cluster.Boolean
default=true ) Configuration to (en/dis)abel the nsq recorderNumber
default=1 ) The maximum number of messages to process at once. This value is shared between nsqd connections. It's highly recommended that this value is greater than the number of nsqd connections.Number
default=30 ) The frequency in seconds at which the nsqd will send heartbeats to this Reader.String[]
default=[ "127.0.0.1:4160", "127.0.0.1:4162" ] ) A list of nsq lookup serversString[]
default=[ "127.0.0.1:4161", "127.0.0.1:4163" ] ) A list of nsq lookup serversNumber
default=10 ) The number of times to a message can be requeued before it will be handed to the DISCARD handler and then automatically finished. 0 means that there is no limit. If not DISCARD handler is specified and maxAttempts > 0, then the message will be finished automatically when the number attempts has been exhausted.Number|Null
default=null ) Message timeout in ms or null
for no timeoutNumber|Null
default=null ) Deliver a percentage of all messages received to this connection. 1 <= sampleRate <= 99Number|Null
default=5 ) The delay is in seconds. This is how long nsqd will hold on the message before attempting it again..connect()
- You have to connect the writer before publishing data
- -Return
- -( Writer ): retuns itself for chaining
- - -.disconnect()
- disconnect the client
- -Return
- -( Writer ): retuns itself for chaining
- - -.activate()
- Activate the module
- -Return
- -( Boolean ): true
if it is now activated. false
if it was already active
.deactivate()
- Deactivate the module
- -Return
- -( Boolean ): true
if it is now deactivated. false
if it was already inactive
.active()
- Test if the module is currently active
- -Return
- -( Boolean ): Is active?
- - -.destroy( cb )
- Disconnect and remove all event listeners
- -Arguments
- -Function
) Called after destroymessage
- The main event to catch and process messages from the defined topic.
- -Arguments
- -String|Object|Array
) The message content. A String or parsed JSON data.String
) You have to call this function until the message was processed. This will remove the message from the queue. Otherwise it will be requeued. If you add a argument cb( new Error("Dooh!") )
it will interpreted as an error and this message be requeued immediately Example:
- - -logger.on( "message", function( data, done ){
- // process your message.
- // E.g: writing the data to a db
- myDB.write( "INSERT INTO mylogs VALUES ( " + data + " )", done );
-});
-
ready
- Emitted once the list of topics was received and the readers are created and connected.
-This is just an internal helper. The Method list
will also wait for the first response. The events add
, remove
and change
are active after this first response.
-Example:
topics.on( "ready", function( err ){
- // handle the error
-});
-
writer.ready
- Type: ( Boolean )
- -The writer is ready to use
- - -writer.connected
- Type: ( Boolean )
- -The writer is connected to nsqd
|Version|Date|Description|
-|:--:|:--:|:--|
-|0.0.7|2016-01-20|Added raw nsqjs Message as last argument to the message
event |
-|0.0.6|2015-12-04|Bugfix on setting an array configuration; added code banner|
-|0.0.5|2015-12-03|Added namespaces and made multiple parallel logger instances possible.|
-|0.0.4|2015-12-03|configuration bugfix|
-|0.0.3|2015-12-02|updated object tests|
-|0.0.2|2015-12-02|Internal restructure and docs|
-|0.0.1|2015-12-02|Initial version|
-- - -Initially Generated with generator-mpnodemodule
-
|Name|Description| -|:--|:--| -|nsq-topics|Nsq helper to poll a nsqlookupd service for all it's topics and mirror it locally.| -|nsq-nodes|Nsq helper to poll a nsqlookupd service for all it's nodes and mirror it locally.| -|node-cache|Simple and fast NodeJS internal caching. Node internal in memory cache like memcached.| -|nsq-watch|Watch one or many topics for unprocessed messages.| -|rsmq|A really simple message queue based on redis| -|redis-heartbeat|Pulse a heartbeat to redis. This can be used to detach or attach servers to nginx or similar problems.| -|systemhealth|Node module to run simple custom checks for your machine or it's connections. It will use redis-heartbeat to send the current state to redis.| -|rsmq-cli|a terminal client for rsmq| -|rest-rsmq|REST interface for.| -|redis-sessions|An advanced session store for NodeJS and Redis| -|connect-redis-sessions|A connect or express middleware to simply use the redis sessions. With redis sessions you can handle multiple sessions per user_id.| -|redis-notifications|A redis based notification engine. It implements the rsmq-worker to safely create notifications and recurring reports.| -|hyperrequest|A wrapper around hyperquest to handle the results| -|task-queue-worker|A powerful tool for background processing of tasks that are run by making standard http requests -|soyer|Soyer is small lib for server side use of Google Closure Templates with node.js.| -|grunt-soy-compile|Compile Goggle Closure Templates ( SOY ) templates including the handling of XLIFF language files.| -|backlunr|A solution to bring Backbone Collections together with the browser fulltext search engine Lunr.js| -|domel|A simple dom helper if you want to get rid of jQuery| -|obj-schema|Simple module to validate an object by a predefined schema|
- - -Copyright © 2015 M. Peter, http://www.tcs.de
- -Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
- -The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
- -THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-
-
-
-
-
-
-
-- ¶ - NsqBasic Module --a collection of shared nsq methods - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
- ¶
-
-extend the internal config - |
- - | -
-
- ¶
-
-init errors - |
- 33
-34 @_initErrors()
-35
-36 @initialize( options )
-37
-38 @debug "loaded"
-39 return
-40
-41 fetchClientId: =>
-42 if _.isFunction( @config.clientId )
-43 _cid = @config.clientId()
-44 if not _.isString( @config.clientId )
-45 @_handleError( null, "EINVALIDCLIENTID" )
-46 return
-47
-48 @config.clientId = _cid
-49 return @config.clientId
-50
-51 if _.isString( @config.clientId )
-52 return @config.clientId
-53
-54 @_handleError( null, "EINVALIDCLIENTID" )
-55 return @config.clientId
-56
-57
-58 active: =>
-59 return @config.active
-60
-61 activate: =>
-62 if @config.active
-63 return false
-64 @config.active = true
-65 @connect()
-66 return true
-67
-68 deactivate: =>
-69 if not @config.active
-70 return false
-71 @config.active = false
-72 @disconnect()
-73 return true
-74
-75 connect: =>
-76 if not @config.active
-77 return
-78
-79 @_initClient()
-80
-81 if not @connected
-82 @disconnecting = false
-83 @log "info", "try to connect"
-84 @client.connect()
-85 return @
-86
-87 disconnect: =>
-88 @disconnecting = true
-89 @client.close()
-90 return
-91
-92 reconnect: =>
-93
-94 |
-
-
- ¶
-
-do not reconnect if it's a manual disconnect - |
- - | -
-
- ¶
-
-try a reconnect every 5 sec until the client is online again - |
- 96
-97 @t_reconnect = setTimeout( =>
-98 @connect()
-99 if not @connected
-100 @reconnect()
-101 return
-102 , 5000 )
-103 return
-104
-105 onConnect: =>
-106 @log "debug", "connection established"
-107 if @t_reconnect?
-108 clearTimeout(@t_reconnect)
-109
-110 @connected = true
-111 @emit( "connected" )
-112 return
-113
-114 onDisconnect: =>
-115 @log "warning", "connection lost" if not @disconnecting
-116
-117 |
-
-
- ¶
-
-if it's currently marked as connected start reconnecting - |
- 116
-117 if @connected and not @disconnecting
-118 @reconnect()
-119 @connected = false
-120 @emit( "disconnected" )
-121 return
-122
-123
-124 destroy: ( cb )=>
-125 if @connected
-126 @disconnect()
-127 @on "disconnected", ->
-128 @removeAllListeners()
-129 cb()
-130 return
-131 return
-132 cb()
-133 return
-134
-135 nsTest: ( topic )=>
-136 if not @config.namespace?
-137 return true
-138 return topic[...@config.namespace.length] is @config.namespace
-139
-140 nsRem: ( topic )=>
-141 if not @config.namespace?
-142 return topic
-143 if not @nsTest( topic )
-144 return topic
-145 return topic[@config.namespace.length..]
-146
-147 nsAdd: ( topic )=>
-148 if not @config.namespace?
-149 return topic
-150 if @nsTest( topic )
-151 return topic
-152 return @config.namespace + topic
-153
-154
-155 ERRORS: =>
-156 return @extend {}, super,
-157
-158 |
-
-
- ¶
-
-Exceptions - |
- - | -
-
-
-
-
-- ¶ - Config Module --a collection of shared nsq methods - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
- ¶
-
-GENERAL -active Boolean Configuration to (en/dis)abel the nsq recorder - |
- - | -
-
- ¶
-
-clientId String|Null An identifier used to disambiguate this client. - |
- - | -
-
- ¶
-
-namespace String A namespace for the topics. This will be added/removed transparent to the topics. So only topics within this namespace a relevant. - |
- - | -
-
- ¶
-
-LOGGER -loggerChannel String The channel name for the logger to each topic - |
- - | -
-
- ¶
-
-exceededTopic String A topic name, that will store exceeded messages. - |
- - | -
-
- ¶
-
-ignoreTopics String[]|Function A list of topics that should be ignored or a function that will called to check the ignored topics manually - |
- - | -
-
- ¶
-
-lookupdHTTPAddresses Number Time in seconds to poll the nsqlookupd servers to sync the availible topics - |
- - | -
-
- ¶
-
-READER -maxInFlight Number The maximum number of messages to process at once. This value is shared between nsqd connections. It's highly recommended that this value is greater than the number of nsqd connections. - |
- - | -
-
- ¶
-
-heartbeatInterval Number The frequency in seconds at which the nsqd will send heartbeats to this Reader. - |
- - | -
-
- ¶
-
-lookupdTCPAddresses String[] A list of nsq lookup servers - |
- - | -
-
- ¶
-
-lookupdHTTPAddresses String[] A list of nsq lookup servers - |
- - | -
-
- ¶
-
-maxAttempts Number The number of times to a message can be requeued before it will be handed to the DISCARD handler and then automatically finished. 0 means that there is no limit. If not DISCARD handler is specified and maxAttempts > 0, then the message will be finished automatically when the number attempts has been exhausted. - |
- - | -
-
- ¶
-
-messageTimeout Number|Null Message timeout in ms or |
- - | -
-
- ¶
-
-sampleRate Number|Null Deliver a percentage of all messages received to this connection. 1 <= sampleRate <= 99 - |
- - | -
-
- ¶
-
-requeueDelay Number|Null The delay is in seconds. This is how long nsqd will hold on the message before attempting it again. - |
- - | -
-
- ¶
-
-WRITER -host String Host of a nsqd - |
- - | -
-
- ¶
-
-port Number Port of a nsqd - |
- - | -
-
- ¶
-
-deflate Boolean Use zlib Deflate compression. - |
- - | -
-
- ¶
-
-deflateLevel Number Use zlib Deflate compression level. - |
- 57
-58 deflateLevel: 6
-59
-60 logging:
-61 severity: process.env[ "severity" ] or process.env[ "severity_nsq_logger"] or "warning"
-62 severitys: "fatal,error,warning,info,debug".split( "," )
-63
-64 addGetter = ( prop, _get, context )=>
-65 _obj =
-66 enumerable: true
-67 writable: true
-68
-69 if _.isFunction( _get )
-70 _obj.get = _get
-71 else
-72 _obj.value = _get
-73 Object.defineProperty( context, prop, _obj )
-74 return
-75
-76 class Config
-77
-78 constructor: ( input )->
-79 for _k, _v of DEFAULTS
-80 addGetter( _k, _v, @ )
-81
-82 @set( input )
-83 return
-84
-85 set: ( key, value )=>
-86 if not key?
-87 return
-88 if _.isObject( key )
-89 for _k, _v of key
-90 @set( _k, _v )
-91 return
-92 if _.isObject( @[ key ] ) and _.isObject( value ) and not _.isArray( value )
-93 @[ key ] = extend( true, {}, @[ key ], value )
-94 else
-95 @[ key ] = value
-96 return
-97
-98 module.exports = Config
-99 |
-
- index.coffee- |
- - |
-
- ¶
-
-
- |
- - | -
-
-
-
-
-
-
-
-
-- ¶ - NsqBasic Module --a collection of shared nsq methods - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
-
-
- - ¶ - defaults -- |
- - | -
-
- ¶
-
-active Boolean Configuration to (en/dis)abel the nsq recorder - |
- 18
-19 active: true
-20
-21 constructor: ->
-22 @connected = false
-23 super
-24 return
-25
-26 fetchClientId: =>
-27 if _.isFunction( @config.clientId )
-28 _cid = @config.clientId()
-29 if not _.isString( @config.clientId )
-30 @_handleError( null, "EINVALIDCLIENTID" )
-31 return
-32
-33 @config.clientId = _cid
-34 return @config.clientId
-35
-36 if _.isString( @config.clientId )
-37 return @config.clientId
-38
-39 @_handleError( null, "EINVALIDCLIENTID" )
-40 return @config.clientId
-41
-42
-43 active: =>
-44 return @config.active
-45
-46 activate: =>
-47 if @config.active
-48 return false
-49 @config.active = true
-50 @connect()
-51 return true
-52
-53 deactivate: =>
-54 if not @config.active
-55 return false
-56 @config.active = false
-57 @disconnect()
-58 return true
-59
-60 connect: =>
-61 if not @config.active
-62 return
-63
-64 @_initClient()
-65
-66 if not @connected
-67 @disconnecting = false
-68 @log "info", "try to connect"
-69 @client.connect()
-70 return
-71
-72 disconnect: =>
-73 @disconnecting = true
-74 @client.close()
-75 return
-76
-77 reconnect: =>
-78
-79 |
-
-
- ¶
-
-do not reconnect if it's a manual disconnect - |
- - | -
-
- ¶
-
-try a reconnect every 5 sec until the client is online again - |
- 81
-82 @t_reconnect = setTimeout( =>
-83 @connect()
-84 if not @connected
-85 @reconnect()
-86 return
-87 , 5000 )
-88 return
-89
-90 onConnect: =>
-91 @log "debug", "connection established"
-92 if @t_reconnect?
-93 clearTimeout(@t_reconnect)
-94
-95 @connected = true
-96 @emit( "connected" )
-97 return
-98
-99 onDisconnect: =>
-100 @log "warning", "connection lost" if not @disconnecting
-101
-102 |
-
-
- ¶
-
-if it's currently marked as connected start reconnecting - |
- 101
-102 if @connected and not @disconnecting
-103 @reconnect()
-104 @connected = false
-105 @emit( "disconnected" )
-106 return
-107
-108
-109 destroy: ( cb )=>
-110 if @connected
-111 @disconnect()
-112 @on "disconnected", ->
-113 cb()
-114 return
-115 return
-116 cb()
-117 return
-118
-119 ERRORS: =>
-120 return @extend {}, super,
-121
-122 |
-
-
- ¶
-
-Exceptions - |
- - | -
-
-
-
-
-- ¶ - Config Module --a collection of shared nsq methods - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
- ¶
-
-GENERAL -active Boolean Configuration to (en/dis)abel the nsq recorder - |
- - | -
-
- ¶
-
-clientId String|Null An identifier used to disambiguate this client. - |
- - | -
-
- ¶
-
-LOGGER -loggerChannel String The channel name for the logger to each topic - |
- - | -
-
- ¶
-
-exceededTopic String A topic name, that will store exceeded messages. - |
- - | -
-
- ¶
-
-ignoreTopics String[]|Function A list of topics that should be ignored or a function that will called to check the ignored topics manually - |
- - | -
-
- ¶
-
-lookupdHTTPAddresses Number Time in seconds to poll the nsqlookupd servers to sync the availible topics - |
- - | -
-
- ¶
-
-READER -maxInFlight Number The maximum number of messages to process at once. This value is shared between nsqd connections. It's highly recommended that this value is greater than the number of nsqd connections. - |
- - | -
-
- ¶
-
-heartbeatInterval Number The frequency in seconds at which the nsqd will send heartbeats to this Reader. - |
- - | -
-
- ¶
-
-lookupdTCPAddresses String[] A list of nsq lookup servers - |
- - | -
-
- ¶
-
-lookupdHTTPAddresses String[] A list of nsq lookup servers - |
- - | -
-
- ¶
-
-maxAttempts Number The number of times to a message can be requeued before it will be handed to the DISCARD handler and then automatically finished. 0 means that there is no limit. If not DISCARD handler is specified and maxAttempts > 0, then the message will be finished automatically when the number attempts has been exhausted. - |
- - | -
-
- ¶
-
-messageTimeout Number|Null Message timeout in ms or |
- - | -
-
- ¶
-
-sampleRate Number|Null Deliver a percentage of all messages received to this connection. 1 <= sampleRate <= 99 - |
- - | -
-
- ¶
-
-requeueDelay Number|Null The delay is in seconds. This is how long nsqd will hold on the message before attempting it again. - |
- - | -
-
- ¶
-
-WRITER -host String Host of a nsqd - |
- - | -
-
- ¶
-
-port Number Port of a nsqd - |
- - | -
-
- ¶
-
-deflate Boolean Use zlib Deflate compression. - |
- - | -
-
- ¶
-
-deflateLevel Number Use zlib Deflate compression level. - |
- 55
-56 deflateLevel: 6
-57
-58 addGetter = ( prop, _get, context )=>
-59 _obj =
-60 enumerable: true
-61 writable: true
-62
-63 if _.isFunction( _get )
-64 _obj.get = _get
-65 else
-66 _obj.value = _get
-67 Object.defineProperty( context, prop, _obj )
-68 return
-69
-70 class Config
-71
-72 constructor: ( input )->
-73 for _k, _v of DEFAULTS
-74 addGetter( _k, _v, @ )
-75
-76 @set( input )
-77 return
-78
-79 set: ( key, value )=>
-80 if not key?
-81 return
-82 if _.isObject( key )
-83 for _k, _v of key
-84 @set( _k, _v )
-85 return
-86 @[ key ] = value
-87 return
-88
-89 get: ( name, logging = false )->
-90 _cnf = {}
-91
-92 if logging
-93 logging =
-94 logging:
-95 severity: process.env[ "severity" ] or process.env[ "severity_#{name}"] or "warning"
-96 severitys: "fatal,error,warning,info,debug".split( "," )
-97 return extend( true, @, logging, _cnf )
-98 else
-99 return @
-100
-101
-102
-103
-104 module.exports = new Config()
-105 |
-
-
-
-
-
-
-
-
-
-- ¶ - NsqLogger Module --A reader factory to spin up on reader per topic - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
-
-
- - ¶ - defaults -- |
- - | -
-
- ¶
-
-loggerChannel String The channel name for the logger to each topic - |
- - | -
-
- ¶
-
-exceededTopic String A topic name, that will store exceeded messages. - |
- - | -
-
- ¶
-
-ignoreTopics String[]|Function A list of topics that should be ignored or a function that will called to check the ignored topics manually - |
- - | -
-
- ¶
-
-add a topic filter to only connect to topics that do not start with a "_" - |
- 44
-45 Topics.filter ( testT )=>
-46 if testT is @config.exceededTopic
-47 return false
-48 if not @config.ignoreTopics?
-49 return true
-50 if _.isArray( @config.ignoreTopics ) and testT in @config.ignoreTopics
-51 return false
-52 if _.isFunction( @config.ignoreTopics )
-53 return @config.ignoreTopics( testT )
-54 return true
-55
-56 Topics.list ( err, topics )=>
-57 if err
-58 @log "error", "initial topic read"
-59
-60 |
-
-
- ¶
-
-on initial read error retry to read the topic after 60 sec - |
- - | -
-
- ¶
-
-create initail readers - |
- - | -
-
- ¶
-
-listen to topic changes - |
- 67
-68 Topics.on "add", @addReader
-69 Topics.on "remove", @removeReader
-70
-71 @ready = true
-72 @emit( "ready" )
-73 return
-74 return
-75
-76 destroy: ( cb )=>
-77 @warning "destroy logger"
-78 if not @ready
-79 return
-80
-81 _count = Object.keys( READERS ).length
-82
-83 Writer.destroy =>
-84 @warning "destroy #{_count} readers"
-85 for _name, _reader of READERS
-86 READERS[ _name ].destroy ->
-87 _count--
-88 if _count <= 0
-89 cb()
-90 return
-91 return
-92
-93 return
-94
-95 addReader: ( topic )=>
-96 if READERS[ topic ]?
-97 @_handleError( "addReader", "EREADEREXISTS", { topic: topic } )
-98 return
-99 READERS[ topic ] = new Reader( @, topic, @config.loggerChannel )
-100 READERS[ topic ].on "message", @message
-101 READERS[ topic ].on "exceeded", @exceeded
-102 @log "info", "reader ´#{topic}´ added"
-103 return
-104
-105 removeReader: ( topic )=>
-106 if not READERS[ topic ]?
-107 @_handleError( "removeReader", "EREADERNOTFOUND", { topic: topic } )
-108 return
-109
-110 READERS[ topic ].destroy ( err )=>
-111 if err
-112 @log "error", "destroy reader", err
-113 return
-114 READERS[ topic ].removeAllListeners()
-115 delete READERS[ topic ]
-116 @log "info", "reader ´#{topic}´ destroyed", Object.keys( READERS )
-117 return
-118 return
-119
-120 message: ( topic, data, cb )=>
-121 @emit( "message", topic, data, cb )
-122 return
-123
-124 exceeded: ( topic, data )=>
-125 _data =
-126 topic: topic
-127 payload: data
-128
-129 Writer.connect()
-130 Writer.publish @config.exceededTopic, _data , ( err )=>
-131 if err
-132 @log "error", "write messag to exceeded list", err
-133 return
-134 return
-135
-136 ERRORS: =>
-137 return @extend {}, super,
-138
-139 |
-
-
- ¶
-
-Exceptions - |
- - | -
-
-
-
-
-
-
-
-
-- ¶ - NsqReader Module --a nsq reader for a single topic - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
-
-
- - ¶ - defaults -- |
- - | -
-
- ¶
-
-maxInFlight Number The maximum number of messages to process at once. This value is shared between nsqd connections. It's highly recommended that this value is greater than the number of nsqd connections. - |
- - | -
-
- ¶
-
-heartbeatInterval Number The frequency in seconds at which the nsqd will send heartbeats to this Reader. - |
- - | -
-
- ¶
-
-lookupdTCPAddresses String[] A list of nsq lookup servers - |
- - | -
-
- ¶
-
-lookupdHTTPAddresses String[] A list of nsq lookup servers - |
- - | -
-
- ¶
-
-maxAttempts Number The number of times to a message can be requeued before it will be handed to the DISCARD handler and then automatically finished. 0 means that there is no limit. If not DISCARD handler is specified and maxAttempts > 0, then the message will be finished automatically when the number attempts has been exhausted. - |
- - | -
-
- ¶
-
-messageTimeout Number|Null Message timeout in ms or |
- - | -
-
- ¶
-
-sampleRate Number|Null Deliver a percentage of all messages received to this connection. 1 <= sampleRate <= 99 - |
- - | -
-
- ¶
-
-clientId String|Null An identifier used to disambiguate this client. - |
- - | -
-
- ¶
-
-requeueDelay Number|Null The delay is in seconds. This is how long nsqd will hold on the message before attempting it again. - |
- 35
-36 requeueDelay: 5
-37
-38 constructor: ( @logger, @topic, @channel, options )->
-39 @connected = false
-40
-41 super( options )
-42 if not @config.active
-43 @log "warning", "nsq reader disabled"
-44 return
-45
-46 @fetchClientId()
-47
-48 @connect()
-49 return
-50
-51 _initClient: =>
-52 if @client
-53 return @client
-54 @log "debug", "start reader", [@topic, @channel]
-55 @client = new nsq.Reader( @topic, @channel, @config )
-56
-57 @client.on( nsq.Reader.NSQD_CLOSED, @onDisconnect )
-58 @client.on( nsq.Reader.NSQD_CONNECTED, @onConnect )
-59 @client.on( nsq.Reader.MESSAGE, @onMessage )
-60
-61 @client.on( nsq.Reader.DISCARD, @onDiscard )
-62 @client.on( nsq.Reader.ERROR, @onError )
-63
-64 return @client
-65
-66 onError: ( err )=>
-67 @log "error", "nsq-reader", err
-68 return
-69
-70 onDiscard: ( msg )=>
-71 @emit( "exceeded", @topic, msg.json() )
-72 @log "warning", "message exceeded", @topic, msg.json()
-73 return
-74
-75 onMessage: ( msg )=>
-76 @emit "message", @topic, msg.json(), ( err )=>
-77 if err
-78 @log "error", "message processing", err
-79 msg.requeue( @config.requeueDelay )
-80 return
-81 msg.finish()
-82 return
-83 return
-84
-85
-86 module.exports = ( logger, topic, channel )->
-87 return new NsqReader( logger, topic, channel, config )
-88 |
-
-
-
-
-
-
-
-
-
-- ¶ - NsqTopics Wrapper --Create and configute a nsq-topics instance - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
-
-
-
-
-
-
-
-- ¶ - NsqWriter Module --This module is a helper to simply write data to nsq - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
-
-
- - ¶ - defaults -- |
- - | -
-
- ¶
-
-host String Host of a nsqd - |
- - | -
-
- ¶
-
-port Number Port of a nsqd - |
- - | -
-
- ¶
-
-deflate Boolean Use zlib Deflate compression. - |
- - | -
-
- ¶
-
-deflateLevel Number Use zlib Deflate compression level. - |
- - | -
-
- ¶
-
-clientId String|Null An identifier used to disambiguate this client. - |
- 26
-27 clientId: null
-28
-29 constructor: ->
-30 @connected = false
-31 super
-32
-33 @publish = @_waitUntil( @_publish, "connected" )
-34
-35 if not @config.active
-36 @warning "nsq writer disabled"
-37 return
-38
-39 @fetchClientId()
-40 return
-41
-42 _initClient: =>
-43 if @client?
-44 return @client
-45
-46 @client = new nsq.Writer( @config.host, @config.port )
-47
-48 @client.on( nsq.Writer.READY, @onConnect )
-49 @client.on( nsq.Writer.CLOSED, @onDisconnect )
-50
-51 @client.on nsq.Writer.ERROR, ( err )=>
-52 @error "nsq error", err
-53 return
-54
-55 @debug "init writer client", @client
-56 return @client
-57
-58 _publish: ( topic, data, cb )=>
-59 if not @config.active
-60 @_handleError( cb, "ENSQOFF" )
-61 return
-62
-63 @debug "publish", topic
-64 @client.publish topic, JSON.stringify(data), ( err )=>
-65 if err
-66 if cb?
-67 cb( err )
-68 else
-69 @error( "publish to topic `#{topic}`", err )
-70 return
-71 @debug "send data to `#{topic}`"
-72 cb( null ) if cb?
-73 return
-74
-75 return
-76
-77 module.exports = new NsqWriter()
-78 |
-
-
-
-
-
-
-
-
-
-- ¶ - NsqLogger Module --A reader factory to spin up on reader per topic - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
-
-
- - ¶ - defaults -- |
- - | -
-
- ¶
-
-loggerChannel String The channel name for the logger to each topic - |
- - | -
-
- ¶
-
-exceededTopic String A topic name, that will store exceeded messages. - |
- - | -
-
- ¶
-
-ignoreTopics String[]|Function A list of topics that should be ignored or a function that will called to check the ignored topics manually - |
- - | -
-
- ¶
-
-namespace String A namespace for the topics. This will be added/removed transparent to the topics. So only topics within this namespace a relevant. - |
- - | -
-
- ¶
-
-set flags - |
- 35
-36 @ready = false
-37
-38 super( options )
-39
-40 @debug "config", @config
-41
-42 _writerInst = null
-43 @getter "Writer", =>
-44 if not _writerInst?
-45 _writerInst = new Writer( @config )
-46 return _writerInst
-47
-48 _topicsInst = null
-49 @getter "Topics", =>
-50 if not _topicsInst?
-51 _topicsInst = new Topics( @config )
-52 return _topicsInst
-53
-54 @_start()
-55 return
-56
-57 _start: =>
-58 if @ready
-59 return
-60
-61 @Topics.filter ( testT )=>
-62 if not @nsTest( testT )
-63 return false
-64
-65 _tp = @nsRem( testT )
-66
-67 if _tp is @config.exceededTopic
-68 return false
-69
-70 if @config.ignoreTopics?
-71 if _.isArray( @config.ignoreTopics ) and _tp in @config.ignoreTopics
-72 return false
-73 if _.isFunction( @config.ignoreTopics )
-74 return @config.ignoreTopics( _tp )
-75
-76 return true
-77
-78 @Topics.list ( err, topics )=>
-79 if err
-80 @log "error", "initial topic read"
-81
-82 |
-
-
- ¶
-
-on initial read error retry to read the topic after 60 sec - |
- - | -
-
- ¶
-
-create initail readers - |
- - | -
-
- ¶
-
-listen to topic changes - |
- 88
-89 @Topics.on "add", @addReader
-90 @Topics.on "remove", @removeReader
-91
-92 @ready = true
-93 @emit( "ready" )
-94 return
-95 return
-96
-97 destroy: ( cb )=>
-98 @warning "destroy logger"
-99 if not @ready
-100 return
-101
-102 _count = Object.keys( @READERS ).length
-103
-104 @Topics.deactivate()
-105
-106 @Writer.destroy =>
-107 @warning "destroy #{_count} readers"
-108 for _name, _reader of @READERS
-109 @READERS[ _name ].destroy =>
-110 _count--
-111 if _count <= 0
-112 @removeAllListeners()
-113 cb()
-114 return
-115 return
-116
-117 return
-118
-119 addReader: ( topic )=>
-120 topic = @nsRem( topic )
-121 if @READERS[ topic ]?
-122 @_handleError( "addReader", "EREADEREXISTS", { topic: topic } )
-123 return
-124 @READERS[ topic ] = new Reader( @, topic, @config.loggerChannel, @config )
-125 @READERS[ topic ].on "message", ( data, cb, msg )=>
-126 @emit( "message", topic, data, cb, msg )
-127 return
-128
-129 @READERS[ topic ].on "exceeded", ( data, cb )=>
-130 @exceeded( topic, data, cb )
-131 return
-132
-133 @READERS[ topic ].connect()
-134
-135 @log "info", "reader ´#{topic}´ added"
-136 return
-137
-138 removeReader: ( topic )=>
-139 topic = @nsRem( topic )
-140 if not @READERS[ topic ]?
-141 @_handleError( "removeReader", "EREADERNOTFOUND", { topic: topic } )
-142 return
-143
-144 @READERS[ topic ].destroy ( err )=>
-145 if err
-146 @log "error", "destroy reader", err
-147 return
-148 @READERS[ topic ].removeAllListeners()
-149 delete @READERS[ topic ]
-150 @log "info", "reader ´#{topic}´ destroyed", Object.keys( @READERS )
-151 return
-152 return
-153
-154 exceeded: ( topic, data )=>
-155 _data =
-156 topic: topic
-157 payload: data
-158
-159 @emit( "exceeded", @nsRem( topic ), data )
-160
-161 @Writer.connect()
-162 @Writer.publish @config.exceededTopic, _data , ( err )=>
-163 if err
-164 @log "error", "write messag to exceeded list", err
-165 return
-166 return
-167
-168 ERRORS: =>
-169 return @extend {}, super,
-170
-171 |
-
-
- ¶
-
-Exceptions - |
- - | -
-
-
-
-
-
-
-
-
-- ¶ - NsqReader Module --a nsq reader for a single topic - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
-
-
- - ¶ - defaults -- |
- - | -
-
- ¶
-
-maxInFlight Number The maximum number of messages to process at once. This value is shared between nsqd connections. It's highly recommended that this value is greater than the number of nsqd connections. - |
- - | -
-
- ¶
-
-heartbeatInterval Number The frequency in seconds at which the nsqd will send heartbeats to this Reader. - |
- - | -
-
- ¶
-
-lookupdTCPAddresses String[] A list of nsq lookup servers - |
- - | -
-
- ¶
-
-lookupdHTTPAddresses String[] A list of nsq lookup servers - |
- - | -
-
- ¶
-
-maxAttempts Number The number of times to a message can be requeued before it will be handed to the DISCARD handler and then automatically finished. 0 means that there is no limit. If not DISCARD handler is specified and maxAttempts > 0, then the message will be finished automatically when the number attempts has been exhausted. - |
- - | -
-
- ¶
-
-messageTimeout Number|Null Message timeout in ms or |
- - | -
-
- ¶
-
-sampleRate Number|Null Deliver a percentage of all messages received to this connection. 1 <= sampleRate <= 99 - |
- - | -
-
- ¶
-
-clientId String|Null An identifier used to disambiguate this client. - |
- - | -
-
- ¶
-
-requeueDelay Number|Null The delay is in seconds. This is how long nsqd will hold on the message before attempting it again. - |
- - | -
-
- ¶
-
-namespace String A namespace for the topics. This will be added/removed transparent to the topics. So only topics within this namespace a relevant. - |
- 37
-38 namespace: null
-39
-40 constructor: ( @logger, @topic, @channel, options )->
-41 @connected = false
-42
-43 super( options )
-44
-45 @fetchClientId()
-46 return
-47
-48 _initClient: =>
-49 if @client
-50 return @client
-51 @debug "start reader", @topic, @channel, @config.namespace
-52 @client = new nsq.Reader( @nsAdd( @topic ), @channel, @config )
-53
-54 @client.on( nsq.Reader.NSQD_CLOSED, @onDisconnect )
-55 @client.on( nsq.Reader.NSQD_CONNECTED, @onConnect )
-56 @client.on( nsq.Reader.MESSAGE, @onMessage )
-57
-58 @client.on( nsq.Reader.DISCARD, @onDiscard )
-59 @client.on( nsq.Reader.ERROR, @onError )
-60
-61 return @client
-62
-63 onError: ( err )=>
-64 @error "nsq-reader", err
-65 return
-66
-67 onDiscard: ( msg )=>
-68 @emit( "exceeded", msg.json() )
-69 @warning "message exceeded", @topic, msg.json()
-70 return
-71
-72 onMessage: ( msg )=>
-73 @emit( "message", msg.json(), ( err )=>
-74 if err
-75 @error "message processing", err
-76 msg.requeue( @config.requeueDelay )
-77 return
-78 msg.finish()
-79 return
-80 , msg )
-81 return
-82
-83
-84 module.exports = NsqReader
-85 |
-
- data.coffee- |
- - |
-
- ¶
-
-
- |
- 1 async = require('async')
-2 hyperrequest = require('hyperrequest')
-3
-4 utils = require('./utils')
-5
-6
-7 module.exports = ( namespace )->
-8
-9 generatedTopics = []
-10
-11 deleteTopic = ( host, topic )->
-12 return ( cb )->
-13 hyperrequest { url: "http://#{host}/delete_topic", qs: { topic: ( namespace or "" ) + topic } }, ( err, resp )->
-14 if resp?.statusCode is 200
-15 cb( null, topic )
-16 else
-17 cb( resp.body )
-18 return
-19 return
-20
-21 deleteTopics = ( topics, hosts, cb )->
-22 if not topics?.length
-23 cb()
-24 return
-25 aFns = []
-26 for host in hosts
-27 for topic in topics
-28 aFns.push( deleteTopic( host, topic) )
-29
-30 console.log " delete #{topics.length} test topics ... "
-31 async.parallelLimit aFns, 5, ( errs, topics )->
-32 console.log " ... done!"
-33 cb()
-34 return
-35 return
-36 return {
-37 newTopic: ->
-38 _t = utils.randomString( 5 )
-39 generatedTopics.push( _t )
-40 return _t
-41
-42 cleanup: ( hosts, cb )->
-43
-44 deleteTopics( generatedTopics, hosts, cb )
-45
-46 return
-47 }
-48 |
-
- main.coffee- |
- - |
-
- ¶
-
-
- |
- 1 should = require('should')
-2 _ = require('lodash')
-3
-4 config = require( "../config" )
-5
-6 utils = require( "./utils" )
-7
-8 testServers = require( "./nsq-deamons" )
-9 NsqLogger = require( "../." )
-10
-11 CNF =
-12 clientId: "mochaTest"
-13 lookupdPollInterval: 1
-14 logging: {}
-15 lookupdHTTPAddresses: [ "localhost:4161", "localhost:4163" ]
-16 namespace: null
-17
-18
-19 logger = null
-20 writer = null
-21 config = null
-22
-23
-24 namespaces = [ null, "mochatestA_", "mochatestB_" ]
-25
-26
-27 describe "----- nsq-logger TESTS -----", ->
-28
-29 before ( done )->
-30 testServers.start ->
-31 done()
-32 return
-33 return
-34
-35 after: ( done )->
-36 testServers.stop( done )
-37 return
-38
-39 namespaces.forEach ( namespace )->
-40
-41 testData = require( "./data" )( namespace )
-42
-43 describe "Namespace `#{namespace}` Tests", ->
-44
-45 before ( done )->
-46 logger = new NsqLogger( _.extend( {}, CNF, {namespace: namespace} ) )
-47
-48 writer = logger.Writer
-49 config = logger.config
-50 writer.connect()
-51 done()
-52 return
-53
-54 after ( done )->
-55 @timeout( 10000 )
-56 testData.cleanup config.lookupdHTTPAddresses, ->
-57 logger.destroy ->
-58 done()
-59 return
-60 return
-61
-62 describe 'Main Tests', ->
-63 it "wait for a single message", ( done )->
-64 @timeout( 10000 )
-65 _topic = testData.newTopic()
-66 _data = utils.randomString( 1024 )
-67
-68 logger.on "message", ( topic, data, cb, msg )->
-69 cb()
-70
-71
-72 |
-
-
- ¶
-
-wait for the previously generated topic - |
- 72
-73 if topic is _topic
-74 msg.attempts.should.be.Number().equal( 1 )
-75
-76 topic.should.equal( _topic )
-77 data.should.equal( _data )
-78
-79 logger.removeAllListeners( "message" )
-80 done()
-81 return
-82 writer.publish( _topic, _data )
-83 return
-84
-85 it "test many messages within multiple topics", ( done )->
-86 @timeout( 6000 )
-87
-88 logger.removeAllListeners( "message" )
-89
-90 _topics = {}
-91 for idx in [1..5]
-92 _topic = testData.newTopic()
-93 _topics[ _topic ] = []
-94 for idx in [1..20]
-95 _topics[ _topic ].push utils.randomString( utils.randRange( 1, 20 ) * 1024 )
-96
-97 logger.on "message", ( topic, data, cb )->
-98 cb()
-99
-100 |
-
-
- ¶
-
-wait for the previously generated topic - |
- 99
-100 if _topics[topic]?
-101 _idx = _topics[topic].indexOf( data )
-102 _topics[topic][ _idx ] = null
-103
-104 if not _.compact( _topics[topic] ).length
-105 delete _topics[topic]
-106
-107 if not _topics? or _.isEmpty( _topics )
-108 logger.removeAllListeners( "message" )
-109 done()
-110 return
-111
-112 for tpc, datas of _topics
-113 for data in datas
-114 writer.publish( tpc, data )
-115 return
-116
-117 it "test many json messages within multiple topics", ( done )->
-118 @timeout( 6000 )
-119
-120 logger.removeAllListeners( "message" )
-121
-122 _topics = {}
-123 for idx in [1..5]
-124 _topic = testData.newTopic()
-125 _topics[ _topic ] = []
-126 for idx in [1..5]
-127 _topics[ _topic ].push JSON.stringify( utils.randomobj( { maxObjSize: 5, maxDepth: 1, maxComplex: 1, maxStringLength: 50 } ) )
-128
-129 logger.on "message", ( topic, data, cb )->
-130 cb()
-131
-132 |
-
-
- ¶
-
-wait for the previously generated topic - |
- - | -
-
- ¶
-
-use stringified versions to find it within the list - |
- 133
-134 _idx = _topics[topic].indexOf( JSON.stringify( data ) )
-135 _topics[topic][ _idx ] = null
-136
-137 if not _.compact( _topics[topic] ).length
-138 delete _topics[topic]
-139
-140 if not _topics? or _.isEmpty( _topics )
-141 logger.removeAllListeners( "message" )
-142 done()
-143 return
-144 for tpc, datas of _topics
-145 for data in datas
-146 writer.publish( tpc, JSON.parse( data ) )
-147 return
-148
-149 return
-150 return
-151 return
-152 return
-153 |
-
- nsq-deamons.coffee- |
- - |
-
- ¶
-
-
- |
- 1 spawn = require('child_process').spawn
-2 fs = require( "fs" )
-3 pathHelper = require( "path" )
-4
-5 _ = require('lodash')
-6
-7 _nsqDataPath = pathHelper.resolve( "./.nsqdata/" )
-8
-9 try
-10 fs.mkdirSync( _nsqDataPath )
-11
-12 deamons = [
-13 {
-14 "name": "LOOKUP-A"
-15 "bin": "nsqlookupd"
-16 "args": {
-17 "http-address": "127.0.0.1:4161"
-18 "tcp-address": "127.0.0.1:4160"
-19 }
-20 },{
-21 "name": "LOOKUP-B"
-22 "bin": "nsqlookupd"
-23 "args": {
-24 "http-address": "127.0.0.1:4163"
-25 "tcp-address": "127.0.0.1:4162"
-26 }
-27 },{
-28 "name": "NSQ"
-29 "bin": "nsqd"
-30 "args": {
-31 "lookupd-tcp-address": [ "127.0.0.1:4160", "127.0.0.1:4162" ]
-32 "data-path": _nsqDataPath
-33 }
-34 }
-35 ]
-36
-37 class Deamons extends require( "events" ).EventEmitter
-38 constructor: ->
-39 @iRunning = 0
-40 @running = []
-41
-42 @basepath = pathHelper.resolve( "./node_modules/nsq-bundle/bin/" )
-43 return
-44
-45 closedOne: =>
-46 @iRunning--
-47 @emit "close"
-48 if @iRunning <= 0
-49 @emit "closedAll"
-50 return
-51
-52 start: ( cb )=>
-53 for deamon in deamons
-54 @running.push( @create( deamon, @closedOne ) )
-55 @iRunning++
-56
-57 setTimeout( cb, 1000 )
-58 return
-59
-60 create: ( options, closed )->
-61 _args = []
-62 for _k, _v of options.args
-63 if _.isArray( _v )
-64 for _vs in _v
-65 _arg = "-" + _k
-66 if _vs?
-67 _arg += "=" + _vs
-68 _args.push _arg
-69 else
-70 _arg = "-" + _k
-71 if _v?
-72 _arg += "=" + _v
-73 _args.push _arg
-74
-75 if process.env.NSQLOG
-76 console.log "✅ START #{ @basepath }/#{options.bin} #{_args.join( " " )}" if process.env.NSQLOG
-77 else
-78 console.log "✅ START #{options.name}"
-79 deamon = spawn( "#{ @basepath }/#{options.bin}", _args )
-80
-81 deamon.stdout.on "data", ( data )->
-82 console.log "LOG #{options.name}:", data.toString() if process.env.NSQLOG
-83 return
-84
-85 deamon.stderr.on "data", ( data )->
-86 console.error "ERR #{options.name}:", data.toString() if process.env.NSQERR
-87 return
-88
-89 deamon.on "close", ( data )->
-90 console.log "⛔️ STOPPED #{options.name}"
-91 closed()
-92 return
-93
-94 return deamon
-95
-96 stop: ( cb )=>
-97 console.log "STOP deamons!"
-98 if @iRunning <= 0
-99 cb()
-100 return
-101
-102 for rd in @running
-103 rd.kill()
-104
-105 @on "closedAll", ->
-106 cb()
-107 return
-108 return
-109
-110 module.exports = new Deamons()
-111
-112 |
-
- utils.coffee- |
- - |
-
- ¶
-
-
- |
- 1 utils =
-2 randomString: ( string_length = 5, specialLevel = 0 ) ->
-3 chars = "BCDFGHJKLMNPQRSTVWXYZbcdfghjklmnpqrstvwxyz"
-4 chars += "0123456789" if specialLevel >= 1
-5 chars += "_-@:." if specialLevel >= 2
-6 chars += "!\"§$%&/()=?*'_:;,.-#+¬”#£fi^\\˜·¯˙˚«∑€®†Ω¨⁄øπ•‘æœ@∆ºª©ƒ∂‚å–…∞µ~∫√ç≈¥" if specialLevel >= 3
-7
-8 randomstring = ""
-9 i = 0
-10
-11 while i < string_length
-12 rnum = Math.floor(Math.random() * chars.length)
-13 randomstring += chars.substring(rnum, rnum + 1)
-14 i++
-15 randomstring
-16
-17 randRange: ( lowVal, highVal )->
-18 Math.floor( Math.random()*(highVal-lowVal+1 ))+lowVal
-19
-20 clone: (inp)->
-21 return JSON.parse(JSON.stringify(inp))
-22
-23 randomobj: ( opt = {}, depth = 0 )->
-24 tgrt={}
-25 for i in [0..utils.randRange(1,( if opt.maxObjSize? then opt.maxObjSize else 13 ))]
-26 _key = utils.randomString( utils.randRange(2,32),0 )
-27 if not tgrt[ _key ]?
-28 tgrt[ _key ] = utils.randomdata( opt, depth )
-29 return tgrt
-30
-31 randomdata: ( opt = {} , depth = 0 )->
-32 if depth >= ( if opt.maxDepth? then opt.maxDepth else 2 )
-33 _i = utils.randRange(1,2)
-34 else
-35 _i = utils.randRange(1,4)
-36
-37 _depth = depth + 1
-38 switch _i
-39 when 1
-40 return utils.randomString( utils.randRange(1,( if opt.maxStringLength? then opt.maxStringLength else 1024*5 )), ( if opt.maxComplex? then opt.maxComplex else 3 ) )
-41 when 2
-42 return utils.randRange(1,1024*64 )
-43 when 3
-44 _arr = []
-45 for i in [0..utils.randRange(0,13)]
-46 _arr.push utils.randomdata( opt, _depth )
-47 return _arr
-48 when 4
-49 return utils.randomobj( opt, _depth )
-50
-51 module.exports = utils
-52 |
-
-
-
-
-
-
-
-
-
-- ¶ - NsqTopics Wrapper --Create and configute a nsq-topics instance - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
-
-
-
-
-
-
-
-- ¶ - NsqWriter Module --This module is a helper to simply write data to nsq - |
- - | -
-
- ¶
-
-npm modules - |
- - | -
-
- ¶
-
-internal modules - |
- - | -
-
-
-
- - ¶ - defaults -- |
- - | -
-
- ¶
-
-host String Host of a nsqd - |
- - | -
-
- ¶
-
-port Number Port of a nsqd - |
- - | -
-
- ¶
-
-deflate Boolean Use zlib Deflate compression. - |
- - | -
-
- ¶
-
-deflateLevel Number Use zlib Deflate compression level. - |
- - | -
-
- ¶
-
-clientId String|Null An identifier used to disambiguate this client. - |
- - | -
-
- ¶
-
-namespace String A namespace for the topics. This will be added/removed transparent to the topics. So only topics within this namespace a relevant. - |
- 29
-30 namespace: null
-31
-32 constructor: ( options )->
-33 @connected = false
-34 super( options )
-35
-36 @publish = @_waitUntil( @_publish, "connected" )
-37
-38 if not @config.active
-39 @warning "nsq writer disabled"
-40 return
-41
-42 @fetchClientId()
-43 return
-44
-45 _initClient: =>
-46 if @client?
-47 return @client
-48
-49 @client = new nsq.Writer( @config.host, @config.port )
-50
-51 @client.on( nsq.Writer.READY, @onConnect )
-52 @client.on( nsq.Writer.CLOSED, @onDisconnect )
-53
-54 @client.on nsq.Writer.ERROR, ( err )=>
-55 @error "nsq error", err
-56 return
-57
-58 @debug "init writer client", @client
-59 return @client
-60
-61 _publish: ( topic, data, cb )=>
-62 if not @config.active
-63 @_handleError( cb, "ENSQOFF" )
-64 return
-65
-66 @debug "publish", topic, @nsAdd( topic )
-67 if _.isString()
-68 _data = data
-69 else
-70 _data = JSON.stringify(data)
-71
-72 @client.publish @nsAdd( topic ), _data, ( err )=>
-73 if err
-74 if cb?
-75 cb( err )
-76 else
-77 @error( "publish to topic `#{topic}`", err )
-78 return
-79 @debug "send data to `#{topic}`"
-80 cb( null ) if cb?
-81 return
-82
-83 return @
-84
-85 module.exports = NsqWriter
-86 |
-