From 75b5770ad281378cb57ba5e0f5a21702a51582a9 Mon Sep 17 00:00:00 2001 From: Lorenzo Aiello Date: Tue, 24 Mar 2020 03:22:04 -0700 Subject: [PATCH] reformatted backend for ES 7 Signed-off-by: Lorenzo Aiello --- lib/default_format.js | 76 +++++------ lib/elasticsearch.js | 310 ++++++++++++++++++++---------------------- lib/regex_format.js | 58 -------- 3 files changed, 185 insertions(+), 259 deletions(-) delete mode 100644 lib/regex_format.js diff --git a/lib/default_format.js b/lib/default_format.js index a3206bc..267a12a 100644 --- a/lib/default_format.js +++ b/lib/default_format.js @@ -1,49 +1,49 @@ const counters = function (key, value, ts, bucket) { - if (value) { - bucket.push({ - "name": key, - "val":value, - "@timestamp": ts - }); - return 1; - } + if (value) { + bucket.push({ + "name": key, + "val": value, + "@timestamp": ts + }); + return 1; + } - return 0; -} + return 0; +}; const timers = function (key, series, ts, bucket) { - let counter = 0; - for (keyTimer in series) { - if (series[keyTimer]) { - bucket.push({ - "name": key, - "val":series[keyTimer], - "@timestamp": ts - }); - counter ++; + let counter = 0; + for (let keyTimer in series) { + if (series[keyTimer]) { + bucket.push({ + "name": key, + "val": series[keyTimer], + "@timestamp": ts + }); + counter++; + } } - } - return counter; -} + return counter; +}; const timer_data = function (key, value, ts, bucket) { - value["@timestamp"] = ts; - value["name"] = key; - let shouldPush = false; - if (value['histogram']) { - for (var keyH in value['histogram']) { - shouldPush = shouldPush || !!value['histogram'][keyH]; - value[keyH] = value['histogram'][keyH]; + value["@timestamp"] = ts; + value["name"] = key; + let shouldPush = false; + if (value['histogram']) { + for (var keyH in value['histogram']) { + shouldPush = shouldPush || !!value['histogram'][keyH]; + value[keyH] = value['histogram'][keyH]; + } + delete value['histogram']; } - delete value['histogram']; - } - if (shouldPush) { - bucket.push(value); - } -} + if (shouldPush) { + bucket.push(value); + } +}; -exports.counters = counters; -exports.timers = timers; +exports.counters = counters; +exports.timers = timers; exports.timer_data = timer_data; -exports.gauges = counters; +exports.gauges = counters; \ No newline at end of file diff --git a/lib/elasticsearch.js b/lib/elasticsearch.js index f374a4f..88d6e00 100644 --- a/lib/elasticsearch.js +++ b/lib/elasticsearch.js @@ -4,11 +4,9 @@ * To enable this backend, include 'elastic' in the backends * configuration array: * - * backends: ['./backends/elastic'] + * backends: ['statsd-elasticsearch-backend'] * (if the config file is in the statsd folder) * - * A sample configuration can be found in exampleElasticConfig.js - * * This backend supports the following config options: * * host: hostname or IP of ElasticSearch server @@ -16,155 +14,142 @@ * path: http path of Elastic Search Server (default: '/') * indexPrefix: Prefix of the dynamic index to be created (default: 'statsd') * indexTimestamp: Timestamping format of the index, either "year", "month", "day", or "hour" - * indexType: The dociment type of the saved stat (default: 'stat') */ -var net = require('net'), - util = require('util'), - http = require('http'), - https = require('https'), - fs = require('fs'), - path = require('path'); +const http = require('http'), + https = require('https'), + fs = require('fs'); // this will be instantiated to the logger -var lg; -var debug; -var flushInterval; -var elasticHost; -var elasticPort; -var elasticPath; -var elasticIndex; -var elasticIndexTimestamp; -var elasticType; -var elasticCountType; -var elasticTimerType; -var elasticUsername; -var elasticPassword; -var elasticHttps; -var elasticCertCa; - -var elasticStats = {}; - -function transform(stats, index, statsdType, esType) { - const payload = []; - - for (let i = 0; i < stats.length; i++) { - payload.push(JSON.stringify({ - index: { - "_index": index, - "_type": esType - } - })); - stats[i].type = statsdType; - payload.push(JSON.stringify(stats[i])); - } +let lg; +let debug; +let flushInterval; +let elasticHost; +let elasticPort; +let elasticPath; +let elasticIndex; +let elasticIndexTimestamp; +let elasticCountType; +let elasticTimerType; +let elasticUsername; +let elasticPassword; +let elasticHttps; +let elasticCertCa; + +let elasticStats = {}; + +function transform(stats, index, statsdType) { + const payload = []; + + for (let i = 0; i < stats.length; i++) { + payload.push(JSON.stringify({ + index: { + "_index": index + } + })); + stats[i].type = statsdType; + payload.push(JSON.stringify(stats[i])); + } - return payload; + return payload; } function insert(listCounters, listTimers, listTimerData, listGaugeData) { - const indexDate = new Date(); - let statsdIndex = elasticIndex + '-' + indexDate.getUTCFullYear(); - - if (elasticIndexTimestamp === 'month' || elasticIndexTimestamp === 'day' || elasticIndexTimestamp === 'hour'){ - let indexMo = indexDate.getUTCMonth() +1; - if (indexMo < 10) { - indexMo = '0'+indexMo; + const indexDate = new Date(); + let statsdIndex = elasticIndex + '-' + indexDate.getUTCFullYear(); + + if (elasticIndexTimestamp === 'month' || elasticIndexTimestamp === 'day' || elasticIndexTimestamp === 'hour') { + let indexMo = indexDate.getUTCMonth() + 1; + if (indexMo < 10) { + indexMo = '0' + indexMo; + } + statsdIndex += '.' + indexMo; } - statsdIndex += '.' + indexMo; - } - if (elasticIndexTimestamp === 'day' || elasticIndexTimestamp === 'hour'){ - let indexDt = indexDate.getUTCDate(); - if (indexDt < 10) { - indexDt = '0'+indexDt; + if (elasticIndexTimestamp === 'day' || elasticIndexTimestamp === 'hour') { + let indexDt = indexDate.getUTCDate(); + if (indexDt < 10) { + indexDt = '0' + indexDt; + } + statsdIndex += '.' + indexDt; } - statsdIndex += '.' + indexDt; - } - if (elasticIndexTimestamp === 'hour'){ - let indexDt = indexDate.getUTCHours(); - if (indexDt < 10) { - indexDt = '0' + indexDt; + if (elasticIndexTimestamp === 'hour') { + let indexDt = indexDate.getUTCHours(); + if (indexDt < 10) { + indexDt = '0' + indexDt; + } + statsdIndex += '.' + indexDt; } - statsdIndex += '.' + indexDt; - } - - let payload = transform(listCounters, statsdIndex, elasticCountType, elasticType); - payload = payload.concat(transform(listTimers, statsdIndex, elasticTimerType, elasticType)); - payload = payload.concat(transform(listTimerData, statsdIndex, elasticTimerType, elasticType)); - payload = payload.concat(transform(listGaugeData, statsdIndex, elasticGaugeDataType, elasticType)); - - if (payload.length === 0) { - // No work to do - return; - } - - payload = payload.join("\n") + "\n"; + let payload = transform(listCounters, statsdIndex, elasticCountType); + payload = payload.concat(transform(listTimers, statsdIndex, elasticTimerType)); + payload = payload.concat(transform(listTimerData, statsdIndex, elasticTimerType)); + payload = payload.concat(transform(listGaugeData, statsdIndex, elasticGaugeDataType)); - const optionsPost = { - host: elasticHost, - port: elasticPort, - path: `${elasticPath}${statsdIndex}/_bulk`, - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Content-Length': payload.length + if (payload.length === 0) { + // No work to do + return; } - }; - if (elasticUsername && elasticPassword) { - optionsPost.auth = elasticUsername + ':' + elasticPassword; - } + payload = payload.join("\n") + "\n"; + + const optionsPost = { + host: elasticHost, + port: elasticPort, + path: `${elasticPath}${statsdIndex}/_bulk`, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': payload.length + } + }; + + if (elasticUsername && elasticPassword) { + optionsPost.auth = elasticUsername + ':' + elasticPassword; + } - let httpClient = http; - if (elasticHttps) { - httpClient = https; + let httpClient = http; + if (elasticHttps) { + httpClient = https; - if (elasticCertCa) { - var ca = fs.readFileSync(elasticCertCa); - optionsPost.ca = ca; - optionsPost.agent = new https.Agent(optionsPost); + if (elasticCertCa) { + optionsPost.ca = fs.readFileSync(elasticCertCa); + optionsPost.agent = new https.Agent(optionsPost); + } } - } - const req = httpClient.request(optionsPost, function(res) { - res.on('data', (d) => { - lg.log(`ES responded with ${res.statusCode}`); - if (res.statusCode >= 400) { - var errdata = "HTTP " + res.statusCode + ": " + d; - lg.log('error', errdata); - } + const req = httpClient.request(optionsPost, function (res) { + res.on('data', (d) => { + lg.log(`ES responded with ${res.statusCode}`); + if (res.statusCode >= 400) { + let errdata = "HTTP " + res.statusCode + ": " + d; + lg.log('error', errdata); + } + }); + }).on('error', function (err) { + lg.log('error', 'Error with HTTP request, no stats flushed.'); + console.log(err); }); - }).on('error', function(err) { - lg.log('error', 'Error with HTTP request, no stats flushed.'); - console.log(err); - }); - if (debug) { - lg.log('ES payload:'); - lg.log(payload); - } + if (debug) { + lg.log('ES payload:'); + lg.log(payload); + } - req.write(payload); - req.end(); + req.write(payload); + req.end(); } -var flush_stats = function elastic_flush(ts, metrics) { - var statString = ''; - var numStats = 0; - var key; - var array_counts = new Array(); - var array_timers = new Array(); - var array_timer_data = new Array(); - var array_gauges = new Array(); +const flush_stats = function elastic_flush(ts, metrics) { + let numStats = 0; + let key; + let array_counts = new Array(); + let array_timers = new Array(); + let array_timer_data = new Array(); + let array_gauges = new Array(); ts = ts * 1000; -/* - var gauges = metrics.gauges; - var pctThreshold = metrics.pctThreshold; -*/ for (key in metrics.counters) { numStats += fm.counters(key, metrics.counters[key], ts, array_counts); @@ -186,7 +171,7 @@ var flush_stats = function elastic_flush(ts, metrics) { if (debug) { lg.log('metrics:'); - lg.log( JSON.stringify(metrics) ); + lg.log(JSON.stringify(metrics)); } insert(array_counts, array_timers, array_timer_data, array_gauges); @@ -196,51 +181,50 @@ var flush_stats = function elastic_flush(ts, metrics) { } }; -var elastic_backend_status = function (writeCb) { - for (stat in elasticStats) { +const elastic_backend_status = function (writeCb) { + for (let stat in elasticStats) { writeCb(null, 'elastic', stat, elasticStats[stat]); } }; -exports.init = function(startup_time, config, events, logger) { - - debug = config.debug; - lg = logger; - - var configEs = config.elasticsearch || { }; - - elasticHost = configEs.host || 'localhost'; - elasticPort = configEs.port || 9200; - elasticPath = configEs.path || '/'; - elasticIndex = configEs.indexPrefix || 'statsd'; - elasticIndexTimestamp = configEs.indexTimestamp || 'day'; - elasticType = configEs.indexType || 'stats'; - elasticCountType = configEs.countType || 'counter'; - elasticTimerType = configEs.timerType || 'timer'; - elasticTimerDataType = configEs.timerDataType || elasticTimerType + '_stats'; - elasticGaugeDataType = configEs.gaugeDataType || 'gauge'; - elasticFormatter = configEs.formatter || 'default_format'; - elasticUsername = configEs.username || undefined; - elasticPassword = configEs.password || undefined; - elasticHttps = configEs.secure || false; - elasticCertCa = configEs.ca || undefined; - - fm = require('./' + elasticFormatter + '.js') - if (debug) { - lg.log("debug", "loaded formatter " + elasticFormatter); - } +exports.init = function (startup_time, config, events, logger) { + + debug = config.debug; + lg = logger; + + let configEs = config.elasticsearch || {}; + + elasticHost = configEs.host || 'localhost'; + elasticPort = configEs.port || 9200; + elasticPath = configEs.path || '/'; + elasticIndex = configEs.indexPrefix || 'statsd'; + elasticIndexTimestamp = configEs.indexTimestamp || 'day'; + elasticCountType = configEs.countType || 'counter'; + elasticTimerType = configEs.timerType || 'timer'; + elasticTimerDataType = configEs.timerDataType || elasticTimerType + '_stats'; + elasticGaugeDataType = configEs.gaugeDataType || 'gauge'; + elasticFormatter = configEs.formatter || 'default_format'; + elasticUsername = configEs.username || undefined; + elasticPassword = configEs.password || undefined; + elasticHttps = configEs.secure || false; + elasticCertCa = configEs.ca || undefined; + + fm = require('./' + elasticFormatter + '.js'); + if (debug) { + lg.log("debug", "loaded formatter " + elasticFormatter); + } - if (fm.init) { - fm.init(configEs); - } - flushInterval = config.flushInterval; + if (fm.init) { + fm.init(configEs); + } + flushInterval = config.flushInterval; - elasticStats.last_flush = startup_time; - elasticStats.last_exception = startup_time; + elasticStats.last_flush = startup_time; + elasticStats.last_exception = startup_time; - events.on('flush', flush_stats); - events.on('status', elastic_backend_status); + events.on('flush', flush_stats); + events.on('status', elastic_backend_status); - return true; + return true; }; diff --git a/lib/regex_format.js b/lib/regex_format.js deleted file mode 100644 index 171fae5..0000000 --- a/lib/regex_format.js +++ /dev/null @@ -1,58 +0,0 @@ -/* - -Remember that setting a new regix in the config for statsd uses -a native regular expression data type, not a string! -*/ -var named = require('named-regexp').named - , extend = require('util')._extend; - -var keyRegex; - -var init = function(config) { - keyRegex = named(config.keyRegex || /^(:[^.]+)\.(:[^.]+)\.(:[^.]+)(?:\.(:[^.]+))?/); -} - -var counters = function (key, value, ts, bucket) { - var matched = keyRegex.exec(key); - if (matched === null) return 0; - - bucket.push(extend(matched.captures, { - "val": value, - "@timestamp": ts - })); - return 1; -} - -var timers = function (key, series, ts, bucket) { - var matched = keyRegex.exec(key); - if (matched === null) return 0; - for (keyTimer in series) { - bucket.push(extend({ - "val": series[keyTimer], - "@timestamp": ts - }, matched.captures)); - } - return series.length; -} - -var timer_data = function (key, value, ts, bucket) { - var matched = keyRegex.exec(key); - if (matched === null) return; - - var value = extend(value, matched.captures); - - value["@timestamp"] = ts; - if (value['histogram']) { - for (var keyH in value['histogram']) { - value[keyH] = value['histogram'][keyH]; - } - delete value['histogram']; - } - bucket.push(value); -} - -exports.counters = counters; -exports.gauges = counters; -exports.timers = timers; -exports.timer_data = timer_data; -exports.init = init;