Skip to content

Commit

Permalink
fix: Updated amqplib instrumentation to properly parse host/port from…
Browse files Browse the repository at this point in the history
… connect (#2461)
  • Loading branch information
bizob2828 authored Aug 14, 2024
1 parent 2b67623 commit 91636a8
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 94 deletions.
47 changes: 34 additions & 13 deletions lib/instrumentation/amqplib/amqplib.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ const {
OperationSpec,
params: { DatastoreParameters }
} = require('../../shim/specs')
const url = require('url')
const wrapModel = require('./channel-model')
const { setCallback } = require('./utils')
const { setCallback, parseConnectionArgs } = require('./utils')
const wrapChannel = require('./channel')
const { amqpConnection } = require('../../symbols')

module.exports.instrumentPromiseAPI = instrumentChannelAPI
module.exports.instrumentCallbackAPI = instrumentCallbackAPI
Expand Down Expand Up @@ -73,31 +73,52 @@ function instrumentAMQP(shim, amqp, promiseMode) {
/**
*
* Instruments the connect method
* We have to both wrap and record because
* we need the host/port for all subsequent calls on the model/channel
* but record only completes in an active transaction
*
* @param {Shim} shim instance of shim
* @param {object} amqp amqplib object
* @param {boolean} promiseMode is this promise based?
*/
function wrapConnect(shim, amqp, promiseMode) {
shim.record(amqp, 'connect', function recordConnect(shim, connect, name, args) {
let [connArgs] = args
const params = new DatastoreParameters()
shim.wrap(amqp, 'connect', function wrapConnect(shim, connect) {
return function wrappedConnect() {
const args = shim.argsToArray.apply(shim, arguments)
const [connArgs] = args
const params = parseConnectionArgs({ shim, connArgs })
const cb = args[args.length - 1]
if (!promiseMode) {
args[args.length - 1] = function wrappedCallback() {
const cbArgs = shim.argsToArray.apply(shim, arguments)
const [, c] = cbArgs
c.connection[amqpConnection] = params
return cb.apply(this, cbArgs)
}
}

if (shim.isString(connArgs)) {
connArgs = url.parse(connArgs)
params.host = connArgs.hostname
if (connArgs.port) {
params.port = connArgs.port
const result = connect.apply(this, args)
if (promiseMode) {
return result.then((c) => {
c.connection[amqpConnection] = params
return c
})
}
return result
}
})

shim.record(amqp, 'connect', function recordConnect(shim, connect, name, args) {
const [connArgs] = args
const params = parseConnectionArgs({ shim, connArgs })
return new OperationSpec({
name: 'amqplib.connect',
callback: setCallback(shim, promiseMode),
promise: promiseMode,
parameters: params,
stream: null,
recorder: null
parameters: new DatastoreParameters({
host: params.host,
port_path_or_id: params.port
})
})
})
}
13 changes: 4 additions & 9 deletions lib/instrumentation/amqplib/channel-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

'use strict'
const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs')
const { amqpConnection } = require('../../symbols')
const CHANNEL_METHODS = [
'close',
'open',
Expand All @@ -22,13 +23,7 @@ const CHANNEL_METHODS = [
'prefetch',
'recover'
]
const {
describeMessage,
setCallback,
parseConnect,
getParametersFromMessage,
TEMP_RE
} = require('./utils')
const { describeMessage, setCallback, getParametersFromMessage, TEMP_RE } = require('./utils')

/**
*
Expand Down Expand Up @@ -89,7 +84,7 @@ function recordPurge({ shim, proto, promiseMode }) {

function recordGet({ shim, proto, promiseMode }) {
shim.recordConsume(proto, 'get', function wrapGet() {
const { host, port } = parseConnect(this?.connection?.stream)
const { host, port } = this?.connection?.[amqpConnection] || {}
return new MessageSpec({
destinationName: shim.FIRST,
callback: setCallback(shim, promiseMode),
Expand All @@ -115,7 +110,7 @@ function recordGet({ shim, proto, promiseMode }) {

function recordConsume({ shim, proto, promiseMode }) {
shim.recordSubscribedConsume(proto, 'consume', function consume() {
const { host, port } = parseConnect(this?.connection?.stream)
const { host, port } = this?.connection?.[amqpConnection] || {}
return new MessageSubscribeSpec({
name: 'amqplib.Channel#consume',
queue: shim.FIRST,
Expand Down
41 changes: 22 additions & 19 deletions lib/instrumentation/amqplib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

'use strict'
const { MessageSpec } = require('../../shim/specs')
const { parseConnect, getParameters, TEMP_RE } = require('./utils')
const { amqpConnection } = require('../../symbols')
const { getParameters, TEMP_RE } = require('./utils')

/**
*
Expand Down Expand Up @@ -47,24 +48,26 @@ module.exports = function wrapChannel(shim) {
}
})

shim.recordProduce(proto, 'sendMessage', function recordSendMessage(shim, fn, n, args) {
const fields = args[0]
if (!fields) {
return null
}
const isDefault = fields.exchange === ''
let exchange = 'Default'
if (!isDefault) {
exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange
}
const { host, port } = parseConnect(this?.connection?.stream)
shim.recordProduce(proto, 'sendMessage', recordSendMessage)
}

function recordSendMessage(shim, fn, n, args) {
const fields = args[0]
if (!fields) {
return null
}
const isDefault = fields.exchange === ''
let exchange = 'Default'
if (!isDefault) {
exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange
}
const { host, port } = this?.connection?.[amqpConnection] || {}

return new MessageSpec({
destinationName: exchange,
destinationType: shim.EXCHANGE,
routingKey: fields.routingKey,
headers: fields.headers,
parameters: getParameters({ parameters: Object.create(null), fields, host, port })
})
return new MessageSpec({
destinationName: exchange,
destinationType: shim.EXCHANGE,
routingKey: fields.routingKey,
headers: fields.headers,
parameters: getParameters({ parameters: Object.create(null), fields, host, port })
})
}
44 changes: 23 additions & 21 deletions lib/instrumentation/amqplib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const {
MessageSpec,
params: { QueueMessageParameters }
} = require('../../shim/specs')
const { amqpConnection } = require('../../symbols')
const TEMP_RE = /^amq\./

/**
Expand Down Expand Up @@ -100,25 +99,6 @@ function getParametersFromMessage({ message, host, port }) {
return parameters
}

/**
* Extracts the host/port from the amqp socket connection.
* Stores on connection as symbol to only parse once.
*
* @param {Socket} socket amqp connection
* @returns {object} {host, port } of connection
*/
function parseConnect(socket) {
if (socket[amqpConnection]) {
return socket[amqpConnection]
}
const host = ['127.0.0.1', '::1', '[::1]'].includes(socket?.remoteAddress)
? 'localhost'
: socket?.remoteAddress
const port = socket?.remotePort
socket[amqpConnection] = { host, port }
return { host, port }
}

/**
* Helper to set the appropriate value of the callback property
* in the spec. If it's a promise set to null otherwise set it to `shim.LAST`
Expand All @@ -131,11 +111,33 @@ function setCallback(shim, promiseMode) {
return promiseMode ? null : shim.LAST
}

/**
* Parses the connection args to return host/port
*
* @param {string|object} connArgs connection arguments
* @returns {object} {host, port }
*/
function parseConnectionArgs({ shim, connArgs }) {
const params = {}
if (shim.isString(connArgs)) {
connArgs = new URL(connArgs)
params.host = connArgs.hostname
if (connArgs.port) {
params.port = parseInt(connArgs.port, 10)
}
} else {
params.port = connArgs.port || (connArgs.protocol === 'amqp' ? 5672 : 5671)
params.host = connArgs.hostname
}

return params
}

module.exports = {
describeMessage,
getParameters,
getParametersFromMessage,
parseConnect,
parseConnectionArgs,
setCallback,
TEMP_RE
}
59 changes: 59 additions & 0 deletions test/unit/instrumentation/amqplib/utils.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'
const test = require('node:test')
const assert = require('node:assert')
const { parseConnectionArgs } = require('../../../../lib/instrumentation/amqplib/utils')

test('should parse host port if connection args is a string', () => {
const stub = {
isString() {
return true
}
}
const params = parseConnectionArgs({ shim: stub, connArgs: 'amqp://host:5388/' })
assert.equal(params.host, 'host')
assert.equal(params.port, 5388)
})

test('should parse host port if connection is an object', () => {
const stub = {
isString() {
return false
}
}
const params = parseConnectionArgs({ shim: stub, connArgs: { hostname: 'host', port: 5388 } })
assert.equal(params.host, 'host')
assert.equal(params.port, 5388)
})

test('should default port to 5672 if protocol is amqp:', () => {
const stub = {
isString() {
return false
}
}
const params = parseConnectionArgs({
shim: stub,
connArgs: { hostname: 'host', protocol: 'amqp' }
})
assert.equal(params.host, 'host')
assert.equal(params.port, 5672)
})

test('should default port to 5671 if protocol is amqps:', () => {
const stub = {
isString() {
return false
}
}
const params = parseConnectionArgs({
shim: stub,
connArgs: { hostname: 'host', protocol: 'amqps' }
})
assert.equal(params.host, 'host')
assert.equal(params.port, 5671)
})
25 changes: 10 additions & 15 deletions test/versioned/amqplib/callback.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,16 @@ tap.test('amqplib callback instrumentation', function (t) {
})

t.test('connect in a transaction', function (t) {
helper.runInTransaction(agent, function () {
t.doesNotThrow(function () {
amqplib.connect(amqpUtils.CON_STRING, null, function (err, _conn) {
t.error(err, 'should not break connection')
if (!t.passing()) {
t.bailout('Can not connect to RabbitMQ, stopping tests.')
}
_conn.close(t.end)
})
}, 'should not error when connecting')

// If connect threw, we need to end the test immediately.
if (!t.passing()) {
t.end()
}
helper.runInTransaction(agent, function (tx) {
amqplib.connect(amqpUtils.CON_STRING, null, function (err, _conn) {
t.error(err, 'should not break connection')
const [segment] = tx.trace.root.children
t.equal(segment.name, 'amqplib.connect')
const attrs = segment.getAttributes()
t.equal(attrs.host, 'localhost')
t.equal(attrs.port_path_or_id, 5672)
_conn.close(t.end)
})
})
})

Expand Down
31 changes: 14 additions & 17 deletions test/versioned/amqplib/promises.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,20 @@ tap.test('amqplib promise instrumentation', function (t) {
})

t.test('connect in a transaction', function (t) {
helper.runInTransaction(agent, function () {
t.doesNotThrow(function () {
amqplib.connect(amqpUtils.CON_STRING).then(
function (_conn) {
_conn.close().then(t.end)
},
function (err) {
t.error(err, 'should not break connection')
t.bailout('Can not connect to RabbitMQ, stopping tests.')
}
)
}, 'should not error when connecting')

// If connect threw, we need to end the test immediately.
if (!t.passing()) {
t.end()
}
helper.runInTransaction(agent, function (tx) {
amqplib.connect(amqpUtils.CON_STRING).then(
function (_conn) {
const [segment] = tx.trace.root.children
t.equal(segment.name, 'amqplib.connect')
const attrs = segment.getAttributes()
t.equal(attrs.host, 'localhost')
t.equal(attrs.port_path_or_id, 5672)
_conn.close().then(t.end)
},
function (err) {
t.error(err, 'should not break connection')
}
)
})
})

Expand Down

0 comments on commit 91636a8

Please sign in to comment.