diff --git a/test/versioned/amqplib/amqp-utils.js b/test/versioned/amqplib/amqp-utils.js index f05f2e988f..b2bf0f66c0 100644 --- a/test/versioned/amqplib/amqp-utils.js +++ b/test/versioned/amqplib/amqp-utils.js @@ -4,17 +4,13 @@ */ 'use strict' - -const semver = require('semver') - +const assert = require('node:assert') const DESTINATIONS = require('../../../lib/config/attribute-filter').DESTINATIONS const params = require('../../lib/params') const metrics = require('../../lib/metrics_helper') +const { assertMetrics, assertSegments } = require('./../../lib/custom-assertions') const CON_STRING = 'amqp://' + params.rabbitmq_host + ':' + params.rabbitmq_port -const { version: pkgVersion } = require('amqplib/package') -const NATIVE_PROMISES = semver.gte(pkgVersion, '0.10.0') - exports.CON_STRING = CON_STRING exports.DIRECT_EXCHANGE = 'test-direct-exchange' exports.FANOUT_EXCHANGE = 'test-fanout-exchange' @@ -30,99 +26,95 @@ exports.verifySendToQueue = verifySendToQueue exports.verifyTransaction = verifyTransaction exports.getChannel = getChannel -function verifySubscribe(t, tx, exchange, routingKey) { +function verifySubscribe(tx, exchange, routingKey) { const isCallback = !!metrics.findSegment(tx.trace.root, 'Callback: ') let segments = [] if (isCallback) { segments = [ - 'amqplib.Channel#consume', ['Callback: ', ['MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchange]] ] - } else if (NATIVE_PROMISES) { - segments = [ - 'amqplib.Channel#consume', - 'MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchange - ] } else { - segments = [ - 'amqplib.Channel#consume', - ['MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchange] - ] + segments = ['MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchange] } - t.assertSegments(tx.trace.root, segments) + assertSegments(tx.trace.root, segments) - t.assertMetrics( + assertMetrics( tx.metrics, [[{ name: 'MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchange }]], false, false ) - t.notMatch(tx.getFullName(), /^OtherTransaction\/Message/, 'should not set transaction name') + assert.equal(tx.getFullName(), null, 'should not set transaction name') const consume = metrics.findSegment( tx.trace.root, 'MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchange ) - t.equal(consume.getAttributes().routing_key, routingKey, 'should store routing key') + assert.equal(consume.getAttributes().routing_key, routingKey, 'should store routing key') } -function verifyCAT(t, produceTransaction, consumeTransaction) { - t.equal( +function verifyCAT(produceTransaction, consumeTransaction) { + assert.equal( consumeTransaction.incomingCatId, produceTransaction.agent.config.cross_process_id, 'should have the proper incoming CAT id' ) - t.equal( + assert.equal( consumeTransaction.referringTransactionGuid, produceTransaction.id, 'should have the the correct referring transaction guid' ) - t.equal(consumeTransaction.tripId, produceTransaction.id, 'should have the the correct trip id') - t.notOk( - consumeTransaction.invalidIncomingExternalTransaction, + assert.equal( + consumeTransaction.tripId, + produceTransaction.id, + 'should have the the correct trip id' + ) + assert.ok( + !consumeTransaction.invalidIncomingExternalTransaction, 'invalid incoming external transaction should be false' ) } -function verifyDistributedTrace(t, produceTransaction, consumeTransaction) { - t.ok(produceTransaction.isDistributedTrace, 'should mark producer as distributed') - t.ok(consumeTransaction.isDistributedTrace, 'should mark consumer as distributed') - - t.equal(consumeTransaction.incomingCatId, null, 'should not set old CAT properties') - - t.equal(produceTransaction.id, consumeTransaction.parentId, 'should have proper parent id') - t.equal(produceTransaction.traceId, consumeTransaction.traceId, 'should have proper trace id') - // native promises flatten the segment tree, grab the product segment as 2nd child of root - let produceSegment = - NATIVE_PROMISES && produceTransaction.trace.root.children.length > 1 - ? produceTransaction.trace.root.children[1] - : produceTransaction.trace.root.children[0].children[0] - produceSegment = produceSegment.children[0] || produceSegment - t.equal(produceSegment.id, consumeTransaction.parentSpanId, 'should have proper parentSpanId') - t.equal(consumeTransaction.parentTransportType, 'AMQP', 'should have correct transport type') +function verifyDistributedTrace(produceTransaction, consumeTransaction) { + assert.ok(produceTransaction.isDistributedTrace, 'should mark producer as distributed') + assert.ok(consumeTransaction.isDistributedTrace, 'should mark consumer as distributed') + + assert.equal(consumeTransaction.incomingCatId, null, 'should not set old CAT properties') + + assert.equal(produceTransaction.id, consumeTransaction.parentId, 'should have proper parent id') + assert.equal( + produceTransaction.traceId, + consumeTransaction.traceId, + 'should have proper trace id' + ) + const produceSegment = produceTransaction.trace.root.children[0] + assert.equal( + produceSegment.id, + consumeTransaction.parentSpanId, + 'should have proper parentSpanId' + ) + assert.equal(consumeTransaction.parentTransportType, 'AMQP', 'should have correct transport type') } -function verifyConsumeTransaction(t, tx, exchange, queue, routingKey) { - t.doesNotThrow(function () { - t.assertMetrics( - tx.metrics, - [ - [{ name: 'OtherTransaction/Message/RabbitMQ/Exchange/Named/' + exchange }], - [{ name: 'OtherTransactionTotalTime/Message/RabbitMQ/Exchange/Named/' + exchange }], - [{ name: 'OtherTransaction/Message/all' }], - [{ name: 'OtherTransaction/all' }], - [{ name: 'OtherTransactionTotalTime' }] - ], - false, - false - ) - }, 'should have expected metrics') - - t.equal( +function verifyConsumeTransaction(tx, exchange, queue, routingKey) { + assertMetrics( + tx.metrics, + [ + [{ name: 'OtherTransaction/Message/RabbitMQ/Exchange/Named/' + exchange }], + [{ name: 'OtherTransactionTotalTime/Message/RabbitMQ/Exchange/Named/' + exchange }], + [{ name: 'OtherTransaction/Message/all' }], + [{ name: 'OtherTransaction/all' }], + [{ name: 'OtherTransactionTotalTime' }] + ], + false, + false + ) + + assert.equal( tx.getFullName(), 'OtherTransaction/Message/RabbitMQ/Exchange/Named/' + exchange, 'should not set transaction name' @@ -132,24 +124,28 @@ function verifyConsumeTransaction(t, tx, exchange, queue, routingKey) { tx.trace.root, 'OtherTransaction/Message/RabbitMQ/Exchange/Named/' + exchange ) - t.equal(consume, tx.baseSegment) + assert.equal(consume, tx.baseSegment) const segmentAttrs = consume.getAttributes() - t.equal(segmentAttrs.host, params.rabbitmq_host, 'should have host on segment') - t.equal(segmentAttrs.port, params.rabbitmq_port, 'should have port on segment') + assert.equal(segmentAttrs.host, params.rabbitmq_host, 'should have host on segment') + assert.equal(segmentAttrs.port, params.rabbitmq_port, 'should have port on segment') const attributes = tx.trace.attributes.get(DESTINATIONS.TRANS_TRACE) - t.equal( + assert.equal( attributes['message.routingKey'], routingKey, 'should have routing key transaction parameter' ) - t.equal(attributes['message.queueName'], queue, 'should have queue name transaction parameter') + assert.equal( + attributes['message.queueName'], + queue, + 'should have queue name transaction parameter' + ) } -function verifySendToQueue(t, tx) { - t.assertSegments(tx.trace.root, ['MessageBroker/RabbitMQ/Exchange/Produce/Named/Default']) +function verifySendToQueue(tx) { + assertSegments(tx.trace.root, ['MessageBroker/RabbitMQ/Exchange/Produce/Named/Default']) - t.assertMetrics( + assertMetrics( tx.metrics, [[{ name: 'MessageBroker/RabbitMQ/Exchange/Produce/Named/Default' }]], false, @@ -161,14 +157,14 @@ function verifySendToQueue(t, tx) { 'MessageBroker/RabbitMQ/Exchange/Produce/Named/Default' ) const attributes = segment.getAttributes() - t.equal(attributes.host, params.rabbitmq_host, 'should have host on segment') - t.equal(attributes.port, params.rabbitmq_port, 'should have port on segment') - t.equal(attributes.routing_key, 'testQueue', 'should store routing key') - t.equal(attributes.reply_to, 'my.reply.queue', 'should store reply to') - t.equal(attributes.correlation_id, 'correlation-id', 'should store correlation id') + assert.equal(attributes.host, params.rabbitmq_host, 'should have host on segment') + assert.equal(attributes.port, params.rabbitmq_port, 'should have port on segment') + assert.equal(attributes.routing_key, 'testQueue', 'should store routing key') + assert.equal(attributes.reply_to, 'my.reply.queue', 'should store reply to') + assert.equal(attributes.correlation_id, 'correlation-id', 'should store correlation id') } -function verifyProduce(t, tx, exchangeName, routingKey) { +function verifyProduce(tx, exchangeName, routingKey) { const isCallback = !!metrics.findSegment(tx.trace.root, 'Callback: ') let segments = [] @@ -194,26 +190,18 @@ function verifyProduce(t, tx, exchangeName, routingKey) { ] // 0.9.0 flattened the segment tree // See: https://github.com/amqp-node/amqplib/pull/635/files - } else if (semver.gte(pkgVersion, '0.9.0')) { + } else { segments = [ 'Channel#assertExchange', 'Channel#assertQueue', 'Channel#bindQueue', 'MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchangeName ] - } else { - segments = [ - 'Channel#assertExchange', - [ - 'Channel#assertQueue', - ['Channel#bindQueue', ['MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchangeName]] - ] - ] } - t.assertSegments(tx.trace.root, segments, 'should have expected segments') + assertSegments(tx.trace.root, segments, 'should have expected segments') - t.assertMetrics( + assertMetrics( tx.metrics, [[{ name: 'MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchangeName }]], false, @@ -226,35 +214,35 @@ function verifyProduce(t, tx, exchangeName, routingKey) { ) const attributes = segment.getAttributes() if (routingKey) { - t.equal(attributes.routing_key, routingKey, 'should have routing key') + assert.equal(attributes.routing_key, routingKey, 'should have routing key') } else { - t.notOk(attributes.routing_key, 'should not have routing key') + assert.ok(!attributes.routing_key, 'should not have routing key') } - t.equal(attributes.host, params.rabbitmq_host, 'should have host on segment') - t.equal(attributes.port, params.rabbitmq_port, 'should have port on segment') + assert.equal(attributes.host, params.rabbitmq_host, 'should have host on segment') + assert.equal(attributes.port, params.rabbitmq_port, 'should have port on segment') } -function verifyGet({ t, tx, exchangeName, routingKey, queue, assertAttr }) { +function verifyGet({ tx, exchangeName, routingKey, queue, assertAttr }) { const isCallback = !!metrics.findSegment(tx.trace.root, 'Callback: ') const produceName = 'MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchangeName const consumeName = 'MessageBroker/RabbitMQ/Exchange/Consume/Named/' + queue if (isCallback) { - t.assertSegments(tx.trace.root, [produceName, consumeName, ['Callback: ']]) + assertSegments(tx.trace.root, [produceName, consumeName, ['Callback: ']]) } else { - t.assertSegments(tx.trace.root, [produceName, consumeName]) + assertSegments(tx.trace.root, [produceName, consumeName]) } - t.assertMetrics(tx.metrics, [[{ name: produceName }], [{ name: consumeName }]], false, false) + assertMetrics(tx.metrics, [[{ name: produceName }], [{ name: consumeName }]], false, false) if (assertAttr) { const segment = metrics.findSegment(tx.trace.root, consumeName) const attributes = segment.getAttributes() - t.equal(attributes.host, params.rabbitmq_host, 'should have host on segment') - t.equal(attributes.port, params.rabbitmq_port, 'should have port on segment') - t.equal(attributes.routing_key, routingKey, 'should have routing key on get') + assert.equal(attributes.host, params.rabbitmq_host, 'should have host on segment') + assert.equal(attributes.port, params.rabbitmq_port, 'should have port on segment') + assert.equal(attributes.routing_key, routingKey, 'should have routing key on get') } } -function verifyPurge(t, tx) { +function verifyPurge(tx) { const isCallback = !!metrics.findSegment(tx.trace.root, 'Callback: ') let segments = [] @@ -278,31 +266,23 @@ function verifyPurge(t, tx) { ] ] ] - // 0.9.0 flattened the segment tree - // See: https://github.com/amqp-node/amqplib/pull/635/files - } else if (semver.gte(pkgVersion, '0.9.0')) { + } else { segments = [ 'Channel#assertExchange', 'Channel#assertQueue', 'Channel#bindQueue', 'MessageBroker/RabbitMQ/Queue/Purge/Temp' ] - } else { - segments = [ - 'Channel#assertExchange', - ['Channel#assertQueue', ['Channel#bindQueue', ['MessageBroker/RabbitMQ/Queue/Purge/Temp']]] - ] } + assertSegments(tx.trace.root, segments, 'should have expected segments') - t.assertSegments(tx.trace.root, segments, 'should have expected segments') - - t.assertMetrics(tx.metrics, [[{ name: 'MessageBroker/RabbitMQ/Queue/Purge/Temp' }]], false, false) + assertMetrics(tx.metrics, [[{ name: 'MessageBroker/RabbitMQ/Queue/Purge/Temp' }]], false, false) } -function verifyTransaction(t, tx, msg) { +function verifyTransaction(tx, msg) { const seg = tx.agent.tracer.getSegment() - if (t.ok(seg, 'should have transaction state in ' + msg)) { - t.equal(seg.transaction.id, tx.id, 'should have correct transaction in ' + msg) + if (seg) { + assert.equal(seg.transaction.id, tx.id, 'should have correct transaction in ' + msg) } } diff --git a/test/versioned/amqplib/callback.tap.js b/test/versioned/amqplib/callback.test.js similarity index 51% rename from test/versioned/amqplib/callback.tap.js rename to test/versioned/amqplib/callback.test.js index 28b7ff5425..3001d7a412 100644 --- a/test/versioned/amqplib/callback.tap.js +++ b/test/versioned/amqplib/callback.test.js @@ -4,12 +4,13 @@ */ 'use strict' - +const assert = require('node:assert') +const test = require('node:test') const amqpUtils = require('./amqp-utils') const API = require('../../../api') const helper = require('../../lib/agent_helper') const { removeMatchedModules } = require('../../lib/cache-buster') -const tap = require('tap') +const promiseResolvers = require('../../lib/promise-resolvers') /* TODO: @@ -23,17 +24,10 @@ consumer */ -tap.test('amqplib callback instrumentation', function (t) { - t.autoend() - - let amqplib = null - let conn = null - let channel = null - let agent = null - let api = null - - t.beforeEach(function () { - agent = helper.instrumentMockedAgent({ +test('amqplib callback instrumentation', async function (t) { + t.beforeEach(async function (ctx) { + const { promise, resolve, reject } = promiseResolvers() + const agent = helper.instrumentMockedAgent({ attributes: { enabled: true } @@ -47,51 +41,54 @@ tap.test('amqplib callback instrumentation', function (t) { agent.config._fromServer(params, 'cross_process_id') agent.config.trusted_account_ids = [1234] - api = new API(agent) + const api = new API(agent) - amqplib = require('amqplib/callback_api') - return new Promise((resolve, reject) => { - amqpUtils.getChannel(amqplib, function (err, result) { - if (err) { - reject(err) - } + const amqplib = require('amqplib/callback_api') + amqpUtils.getChannel(amqplib, function (err, result) { + if (err) { + reject(err) + } - conn = result.connection - channel = result.channel - channel.assertQueue('testQueue', null, resolve) - }) + ctx.nr.conn = result.connection + ctx.nr.channel = result.channel + ctx.nr.channel.assertQueue('testQueue', null, resolve) }) + ctx.nr = { + agent, + api, + amqplib + } + await promise }) - t.afterEach(function () { - helper.unloadAgent(agent) + t.afterEach(async function (ctx) { + const { promise, resolve } = promiseResolvers() + helper.unloadAgent(ctx.nr.agent) removeMatchedModules(/amqplib/) - - if (!conn) { - return - } - - return conn.close() + ctx.nr.conn.close(resolve) + await promise }) - t.test('connect in a transaction', function (t) { + await t.test('connect in a transaction', function (t, end) { + const { agent, amqplib } = t.nr helper.runInTransaction(agent, function (tx) { amqplib.connect(amqpUtils.CON_STRING, null, function (err, _conn) { - t.error(err, 'should not break connection') + assert.ok(!err, 'should not break connection') const [segment] = tx.trace.root.children - t.equal(segment.name, 'amqplib.connect') + assert.equal(segment.name, 'amqplib.connect') const attrs = segment.getAttributes() - t.equal(attrs.host, 'localhost') - t.equal(attrs.port_path_or_id, 5672) - _conn.close(t.end) + assert.equal(attrs.host, 'localhost') + assert.equal(attrs.port_path_or_id, 5672) + _conn.close(end) }) }) }) - t.test('sendToQueue', function (t) { + await t.test('sendToQueue', function (t, end) { + const { agent, channel } = t.nr agent.on('transactionFinished', function (tx) { - amqpUtils.verifySendToQueue(t, tx) - t.end() + amqpUtils.verifySendToQueue(tx) + end() }) helper.runInTransaction(agent, function transactionInScope(tx) { @@ -103,28 +100,29 @@ tap.test('amqplib callback instrumentation', function (t) { }) }) - t.test('publish to fanout exchange', function (t) { + await t.test('publish to fanout exchange', function (t, end) { + const { agent, channel } = t.nr const exchange = amqpUtils.FANOUT_EXCHANGE agent.on('transactionFinished', function (tx) { - amqpUtils.verifyProduce(t, tx, exchange) - t.end() + amqpUtils.verifyProduce(tx, exchange) + end() }) helper.runInTransaction(agent, function (tx) { - t.ok(agent.tracer.getSegment(), 'should start in transaction') + assert.ok(agent.tracer.getSegment(), 'should start in transaction') channel.assertExchange(exchange, 'fanout', null, function (err) { - t.error(err, 'should not error asserting exchange') - amqpUtils.verifyTransaction(t, tx, 'assertExchange') + assert.ok(!err, 'should not error asserting exchange') + amqpUtils.verifyTransaction(tx, 'assertExchange') channel.assertQueue('', { exclusive: true }, function (err, result) { - t.error(err, 'should not error asserting queue') - amqpUtils.verifyTransaction(t, tx, 'assertQueue') + assert.ok(!err, 'should not error asserting queue') + amqpUtils.verifyTransaction(tx, 'assertQueue') const queueName = result.queue channel.bindQueue(queueName, exchange, '', null, function (err) { - t.error(err, 'should not error binding queue') - amqpUtils.verifyTransaction(t, tx, 'bindQueue') + assert.ok(!err, 'should not error binding queue') + amqpUtils.verifyTransaction(tx, 'bindQueue') channel.publish(exchange, '', Buffer.from('hello')) setImmediate(function () { tx.end() @@ -135,27 +133,28 @@ tap.test('amqplib callback instrumentation', function (t) { }) }) - t.test('publish to direct exchange', function (t) { + await t.test('publish to direct exchange', function (t, end) { + const { agent, channel } = t.nr const exchange = amqpUtils.DIRECT_EXCHANGE agent.on('transactionFinished', function (tx) { - amqpUtils.verifyProduce(t, tx, exchange, 'key1') - t.end() + amqpUtils.verifyProduce(tx, exchange, 'key1') + end() }) helper.runInTransaction(agent, function (tx) { channel.assertExchange(exchange, 'direct', null, function (err) { - t.error(err, 'should not error asserting exchange') - amqpUtils.verifyTransaction(t, tx, 'assertExchange') + assert.ok(!err, 'should not error asserting exchange') + amqpUtils.verifyTransaction(tx, 'assertExchange') channel.assertQueue('', { exclusive: true }, function (err, result) { - t.error(err, 'should not error asserting queue') - amqpUtils.verifyTransaction(t, tx, 'assertQueue') + assert.ok(!err, 'should not error asserting queue') + amqpUtils.verifyTransaction(tx, 'assertQueue') const queueName = result.queue channel.bindQueue(queueName, exchange, 'key1', null, function (err) { - t.error(err, 'should not error binding queue') - amqpUtils.verifyTransaction(t, tx, 'bindQueue') + assert.ok(!err, 'should not error binding queue') + amqpUtils.verifyTransaction(tx, 'bindQueue') channel.publish(exchange, 'key1', Buffer.from('hello')) setImmediate(function () { tx.end() @@ -166,30 +165,31 @@ tap.test('amqplib callback instrumentation', function (t) { }) }) - t.test('purge queue', function (t) { + await t.test('purge queue', function (t, end) { + const { agent, channel } = t.nr const exchange = amqpUtils.DIRECT_EXCHANGE let queueName = null agent.on('transactionFinished', function (tx) { - amqpUtils.verifyPurge(t, tx) - t.end() + amqpUtils.verifyPurge(tx) + end() }) helper.runInTransaction(agent, function (tx) { channel.assertExchange(exchange, 'direct', null, function (err) { - t.error(err, 'should not error asserting exchange') - amqpUtils.verifyTransaction(t, tx, 'assertExchange') + assert.ok(!err, 'should not error asserting exchange') + amqpUtils.verifyTransaction(tx, 'assertExchange') channel.assertQueue('', { exclusive: true }, function (err, result) { - t.error(err, 'should not error asserting queue') - amqpUtils.verifyTransaction(t, tx, 'assertQueue') + assert.ok(!err, 'should not error asserting queue') + amqpUtils.verifyTransaction(tx, 'assertQueue') queueName = result.queue channel.bindQueue(queueName, exchange, 'key1', null, function (err) { - t.error(err, 'should not error binding queue') - amqpUtils.verifyTransaction(t, tx, 'bindQueue') + assert.ok(!err, 'should not error binding queue') + amqpUtils.verifyTransaction(tx, 'bindQueue') channel.purgeQueue(queueName, function (err) { - t.error(err, 'should not error purging queue') + assert.ok(!err, 'should not error purging queue') setImmediate(function () { tx.end() }) @@ -200,42 +200,42 @@ tap.test('amqplib callback instrumentation', function (t) { }) }) - t.test('get a message', function (t) { + await t.test('get a message', function (t, end) { + const { agent, channel } = t.nr const exchange = amqpUtils.DIRECT_EXCHANGE let queue = null channel.assertExchange(exchange, 'direct', null, function (err) { - t.error(err, 'should not error asserting exchange') + assert.ok(!err, 'should not error asserting exchange') channel.assertQueue('', { exclusive: true }, function (err, res) { - t.error(err, 'should not error asserting queue') + assert.ok(!err, 'should not error asserting queue') queue = res.queue channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) { - t.error(err, 'should not error binding queue') + assert.ok(!err, 'should not error binding queue') helper.runInTransaction(agent, function (tx) { channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) channel.get(queue, {}, function (err, msg) { - t.notOk(err, 'should not cause an error') - t.ok(msg, 'should receive a message') + assert.ok(!err, 'should not cause an error') + assert.ok(msg, 'should receive a message') - amqpUtils.verifyTransaction(t, tx, 'get') + amqpUtils.verifyTransaction(tx, 'get') const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') + assert.equal(body, 'hello', 'should receive expected body') channel.ack(msg) setImmediate(function () { tx.end() amqpUtils.verifyGet({ - t, tx, exchangeName: exchange, routingKey: 'consume-tx-key', queue, assertAttr: true }) - t.end() + end() }) }) }) @@ -244,42 +244,41 @@ tap.test('amqplib callback instrumentation', function (t) { }) }) - t.test('get a message disable parameters', function (t) { + await t.test('get a message disable parameters', function (t, end) { + const { agent, channel } = t.nr agent.config.message_tracer.segment_parameters.enabled = false const exchange = amqpUtils.DIRECT_EXCHANGE let queue = null channel.assertExchange(exchange, 'direct', null, function (err) { - t.error(err, 'should not error asserting exchange') + assert.ok(!err, 'should not error asserting exchange') channel.assertQueue('', { exclusive: true }, function (err, res) { - t.error(err, 'should not error asserting queue') + assert.ok(!err, 'should not error asserting queue') queue = res.queue channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) { - t.error(err, 'should not error binding queue') + assert.ok(!err, 'should not error binding queue') helper.runInTransaction(agent, function (tx) { channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) channel.get(queue, {}, function (err, msg) { - t.notOk(err, 'should not cause an error') - t.ok(msg, 'should receive a message') + assert.ok(!err, 'should not cause an error') + assert.ok(msg, 'should receive a message') - amqpUtils.verifyTransaction(t, tx, 'get') + amqpUtils.verifyTransaction(tx, 'get') const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') + assert.equal(body, 'hello', 'should receive expected body') channel.ack(msg) setImmediate(function () { tx.end() amqpUtils.verifyGet({ - t, tx, exchangeName: exchange, - routingKey: 'consume-tx-key', queue }) - t.end() + end() }) }) }) @@ -288,64 +287,54 @@ tap.test('amqplib callback instrumentation', function (t) { }) }) - t.test('consume in a transaction with old CAT', function (t) { + await t.test('consume in a transaction with old CAT', function (t, end) { + const { agent, api, channel } = t.nr agent.config.cross_application_tracer.enabled = true agent.config.distributed_tracing.enabled = false const exchange = amqpUtils.DIRECT_EXCHANGE let queue = null channel.assertExchange(exchange, 'direct', null, function (err) { - t.error(err, 'should not error asserting exchange') + assert.ok(!err, 'should not error asserting exchange') channel.assertQueue('', { exclusive: true }, function (err, res) { - t.error(err, 'should not error asserting queue') + assert.ok(!err, 'should not error asserting queue') queue = res.queue channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) { - t.error(err, 'should not error binding queue') - + assert.ok(!err, 'should not error binding queue') + let produceTx + // set up consume, this creates its own transaction + channel.consume(queue, function (msg) { + const consumeTxnHandle = api.getTransaction() + const consumeTxn = consumeTxnHandle._transaction + assert.notStrictEqual(consumeTxn, produceTx, 'should not be in original transaction') + assert.ok(msg, 'should receive a message') + + const body = msg.content.toString('utf8') + assert.equal(body, 'hello', 'should receive expected body') + + channel.ack(msg) + produceTx.end() + amqpUtils.verifySubscribe(produceTx, exchange, 'consume-tx-key') + consumeTxnHandle.end(function () { + amqpUtils.verifyConsumeTransaction(consumeTxn, exchange, queue, 'consume-tx-key') + amqpUtils.verifyCAT(produceTx, consumeTxn) + end() + }) + }) helper.runInTransaction(agent, function (tx) { - channel.consume( - queue, - function (msg) { - const consumeTxnHandle = api.getTransaction() - const consumeTxn = consumeTxnHandle._transaction - t.not(consumeTxn, tx, 'should not be in original transaction') - t.ok(msg, 'should receive a message') - - const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') - - channel.ack(msg) - tx.end() - amqpUtils.verifySubscribe(t, tx, exchange, 'consume-tx-key') - consumeTxnHandle.end(function () { - amqpUtils.verifyConsumeTransaction( - t, - consumeTxn, - exchange, - queue, - 'consume-tx-key' - ) - amqpUtils.verifyCAT(t, tx, consumeTxn) - t.end() - }) - }, - null, - function (err) { - t.error(err, 'should not error subscribing consumer') - amqpUtils.verifyTransaction(t, tx, 'consume') - - channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) - } - ) + produceTx = tx + amqpUtils.verifyTransaction(tx, 'consume') + channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) }) }) }) }) }) - t.test('consume in a transaction with distributed tracing', function (t) { + await t.test('consume in a transaction with distributed tracing', function (t, end) { + const { agent, api, channel } = t.nr agent.config.span_events.enabled = true agent.config.account_id = 1234 agent.config.primary_application_id = 4321 @@ -355,83 +344,75 @@ tap.test('amqplib callback instrumentation', function (t) { let queue = null channel.assertExchange(exchange, 'direct', null, function (err) { - t.error(err, 'should not error asserting exchange') + assert.ok(!err, 'should not error asserting exchange') channel.assertQueue('', { exclusive: true }, function (err, res) { - t.error(err, 'should not error asserting queue') + assert.ok(!err, 'should not error asserting queue') queue = res.queue channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) { - t.error(err, 'should not error binding queue') + assert.ok(!err, 'should not error binding queue') + let produceTx + // set up consume, this creates its own transaction + channel.consume(queue, function (msg) { + const consumeTxnHandle = api.getTransaction() + const consumeTxn = consumeTxnHandle._transaction + assert.notStrictEqual(consumeTxn, produceTx, 'should not be in original transaction') + assert.ok(msg, 'should receive a message') + + const body = msg.content.toString('utf8') + assert.equal(body, 'hello', 'should receive expected body') + + channel.ack(msg) + produceTx.end() + amqpUtils.verifySubscribe(produceTx, exchange, 'consume-tx-key') + consumeTxnHandle.end(function () { + amqpUtils.verifyConsumeTransaction(consumeTxn, exchange, queue, 'consume-tx-key') + amqpUtils.verifyDistributedTrace(produceTx, consumeTxn) + end() + }) + }) helper.runInTransaction(agent, function (tx) { - channel.consume( - queue, - function (msg) { - const consumeTxnHandle = api.getTransaction() - const consumeTxn = consumeTxnHandle._transaction - t.not(consumeTxn, tx, 'should not be in original transaction') - t.ok(msg, 'should receive a message') - - const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') - - channel.ack(msg) - tx.end() - amqpUtils.verifySubscribe(t, tx, exchange, 'consume-tx-key') - consumeTxnHandle.end(function () { - amqpUtils.verifyConsumeTransaction( - t, - consumeTxn, - exchange, - queue, - 'consume-tx-key' - ) - amqpUtils.verifyDistributedTrace(t, tx, consumeTxn) - t.end() - }) - }, - null, - function (err) { - t.error(err, 'should not error subscribing consumer') - amqpUtils.verifyTransaction(t, tx, 'consume') - - channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) - } - ) + produceTx = tx + assert.ok(!err, 'should not error subscribing consumer') + amqpUtils.verifyTransaction(tx, 'consume') + + channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) }) }) }) }) }) - t.test('consume out of transaction', function (t) { + await t.test('consume out of transaction', function (t, end) { + const { agent, api, channel } = t.nr const exchange = amqpUtils.DIRECT_EXCHANGE let queue = null agent.on('transactionFinished', function (tx) { - amqpUtils.verifyConsumeTransaction(t, tx, exchange, queue, 'consume-tx-key') - t.end() + amqpUtils.verifyConsumeTransaction(tx, exchange, queue, 'consume-tx-key') + end() }) channel.assertExchange(exchange, 'direct', null, function (err) { - t.error(err, 'should not error asserting exchange') + assert.ok(!err, 'should not error asserting exchange') channel.assertQueue('', { exclusive: true }, function (err, res) { - t.error(err, 'should not error asserting queue') + assert.ok(!err, 'should not error asserting queue') queue = res.queue channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) { - t.error(err, 'should not error binding queue') + assert.ok(!err, 'should not error binding queue') channel.consume( queue, function (msg) { const tx = api.getTransaction() - t.ok(msg, 'should receive a message') + assert.ok(msg, 'should receive a message') const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') + assert.equal(body, 'hello', 'should receive expected body') channel.ack(msg) @@ -441,7 +422,7 @@ tap.test('amqplib callback instrumentation', function (t) { }, null, function (err) { - t.error(err, 'should not error subscribing consumer') + assert.ok(!err, 'should not error subscribing consumer') channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello')) } @@ -451,28 +432,29 @@ tap.test('amqplib callback instrumentation', function (t) { }) }) - t.test('rename message consume transaction', function (t) { + await t.test('rename message consume transaction', function (t, end) { + const { agent, api, channel } = t.nr const exchange = amqpUtils.DIRECT_EXCHANGE let queue = null agent.on('transactionFinished', function (tx) { - t.equal( + assert.equal( tx.getFullName(), 'OtherTransaction/Message/Custom/foobar', 'should have specified name' ) - t.end() + end() }) channel.assertExchange(exchange, 'direct', null, function (err) { - t.error(err, 'should not error asserting exchange') + assert.ok(!err, 'should not error asserting exchange') channel.assertQueue('', { exclusive: true }, function (err, res) { - t.error(err, 'should not error asserting queue') + assert.ok(!err, 'should not error asserting queue') queue = res.queue channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) { - t.error(err, 'should not error binding queue') + assert.ok(!err, 'should not error binding queue') channel.consume( queue, @@ -488,7 +470,7 @@ tap.test('amqplib callback instrumentation', function (t) { }, null, function (err) { - t.error(err, 'should not error subscribing consumer') + assert.ok(!err, 'should not error subscribing consumer') channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello')) } diff --git a/test/versioned/amqplib/package.json b/test/versioned/amqplib/package.json index a793829d19..94d67dfba2 100644 --- a/test/versioned/amqplib/package.json +++ b/test/versioned/amqplib/package.json @@ -12,8 +12,8 @@ "amqplib": ">=0.5.0" }, "files": [ - "callback.tap.js", - "promises.tap.js" + "callback.test.js", + "promises.test.js" ] } ] diff --git a/test/versioned/amqplib/promises.tap.js b/test/versioned/amqplib/promises.tap.js deleted file mode 100644 index ec93395be4..0000000000 --- a/test/versioned/amqplib/promises.tap.js +++ /dev/null @@ -1,482 +0,0 @@ -/* - * Copyright 2020 New Relic Corporation. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -'use strict' - -const amqpUtils = require('./amqp-utils') -const API = require('../../../api') -const helper = require('../../lib/agent_helper') -const { removeMatchedModules } = require('../../lib/cache-buster') -const tap = require('tap') - -/* -TODO: - -- promise API -- callback API - -consumer -- off by default for rum -- value of the attribute is limited to 255 bytes - - */ - -tap.test('amqplib promise instrumentation', function (t) { - t.autoend() - - let amqplib = null - let conn = null - let channel = null - let agent = null - let api = null - - t.beforeEach(function () { - // In promise mode, versions older than 0.10.0 of amqplib load bluebird. In our tests we unwrap the - // instrumentation after each one. This is fine for first-order modules - // which the test itself re-requires, but second-order modules (deps of - // instrumented methods) are not reloaded and thus not re-instrumented. To - // resolve this we just delete everything. Kill it all. - removeMatchedModules(/amqplib|bluebird/) - - agent = helper.instrumentMockedAgent({ - attributes: { - enabled: true - } - }) - - const params = { - encoding_key: 'this is an encoding key', - cross_process_id: '1234#4321' - } - agent.config._fromServer(params, 'encoding_key') - agent.config._fromServer(params, 'cross_process_id') - agent.config.trusted_account_ids = [1234] - - api = new API(agent) - - amqplib = require('amqplib') - return amqpUtils.getChannel(amqplib).then(function (result) { - conn = result.connection - channel = result.channel - return channel.assertQueue('testQueue') - }) - }) - - t.afterEach(function () { - helper.unloadAgent(agent) - - if (!conn) { - return - } - - return conn.close() - }) - - t.test('connect in a transaction', function (t) { - helper.runInTransaction(agent, function (tx) { - amqplib.connect(amqpUtils.CON_STRING).then( - function (_conn) { - const [segment] = tx.trace.root.children - t.equal(segment.name, 'amqplib.connect') - const attrs = segment.getAttributes() - t.equal(attrs.host, 'localhost') - t.equal(attrs.port_path_or_id, 5672) - _conn.close().then(t.end) - }, - function (err) { - t.error(err, 'should not break connection') - } - ) - }) - }) - - t.test('sendToQueue', function (t) { - agent.on('transactionFinished', function (tx) { - amqpUtils.verifySendToQueue(t, tx) - t.end() - }) - - helper.runInTransaction(agent, function transactionInScope(tx) { - channel.sendToQueue('testQueue', Buffer.from('hello'), { - replyTo: 'my.reply.queue', - correlationId: 'correlation-id' - }) - tx.end() - }) - }) - - t.test('publish to fanout exchange', function (t) { - agent.on('transactionFinished', function (tx) { - amqpUtils.verifyProduce(t, tx, amqpUtils.FANOUT_EXCHANGE) - t.end() - }) - - helper.runInTransaction(agent, function (tx) { - t.ok(agent.tracer.getSegment(), 'should start in transaction') - channel - .assertExchange(amqpUtils.FANOUT_EXCHANGE, 'fanout') - .then(function () { - amqpUtils.verifyTransaction(t, tx, 'assertExchange') - return channel.assertQueue('', { exclusive: true }) - }) - .then(function (result) { - amqpUtils.verifyTransaction(t, tx, 'assertQueue') - const queueName = result.queue - return channel.bindQueue(queueName, amqpUtils.FANOUT_EXCHANGE) - }) - .then(function () { - amqpUtils.verifyTransaction(t, tx, 'bindQueue') - channel.publish(amqpUtils.FANOUT_EXCHANGE, '', Buffer.from('hello')) - tx.end() - }) - .catch(function (err) { - t.fail(err) - t.end() - }) - }) - }) - - t.test('publish to direct exchange', function (t) { - agent.on('transactionFinished', function (tx) { - amqpUtils.verifyProduce(t, tx, amqpUtils.DIRECT_EXCHANGE, 'key1') - t.end() - }) - - helper.runInTransaction(agent, function (tx) { - channel - .assertExchange(amqpUtils.DIRECT_EXCHANGE, 'direct') - .then(function () { - amqpUtils.verifyTransaction(t, tx, 'assertExchange') - return channel.assertQueue('', { exclusive: true }) - }) - .then(function (result) { - amqpUtils.verifyTransaction(t, tx, 'assertQueue') - const queueName = result.queue - return channel.bindQueue(queueName, amqpUtils.DIRECT_EXCHANGE, 'key1') - }) - .then(function () { - amqpUtils.verifyTransaction(t, tx, 'bindQueue') - channel.publish(amqpUtils.DIRECT_EXCHANGE, 'key1', Buffer.from('hello')) - tx.end() - }) - .catch(function (err) { - t.fail(err) - t.end() - }) - }) - }) - - t.test('purge queue', function (t) { - let queueName = null - - agent.on('transactionFinished', function (tx) { - amqpUtils.verifyPurge(t, tx) - t.end() - }) - - helper.runInTransaction(agent, function (tx) { - channel - .assertExchange(amqpUtils.DIRECT_EXCHANGE, 'direct') - .then(function () { - amqpUtils.verifyTransaction(t, tx, 'assertExchange') - return channel.assertQueue('', { exclusive: true }) - }) - .then(function (result) { - amqpUtils.verifyTransaction(t, tx, 'assertQueue') - queueName = result.queue - return channel.bindQueue(queueName, amqpUtils.DIRECT_EXCHANGE, 'key1') - }) - .then(function () { - amqpUtils.verifyTransaction(t, tx, 'bindQueue') - return channel.purgeQueue(queueName) - }) - .then(function () { - amqpUtils.verifyTransaction(t, tx, 'purgeQueue') - tx.end() - }) - .catch(function (err) { - t.fail(err) - t.end() - }) - }) - }) - - t.test('get a message', function (t) { - let queue = null - const exchange = amqpUtils.DIRECT_EXCHANGE - - channel - .assertExchange(exchange, 'direct') - .then(function () { - return channel.assertQueue('', { exclusive: true }) - }) - .then(function (res) { - queue = res.queue - return channel.bindQueue(queue, exchange, 'consume-tx-key') - }) - .then(function () { - return helper.runInTransaction(agent, function (tx) { - channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) - return channel - .get(queue) - .then(function (msg) { - t.ok(msg, 'should receive a message') - - const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') - - amqpUtils.verifyTransaction(t, tx, 'get') - channel.ack(msg) - }) - .then(function () { - tx.end() - amqpUtils.verifyGet({ - t, - tx, - exchangeName: exchange, - routingKey: 'consume-tx-key', - queue, - assertAttr: true - }) - t.end() - }) - }) - }) - .catch(function (err) { - t.fail(err) - t.end() - }) - }) - - t.test('get a message disable parameters', function (t) { - agent.config.message_tracer.segment_parameters.enabled = false - let queue = null - const exchange = amqpUtils.DIRECT_EXCHANGE - - channel - .assertExchange(exchange, 'direct') - .then(function () { - return channel.assertQueue('', { exclusive: true }) - }) - .then(function (res) { - queue = res.queue - return channel.bindQueue(queue, exchange, 'consume-tx-key') - }) - .then(function () { - return helper.runInTransaction(agent, function (tx) { - channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) - return channel - .get(queue) - .then(function (msg) { - t.ok(msg, 'should receive a message') - - const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') - - amqpUtils.verifyTransaction(t, tx, 'get') - channel.ack(msg) - }) - .then(function () { - tx.end() - amqpUtils.verifyGet({ - t, - tx, - exchangeName: exchange, - routingKey: 'consume-tx-key', - queue - }) - t.end() - }) - }) - }) - .catch(function (err) { - t.fail(err) - t.end() - }) - }) - - t.test('consume in a transaction with old CAT', function (t) { - agent.config.cross_application_tracer.enabled = true - agent.config.distributed_tracing.enabled = false - let queue = null - let consumeTxn = null - const exchange = amqpUtils.DIRECT_EXCHANGE - - channel - .assertExchange(exchange, 'direct') - .then(function () { - return channel.assertQueue('', { exclusive: true }) - }) - .then(function (res) { - queue = res.queue - return channel.bindQueue(queue, exchange, 'consume-tx-key') - }) - .then(function () { - return helper.runInTransaction(agent, function (tx) { - return channel - .consume(queue, function (msg) { - const consumeTxnHandle = api.getTransaction() - consumeTxn = consumeTxnHandle._transaction - t.not(consumeTxn, tx, 'should not be in original transaction') - t.ok(msg, 'should receive a message') - - const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') - - channel.ack(msg) - tx.end() - amqpUtils.verifySubscribe(t, tx, exchange, 'consume-tx-key') - consumeTxnHandle.end(function () { - amqpUtils.verifyConsumeTransaction(t, consumeTxn, exchange, queue, 'consume-tx-key') - amqpUtils.verifyCAT(t, tx, consumeTxn) - t.end() - }) - }) - .then(function () { - amqpUtils.verifyTransaction(t, tx, 'consume') - channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) - }) - }) - }) - .catch(function (err) { - t.fail(err) - t.end() - }) - }) - - t.test('consume in a transaction with distributed tracing', function (t) { - agent.config.account_id = 1234 - agent.config.primary_application_id = 4321 - agent.config.trusted_account_key = 1234 - - let queue = null - let consumeTxn = null - const exchange = amqpUtils.DIRECT_EXCHANGE - - channel - .assertExchange(exchange, 'direct') - .then(function () { - return channel.assertQueue('', { exclusive: true }) - }) - .then(function (res) { - queue = res.queue - return channel.bindQueue(queue, exchange, 'consume-tx-key') - }) - .then(function () { - return helper.runInTransaction(agent, function (tx) { - return channel - .consume(queue, function (msg) { - const consumeTxnHandle = api.getTransaction() - consumeTxn = consumeTxnHandle._transaction - t.not(consumeTxn, tx, 'should not be in original transaction') - t.ok(msg, 'should receive a message') - - const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') - - channel.ack(msg) - tx.end() - amqpUtils.verifySubscribe(t, tx, exchange, 'consume-tx-key') - consumeTxnHandle.end(function () { - amqpUtils.verifyConsumeTransaction(t, consumeTxn, exchange, queue, 'consume-tx-key') - amqpUtils.verifyDistributedTrace(t, tx, consumeTxn) - t.end() - }) - }) - .then(function () { - amqpUtils.verifyTransaction(t, tx, 'consume') - channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) - }) - }) - }) - .catch(function (err) { - t.fail(err) - t.end() - }) - }) - - t.test('consume out of transaction', function (t) { - let queue = null - - agent.on('transactionFinished', function (tx) { - amqpUtils.verifyConsumeTransaction(t, tx, amqpUtils.DIRECT_EXCHANGE, queue, 'consume-tx-key') - t.end() - }) - - channel - .assertExchange(amqpUtils.DIRECT_EXCHANGE, 'direct') - .then(function () { - return channel.assertQueue('', { exclusive: true }) - }) - .then(function (res) { - queue = res.queue - return channel.bindQueue(queue, amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key') - }) - .then(function () { - return channel - .consume(queue, function (msg) { - t.ok(msg, 'should receive a message') - - const body = msg.content.toString('utf8') - t.equal(body, 'hello', 'should receive expected body') - - channel.ack(msg) - - return new Promise(function (resolve) { - setImmediate(resolve) - }) - }) - .then(function () { - channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello')) - }) - }) - .catch(function (err) { - t.fail(err) - t.end() - }) - }) - - t.test('rename message consume transaction', function (t) { - let queue = null - - agent.on('transactionFinished', function (tx) { - t.equal( - tx.getFullName(), - 'OtherTransaction/Message/Custom/foobar', - 'should have specified name' - ) - t.end() - }) - - channel - .assertExchange(amqpUtils.DIRECT_EXCHANGE, 'direct') - .then(function () { - return channel.assertQueue('', { exclusive: true }) - }) - .then(function (res) { - queue = res.queue - return channel.bindQueue(queue, amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key') - }) - .then(function () { - return channel - .consume(queue, function (msg) { - api.setTransactionName('foobar') - - channel.ack(msg) - - return new Promise(function (resolve) { - setImmediate(resolve) - }) - }) - .then(function () { - channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello')) - }) - }) - .catch(function (err) { - t.fail(err) - t.end() - }) - }) -}) diff --git a/test/versioned/amqplib/promises.test.js b/test/versioned/amqplib/promises.test.js new file mode 100644 index 0000000000..abf90c0190 --- /dev/null +++ b/test/versioned/amqplib/promises.test.js @@ -0,0 +1,323 @@ +/* + * Copyright 2020 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' +const assert = require('node:assert') +const test = require('node:test') +const amqpUtils = require('./amqp-utils') +const API = require('../../../api') +const helper = require('../../lib/agent_helper') +const { removeMatchedModules } = require('../../lib/cache-buster') +const promiseResolvers = require('../../lib/promise-resolvers') + +/* +TODO: + +- promise API +- callback API + +consumer +- off by default for rum +- value of the attribute is limited to 255 bytes + + */ + +test('amqplib promise instrumentation', async function (t) { + t.beforeEach(async function (ctx) { + const agent = helper.instrumentMockedAgent({ + attributes: { + enabled: true + } + }) + + const params = { + encoding_key: 'this is an encoding key', + cross_process_id: '1234#4321' + } + agent.config._fromServer(params, 'encoding_key') + agent.config._fromServer(params, 'cross_process_id') + agent.config.trusted_account_ids = [1234] + + const api = new API(agent) + + const amqplib = require('amqplib') + + const { connection: conn, channel } = await amqpUtils.getChannel(amqplib) + ctx.nr = { + agent, + amqplib, + api, + channel, + conn + } + await channel.assertQueue('testQueue') + }) + + t.afterEach(async function (ctx) { + helper.unloadAgent(ctx.nr.agent) + removeMatchedModules(/amqplib/) + await ctx.nr.conn.close() + }) + + await t.test('connect in a transaction', async function (t) { + const { agent, amqplib } = t.nr + await helper.runInTransaction(agent, async function (tx) { + const _conn = await amqplib.connect(amqpUtils.CON_STRING) + const [segment] = tx.trace.root.children + assert.equal(segment.name, 'amqplib.connect') + const attrs = segment.getAttributes() + assert.equal(attrs.host, 'localhost') + assert.equal(attrs.port_path_or_id, 5672) + await _conn.close() + }) + }) + + await t.test('sendToQueue', async function (t) { + const { agent, channel } = t.nr + const { promise, resolve } = promiseResolvers() + + helper.runInTransaction(agent, function transactionInScope(tx) { + channel.sendToQueue('testQueue', Buffer.from('hello'), { + replyTo: 'my.reply.queue', + correlationId: 'correlation-id' + }) + tx.end() + amqpUtils.verifySendToQueue(tx) + resolve() + }) + await promise + }) + + await t.test('publish to fanout exchange', async function (t) { + const { agent, channel } = t.nr + await helper.runInTransaction(agent, async function (tx) { + assert.ok(agent.tracer.getSegment(), 'should start in transaction') + await channel.assertExchange(amqpUtils.FANOUT_EXCHANGE, 'fanout') + amqpUtils.verifyTransaction(tx, 'assertExchange') + const result = await channel.assertQueue('', { exclusive: true }) + amqpUtils.verifyTransaction(tx, 'assertQueue') + const queueName = result.queue + await channel.bindQueue(queueName, amqpUtils.FANOUT_EXCHANGE) + amqpUtils.verifyTransaction(tx, 'bindQueue') + channel.publish(amqpUtils.FANOUT_EXCHANGE, '', Buffer.from('hello')) + tx.end() + amqpUtils.verifyProduce(tx, amqpUtils.FANOUT_EXCHANGE) + }) + }) + + await t.test('publish to direct exchange', async function (t) { + const { agent, channel } = t.nr + await helper.runInTransaction(agent, async function (tx) { + await channel.assertExchange(amqpUtils.DIRECT_EXCHANGE, 'direct') + amqpUtils.verifyTransaction(tx, 'assertExchange') + const result = await channel.assertQueue('', { exclusive: true }) + amqpUtils.verifyTransaction(tx, 'assertQueue') + const queueName = result.queue + await channel.bindQueue(queueName, amqpUtils.DIRECT_EXCHANGE, 'key1') + amqpUtils.verifyTransaction(tx, 'bindQueue') + channel.publish(amqpUtils.DIRECT_EXCHANGE, 'key1', Buffer.from('hello')) + tx.end() + amqpUtils.verifyProduce(tx, amqpUtils.DIRECT_EXCHANGE, 'key1') + }) + }) + + await t.test('purge queue', async function (t) { + const { agent, channel } = t.nr + + await helper.runInTransaction(agent, async function (tx) { + await channel.assertExchange(amqpUtils.DIRECT_EXCHANGE, 'direct') + amqpUtils.verifyTransaction(tx, 'assertExchange') + const result = await channel.assertQueue('', { exclusive: true }) + amqpUtils.verifyTransaction(tx, 'assertQueue') + const queueName = result.queue + await channel.bindQueue(queueName, amqpUtils.DIRECT_EXCHANGE, 'key1') + amqpUtils.verifyTransaction(tx, 'bindQueue') + await channel.purgeQueue(queueName) + amqpUtils.verifyTransaction(tx, 'purgeQueue') + tx.end() + amqpUtils.verifyPurge(tx) + }) + }) + + await t.test('get a message', async function (t) { + const { agent, channel } = t.nr + const exchange = amqpUtils.DIRECT_EXCHANGE + + await channel.assertExchange(exchange, 'direct') + const { queue } = await channel.assertQueue('', { exclusive: true }) + await channel.bindQueue(queue, exchange, 'consume-tx-key') + await helper.runInTransaction(agent, async function (tx) { + channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) + const msg = await channel.get(queue) + assert.ok(msg, 'should receive a message') + const body = msg.content.toString('utf8') + assert.equal(body, 'hello', 'should receive expected body') + + amqpUtils.verifyTransaction(tx, 'get') + channel.ack(msg) + tx.end() + amqpUtils.verifyGet({ + tx, + exchangeName: exchange, + routingKey: 'consume-tx-key', + queue, + assertAttr: true + }) + }) + }) + + await t.test('get a message disable parameters', async function (t) { + const { agent, channel } = t.nr + agent.config.message_tracer.segment_parameters.enabled = false + const exchange = amqpUtils.DIRECT_EXCHANGE + + await channel.assertExchange(exchange, 'direct') + const { queue } = await channel.assertQueue('', { exclusive: true }) + await channel.bindQueue(queue, exchange, 'consume-tx-key') + await helper.runInTransaction(agent, async function (tx) { + channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) + const msg = await channel.get(queue) + assert.ok(msg, 'should receive a message') + + const body = msg.content.toString('utf8') + assert.equal(body, 'hello', 'should receive expected body') + + amqpUtils.verifyTransaction(tx, 'get') + channel.ack(msg) + tx.end() + amqpUtils.verifyGet({ + tx, + exchangeName: exchange, + queue + }) + }) + }) + + await t.test('consume in a transaction with old CAT', async function (t) { + const { agent, api, channel } = t.nr + const { promise, resolve } = promiseResolvers() + agent.config.cross_application_tracer.enabled = true + agent.config.distributed_tracing.enabled = false + const exchange = amqpUtils.DIRECT_EXCHANGE + + await channel.assertExchange(exchange, 'direct') + const { queue } = await channel.assertQueue('', { exclusive: true }) + await channel.bindQueue(queue, exchange, 'consume-tx-key') + let publishTx + let consumeTx + // set up consume, this creates its own transaction + channel.consume(queue, function (msg) { + const consumeTxnHandle = api.getTransaction() + consumeTx = consumeTxnHandle._transaction + assert.notStrictEqual(consumeTx, publishTx, 'should not be in original transaction') + assert.ok(msg, 'should receive a message') + + const body = msg.content.toString('utf8') + assert.equal(body, 'hello', 'should receive expected body') + + channel.ack(msg) + publishTx.end() + consumeTx.end() + resolve() + }) + await helper.runInTransaction(agent, async function (tx) { + publishTx = tx + amqpUtils.verifyTransaction(tx, 'consume') + channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) + }) + await promise + amqpUtils.verifySubscribe(publishTx, exchange, 'consume-tx-key') + amqpUtils.verifyConsumeTransaction(consumeTx, exchange, queue, 'consume-tx-key') + amqpUtils.verifyCAT(publishTx, consumeTx) + }) + + await t.test('consume in a transaction with distributed tracing', async function (t) { + const { agent, api, channel } = t.nr + const { promise, resolve } = promiseResolvers() + agent.config.account_id = 1234 + agent.config.primary_application_id = 4321 + agent.config.trusted_account_key = 1234 + + const exchange = amqpUtils.DIRECT_EXCHANGE + await channel.assertExchange(exchange, 'direct') + const { queue } = await channel.assertQueue('', { exclusive: true }) + await channel.bindQueue(queue, exchange, 'consume-tx-key') + let publishTx + let consumeTx + // set up consume, this creates its own transaction + channel.consume(queue, function (msg) { + const consumeTxnHandle = api.getTransaction() + consumeTx = consumeTxnHandle._transaction + assert.notStrictEqual(consumeTx, publishTx, 'should not be in original transaction') + assert.ok(msg, 'should receive a message') + + const body = msg.content.toString('utf8') + assert.equal(body, 'hello', 'should receive expected body') + + channel.ack(msg) + publishTx.end() + consumeTx.end() + resolve() + }) + await helper.runInTransaction(agent, async function (tx) { + publishTx = tx + amqpUtils.verifyTransaction(tx, 'consume') + channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) + }) + await promise + amqpUtils.verifySubscribe(publishTx, exchange, 'consume-tx-key') + amqpUtils.verifyConsumeTransaction(consumeTx, exchange, queue, 'consume-tx-key') + amqpUtils.verifyDistributedTrace(publishTx, consumeTx) + }) + + await t.test('consume out of transaction', async function (t) { + const { api, channel } = t.nr + const { promise, resolve } = promiseResolvers() + + await channel.assertExchange(amqpUtils.DIRECT_EXCHANGE, 'direct') + const { queue } = await channel.assertQueue('', { exclusive: true }) + await channel.bindQueue(queue, amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key') + let tx + channel.consume(queue, function (msg) { + ;({ _transaction: tx } = api.getTransaction()) + assert.ok(msg, 'should receive a message') + + const body = msg.content.toString('utf8') + assert.equal(body, 'hello', 'should receive expected body') + + channel.ack(msg) + tx.end() + resolve() + }) + channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello')) + await promise + amqpUtils.verifyConsumeTransaction(tx, amqpUtils.DIRECT_EXCHANGE, queue, 'consume-tx-key') + }) + + await t.test('rename message consume transaction', async function (t) { + const { api, channel } = t.nr + const { promise, resolve } = promiseResolvers() + + await channel.assertExchange(amqpUtils.DIRECT_EXCHANGE, 'direct') + const { queue } = await channel.assertQueue('', { exclusive: true }) + await channel.bindQueue(queue, amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key') + let tx + channel.consume(queue, function (msg) { + api.setTransactionName('foobar') + + channel.ack(msg) + ;({ _transaction: tx } = api.getTransaction()) + tx.end() + resolve() + }) + channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello')) + await promise + assert.equal( + tx.getFullName(), + 'OtherTransaction/Message/Custom/foobar', + 'should have specified name' + ) + }) +})