diff --git a/MIGRATION.md b/MIGRATION.md index d1c7e0e8..3c28761b 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -199,6 +199,7 @@ producerRun().then(consumerRun).catch(console.error); ``` * A transactional producer (with a `transactionId`) set, **cannot** send messages without initiating a transaction using `producer.transaction()`. +* While using `sendOffsets` from a transactional producer, the `consumerGroupId` argument must be omitted, and rather, the consumer object itself must be passed instead. ### Consumer diff --git a/lib/kafkajs/_common.js b/lib/kafkajs/_common.js index 73e774de..2d02ce5d 100644 --- a/lib/kafkajs/_common.js +++ b/lib/kafkajs/_common.js @@ -240,6 +240,8 @@ const CompatibilityErrorMessages = Object.freeze({ createReplacementErrorMessage('producer', fn, 'timeout', 'timeout: ', 'timeout: ', false), sendBatchMandatoryMissing: () => "The argument passed to sendbatch must be an object, and must contain the 'topicMessages' property: { topicMessages: {topic: string, messages: Message[]}[] } \n", + sendOffsetsMustProvideConsumer: () => + "The sendOffsets method must be called with a connected consumer instance and without a consumerGroupId parameter.\n", /* Consumer */ partitionAssignors: () => diff --git a/lib/kafkajs/_producer.js b/lib/kafkajs/_producer.js index cdcfb872..f2542f39 100644 --- a/lib/kafkajs/_producer.js +++ b/lib/kafkajs/_producer.js @@ -11,7 +11,6 @@ const { kafkaJSToRdKafkaConfig, CompatibilityErrorMessages, logLevel, } = require('./_common'); -const { Consumer } = require('./_consumer'); const error = require('./_error'); const { Buffer } = require('buffer'); @@ -371,7 +370,7 @@ class Producer { return; } this.#state = ProducerState.DISCONNECTED; - this.#logger.info("Producr disconnected", this.#createProducerBindingMessageMetadata()); + this.#logger.info("Producer disconnected", this.#createProducerBindingMessageMetadata()); resolve(); }; this.#internalClient.disconnect(5000 /* default timeout, 5000ms */, cb); @@ -465,17 +464,20 @@ class Producer { /** * Send offsets for the transaction. * @param {object} arg - The arguments to sendOffsets - * @param {string} arg.consumerGroupId - The consumer group id to send offsets for. * @param {Consumer} arg.consumer - The consumer to send offsets for. * @param {import("../../types/kafkajs").TopicOffsets[]} arg.topics - The topics, partitions and the offsets to send. * - * @note only one of consumerGroupId or consumer must be set. It is recommended to use `consumer`. * @returns {Promise} Resolves when the offsets are sent. */ async sendOffsets(arg) { let { consumerGroupId, topics, consumer } = arg; - if ((!consumerGroupId && !consumer) || !Array.isArray(topics) || topics.length === 0) { + /* If the user has not supplied a consumer, or supplied a consumerGroupId, throw immediately. */ + if (consumerGroupId || !consumer) { + throw new error.KafkaJSError(CompatibilityErrorMessages.sendOffsetsMustProvideConsumer(), { code: error.ErrorCodes.ERR__INVALID_ARG }); + } + + if (!Array.isArray(topics) || topics.length === 0) { throw new error.KafkaJSError("sendOffsets arguments are invalid", { code: error.ErrorCodes.ERR__INVALID_ARG }); } @@ -487,27 +489,11 @@ class Producer { throw new error.KafkaJSError("Cannot sendOffsets, no transaction ongoing.", { code: error.ErrorCodes.ERR__STATE }); } - // If we don't have a consumer, we must create a consumer at this point internally. - // This isn't exactly efficient, but we expect people to use either a consumer, - // or we will need to change the C/C++ code to facilitate using the consumerGroupId - // directly. - // TODO: Change the C/C++ code to facilitate this if we go to release with this. - - let consumerCreated = false; - if (!consumer) { - const config = Object.assign({ 'group.id': consumerGroupId }, this.rdKafkaConfig); - consumer = new Consumer(config); - consumerCreated = true; - await consumer.connect(); - } - return new Promise((resolve, reject) => { this.#internalClient.sendOffsetsToTransaction( this.#flattenTopicPartitionOffsets(topics).map(topicPartitionOffsetToRdKafka), consumer._getInternalConsumer(), async err => { - if (consumerCreated) - await consumer.disconnect(); if (err) reject(createKafkaJsErrorFromLibRdKafkaError(err)); else diff --git a/test/promisified/producer/eos.spec.js b/test/promisified/producer/eos.spec.js new file mode 100644 index 00000000..9f8f21d3 --- /dev/null +++ b/test/promisified/producer/eos.spec.js @@ -0,0 +1,156 @@ +jest.setTimeout(30000); + +const { + secureRandom, + createConsumer, + createProducer, + createTopic, + waitForMessages, +} = require('../testhelpers'); +const { ErrorCodes } = require('../../../lib').KafkaJS; + +describe('Producer > Transactional producer', () => { + let producer, basicProducer, topicName, topicName2, transactionalId, message, consumer, groupId; + + beforeEach(async () => { + topicName = `test-topic-${secureRandom()}`; + topicName2 = `test-topic2-${secureRandom()}`; + transactionalId = `transactional-id-${secureRandom()}`; + message = { key: `key-${secureRandom()}`, value: `value-${secureRandom()}` }; + groupId = `group-id-${secureRandom()}`; + + producer = createProducer({ + idempotent: true, + transactionalId, + transactionTimeout: 1000, + }); + + basicProducer = createProducer({}); + + consumer = createConsumer({ groupId, autoCommit: false, fromBeginning: true }); + + await createTopic({ topic: topicName, partitions: 1 }); + await createTopic({ topic: topicName2 }); + }); + + afterEach(async () => { + consumer && (await consumer.disconnect()); + producer && (await producer.disconnect()); + basicProducer && (await basicProducer.disconnect()); + }); + + it('fails when using consumer group id while sending offsets from transactional producer', async () => { + await producer.connect(); + await basicProducer.connect(); + await consumer.connect(); + + await basicProducer.send({ topic: topicName, messages: [message] }); + + await consumer.subscribe({ topic: topicName }); + + let messagesConsumed = []; + await consumer.run({ + eachMessage: async ({ message }) => { + const transaction = await producer.transaction(); + await transaction.send({ topic: topicName, messages: [message] }); + + await expect( + transaction.sendOffsets({ consumerGroupId: groupId })).rejects.toHaveProperty('code', ErrorCodes.ERR__INVALID_ARG); + await expect( + transaction.sendOffsets({ consumerGroupId: groupId, consumer })).rejects.toHaveProperty('code', ErrorCodes.ERR__INVALID_ARG); + + await transaction.abort(); + messagesConsumed.push(message); + } + }); + + await waitForMessages(messagesConsumed, { number: 1 }); + expect(messagesConsumed.length).toBe(1); + }); + + it('sends offsets when transaction is committed', async () => { + await producer.connect(); + await basicProducer.connect(); + await consumer.connect(); + + await basicProducer.send({ topic: topicName, messages: [message] }); + + await consumer.subscribe({ topic: topicName }); + + let messagesConsumed = []; + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + const transaction = await producer.transaction(); + await transaction.send({ topic: topicName2, messages: [message] }); + + await transaction.sendOffsets({ consumer, topics: [ + { + topic, + partitions: [ + { partition, offset: Number(message.offset) + 1 }, + ], + } + ], }); + + await transaction.commit(); + messagesConsumed.push(message); + } + }); + + await waitForMessages(messagesConsumed, { number: 1 }); + expect(messagesConsumed.length).toBe(1); + const committed = await consumer.committed(); + expect(committed).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + topic: topicName, + offset: '1', + partition: 0, + }), + ]) + ); + }); + + it('sends no offsets when transaction is aborted', async () => { + await producer.connect(); + await basicProducer.connect(); + await consumer.connect(); + + await basicProducer.send({ topic: topicName, messages: [message] }); + + await consumer.subscribe({ topic: topicName }); + + let messagesConsumed = []; + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + const transaction = await producer.transaction(); + await transaction.send({ topic: topicName2, messages: [message] }); + + await transaction.sendOffsets({ consumer, topics: [ + { + topic, + partitions: [ + { partition, offset: Number(message.offset) + 1 }, + ], + } + ], }); + + await transaction.abort(); + messagesConsumed.push(message); + } + }); + + await waitForMessages(messagesConsumed, { number: 1 }); + expect(messagesConsumed.length).toBe(1); + const committed = await consumer.committed(); + expect(committed).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + topic: topicName, + offset: null, + partition: 0, + }), + ]) + ); + }); +}); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index bf233db1..259fe07f 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -168,7 +168,7 @@ export type Producer = Client & { transaction(): Promise commit(): Promise abort(): Promise - sendOffsets(args: { consumerGroupId?: string, consumer?: Consumer, topics: TopicOffsets[] }): Promise + sendOffsets(args: { consumer: Consumer, topics: TopicOffsets[] }): Promise isActive(): boolean }