diff --git a/azure-pipelines.yml b/azure-pipelines.yml index c98b40d87..0495b3f74 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -17,9 +17,9 @@ pr: variables: - group: website_secrets - name: KAFKA_VERSION - value: 2.2 + value: 2.3 - name: COMPOSE_FILE - value: docker-compose.2_2.yml + value: docker-compose.2_3.yml ####### Linter jobs: diff --git a/docker-compose.2_3.yml b/docker-compose.2_3.yml new file mode 100644 index 000000000..7ec5ddf58 --- /dev/null +++ b/docker-compose.2_3.yml @@ -0,0 +1,146 @@ +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/server-jaas.conf -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider" + volumes: + - ./testHelpers/kafka/server-jaas.conf:/etc/kafka/server-jaas.conf + + kafka1: + image: confluentinc/cp-kafka:5.3.1 + hostname: kafka1 + container_name: kafka1 + labels: + - "custom.project=kafkajs" + - "custom.service=kafka1" + depends_on: + - zookeeper + ports: + - "29092:29092" + - "9092:9092" + - "29093:29093" + - "9093:9093" + - "29094:29094" + - "9094:9094" + environment: + KAFKA_BROKER_ID: 0 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_SSL_KEYSTORE_FILENAME: "kafka.server.keystore.jks" + KAFKA_SSL_KEYSTORE_CREDENTIALS: "keystore_creds" + KAFKA_SSL_KEY_CREDENTIALS: "sslkey_creds" + KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.server.truststore.jks" + KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "truststore_creds" + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN" + KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512" + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf" + # suppress verbosity + # https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./testHelpers/certs/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks + - ./testHelpers/certs/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks + - ./testHelpers/certs/keystore_creds:/etc/kafka/secrets/keystore_creds + - ./testHelpers/certs/sslkey_creds:/etc/kafka/secrets/sslkey_creds + - ./testHelpers/certs/truststore_creds:/etc/kafka/secrets/truststore_creds + - ./testHelpers/kafka/server-jaas.conf:/opt/kafka/config/server-jaas.conf + + kafka2: + image: confluentinc/cp-kafka:5.3.1 + hostname: kafka2 + container_name: kafka2 + labels: + - "custom.project=kafkajs" + - "custom.service=kafka2" + depends_on: + - zookeeper + ports: + - "29095:29095" + - "9095:9095" + - "29096:29096" + - "9096:9096" + - "29097:29097" + - "9097:9097" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_SSL_KEYSTORE_FILENAME: "kafka.server.keystore.jks" + KAFKA_SSL_KEYSTORE_CREDENTIALS: "keystore_creds" + KAFKA_SSL_KEY_CREDENTIALS: "sslkey_creds" + KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.server.truststore.jks" + KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "truststore_creds" + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN" + KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512" + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf" + # suppress verbosity + # https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./testHelpers/certs/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks + - ./testHelpers/certs/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks + - ./testHelpers/certs/keystore_creds:/etc/kafka/secrets/keystore_creds + - ./testHelpers/certs/sslkey_creds:/etc/kafka/secrets/sslkey_creds + - ./testHelpers/certs/truststore_creds:/etc/kafka/secrets/truststore_creds + - ./testHelpers/kafka/server-jaas.conf:/opt/kafka/config/server-jaas.conf + + kafka3: + image: confluentinc/cp-kafka:5.3.1 + hostname: kafka3 + container_name: kafka3 + labels: + - "custom.project=kafkajs" + - "custom.service=kafka3" + depends_on: + - zookeeper + ports: + - "29098:29098" + - "9098:9098" + - "29099:29099" + - "9099:9099" + - "29100:29100" + - "9100:9100" + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_SSL_KEYSTORE_FILENAME: "kafka.server.keystore.jks" + KAFKA_SSL_KEYSTORE_CREDENTIALS: "keystore_creds" + KAFKA_SSL_KEY_CREDENTIALS: "sslkey_creds" + KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.server.truststore.jks" + KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "truststore_creds" + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN" + KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512" + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf" + # suppress verbosity + # https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./testHelpers/certs/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks + - ./testHelpers/certs/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks + - ./testHelpers/certs/keystore_creds:/etc/kafka/secrets/keystore_creds + - ./testHelpers/certs/sslkey_creds:/etc/kafka/secrets/sslkey_creds + - ./testHelpers/certs/truststore_creds:/etc/kafka/secrets/truststore_creds + - ./testHelpers/kafka/server-jaas.conf:/opt/kafka/config/server-jaas.conf \ No newline at end of file diff --git a/docs/DevelopmentEnvironment.md b/docs/DevelopmentEnvironment.md index 9b1442219..30f1e9a40 100644 --- a/docs/DevelopmentEnvironment.md +++ b/docs/DevelopmentEnvironment.md @@ -20,13 +20,13 @@ For testing KafkaJS we use a multi-broker Kafka cluster as well as Zookeeper for This boots the Kafka cluster using the default docker-compose.yml file described in [scripts/dockerComposeUp.sh](https://github.com/tulios/kafkajs/blob/master/scripts/dockerComposeUp.sh). If you want to run a different version of Kafka, specify a different compose file using the `COMPOSE_FILE` environment variable: ```sh -COMPOSE_FILE="docker-compose.2_2.yml" ./scripts/dockerComposeUp.sh +COMPOSE_FILE="docker-compose.2_3.yml" ./scripts/dockerComposeUp.sh ``` -If you run `docker-compose -f docker-compose.2_2.yml ps` (specify whichever compose file you used in the step above), you should see something like: +If you run `docker-compose -f docker-compose.2_3.yml ps` (specify whichever compose file you used in the step above), you should see something like: ```sh -$ docker-compose -f docker-compose.2_2.yml ps +$ docker-compose -f docker-compose.2_3.yml ps WARNING: The HOST_IP variable is not set. Defaulting to a blank string. Name Command State Ports ---------------------------------------------------------------------------------------------------------------------------------- diff --git a/docs/DockerLocal.md b/docs/DockerLocal.md index 97ae5f40a..c1f5ce932 100644 --- a/docs/DockerLocal.md +++ b/docs/DockerLocal.md @@ -45,4 +45,4 @@ You will now be able to connect to your Kafka broker at `$(HOST_IP):9092`. See t ## SSL & authentication methods -To configure Kafka to use SSL and/or authentication methods such as SASL, see [docker-compose.yml](https://github.com/tulios/kafkajs/blob/master/docker-compose.2_2.yml). This configuration is used while developing KafkaJS, and is more complicated to set up, but may give you a more production-like development environment. \ No newline at end of file +To configure Kafka to use SSL and/or authentication methods such as SASL, see [docker-compose.yml](https://github.com/tulios/kafkajs/blob/master/docker-compose.2_3.yml). This configuration is used while developing KafkaJS, and is more complicated to set up, but may give you a more production-like development environment. \ No newline at end of file diff --git a/package.json b/package.json index e02d36747..af712b863 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ }, "homepage": "https://kafka.js.org", "scripts": { - "jest": "export KAFKA_VERSION=${KAFKA_VERSION:='2.2'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest", + "jest": "export KAFKA_VERSION=${KAFKA_VERSION:='2.3'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest", "test:local": "yarn jest --detectOpenHandles", "test:debug": "NODE_ENV=test KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch", "test:local:watch": "yarn test:local --watch", diff --git a/scripts/dockerComposeUp.sh b/scripts/dockerComposeUp.sh index b7aa72c3b..9ce743b86 100755 --- a/scripts/dockerComposeUp.sh +++ b/scripts/dockerComposeUp.sh @@ -1,6 +1,6 @@ #!/bin/bash -e -COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_2.yml"} +COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_3.yml"} echo "Running compose file: ${COMPOSE_FILE}:" docker-compose -f "${COMPOSE_FILE}" up --force-recreate -d diff --git a/scripts/testWithKafka.sh b/scripts/testWithKafka.sh index 14c5126d3..39684c44c 100755 --- a/scripts/testWithKafka.sh +++ b/scripts/testWithKafka.sh @@ -3,7 +3,7 @@ testCommand="$1" extraArgs="$2" -export COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_2.yml"} +export COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_3.yml"} export KAFKAJS_DEBUG_PROTOCOL_BUFFERS=${KAFKAJS_DEBUG_PROTOCOL_BUFFERS:=1} find_container_id() { diff --git a/src/broker/__tests__/describeConfigs.spec.js b/src/broker/__tests__/describeConfigs.spec.js index b4c550f9f..df2db655a 100644 --- a/src/broker/__tests__/describeConfigs.spec.js +++ b/src/broker/__tests__/describeConfigs.spec.js @@ -116,7 +116,7 @@ describe('Broker > describeConfigs', () => { }, { configName: 'message.format.version', - configValue: expect.stringMatching(/^(0\.11\.0-IV2|1\.1-IV0|2\.2-IV1)$/), + configValue: expect.stringMatching(/^(0\.11\.0-IV2|1\.1-IV0|2\.[2,3]-IV1)$/), isDefault: false, isSensitive: false, readOnly: false, @@ -138,6 +138,22 @@ describe('Broker > describeConfigs', () => { readOnly: false, configSynonyms: [], }, + { + configName: 'max.compaction.lag.ms', + configSynonyms: [], + configValue: '9223372036854775807', + isDefault: false, + isSensitive: false, + readOnly: false, + }, + { + configName: 'message.downconversion.enable', + configSynonyms: [], + configValue: 'true', + isDefault: false, + isSensitive: false, + readOnly: false, + }, { configName: 'max.message.bytes', configValue: '1000012', diff --git a/src/broker/__tests__/findGroupCoordinator.spec.js b/src/broker/__tests__/findGroupCoordinator.spec.js index 76a1446fc..1575721c9 100644 --- a/src/broker/__tests__/findGroupCoordinator.spec.js +++ b/src/broker/__tests__/findGroupCoordinator.spec.js @@ -25,7 +25,7 @@ describe('Broker > FindGroupCoordinator', () => { expect(response).toEqual({ errorCode: 0, - errorMessage: null, + errorMessage: expect.toBeOneOf([null, 'NONE']), throttleTime: 0, coordinator: { nodeId: expect.any(Number), diff --git a/src/consumer/batch.js b/src/consumer/batch.js index c94114eab..31c9e97cd 100644 --- a/src/consumer/batch.js +++ b/src/consumer/batch.js @@ -8,26 +8,27 @@ const filterAbortedMessages = require('./filterAbortedMessages') */ module.exports = class Batch { constructor(topic, fetchedOffset, partitionData) { - const longFetchedOffset = Long.fromValue(fetchedOffset) - const { abortedTransactions } = partitionData + this.fetchedOffset = fetchedOffset + const longFetchedOffset = Long.fromValue(this.fetchedOffset) + const { abortedTransactions, messages } = partitionData this.topic = topic this.partition = partitionData.partition this.highWatermark = partitionData.highWatermark + this.rawMessages = messages // Apparently fetch can return different offsets than the target offset provided to the fetch API. // Discard messages that are not in the requested offset // https://github.com/apache/kafka/blob/bf237fa7c576bd141d78fdea9f17f65ea269c290/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L912 - const messagesWithinOffset = partitionData.messages.filter(message => + this.messagesWithinOffset = this.rawMessages.filter(message => Long.fromValue(message.offset).gte(longFetchedOffset) ) - this.unfilteredMessages = messagesWithinOffset // 1. Don't expose aborted messages // 2. Don't expose control records // @see https://kafka.apache.org/documentation/#controlbatch this.messages = filterAbortedMessages({ - messages: messagesWithinOffset, + messages: this.messagesWithinOffset, abortedTransactions, }).filter(message => !message.isControlRecord) } @@ -37,33 +38,52 @@ module.exports = class Batch { } isEmptyIncludingFiltered() { - return this.unfilteredMessages.length === 0 + return this.messagesWithinOffset.length === 0 } isEmptyControlRecord() { - return this.isEmpty() && this.unfilteredMessages.some(({ isControlRecord }) => isControlRecord) + return ( + this.isEmpty() && this.messagesWithinOffset.some(({ isControlRecord }) => isControlRecord) + ) + } + + /** + * With compressed messages, it's possible for the returned messages to have offsets smaller than the starting offset. + * These messages will be filtered out (i.e. they are not even included in this.messagesWithinOffset) + * If these are the only messages, the batch will appear as an empty batch. + * + * isEmpty() and isEmptyIncludingFiltered() will always return true if the batch is empty, + * but this method will only return true if the batch is empty due to log compacted messages. + * + * @returns boolean True if the batch is empty, because of log compacted messages in the partition. + */ + isEmptyDueToLogCompactedMessages() { + const hasMessages = this.rawMessages.length > 0 + return hasMessages && this.isEmptyIncludingFiltered() } firstOffset() { - return this.isEmptyIncludingFiltered() ? null : this.unfilteredMessages[0].offset + return this.isEmptyIncludingFiltered() ? null : this.messagesWithinOffset[0].offset } lastOffset() { - return this.isEmptyIncludingFiltered() - ? Long.fromValue(this.highWatermark) - .add(-1) - .toString() - : this.unfilteredMessages[this.unfilteredMessages.length - 1].offset + if (this.isEmptyDueToLogCompactedMessages()) { + return this.fetchedOffset + } + + if (this.isEmptyIncludingFiltered()) { + return Long.fromValue(this.highWatermark) + .add(-1) + .toString() + } + + return this.messagesWithinOffset[this.messagesWithinOffset.length - 1].offset } /** * Returns the lag based on the last offset in the batch (also known as "high") */ offsetLag() { - if (this.isEmptyIncludingFiltered()) { - return '0' - } - const lastOffsetOfPartition = Long.fromValue(this.highWatermark).add(-1) const lastConsumedOffset = Long.fromValue(this.lastOffset()) return lastOffsetOfPartition.add(lastConsumedOffset.multiply(-1)).toString() diff --git a/src/consumer/consumerGroup.js b/src/consumer/consumerGroup.js index 49d59eeba..af0888c6d 100644 --- a/src/consumer/consumerGroup.js +++ b/src/consumer/consumerGroup.js @@ -437,7 +437,7 @@ module.exports = class ConsumerGroup { * * @see https://github.com/apache/kafka/blob/9aa660786e46c1efbf5605a6a69136a1dac6edb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1499-L1505 */ - if (batch.isEmptyControlRecord()) { + if (batch.isEmptyControlRecord() || batch.isEmptyDueToLogCompactedMessages()) { this.resolveOffset({ topic: batch.topic, partition: batch.partition,