Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get correct next offset for compacted topics #577

Merged
merged 8 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ pr:
variables:
- group: website_secrets
- name: KAFKA_VERSION
value: 2.2
value: 2.3
- name: COMPOSE_FILE
value: docker-compose.2_2.yml
value: docker-compose.2_3.yml

####### Linter
jobs:
Expand Down
146 changes: 146 additions & 0 deletions docker-compose.2_3.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/server-jaas.conf -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider"
volumes:
- ./testHelpers/kafka/server-jaas.conf:/etc/kafka/server-jaas.conf

kafka1:
image: confluentinc/cp-kafka:5.3.1
hostname: kafka1
container_name: kafka1
labels:
- "custom.project=kafkajs"
- "custom.service=kafka1"
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "29093:29093"
- "9093:9093"
- "29094:29094"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_SSL_KEYSTORE_FILENAME: "kafka.server.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "sslkey_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "truststore_creds"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512"
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf"
# suppress verbosity
# https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./testHelpers/certs/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks
- ./testHelpers/certs/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks
- ./testHelpers/certs/keystore_creds:/etc/kafka/secrets/keystore_creds
- ./testHelpers/certs/sslkey_creds:/etc/kafka/secrets/sslkey_creds
- ./testHelpers/certs/truststore_creds:/etc/kafka/secrets/truststore_creds
- ./testHelpers/kafka/server-jaas.conf:/opt/kafka/config/server-jaas.conf

kafka2:
image: confluentinc/cp-kafka:5.3.1
hostname: kafka2
container_name: kafka2
labels:
- "custom.project=kafkajs"
- "custom.service=kafka2"
depends_on:
- zookeeper
ports:
- "29095:29095"
- "9095:9095"
- "29096:29096"
- "9096:9096"
- "29097:29097"
- "9097:9097"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_SSL_KEYSTORE_FILENAME: "kafka.server.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "sslkey_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "truststore_creds"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512"
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf"
# suppress verbosity
# https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./testHelpers/certs/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks
- ./testHelpers/certs/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks
- ./testHelpers/certs/keystore_creds:/etc/kafka/secrets/keystore_creds
- ./testHelpers/certs/sslkey_creds:/etc/kafka/secrets/sslkey_creds
- ./testHelpers/certs/truststore_creds:/etc/kafka/secrets/truststore_creds
- ./testHelpers/kafka/server-jaas.conf:/opt/kafka/config/server-jaas.conf

