Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ producerRun().then(consumerRun).catch(console.error);
```

* A transactional producer (with a `transactionId`) set, **cannot** send messages without initiating a transaction using `producer.transaction()`.
* While using `sendOffsets` from a transactional producer, the `consumerGroupId` argument must be omitted, and rather, the consumer object itself must be passed instead.

### Consumer

Expand Down
2 changes: 2 additions & 0 deletions lib/kafkajs/_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ const CompatibilityErrorMessages = Object.freeze({
createReplacementErrorMessage('producer', fn, 'timeout', 'timeout: <number>', 'timeout: <number>', false),
sendBatchMandatoryMissing: () =>
"The argument passed to sendbatch must be an object, and must contain the 'topicMessages' property: { topicMessages: {topic: string, messages: Message[]}[] } \n",
sendOffsetsMustProvideConsumer: () =>
"The sendOffsets method must be called with a connected consumer instance and without a consumerGroupId parameter.\n",

/* Consumer */
partitionAssignors: () =>
Expand Down
28 changes: 7 additions & 21 deletions lib/kafkajs/_producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const { kafkaJSToRdKafkaConfig,
CompatibilityErrorMessages,
logLevel,
} = require('./_common');
const { Consumer } = require('./_consumer');
const error = require('./_error');
const { Buffer } = require('buffer');

Expand Down Expand Up @@ -371,7 +370,7 @@ class Producer {
return;
}
this.#state = ProducerState.DISCONNECTED;
this.#logger.info("Producr disconnected", this.#createProducerBindingMessageMetadata());
this.#logger.info("Producer disconnected", this.#createProducerBindingMessageMetadata());
resolve();
};
this.#internalClient.disconnect(5000 /* default timeout, 5000ms */, cb);
Expand Down Expand Up @@ -465,17 +464,20 @@ class Producer {
/**
* Send offsets for the transaction.
* @param {object} arg - The arguments to sendOffsets
* @param {string} arg.consumerGroupId - The consumer group id to send offsets for.
* @param {Consumer} arg.consumer - The consumer to send offsets for.
* @param {import("../../types/kafkajs").TopicOffsets[]} arg.topics - The topics, partitions and the offsets to send.
*
* @note only one of consumerGroupId or consumer must be set. It is recommended to use `consumer`.
* @returns {Promise<void>} Resolves when the offsets are sent.
*/
async sendOffsets(arg) {
let { consumerGroupId, topics, consumer } = arg;

if ((!consumerGroupId && !consumer) || !Array.isArray(topics) || topics.length === 0) {
/* If the user has not supplied a consumer, or supplied a consumerGroupId, throw immediately. */
if (consumerGroupId || !consumer) {
throw new error.KafkaJSError(CompatibilityErrorMessages.sendOffsetsMustProvideConsumer(), { code: error.ErrorCodes.ERR__INVALID_ARG });
}

if (!Array.isArray(topics) || topics.length === 0) {
throw new error.KafkaJSError("sendOffsets arguments are invalid", { code: error.ErrorCodes.ERR__INVALID_ARG });
}

Expand All @@ -487,27 +489,11 @@ class Producer {
throw new error.KafkaJSError("Cannot sendOffsets, no transaction ongoing.", { code: error.ErrorCodes.ERR__STATE });
}

// If we don't have a consumer, we must create a consumer at this point internally.
// This isn't exactly efficient, but we expect people to use either a consumer,
// or we will need to change the C/C++ code to facilitate using the consumerGroupId
// directly.
// TODO: Change the C/C++ code to facilitate this if we go to release with this.

let consumerCreated = false;
if (!consumer) {
const config = Object.assign({ 'group.id': consumerGroupId }, this.rdKafkaConfig);
consumer = new Consumer(config);
consumerCreated = true;
await consumer.connect();
}

return new Promise((resolve, reject) => {
this.#internalClient.sendOffsetsToTransaction(
this.#flattenTopicPartitionOffsets(topics).map(topicPartitionOffsetToRdKafka),
consumer._getInternalConsumer(),
async err => {
if (consumerCreated)
await consumer.disconnect();
if (err)
reject(createKafkaJsErrorFromLibRdKafkaError(err));
else
Expand Down
156 changes: 156 additions & 0 deletions test/promisified/producer/eos.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
jest.setTimeout(30000);

const {
secureRandom,
createConsumer,
createProducer,
createTopic,
waitForMessages,
} = require('../testhelpers');
const { ErrorCodes } = require('../../../lib').KafkaJS;

describe('Producer > Transactional producer', () => {
let producer, basicProducer, topicName, topicName2, transactionalId, message, consumer, groupId;

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`;
topicName2 = `test-topic2-${secureRandom()}`;
transactionalId = `transactional-id-${secureRandom()}`;
message = { key: `key-${secureRandom()}`, value: `value-${secureRandom()}` };
groupId = `group-id-${secureRandom()}`;

