Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix message key #704

Merged
merged 4 commits into from
Jul 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
// If set true, consumer will fetch message from the given offset in the payloads
fromOffset: false,
// If set to 'buffer', values will be returned as raw buffer objects.
encoding: 'utf8'
encoding: 'utf8',
keyEncoding: 'utf-8'
}
```
Example:
Expand Down
10 changes: 8 additions & 2 deletions docker/createTopic.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ function createTopic (topicName, partitions, replicas) {
assert(partitions && partitions > 0);
assert(replicas && replicas > 0);
const topic = `${topicName}:${partitions}:${replicas}`;
const createResult = execa('docker-compose', ['exec', 'kafka', 'bash', '-c', `KAFKA_CREATE_TOPICS=${topic} KAFKA_PORT=9092 /usr/bin/create-topics.sh`]);
createResult.stdout.pipe(process.stdout);
const createResult = execa('docker-compose', [
'exec',
'kafka',
'bash',
'-c',
`KAFKA_CREATE_TOPICS=${topic} KAFKA_PORT=9092 /usr/bin/create-topics.sh`
]);
// createResult.stdout.pipe(process.stdout);
return createResult;
}

Expand Down
16 changes: 7 additions & 9 deletions lib/baseProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,8 @@ function BaseProducer (client, options, defaultPartitionerType, customPartitione
this.ready = false;
this.client = client;

this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;
this.requireAcks = options.requireAcks === undefined ? DEFAULTS.requireAcks : options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined ? DEFAULTS.ackTimeoutMs : options.ackTimeoutMs;

if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) {
throw new Error('Partitioner Type must be custom if providing a customPartitioner.');
Expand Down Expand Up @@ -125,16 +121,18 @@ BaseProducer.prototype.send = function (payloads, cb) {

BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic], 'partition'), p.key);
payloads.forEach(p => {
p.partition = p.hasOwnProperty('partition')
? p.partition
: this.partitioner.getPartition(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];

messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
return new Message(0, 0, '', message);
return new Message(0, 0, p.key, message);
});

let key = p.topic + p.partition;
Expand Down
13 changes: 10 additions & 3 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ Client.prototype.closeBrokers = function (brokers) {
});
};

function decodeValue (encoding, value) {
if (encoding !== 'buffer' && value != null) {
return value.toString(encoding);
}
return value;
}

Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var self = this;
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
Expand All @@ -186,11 +193,11 @@ Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs
}

var encoding = consumer.options.encoding;
const keyEncoding = consumer.options.keyEncoding;

if (type === 'message') {
if (encoding !== 'buffer' && message.value) {
message.value = message.value.toString(encoding);
}
message.value = decodeValue(encoding, message.value);
message.key = decodeValue(keyEncoding || encoding, message.key);

consumer.emit('message', message);
} else {
Expand Down
14 changes: 13 additions & 1 deletion lib/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ function ConsumerGroup (memberOptions, topics) {

if (this.options.migrateHLC) {
if (this.client instanceof KafkaClient) {
this.client.close(_.noop);
throw new Error('KafkaClient cannot be used to migrate from Zookeeper use Client instead');
}

Expand Down Expand Up @@ -414,6 +415,11 @@ ConsumerGroup.prototype.connect = function () {
return;
}

if (this.closed) {
logger.warn('Connect ignored. Consumer closed.');
return;
}

logger.debug('Connecting %s', this.client.clientId);
var self = this;

Expand Down Expand Up @@ -600,7 +606,13 @@ ConsumerGroup.prototype.close = function (force, cb) {
self.client.close(callback);
}
],
cb
function (error) {
if (error) {
return cb(error);
}
self.closed = true;
cb(null);
}
);
};

Expand Down
32 changes: 17 additions & 15 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ function decodeMessageSet (topic, partition, messageSet, cb, maxTickMessages, hi
.word8bs('attributes')
.word32bs('key')
.tap(function (vars) {
if (vars.key === -1) return;
if (vars.key === -1) {
vars.key = null;
return;
}
cur += vars.key;
this.buffer('key', vars.key);
})
Expand Down Expand Up @@ -409,29 +412,28 @@ function encodeMessage (message) {
var m = new Buffermaker().Int8(message.magic).Int8(message.attributes);

var key = message.key;
if (key) {
m.Int32BE(message.key.length);
m.string(message.key);
} else {
m.Int32BE(-1);
}
setValueOnBuffer(m, key);

var value = message.value;
setValueOnBuffer(m, value);

m = m.make();
var crc = crc32.signed(m);
return new Buffermaker().Int32BE(crc).string(m).make();
}

if (value !== null && value !== undefined) {
function setValueOnBuffer (buffer, value) {
if (value != null) {
if (Buffer.isBuffer(value)) {
m.Int32BE(value.length);
buffer.Int32BE(value.length);
} else {
if (typeof value !== 'string') value = value.toString();
m.Int32BE(Buffer.byteLength(value));
buffer.Int32BE(Buffer.byteLength(value));
}
m.string(value);
buffer.string(value);
} else {
m.Int32BE(-1);
buffer.Int32BE(-1);
}
m = m.make();
var crc = crc32.signed(m);
return new Buffermaker().Int32BE(crc).string(m).make();
}

function decodeProduceResponse (resp) {
Expand Down
191 changes: 191 additions & 0 deletions test/test.baseProducer.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,201 @@
'use strict';

const BaseProducer = require('../lib/baseProducer');
const ConsumerGroup = require('../lib/consumerGroup');
const KafkaClient = require('../lib/kafkaClient');
const Client = require('./mocks/mockClient');
const uuid = require('uuid');
const sinon = require('sinon');
const async = require('async');
const should = require('should');

describe('BaseProducer', function () {
describe('encoding and decoding key attribute', function () {
const KAFKA_HOST = 'localhost:9092';
let consumerGroup, topic, producer;
beforeEach(function (done) {
topic = uuid.v4();

const createTopic = require('../docker/createTopic');

async.series(
[
function (callback) {
createTopic(topic, 1, 1)
.then(function () {
callback(null);
})
.catch(error => callback(error));
},
function (callback) {
const client = new KafkaClient({
kafkaHost: KAFKA_HOST
});

producer = new BaseProducer(client, {}, BaseProducer.PARTITIONER_TYPES.default);
producer.once('ready', function () {
callback(null);
});
},
function (callback) {
consumerGroup = new ConsumerGroup(
{
kafkaHost: KAFKA_HOST,
groupId: uuid.v4()
},
topic
);
consumerGroup.once('connect', function () {
callback(null);
});
}
],
done
);
});

afterEach(function (done) {
producer.close();
consumerGroup.close(done);
});

it('verify key string value makes it into the message', function (done) {
producer.send(
[
{
topic: topic,
messages: 'this is my message',
key: 'myKeyIsHere'
}
],
function (error) {
if (error) {
done(error);
}
}
);

consumerGroup.on('message', function (message) {
message.key.should.be.exactly('myKeyIsHere');
done();
});
});

it('verify empty key string value makes it into the message', function (done) {
producer.send(
[
{
topic: topic,
messages: 'this is my message',
key: ''
}
],
function (error) {
if (error) {
done(error);
}
}
);

consumerGroup.on('message', function (message) {
message.key.should.be.exactly('');
done();
});
});

it('verify key value of 0 makes it into the message', function (done) {
producer.send(
[
{
topic: topic,
messages: 'this is my message',
key: 0
}
],
function (error) {
if (error) {
done(error);
}
}
);

consumerGroup.on('message', function (message) {
message.key.should.be.exactly('0');
done();
});
});

it('verify key value of null makes it into the message as null', function (done) {
producer.send(
[
{
topic: topic,
messages: 'this is my message',
key: null
}
],
function (error) {
if (error) {
done(error);
}
}
);

consumerGroup.on('message', function (message) {
should(message.key).be.null;
done();
});
});

it('verify key value of undefined makes it into the message as null', function (done) {
producer.send(
[
{
topic: topic,
messages: 'this is my message',
key: undefined
}
],
function (error) {
if (error) {
done(error);
}
}
);

consumerGroup.on('message', function (message) {
should(message.key).be.null;
done();
});
});

it('verify key value of buffer makes it into the message as untouched buffer', function (done) {
const keyBuffer = Buffer.from('testing123');
producer.send(
[
{
topic: topic,
messages: 'this is my message',
key: keyBuffer
}
],
function (error) {
if (error) {
done(error);
}
}
);

consumerGroup.options.keyEncoding = 'buffer';

consumerGroup.on('message', function (message) {
should(message.key).not.be.empty;
keyBuffer.equals(message.key).should.be.true;
done();
});
});
});

describe('On Brokers Changed', function () {
it('should emit error when refreshMetadata fails', function (done) {
const fakeClient = new Client();
Expand Down
2 changes: 1 addition & 1 deletion test/test.consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ describe('ConsumerGroup', function () {
new ConsumerGroup({
kafkaHost: 'localhost:9092',
migrateHLC: true
});
}, 'TestTopic');
});
});
});
Expand Down