Skip to content

Commit

Permalink
feat: support APM Server intake API version 2 (#465)
Browse files Browse the repository at this point in the history
Closes #356
  • Loading branch information
watson authored and Stephen Belanger committed Nov 8, 2018
1 parent 1d52685 commit a629cc0
Show file tree
Hide file tree
Showing 62 changed files with 1,365 additions and 2,135 deletions.
2 changes: 1 addition & 1 deletion docs/agent-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ Defaults to `unnamed`
You can alternatively set this via <<span-type,`span.type`>>.
Defaults to `custom.code`

When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.

See <<span-api,Span API>> docs for details on how to use custom spans.

Expand Down
19 changes: 1 addition & 18 deletions docs/span-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Defaults to `unnamed`
You can alternatively set this via <<span-type,`span.type`>>.
Defaults to `custom.code`

When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.

[[span-end]]
==== `span.end()`
Expand All @@ -86,20 +86,3 @@ span.end()
End the span.
If the span has already ended,
nothing happens.

A span that isn't ended before the parent transaction ends will be <<span-truncate,truncated>>.

[[span-truncate]]
==== `span.truncate()`

[source,js]
----
span.truncate()
----

Truncates and ends the span.
If the span is already ended or truncated,
nothing happens.

A truncated span is a special type of ended span.
It's used to indicate that the measured event took longer than the duration recorded by the span.
2 changes: 1 addition & 1 deletion docs/transaction-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Think of the transaction result as equivalent to the status code of an HTTP resp
transaction.end([result])
----

Ends the transaction and <<span-truncate,truncates>> all un-ended child spans.
Ends the transaction.
If the transaction has already ended,
nothing happens.

Expand Down
65 changes: 56 additions & 9 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ var connect = require('./middleware/connect')
var Filters = require('./filters')
var Instrumentation = require('./instrumentation')
var parsers = require('./parsers')
var request = require('./request')
var stackman = require('./stackman')
var symbols = require('./symbols')
var truncate = require('./truncate')

var IncomingMessage = http.IncomingMessage
var ServerResponse = http.ServerResponse
Expand All @@ -32,6 +32,7 @@ function Agent () {

this._instrumentation = new Instrumentation(this)
this._filters = new Filters()
this._apmServer = null

this._conf = null
this._httpClient = null
Expand All @@ -52,6 +53,10 @@ Object.defineProperty(Agent.prototype, 'currentTransaction', {
}
})

Agent.prototype.destroy = function () {
if (this._apmServer) this._apmServer.destroy()
}

Agent.prototype.startTransaction = function () {
return this._instrumentation.startTransaction.apply(this._instrumentation, arguments)
}
Expand Down Expand Up @@ -129,15 +134,35 @@ Agent.prototype.start = function (opts) {
})
}

