Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bf: BB-419 notification: don't wait for delivery report #2421

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extensions/notification/NotificationConfigManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class NotificationConfigManager {
= 'error setting bucket notification configuration';
this.log.error(errMsg, {
method: 'BucketNotificationConfigManager.setConfig',
error: err,
error: err.message,
bucket,
config,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,18 @@ class KafkaNotificationDestination extends NotificationDestination {
* Process entry in the sub-class and send it
*
* @param {Object[]} messages - message to be sent
* @param {function} done - callback
* @param {function} done - callback when delivery report has been received
* @return {undefined}
*/
send(messages, done) {
this._notificationProducer.send(messages, error => {
if (error) {
this._log.error('error publishing message', {
const { host, topic } = this._destinationConfig;
this._log.error('error in message delivery to external Kafka destination', {
method: 'KafkaNotificationDestination.send',
error,
host,
topic,
error: error.message,
});
}
done();
Expand Down
2 changes: 2 additions & 0 deletions extensions/notification/destination/KafkaProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class KafkaProducer extends EventEmitter {
const authObject = auth ? authUtil.generateKafkaAuthObject(auth) : {};
const producerOptions = {
'metadata.broker.list': this._kafkaHosts,
'batch.num.messages': 100000,
'queue.buffering.max.ms': 10,
'message.max.bytes': messageMaxBytes,
'dr_cb': true,
};
Expand Down
7 changes: 4 additions & 3 deletions extensions/notification/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,19 @@ class QueueProcessor extends EventEmitter {
eventTime: eventRecord.eventTime,
destination: this.destinationId,
});
return this._destination.send([msg], done);
this._destination.send([msg], () => {});
}
return done();
}
// skip if there is no bucket notification configuration
return done();
} catch (error) {
if (error) {
this.logger.err('error processing entry', {
this.logger.error('error processing entry', {
bucket,
key,
error,
error: error.message,
errorStack: error.stack,
});
}
return done();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "backbeat",
"version": "7.10.10",
"version": "7.10.11",
"description": "Asynchronous queue and job manager",
"main": "index.js",
"scripts": {
Expand Down
103 changes: 103 additions & 0 deletions tests/unit/notification/QueueProcessor.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
const assert = require('assert');
const sinon = require('sinon');

const QueueProcessor = require(
'../../../extensions/notification/queueProcessor/QueueProcessor');

describe('notification QueueProcessor', () => {
let qp;

before(done => {
qp = new QueueProcessor(null, null, null, {
host: 'external-kafka-host',
}, 'destId');
qp.bnConfigManager = {
getConfig: () => ({
notificationConfiguration: {
queueConfig: [
{
queueArn: 'arn:scality:bucketnotif:::destId',
events: [
's3:ObjectCreated:*',
],
},
],
},
}),
setConfig: () => {},
};
qp.bnConfigManager.setConfig('mybucket', {
host: 'foo',
});
qp._setupDestination('kafka', done);
});

it('should send notification to external destination and invoke callback immediately', done => {
const send = sinon.spy();
qp._destination._notificationProducer = {
send,
};
qp.processKafkaEntry({
value: JSON.stringify({
bucket: 'mybucket',
key: 'key',
eventType: 's3:ObjectCreated:Put',
value: '{}',
}),
}, err => {
assert.ifError(err);

assert(send.calledOnce);
assert(Array.isArray(send.args[0][0]));
assert(send.args[0][0][0].key === 'mybucket/key');
assert(typeof send.args[0][0][0].message === 'object');
assert(Array.isArray(send.args[0][0][0].message.Records));
assert.strictEqual(send.args[0][0][0].message.Records.length, 1);
assert.strictEqual(
send.args[0][0][0].message.Records[0].eventName,
's3:ObjectCreated:Put'
);
done();
});
});

it('should send notification to external destination with delivery error', done => {
const send = sinon.stub().callsFake((messages, cb) => cb(new Error('delivery error')));
qp._destination._notificationProducer = {
send,
};
qp.processKafkaEntry({
value: JSON.stringify({
bucket: 'mybucket',
key: 'key',
eventType: 's3:ObjectCreated:Put',
value: '{}',
}),
}, err => {
assert.ifError(err);

assert(send.calledOnce);
setTimeout(done, 100);
});
});

it('should not send an entry without "eventType" attribute', done => {
const send = sinon.spy();
qp._destination._notificationProducer = {
send,
};
qp.processKafkaEntry({
value: JSON.stringify({
bucket: 'mybucket',
key: 'key',
// no "eventType"
value: '{}',
}),
}, err => {
assert.ifError(err);
// should not have been sent
assert(!send.calledOnce);
done();
});
});
});