kafka3:
image: confluentinc/cp-kafka:5.3.1
hostname: kafka3
container_name: kafka3
labels:
- "custom.project=kafkajs"
- "custom.service=kafka3"
depends_on:
- zookeeper
ports:
- "29098:29098"
- "9098:9098"
- "29099:29099"
- "9099:9099"
- "29100:29100"
- "9100:9100"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_SSL_KEYSTORE_FILENAME: "kafka.server.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "sslkey_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "truststore_creds"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512"
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf"
# suppress verbosity
# https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./testHelpers/certs/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks
- ./testHelpers/certs/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks
- ./testHelpers/certs/keystore_creds:/etc/kafka/secrets/keystore_creds
- ./testHelpers/certs/sslkey_creds:/etc/kafka/secrets/sslkey_creds
- ./testHelpers/certs/truststore_creds:/etc/kafka/secrets/truststore_creds
- ./testHelpers/kafka/server-jaas.conf:/opt/kafka/config/server-jaas.conf
6 changes: 3 additions & 3 deletions docs/DevelopmentEnvironment.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ For testing KafkaJS we use a multi-broker Kafka cluster as well as Zookeeper for
This boots the Kafka cluster using the default docker-compose.yml file described in [scripts/dockerComposeUp.sh](https://github.com/tulios/kafkajs/blob/master/scripts/dockerComposeUp.sh). If you want to run a different version of Kafka, specify a different compose file using the `COMPOSE_FILE` environment variable:

```sh
COMPOSE_FILE="docker-compose.2_2.yml" ./scripts/dockerComposeUp.sh
COMPOSE_FILE="docker-compose.2_3.yml" ./scripts/dockerComposeUp.sh
```

If you run `docker-compose -f docker-compose.2_2.yml ps` (specify whichever compose file you used in the step above), you should see something like:
If you run `docker-compose -f docker-compose.2_3.yml ps` (specify whichever compose file you used in the step above), you should see something like:

```sh
$ docker-compose -f docker-compose.2_2.yml ps
$ docker-compose -f docker-compose.2_3.yml ps
WARNING: The HOST_IP variable is not set. Defaulting to a blank string.
Name Command State Ports
----------------------------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion docs/DockerLocal.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ You will now be able to connect to your Kafka broker at `$(HOST_IP):9092`. See t

## SSL & authentication methods

To configure Kafka to use SSL and/or authentication methods such as SASL, see [docker-compose.yml](https://github.com/tulios/kafkajs/blob/master/docker-compose.2_2.yml). This configuration is used while developing KafkaJS, and is more complicated to set up, but may give you a more production-like development environment.
To configure Kafka to use SSL and/or authentication methods such as SASL, see [docker-compose.yml](https://github.com/tulios/kafkajs/blob/master/docker-compose.2_3.yml). This configuration is used while developing KafkaJS, and is more complicated to set up, but may give you a more production-like development environment.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
},
"homepage": "https://kafka.js.org",
"scripts": {
"jest": "export KAFKA_VERSION=${KAFKA_VERSION:='2.2'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest",
"jest": "export KAFKA_VERSION=${KAFKA_VERSION:='2.3'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest",
"test:local": "yarn jest --detectOpenHandles",
"test:debug": "NODE_ENV=test KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch",
"test:local:watch": "yarn test:local --watch",
Expand Down
2 changes: 1 addition & 1 deletion scripts/dockerComposeUp.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash -e

COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_2.yml"}
COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_3.yml"}

echo "Running compose file: ${COMPOSE_FILE}:"
docker-compose -f "${COMPOSE_FILE}" up --force-recreate -d
Expand Down
2 changes: 1 addition & 1 deletion scripts/testWithKafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
testCommand="$1"
extraArgs="$2"

export COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_2.yml"}
export COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_3.yml"}
export KAFKAJS_DEBUG_PROTOCOL_BUFFERS=${KAFKAJS_DEBUG_PROTOCOL_BUFFERS:=1}

find_container_id() {
Expand Down
18 changes: 17 additions & 1 deletion src/broker/__tests__/describeConfigs.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describe('Broker > describeConfigs', () => {
},
{
configName: 'message.format.version',
configValue: expect.stringMatching(/^(0\.11\.0-IV2|1\.1-IV0|2\.2-IV1)$/),
configValue: expect.stringMatching(/^(0\.11\.0-IV2|1\.1-IV0|2\.[2,3]-IV1)$/),
isDefault: false,
isSensitive: false,
readOnly: false,
Expand All @@ -138,6 +138,22 @@ describe('Broker > describeConfigs', () => {
readOnly: false,
configSynonyms: [],
},
{
configName: 'max.compaction.lag.ms',
configSynonyms: [],
configValue: '9223372036854775807',
isDefault: false,
isSensitive: false,
readOnly: false,
},
{
configName: 'message.downconversion.enable',
configSynonyms: [],
configValue: 'true',
isDefault: false,
isSensitive: false,
readOnly: false,
},
{
configName: 'max.message.bytes',
configValue: '1000012',
Expand Down
2 changes: 1 addition & 1 deletion src/broker/__tests__/findGroupCoordinator.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('Broker > FindGroupCoordinator', () => {

expect(response).toEqual({
errorCode: 0,
errorMessage: null,
errorMessage: expect.toBeOneOf([null, 'NONE']),
throttleTime: 0,
coordinator: {
nodeId: expect.any(Number),
Expand Down
54 changes: 37 additions & 17 deletions src/consumer/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,27 @@ const filterAbortedMessages = require('./filterAbortedMessages')
*/
module.exports = class Batch {
constructor(topic, fetchedOffset, partitionData) {
const longFetchedOffset = Long.fromValue(fetchedOffset)
const { abortedTransactions } = partitionData
this.fetchedOffset = fetchedOffset
const longFetchedOffset = Long.fromValue(this.fetchedOffset)
const { abortedTransactions, messages } = partitionData

this.topic = topic
this.partition = partitionData.partition
this.highWatermark = partitionData.highWatermark

this.rawMessages = messages
// Apparently fetch can return different offsets than the target offset provided to the fetch API.
// Discard messages that are not in the requested offset
// https://github.com/apache/kafka/blob/bf237fa7c576bd141d78fdea9f17f65ea269c290/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L912
const messagesWithinOffset = partitionData.messages.filter(message =>
this.messagesWithinOffset = this.rawMessages.filter(message =>
Long.fromValue(message.offset).gte(longFetchedOffset)
)
this.unfilteredMessages = messagesWithinOffset

// 1. Don't expose aborted messages
// 2. Don't expose control records
// @see https://kafka.apache.org/documentation/#controlbatch
this.messages = filterAbortedMessages({
messages: messagesWithinOffset,
messages: this.messagesWithinOffset,
abortedTransactions,
}).filter(message => !message.isControlRecord)
}
Expand All @@ -37,33 +38,52 @@ module.exports = class Batch {
}

isEmptyIncludingFiltered() {
return this.unfilteredMessages.length === 0
return this.messagesWithinOffset.length === 0
}

isEmptyControlRecord() {
return this.isEmpty() && this.unfilteredMessages.some(({ isControlRecord }) => isControlRecord)
return (
this.isEmpty() && this.messagesWithinOffset.some(({ isControlRecord }) => isControlRecord)
)
}

/**
* With compressed messages, it's possible for the returned messages to have offsets smaller than the starting offset.
* These messages will be filtered out (i.e. they are not even included in this.messagesWithinOffset)
* If these are the only messages, the batch will appear as an empty batch.
*
* isEmpty() and isEmptyIncludingFiltered() will always return true if the batch is empty,
* but this method will only return true if the batch is empty due to log compacted messages.
*
* @returns boolean True if the batch is empty, because of log compacted messages in the partition.
*/
isEmptyDueToLogCompactedMessages() {
const hasMessages = this.rawMessages.length > 0
return hasMessages && this.isEmptyIncludingFiltered()
}

firstOffset() {
return this.isEmptyIncludingFiltered() ? null : this.unfilteredMessages[0].offset
return this.isEmptyIncludingFiltered() ? null : this.messagesWithinOffset[0].offset
}

lastOffset() {
return this.isEmptyIncludingFiltered()
? Long.fromValue(this.highWatermark)
.add(-1)
.toString()
: this.unfilteredMessages[this.unfilteredMessages.length - 1].offset
if (this.isEmptyDueToLogCompactedMessages()) {
return this.fetchedOffset
}

if (this.isEmptyIncludingFiltered()) {
return Long.fromValue(this.highWatermark)
.add(-1)
.toString()
}

return this.messagesWithinOffset[this.messagesWithinOffset.length - 1].offset
}

/**
* Returns the lag based on the last offset in the batch (also known as "high")
*/
offsetLag() {
if (this.isEmptyIncludingFiltered()) {
return '0'
}

const lastOffsetOfPartition = Long.fromValue(this.highWatermark).add(-1)
const lastConsumedOffset = Long.fromValue(this.lastOffset())
return lastOffsetOfPartition.add(lastConsumedOffset.multiply(-1)).toString()
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ module.exports = class ConsumerGroup {
*
* @see https://github.com/apache/kafka/blob/9aa660786e46c1efbf5605a6a69136a1dac6edb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1499-L1505
*/
if (batch.isEmptyControlRecord()) {
if (batch.isEmptyControlRecord() || batch.isEmptyDueToLogCompactedMessages()) {
this.resolveOffset({
topic: batch.topic,
partition: batch.partition,
Expand Down