Skip to content

Commit b7fd389

Browse files
committed
Add async lock for consume/disconnect
1 parent 962defb commit b7fd389

File tree

6 files changed

+156
-31
lines changed

6 files changed

+156
-31
lines changed

LICENSE.kafkajs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
The promisified API (lib/kafkajs) is inspired by kafkajs (github.com/tulios/kafkajs).
22
The promisified tests (test/promisified) are also adapted from there.
3+
An async lock implementation and many error types are also adapted from there.
34
The license notice is reproduced below.
45

56
----

lib/kafkajs/_common.js

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,83 @@ function notImplemented(msg = 'Not implemented') {
540540
throw new error.KafkaJSError(msg, { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
541541
}
542542

543+
/* Code from the async lock is from github.com/tulios/kafkajs.
544+
* For more details, see LICENSE.kafkajs */
545+
const LockStates = Object.freeze({
546+
LOCKED: 'locked',
547+
TIMEOUT: 'timeout',
548+
WAITING: 'waiting',
549+
TIMEOUT_ERROR_MESSAGE: 'timeoutErrorMessage',
550+
});
551+
552+
class Lock {
553+
constructor({ timeout, description = null } = {}) {
554+
if (typeof timeout !== 'number') {
555+
throw new TypeError(`'timeout' is not a number, received '${typeof timeout}'`);
556+
}
557+
558+
this[LockStates.LOCKED] = false;
559+
this[LockStates.TIMEOUT] = timeout;
560+
this[LockStates.WAITING] = new Set();
561+
this[LockStates.TIMEOUT_ERROR_MESSAGE] = () => {
562+
const timeoutMessage = `Timeout while acquiring lock (${this[LockStates.WAITING].size} waiting locks)`;
563+
return description ? `${timeoutMessage}: "${description}"` : timeoutMessage;
564+
}
565+
}
566+
567+
async acquire() {
568+
return new Promise((resolve, reject) => {
569+
if (!this[LockStates.LOCKED]) {
570+
this[LockStates.LOCKED] = true;
571+
return resolve();
572+
}
573+
574+
let timeoutId = null;
575+
const tryToAcquire = async () => {
576+
if (!this[LockStates.LOCKED]) {
577+
this[LockStates.LOCKED] = true;
578+
clearTimeout(timeoutId);
579+
this[LockStates.WAITING].delete(tryToAcquire);
580+
return resolve();
581+
}
582+
}
583+
584+
this[LockStates.WAITING].add(tryToAcquire);
585+
timeoutId = setTimeout(() => {
586+
// The message should contain the number of waiters _including_ this one
587+
const e = new error.KafkaJSLockTimeout(this[LockStates.TIMEOUT_ERROR_MESSAGE]());
588+
this[LockStates.WAITING].delete(tryToAcquire);
589+
reject(e);
590+
}, this[LockStates.TIMEOUT]);
591+
})
592+
}
593+
594+
async release() {
595+
this[LockStates.LOCKED] = false;
596+
const waitingLock = this[LockStates.WAITING].values().next().value;
597+
598+
if (waitingLock) {
599+
return waitingLock();
600+
}
601+
}
602+
}
603+
604+
/**
605+
* Acquires a lock, or logs an error if it fails.
606+
* @param {Lock} lock
607+
* @param {import("../../types/kafkajs").Logger} logger
608+
* @returns {boolean} true if the lock was acquired, false otherwise.
609+
*/
610+
async function acquireOrLog(lock, logger) {
611+
try {
612+
await lock.acquire();
613+
return true;
614+
} catch (e) {
615+
logger.error(`Failed to acquire lock: ${e.message}`);
616+
}
617+
return false;
618+
}
619+
543620
module.exports = {
544621
kafkaJSToRdKafkaConfig,
545622
topicPartitionOffsetToRdKafka,
@@ -553,5 +630,7 @@ module.exports = {
553630
CompatibilityErrorMessages,
554631
severityToLogLevel,
555632
checkAllowedKeys,
556-
checkIfKafkaJsKeysPresent
633+
checkIfKafkaJsKeysPresent,
634+
Lock,
635+
acquireOrLog,
557636
};

lib/kafkajs/_consumer.js

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ const {
1111
CompatibilityErrorMessages,
1212
severityToLogLevel,
1313
checkAllowedKeys,
14-
logLevel
14+
logLevel,
15+
Lock,
16+
acquireOrLog
1517
} = require('./_common');
1618
const { Buffer } = require('buffer');
1719

@@ -91,6 +93,15 @@ class Consumer {
9193
*/
9294
#logger = new DefaultLogger();
9395

96+
/**
97+
* A lock for consuming and disconnecting.
98+
* This lock should be held whenever we want to change the state from CONNECTED to any state other than CONNECTED.
99+
* In practical terms, this lock is held whenever we're consuming a message, or disconnecting.
100+
* We set the timeout to 5 seconds, after which we log an error, but keep trying to acquire the lock.
101+
* @type {Lock}
102+
*/
103+
#lock = new Lock({ timeout: 5000 });
104+
94105
/**
95106
* @constructor
96107
* @param {import("../../types/kafkajs").ConsumerConfig} kJSConfig
@@ -575,21 +586,21 @@ class Consumer {
575586
/* Internal polling loop. It accepts the same config object that `run` accepts. */
576587
async #runInternal(config) {
577588
while (this.#state === ConsumerState.CONNECTED) {
589+
590+
/* We need to acquire a lock here, because we need to ensure that we don't
591+
* disconnect while in the middle of processing a message. */
592+
if (!(await acquireOrLog(this.#lock, this.#logger)))
593+
continue;
594+
578595
const m = await this.#consumeSingle().catch(e => {
579596
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
580597
* This is due to restartOnFailure being set to always true. */
581598
if (this.#logger)
582599
this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${JSON.stringify(e)}`);
583600
});
584601

585-
/* It's possible for the state to change as we await something.
586-
* Unfortunately, we have no alternative but to break and drop the message, if it exists.
587-
* TODO: fix this, possibly with a flag in disconnect that waits until the run loop winds down.
588-
*/
589-
if (this.#state !== ConsumerState.CONNECTED)
590-
break;
591-
592602
if (!m) {
603+
this.#lock.release();
593604
continue;
594605
}
595606

@@ -608,27 +619,15 @@ class Consumer {
608619
const invalidateMessage = await this.#seekInternal({ topic: m.topic, partition: m.partition });
609620
if (invalidateMessage) {
610621
/* Don't pass this message on to the user if this topic partition was seeked to. */
622+
this.#lock.release();
611623
continue;
612624
}
613-
614-
/* It's possible for the state to change as we await something.
615-
* Unfortunately, we have no alternative but to break and drop the message.
616-
* TODO: fix this, possibly with a flag in disconnect that waits until the run loop winds down.
617-
*/
618-
if (this.#state !== ConsumerState.CONNECTED)
619-
break;
620625
}
621626

622627
try {
623628
await config.eachMessage(
624629
this.#createPayload(m)
625630
)
626-
627-
/* It's possible for the state to change as we await something.
628-
* Unfortunately, we have no alternative but to break without taking any action that the user might need.
629-
*/
630-
if (this.#state !== ConsumerState.CONNECTED)
631-
break;
632631
} catch (e) {
633632
/* It's not only possible, but expected that an error will be thrown by eachMessage.
634633
* This is especially true since the pattern of pause() followed by throwing an error
@@ -645,7 +644,7 @@ class Consumer {
645644
/* Force a immediate seek here. It's possible that there are no more messages to be passed to the user,
646645
* but the user seeked in the call to eachMessage, or else we encountered the error catch block.
647646
* In that case, the results of that seek will never be reflected unless we do this. */
648-
if (this.#checkPendingSeeks && this.#state === ConsumerState.CONNECTED)
647+
if (this.#checkPendingSeeks)
649648
await this.#seekInternal();
650649

651650
/* TODO: another check we need to do here is to see how kafkaJS is handling
@@ -654,13 +653,8 @@ class Consumer {
654653
* inside this function.
655654
*/
656655

657-
/* Yield for a bit to allow other scheduled tasks on the event loop to run.
658-
* For instance, if disconnect() is called during/after we await eachMessage, and
659-
* we don't await anything else after that, this loop will run despite needing to
660-
* disconnect.
661-
* It's better than any pending tasks be processed here, while we've processed one
662-
* message completely, rather than between message processing. */
663-
await new Promise((resolve) => setTimeout(resolve, 0));
656+
/* Release the lock so that any pending disconnect can go through. */
657+
await this.#lock.release();
664658
}
665659
}
666660

@@ -952,7 +946,13 @@ class Consumer {
952946
return;
953947
}
954948

949+
while (!(await acquireOrLog(this.#lock, this.#logger))); /* Just retry... */
950+
955951
this.#state = ConsumerState.DISCONNECTING;
952+
953+
/* Since there are state-checks before everything, we are safe to proceed without the lock. */
954+
await this.#lock.release();
955+
956956
await new Promise((resolve, reject) => {
957957
const cb = (err) => {
958958
if (err) {

lib/kafkajs/_error.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,13 @@ class KafkaJSTimeout extends KafkaJSError {
130130
}
131131
}
132132

133+
class KafkaJSLockTimeout extends KafkaJSTimeout {
134+
constructor() {
135+
super(...arguments)
136+
this.name = 'KafkaJSLockTimeout'
137+
}
138+
}
139+
133140
/**
134141
* @typedef {Object} KafkaJSAggregateError represents an error raised when multiple errors occur at once.
135142
*/
@@ -179,6 +186,7 @@ module.exports = {
179186
KafkaJSGroupCoordinatorNotFound,
180187
KafkaJSNotImplemented,
181188
KafkaJSTimeout,
189+
KafkaJSLockTimeout,
182190
KafkaJSAggregateError,
183191
KafkaJSNoBrokerAvailableError,
184192
isRebalancing,

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ jest.setTimeout(30000)
22

33
const { ErrorCodes, CompressionTypes } = require('../../../lib').KafkaJS;
44

5+
const { doesNotMatch } = require('assert');
56
const {
67
secureRandom,
78
createTopic,
@@ -443,6 +444,43 @@ describe('Consumer', () => {
443444
expect(offsetsConsumed.length).toEqual(messages.length)
444445
});
445446

447+
it('does not disconnect in the middle of message processing', async () => {
448+
await producer.connect();
449+
await consumer.connect();
450+
await consumer.subscribe({ topic: topicName });
451+
452+
let calls = 0;
453+
let failedSeek = false;
454+
consumer.run({
455+
eachMessage: async ({ message }) => {
456+
/* Take a long time to process the message. */
457+
await sleep(7000);
458+
try {
459+
consumer.seek({ topic: topicName, partition: 0, offset: message.offset });
460+
} catch (e) {
461+
failedSeek = true;
462+
}
463+
calls++;
464+
}
465+
});
466+
467+
await producer.send({
468+
topic: topicName,
469+
messages: [{ key: '1', value: '1' }],
470+
});
471+
472+
/* Waiting for assignment and then a bit more means that the first eachMessage starts running. */
473+
await waitFor(() => consumer.assignment().length > 0, () => { }, { delay: 50 });
474+
await sleep(200);
475+
await consumer.disconnect();
476+
477+
/* Even without explicitly waiting for it, a pending call to eachMessage must complete before disconnect does. */
478+
expect(calls).toEqual(1);
479+
expect(failedSeek).toEqual(false);
480+
481+
await producer.disconnect();
482+
});
483+
446484
describe('transactions', () => {
447485
it('accepts messages from an idempotent producer', async () => {
448486
producer = createProducer({

test/promisified/testhelpers.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
const crypto = require('crypto');
22
const process = require('process');
3-
const { logLevel } = require('../../lib/kafkajs');
43
const { Kafka } = require('../../lib').KafkaJS;
54

65
// TODO: pick this up from a file

0 commit comments

Comments
 (0)