producer = createProducer({
idempotent: true,
transactionalId,
transactionTimeout: 1000,
});

basicProducer = createProducer({});

consumer = createConsumer({ groupId, autoCommit: false, fromBeginning: true });

await createTopic({ topic: topicName, partitions: 1 });
await createTopic({ topic: topicName2 });
});

afterEach(async () => {
consumer && (await consumer.disconnect());
producer && (await producer.disconnect());
basicProducer && (await basicProducer.disconnect());
});

it('fails when using consumer group id while sending offsets from transactional producer', async () => {
await producer.connect();
await basicProducer.connect();
await consumer.connect();

await basicProducer.send({ topic: topicName, messages: [message] });

await consumer.subscribe({ topic: topicName });

let messagesConsumed = [];
await consumer.run({
eachMessage: async ({ message }) => {
const transaction = await producer.transaction();
await transaction.send({ topic: topicName, messages: [message] });

await expect(
transaction.sendOffsets({ consumerGroupId: groupId })).rejects.toHaveProperty('code', ErrorCodes.ERR__INVALID_ARG);
await expect(
transaction.sendOffsets({ consumerGroupId: groupId, consumer })).rejects.toHaveProperty('code', ErrorCodes.ERR__INVALID_ARG);

await transaction.abort();
messagesConsumed.push(message);
}
});

await waitForMessages(messagesConsumed, { number: 1 });
expect(messagesConsumed.length).toBe(1);
});

it('sends offsets when transaction is committed', async () => {
await producer.connect();
await basicProducer.connect();
await consumer.connect();

await basicProducer.send({ topic: topicName, messages: [message] });

await consumer.subscribe({ topic: topicName });

let messagesConsumed = [];
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const transaction = await producer.transaction();
await transaction.send({ topic: topicName2, messages: [message] });

await transaction.sendOffsets({ consumer, topics: [
{
topic,
partitions: [
{ partition, offset: Number(message.offset) + 1 },
],
}
], });

await transaction.commit();
messagesConsumed.push(message);
}
});

await waitForMessages(messagesConsumed, { number: 1 });
expect(messagesConsumed.length).toBe(1);
const committed = await consumer.committed();
expect(committed).toEqual(
expect.arrayContaining([
expect.objectContaining({
topic: topicName,
offset: '1',
partition: 0,
}),
])
);
});

it('sends no offsets when transaction is aborted', async () => {
await producer.connect();
await basicProducer.connect();
await consumer.connect();

await basicProducer.send({ topic: topicName, messages: [message] });

await consumer.subscribe({ topic: topicName });

let messagesConsumed = [];
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const transaction = await producer.transaction();
await transaction.send({ topic: topicName2, messages: [message] });

await transaction.sendOffsets({ consumer, topics: [
{
topic,
partitions: [
{ partition, offset: Number(message.offset) + 1 },
],
}
], });

await transaction.abort();
messagesConsumed.push(message);
}
});

await waitForMessages(messagesConsumed, { number: 1 });
expect(messagesConsumed.length).toBe(1);
const committed = await consumer.committed();
expect(committed).toEqual(
expect.arrayContaining([
expect.objectContaining({
topic: topicName,
offset: null,
partition: 0,
}),
])
);
});
});
2 changes: 1 addition & 1 deletion types/kafkajs.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export type Producer = Client & {
transaction(): Promise<Transaction>
commit(): Promise<void>
abort(): Promise<void>
sendOffsets(args: { consumerGroupId?: string, consumer?: Consumer, topics: TopicOffsets[] }): Promise<void>
sendOffsets(args: { consumer: Consumer, topics: TopicOffsets[] }): Promise<void>
isActive(): boolean
}

Expand Down