this._instrumentation.start()
this._apmServer = new ElasticAPMHttpClient({
// metadata
agentName: 'nodejs',
agentVersion: version,
serviceName: this._conf.serviceName,
serviceVersion: this._conf.serviceVersion,
frameworkName: this._conf.frameworkName,
frameworkVersion: this._conf.frameworkVersion,
hostname: this._conf.hostname,

this._httpClient = new ElasticAPMHttpClient({
// Sanitize conf
truncateStringsAt: config.INTAKE_STRING_MAX_SIZE, // TODO: Do we need to set this (it's the default value)

// HTTP conf
secretToken: this._conf.secretToken,
userAgent: userAgent,
serverUrl: this._conf.serverUrl,
rejectUnauthorized: this._conf.verifyServerCert,
serverTimeout: this._conf.serverTimeout * 1000
serverTimeout: this._conf.serverTimeout * 1000,

// Streaming conf
size: this._conf.apiRequestSize,
time: this._conf.apiRequestTime * 1000
})
this._apmServer.on('error', err => {
this.logger.error('An error occrued while communicating with the APM Server:', err.message)
})

this._instrumentation.start()

Error.stackTraceLimit = this._conf.stackTraceLimit
if (this._conf.captureExceptions) this.handleUncaughtExceptions()
Expand Down Expand Up @@ -285,10 +310,27 @@ Agent.prototype.captureError = function (err, opts, cb) {
}

function send (error) {
agent.logger.info('logging error %s with Elastic APM', id)
request.errors(agent, [error], (err) => {
if (cb) cb(err, error.id)
})
error = agent._filters.process(error) // TODO: Update filter to expect this format

if (!error) {
agent.logger.debug('error ignored by filter %o', { id: id })
cb(null, id)
return
}

truncate.error(error, agent._conf)

if (agent._apmServer) {
agent.logger.info(`Sending error ${id} to Elastic APM`)
agent._apmServer.sendError(error, function () {
agent._apmServer.flush(function (err) {
cb(err, id)
})
})
} else {
// TODO: Swallow this error just as it's done in agent.flush()?
process.nextTick(cb.bind(null, new Error('cannot capture error before agent is started'), id))
}
}
}

Expand All @@ -314,7 +356,12 @@ Agent.prototype.handleUncaughtExceptions = function (cb) {
}

Agent.prototype.flush = function (cb) {
this._instrumentation.flush(cb)
if (this._apmServer) {
this._apmServer.flush(cb)
} else {
this.logger.warn(new Error('cannot flush agent before it is started'))
process.nextTick(cb)
}
}

Agent.prototype.lambda = function wrapLambda (type, fn) {
Expand Down
12 changes: 8 additions & 4 deletions lib/config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

var fs = require('fs')
var os = require('os')
var path = require('path')

var consoleLogLevel = require('console-log-level')
Expand Down Expand Up @@ -35,7 +34,8 @@ var DEFAULTS = {
verifyServerCert: true,
active: true,
logLevel: 'info',
hostname: os.hostname(),
apiRequestSize: 1024 * 1024, // TODO: Is this the right default
apiRequestTime: 10, // TODO: Is this the right default. Should this be ms?
stackTraceLimit: 50,
captureExceptions: true,
filterHttpHeaders: true,
Expand All @@ -51,10 +51,10 @@ var DEFAULTS = {
sourceLinesSpanAppFrames: 0,
sourceLinesSpanLibraryFrames: 0,
errorMessageMaxLength: 2048,
flushInterval: 10,
flushInterval: 10, // TODO: Deprecate
transactionMaxSpans: 500,
transactionSampleRate: 1.0,
maxQueueSize: 100,
maxQueueSize: 100, // TODO: Deprecate
serverTimeout: 30,
disableInstrumentations: []
}
Expand All @@ -68,6 +68,8 @@ var ENV_TABLE = {
active: 'ELASTIC_APM_ACTIVE',
logLevel: 'ELASTIC_APM_LOG_LEVEL',
hostname: 'ELASTIC_APM_HOSTNAME',
apiRequestSize: 'ELASTIC_APM_API_REQUEST_SIZE',
apiRequestTime: 'ELASTIC_APM_API_REQUEST_TIME',
frameworkName: 'ELASTIC_APM_FRAMEWORK_NAME',
frameworkVersion: 'ELASTIC_APM_FRAMEWORK_VERSION',
stackTraceLimit: 'ELASTIC_APM_STACK_TRACE_LIMIT',
Expand Down Expand Up @@ -105,6 +107,8 @@ var BOOL_OPTS = [
]

var NUM_OPTS = [
'apiRequestSize',
'apiRequestTime',
'stackTraceLimit',
'abortedErrorThreshold',
'flushInterval',
Expand Down
73 changes: 32 additions & 41 deletions lib/instrumentation/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
var fs = require('fs')
var path = require('path')

var AsyncValuePromise = require('async-value-promise')
var hook = require('require-in-the-middle')
var semver = require('semver')

var Queue = require('../queue')
var request = require('../request')
var Transaction = require('./transaction')
var truncate = require('../truncate')
var shimmer = require('./shimmer')

var MODULES = [
Expand Down Expand Up @@ -46,7 +44,6 @@ module.exports = Instrumentation

function Instrumentation (agent) {
this._agent = agent
this._queue = null
this._hook = null // this._hook is only exposed for testing purposes
this._started = false
this.currentTransaction = null
Expand All @@ -60,21 +57,6 @@ Instrumentation.prototype.start = function () {
var self = this
this._started = true

var qopts = {
flushInterval: this._agent._conf.flushInterval,
maxQueueSize: this._agent._conf.maxQueueSize,
logger: this._agent.logger
}
this._queue = new Queue(qopts, function onFlush (transactions, done) {
AsyncValuePromise.all(transactions).then(function (transactions) {
if (self._agent._conf.active && transactions.length > 0) {
request.transactions(self._agent, transactions, done)
} else {
done()
}
}, done)
})

if (this._agent._conf.asyncHooks && semver.gte(process.version, '8.2.0')) {
require('./async-hooks')(this)
} else {
Expand Down Expand Up @@ -111,27 +93,44 @@ Instrumentation.prototype._patchModule = function (exports, name, version, enabl
}

Instrumentation.prototype.addEndedTransaction = function (transaction) {
var agent = this._agent

if (this._started) {
var queue = this._queue
var payload = agent._filters.process(transaction._encode()) // TODO: Update filter to expect this format
if (!payload) return agent.logger.debug('transaction ignored by filter %o', { id: transaction.id })
truncate.transaction(payload)
agent.logger.debug('sending transaction %o', { id: transaction.id })
agent._apmServer.sendTransaction(payload)
} else {
agent.logger.debug('ignoring transaction %o', { id: transaction.id })
}
}

this._agent.logger.debug('adding transaction to queue %o', { id: transaction.id })
Instrumentation.prototype.addEndedSpan = function (span) {
var agent = this._agent

var payload = new AsyncValuePromise()
if (this._started) {
agent.logger.debug('encoding span %o', { trans: span.transaction.id, name: span.name, type: span.type })
span._encode(function (err, payload) {
if (err) {
agent.logger.error('error encoding span %o', { trans: span.transaction.id, name: span.name, type: span.type, error: err.message })
return
}

payload.catch(function (err) {
this._agent.logger.error('error encoding transaction %s: %s', transaction.id, err.message)
})
payload = agent._filters.process(payload) // TODO: Update filter to expect this format

// Add the transaction payload to the queue instead of the transation
// object it self to free up the transaction for garbage collection
transaction._encode(function (err, _payload) {
if (err) payload.reject(err)
else payload.resolve(_payload)
})
if (!payload) {
agent.logger.debug('span ignored by filter %o', { trans: span.transaction.id, name: span.name, type: span.type })
return
}

queue.add(payload)
truncate.span(payload)

agent.logger.debug('sending span %o', { trans: span.transaction.id, name: span.name, type: span.type })
if (agent._apmServer) agent._apmServer.sendSpan(payload)
})
} else {
this._agent.logger.debug('ignoring transaction %o', { id: transaction.id })
agent.logger.debug('ignoring span %o', { trans: span.transaction.id, name: span.name, type: span.type })
}
}

Expand Down Expand Up @@ -221,11 +220,3 @@ Instrumentation.prototype._recoverTransaction = function (trans) {

this.currentTransaction = trans
}

Instrumentation.prototype.flush = function (cb) {
if (this._queue) {
this._queue.flush(cb)
} else {
process.nextTick(cb)
}
}
21 changes: 5 additions & 16 deletions lib/instrumentation/span.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ module.exports = Span
function Span (transaction) {
this.transaction = transaction
this.started = false
this.truncated = false
this.ended = false
this.name = null
this.type = null
Expand Down Expand Up @@ -50,18 +49,6 @@ Span.prototype.customStackTrace = function (stackObj) {
this._recordStackTrace(stackObj)
}

Span.prototype.truncate = function () {
if (!this.started) {
this._agent.logger.debug('tried to truncate non-started span - ignoring %o', { id: this.transaction.id, name: this.name, type: this.type })
return
} else if (this.ended) {
this._agent.logger.debug('tried to truncate already ended span - ignoring %o', { id: this.transaction.id, name: this.name, type: this.type })
return
}
this.truncated = true
this.end()
}

Span.prototype.end = function () {
if (!this.started) {
this._agent.logger.debug('tried to call span.end() on un-started span %o', { id: this.transaction.id, name: this.name, type: this.type })
Expand All @@ -75,8 +62,8 @@ Span.prototype.end = function () {
this._agent._instrumentation._recoverTransaction(this.transaction)

this.ended = true
this._agent.logger.debug('ended span %o', { id: this.transaction.id, name: this.name, type: this.type, truncated: this.truncated })
this.transaction._recordEndedSpan(this)
this._agent.logger.debug('ended span %o', { id: this.transaction.id, name: this.name, type: this.type })
this._agent._instrumentation.addEndedSpan(this)
}

Span.prototype.duration = function () {
Expand Down Expand Up @@ -157,8 +144,10 @@ Span.prototype._encode = function (cb) {
}

var payload = {
transactionId: self.transaction.id,
timestamp: self.transaction.timestamp,
name: self.name,
type: self.truncated ? self.type + '.truncated' : self.type,
type: self.type,
start: self.offsetTime(),
duration: self.duration()
}
Expand Down
Loading

0 comments on commit a629cc0

Please sign in to comment.