Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/subscribers/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ class Subscriber {
asyncStart.runStores(context, () => {
try {
if (callback) {
return Reflect.apply(callback, this, arguments)
const cbResult = Reflect.apply(callback, this, arguments)
context.cbResult = cbResult
return cbResult
}
} finally {
asyncEnd.publish(context)
Expand Down
17 changes: 14 additions & 3 deletions lib/subscribers/message-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,22 @@ class MessageConsumerSubscriber extends Subscriber {
}

/**
* Ends the transaction created for the consumption callback.
* Checks the result of the message handler.
* If it's a promise, waits for it to resolve before ending transaction.
* This ensures that the transaction stays active until the promise resolves.
* @param {object} data the data associated with the `asyncEnd` event
*/
asyncEnd() {
asyncEnd(data) {
const ctx = this.agent.tracer.getContext()
ctx?.transaction?.end()
const result = data.cbResult
if (typeof result?.then === 'function') {
const prom = Promise.resolve(result)
prom.finally(() => {
ctx?.transaction?.end()
})
} else {
ctx?.transaction?.end()
}
}

enable() {
Expand Down
140 changes: 140 additions & 0 deletions test/unit/lib/subscribers/base.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,146 @@ test('should bind callback and invoke the asyncStart/error/asyncError events whe
await plan.completed
})

test('should bind callback and invoke the asyncStart/error/asyncError events with right context when callback succeeds', async (t) => {
const plan = tspl(t, { plan: 10 })
const { agent, subscriber } = t.nr
const name = 'test-segment'
const expectedResult = 'test-result'
subscriber.callback = -1
subscriber.error = (data) => {
plan.fail('should not call error when cb succeeds')
}

subscriber.asyncStart = (data) => {
plan.equal(data.callback, true)
plan.ok(!data.error)
plan.equal(data.result, expectedResult)
}
subscriber.asyncEnd = (data) => {
plan.equal(data.callback, true)
plan.ok(!data.error)
plan.equal(data.cbResult, undefined)
plan.equal(data.result, expectedResult)
}
subscriber.enable()
subscriber.events = ['asyncStart', 'asyncEnd', 'error']
subscriber.subscribe()
subscriber.handler = function handler(data, ctx) {
plan.equal(data.name, name)
return subscriber.createSegment({
name: data?.name,
ctx
})
}

function testCb(err, result) {
plan.equal(result, expectedResult)
plan.equal(err, null)
}

helper.runInTransaction(agent, () => {
const event = { name, arguments: [testCb] }
subscriber.channel.start.runStores(event, () => {
event.arguments[0](null, expectedResult)
})
})

await plan.completed
})

test('should bind callback and invoke the asyncStart/error/asyncError events with right context when callback returns a promise', async (t) => {
const plan = tspl(t, { plan: 10 })
const { agent, subscriber } = t.nr
const name = 'test-segment'
const expectedResult = 'test-result'
subscriber.callback = -1
subscriber.error = (data) => {
plan.fail('should not call error when cb succeeds')
}

subscriber.asyncStart = (data) => {
plan.equal(data.callback, true)
plan.ok(!data.error)
plan.equal(data.result, expectedResult)
}
subscriber.asyncEnd = (data) => {
plan.equal(data.callback, true)
plan.ok(!data.error)
plan.ok(data.cbResult instanceof Promise)
plan.equal(data.result, expectedResult)
}
subscriber.enable()
subscriber.events = ['asyncStart', 'asyncEnd', 'error']
subscriber.subscribe()
subscriber.handler = function handler(data, ctx) {
plan.equal(data.name, name)
return subscriber.createSegment({
name: data?.name,
ctx
})
}

async function testCb(err, result) {
plan.equal(result, expectedResult)
plan.equal(err, null)
}

helper.runInTransaction(agent, () => {
const event = { name, arguments: [testCb] }
subscriber.channel.start.runStores(event, () => {
event.arguments[0](null, expectedResult)
})
})

await plan.completed
})

test('should bind callback and invoke the asyncStart/error/asyncError events with right context when callback fails', async (t) => {
const plan = tspl(t, { plan: 10 })
const { agent, subscriber } = t.nr
const name = 'test-segment'
const expectedErr = new Error('cb failed')
subscriber.callback = -1
subscriber.error = (data) => {
plan.equal(data.callback, true)
plan.deepEqual(data.error, expectedErr)
}

subscriber.asyncStart = (data) => {
plan.equal(data.callback, true)
plan.deepEqual(data.error, expectedErr)
}
subscriber.asyncEnd = (data) => {
plan.equal(data.callback, true)
plan.deepEqual(data.error, expectedErr)
plan.equal(data.cbResult, undefined)
}
subscriber.enable()
subscriber.events = ['asyncStart', 'asyncEnd', 'error']
subscriber.subscribe()
subscriber.handler = function handler(data, ctx) {
plan.equal(data.name, name)
return subscriber.createSegment({
name: data?.name,
ctx
})
}

function testCb(err, result) {
plan.deepEqual(err, expectedErr)
plan.equal(result, undefined)
}

helper.runInTransaction(agent, () => {
const event = { name, arguments: [testCb] }
subscriber.channel.start.runStores(event, () => {
event.arguments[0](expectedErr)
})
})

await plan.completed
})

test('should bind callback and invoke asyncStart/asyncEnd events and propagateContext', async (t) => {
const plan = tspl(t, { plan: 6 })
const { agent, subscriber } = t.nr
Expand Down
122 changes: 100 additions & 22 deletions test/versioned/amqplib/callback.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,7 @@ const { removeMatchedModules } = require('../../lib/cache-buster')
const promiseResolvers = require('../../lib/promise-resolvers')
const { version } = require('amqplib/package.json')
const { assertPackageMetrics } = require('../../lib/custom-assertions')

/*
TODO:

- promise API
- callback API

consumer
- off by default for rum
- value of the attribute is limited to 255 bytes

*/
const PROMISE_WAIT = 100

test('amqplib callback instrumentation', async function (t) {
t.beforeEach(async function (ctx) {
Expand Down Expand Up @@ -399,7 +388,7 @@ test('amqplib callback instrumentation', async function (t) {
})

await t.test('consume out of transaction', function (t, end) {
const { agent, api, channel } = t.nr
const { agent, channel } = t.nr
const exchange = amqpUtils.DIRECT_EXCHANGE
let queue = null

Expand All @@ -421,17 +410,12 @@ test('amqplib callback instrumentation', async function (t) {
channel.consume(
queue,
function (msg) {
const tx = api.getTransaction()
assert.ok(msg, 'should receive a message')

const body = msg.content.toString('utf8')
assert.equal(body, 'hello', 'should receive expected body')

channel.ack(msg)

setImmediate(function () {
tx.end()
})
},
null,
function (err) {
Expand Down Expand Up @@ -472,14 +456,108 @@ test('amqplib callback instrumentation', async function (t) {
channel.consume(
queue,
function (msg) {
const tx = api.getTransaction()
api.setTransactionName('foobar')

channel.ack(msg)
},
null,
function (err) {
assert.ok(!err, 'should not error subscribing consumer')

setImmediate(function () {
tx.end()
})
channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello'))
}
)
})
})
})
})

await t.test('consume async handler', function (t, end) {
const { agent, channel } = t.nr
const exchange = amqpUtils.DIRECT_EXCHANGE
let queue = null

agent.on('transactionFinished', function (tx) {
amqpUtils.verifyConsumeTransaction(tx, exchange, queue, 'consume-tx-key')
assert.ok(tx.trace.getDurationInMillis() >= PROMISE_WAIT, 'transaction should account for async work')
end()
})

channel.assertExchange(exchange, 'direct', null, function (err) {
assert.ok(!err, 'should not error asserting exchange')

channel.assertQueue('', { exclusive: true }, function (err, res) {
assert.ok(!err, 'should not error asserting queue')
queue = res.queue

channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) {
assert.ok(!err, 'should not error binding queue')

channel.consume(
queue,
async function (msg) {
assert.ok(msg, 'should receive a message')

const body = msg.content.toString('utf8')
assert.equal(body, 'hello', 'should receive expected body')

await new Promise((resolve) => setTimeout(resolve, PROMISE_WAIT))
channel.ack(msg)
},
null,
function (err) {
assert.ok(!err, 'should not error subscribing consumer')

channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello'))
}
)
})
})
})
})

await t.test('consume async handler that rejects', function (t, end) {
const { agent, channel } = t.nr
const exchange = amqpUtils.DIRECT_EXCHANGE
let queue = null

agent.on('transactionFinished', function (tx) {
amqpUtils.verifyConsumeTransaction(tx, exchange, queue, 'consume-tx-key')
assert.ok(tx.trace.getDurationInMillis() >= PROMISE_WAIT, 'transaction should account for async work')
end()
})

channel.assertExchange(exchange, 'direct', null, function (err) {
assert.ok(!err, 'should not error asserting exchange')

channel.assertQueue('', { exclusive: true }, function (err, res) {
assert.ok(!err, 'should not error asserting queue')
queue = res.queue

channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) {
assert.ok(!err, 'should not error binding queue')

channel.consume(
queue,
async function (msg) {
assert.ok(msg, 'should receive a message')

const body = msg.content.toString('utf8')
assert.equal(body, 'hello', 'should receive expected body')

try {
const err = new Error('async handler failure')
await new Promise((_resolve, reject) => {
setTimeout(() => {
reject(err)
}, PROMISE_WAIT)
})
assert.fail('should not resolve successfully')
} catch (err) {
assert.equal(err.message, 'async handler failure')
} finally {
channel.ack(msg)
}
},
null,
function (err) {
Expand Down
Loading
Loading