Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for promisified consumer API #12

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@

#### Semantic and Per-Method Changes

* `sendBatch` is not supported (YET). However, the actual batching semantics are handled by librdkafka.
* Changes to `send`:
* `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration.
Before:
Expand Down Expand Up @@ -178,6 +177,8 @@
```

* Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error.
* `sendBatch` is supported. However, the actual batching semantics are handled by librdkafka, and it just acts as a wrapper around `send` (See `send` for changes).
* A transactional producer (with a `transactionId`) set, can only send messages after calling `producer.transaction()`.

### Consumer

Expand Down Expand Up @@ -219,7 +220,10 @@
#### Semantic and Per-Method Changes


* While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used.
* Changes to subscribe:
* Regex flags are ignored while passing a topic subscription (like 'i' or 'g').
* Subscribe must be called after `connect`.
* While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used.
Before:
```javascript
const kafka = new Kafka({ /* ... */ });
Expand All @@ -229,7 +233,6 @@
await consumer.connect();
await consumer.subscribe({ topics: ["topic"], fromBeginning: true});
```

After:
```javascript
const kafka = new Kafka({ /* ... */ });
Expand Down Expand Up @@ -287,8 +290,13 @@
* The `partitionsConsumedConcurrently` property is not supported (YET).
* The `eachBatch` method is not supported.
* `commitOffsets` does not (YET) support sending metadata for topic partitions being committed.
* `paused()` is not (YET) supported.
* `paused()` is supported without any changes.
* Custom partition assignors are not supported.
* Changes to `seek`:
* The restriction to call seek only after `run` is removed. It can be called any time.
* Rather than the `autoCommit` property of `run` deciding if the offset is committed, the librdkafka property `enable.auto.commit` of the consumer config is used.
* `pause` and `resume` MUST be called after the consumer group is joined. In practice, this means it can be called whenever `consumer.assignment()` has a non-zero size, or within the `eachMessage`
callback.

### Admin Client

Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ endif
NODE ?= node
CPPLINT ?= cpplint.py
BUILDTYPE ?= Release
TESTS = "test/**/*.js"
TESTS = $(ls test/producer/*.js test/*.js test/tools/*.js)
PROMISIFIED_TESTS = "test/promisified"
E2E_TESTS = $(wildcard e2e/*.spec.js)
TEST_REPORTER =
TEST_OUTPUT =
Expand All @@ -24,7 +25,7 @@ CONFIG_OUTPUTS = \

CPPLINT_FILES = $(wildcard src/*.cc src/*.h)
CPPLINT_FILTER = -legal/copyright
JSLINT_FILES = lib/*.js test/*.js e2e/*.js
JSLINT_FILES = lib/*.js test/*.js e2e/*.js lib/kafkajs/*.js

PACKAGE = $(shell node -pe 'require("./package.json").name.split("/")[1]')
VERSION = $(shell node -pe 'require("./package.json").version')
Expand Down Expand Up @@ -58,6 +59,7 @@ $(CONFIG_OUTPUTS): node_modules/.dirstamp binding.gyp

test: node_modules/.dirstamp
@./node_modules/.bin/mocha --ui exports $(TEST_REPORTER) $(TESTS) $(TEST_OUTPUT)
@./node_modules/.bin/jest --ci --runInBand $(PROMISIFIED_TESTS)

check: node_modules/.dirstamp
@$(NODE) util/test-compile.js
Expand Down
4 changes: 3 additions & 1 deletion examples/kafkajs/eos.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ async function eosStart() {
{
topic,
partitions: [
{ partition, offset: message.offset },
/* The message.offset indicates current offset, so we need to add 1 to it, since committed offset denotes
* the next offset to consume. */
{ partition, offset: message.offset + 1 },
],
}
],
Expand Down
6 changes: 6 additions & 0 deletions lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ class Admin {
* @returns {Promise<void>} Resolves when disconnect is complete, rejects on error.
*/
async disconnect() {
/* Not yet connected - no error. */
if (this.#state == AdminState.INIT) {
return;
}

/* Already disconnecting, or disconnected. */
if (this.#state >= AdminState.DISCONNECTING) {
return;
}
Expand Down
Loading