-
Notifications
You must be signed in to change notification settings - Fork 23
[KIP-848]: Add testing changes and describe consumer group changes for KIP-848 #329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
7e534b0
821058b
9bb1418
598fc26
3b609c0
663a602
125a548
33eb38f
8c51e8e
e624c99
5767f5a
6b868fa
5b05c28
e18c470
2a6b7d8
41a30c5
dee9afd
8db15b5
4bd627d
7a0e1cc
3d89848
c7d2ba0
8ceb546
704d1df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -474,7 +474,7 @@ | |
| } | ||
| } | ||
|
|
||
| #kafkaJSToConsumerConfig(kjsConfig) { | ||
| #kafkaJSToConsumerConfig(kjsConfig, isClassicProtocol = true) { | ||
| if (!kjsConfig || Object.keys(kjsConfig).length === 0) { | ||
| return {}; | ||
| } | ||
|
|
@@ -498,37 +498,53 @@ | |
| } | ||
|
|
||
| if (Object.hasOwn(kjsConfig, 'partitionAssignors')) { | ||
| if (!isClassicProtocol) { | ||
| throw new error.KafkaJSError( | ||
| "partitionAssignors is not supported when group.protocol is not 'classic'.", | ||
| { code: error.ErrorCodes.ERR__INVALID_ARG } | ||
| ); | ||
| } | ||
| if (!Array.isArray(kjsConfig.partitionAssignors)) { | ||
| throw new error.KafkaJSError(CompatibilityErrorMessages.partitionAssignors(), { code: error.ErrorCodes.ERR__INVALID_ARG }); | ||
| } | ||
|
|
||
| kjsConfig.partitionAssignors.forEach(assignor => { | ||
| if (typeof assignor !== 'string') | ||
| throw new error.KafkaJSError(CompatibilityErrorMessages.partitionAssignors(), { code: error.ErrorCodes.ERR__INVALID_ARG }); | ||
| }); | ||
|
|
||
| rdKafkaConfig['partition.assignment.strategy'] = kjsConfig.partitionAssignors.join(','); | ||
| } else { | ||
| } else if (isClassicProtocol) { | ||
| rdKafkaConfig['partition.assignment.strategy'] = PartitionAssigners.roundRobin; | ||
| } | ||
|
|
||
| if (Object.hasOwn(kjsConfig, 'sessionTimeout')) { | ||
| if (!isClassicProtocol) { | ||
| throw new error.KafkaJSError( | ||
| "sessionTimeout is not supported when group.protocol is not 'classic'.", | ||
| { code: error.ErrorCodes.ERR__INVALID_ARG } | ||
| ); | ||
| } | ||
| rdKafkaConfig['session.timeout.ms'] = kjsConfig.sessionTimeout; | ||
| } else { | ||
| } else if (isClassicProtocol) { | ||
| rdKafkaConfig['session.timeout.ms'] = 30000; | ||
| } | ||
|
|
||
| if (Object.hasOwn(kjsConfig, 'heartbeatInterval')) { | ||
| if (!isClassicProtocol) { | ||
| throw new error.KafkaJSError( | ||
| "heartbeatInterval is not supported when group.protocol is not 'classic'.", | ||
| { code: error.ErrorCodes.ERR__INVALID_ARG } | ||
| ); | ||
| } | ||
| rdKafkaConfig['heartbeat.interval.ms'] = kjsConfig.heartbeatInterval; | ||
| } | ||
|
|
||
| if (Object.hasOwn(kjsConfig, 'rebalanceTimeout')) { | ||
| /* In librdkafka, we use the max poll interval as the rebalance timeout as well. */ | ||
| rdKafkaConfig['max.poll.interval.ms'] = +kjsConfig.rebalanceTimeout; | ||
| } else if (!rdKafkaConfig['max.poll.interval.ms']) { | ||
| rdKafkaConfig['max.poll.interval.ms'] = 300000; /* librdkafka default */ | ||
| } | ||
|
|
||
| if (Object.hasOwn(kjsConfig, 'heartbeatInterval')) { | ||
| rdKafkaConfig['heartbeat.interval.ms'] = kjsConfig.heartbeatInterval; | ||
| } | ||
|
|
||
| if (Object.hasOwn(kjsConfig, 'metadataMaxAge')) { | ||
| rdKafkaConfig['topic.metadata.refresh.interval.ms'] = kjsConfig.metadataMaxAge; | ||
| } | ||
|
|
@@ -605,8 +621,10 @@ | |
| } | ||
|
|
||
| #finalizedConfig() { | ||
| const protocol = this.#userConfig['group.protocol']; | ||
| const isClassicProtocol = protocol === undefined || protocol === 'classic'; | ||
|
||
| /* Creates an rdkafka config based off the kafkaJS block. Switches to compatibility mode if the block exists. */ | ||
| let compatibleConfig = this.#kafkaJSToConsumerConfig(this.#userConfig.kafkaJS); | ||
| let compatibleConfig = this.#kafkaJSToConsumerConfig(this.#userConfig.kafkaJS, isClassicProtocol); | ||
|
|
||
| /* There can be multiple different and conflicting config directives for setting the log level: | ||
| * 1. If there's a kafkaJS block: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| services: | ||
| kafka: | ||
| image: apache/kafka:4.0.0 | ||
| restart: unless-stopped | ||
| container_name: kafka | ||
| ports: | ||
| - 9092:29092 | ||
| - 9093:29093 | ||
| volumes: | ||
| - ./kafka_jaas.conf:/etc/kafka/kafka_jaas.conf | ||
| - ./kraft/server.properties:/mnt/shared/config/server.properties | ||
| environment: | ||
| KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf" | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| KafkaServer { | ||
| org.apache.kafka.common.security.plain.PlainLoginModule required | ||
| username="admin" | ||
| password="admin" | ||
| user_admin="admin" | ||
| user_testuser="testpass"; | ||
| }; | ||
|
|
||
| Client { | ||
| org.apache.kafka.common.security.plain.PlainLoginModule required | ||
| username="admin" | ||
| password="admin"; | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| broker.id=0 | ||
| port=9092 | ||
| reserved.broker.max.id=65536 | ||
| listeners=PLAINTEXT://:9092,CONTROLLER://:38705,SASL_PLAINTEXT://:9093,DOCKER://:29092,DOCKER_SASL_PLAINTEXT://:29093 | ||
| advertised.listeners=PLAINTEXT://kafka:9092,SASL_PLAINTEXT://kafka:9093,DOCKER://localhost:9092,DOCKER_SASL_PLAINTEXT://localhost:9093 | ||
| num.partitions=4 | ||
| auto.create.topics.enable=true | ||
| delete.topic.enable=true | ||
| default.replication.factor=1 | ||
| offsets.topic.replication.factor=1 | ||
| transaction.state.log.replication.factor=1 | ||
| transaction.state.log.min.isr=1 | ||
| security.inter.broker.protocol=SASL_PLAINTEXT | ||
| sasl.mechanism.controller.protocol=PLAIN | ||
| sasl.mechanism.inter.broker.protocol=PLAIN | ||
| super.users=User:admin | ||
| allow.everyone.if.no.acl.found=true | ||
|
|
||
| broker.rack=RACK1 | ||
| replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector | ||
| group.coordinator.rebalance.protocols=classic,consumer | ||
| connections.max.reauth.ms=10000 | ||
| log.retention.bytes=1000000000 | ||
| process.roles=broker,controller | ||
| controller.listener.names=CONTROLLER | ||
| listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,CONTROLLER:SASL_PLAINTEXT,DOCKER:PLAINTEXT,DOCKER_SASL_PLAINTEXT:SASL_PLAINTEXT | ||
| authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer | ||
| sasl.enabled.mechanisms=PLAIN | ||
| controller.quorum.voters=0@kafka:38705 | ||
| group.consumer.min.session.timeout.ms =6000 | ||
| group.consumer.session.timeout.ms = 10000 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| jest.setTimeout(30000); | ||
|
|
||
| const { | ||
| testConsumerGroupProtocolClassic, | ||
| createConsumer, | ||
| createProducer, | ||
| secureRandom, | ||
|
|
@@ -9,7 +10,7 @@ const { | |
| createAdmin, | ||
| sleep, | ||
| } = require('../testhelpers'); | ||
| const { ConsumerGroupStates, ErrorCodes, AclOperationTypes } = require('../../../lib').KafkaJS; | ||
| const { ConsumerGroupStates, ConsumerGroupTypes, ErrorCodes, AclOperationTypes } = require('../../../lib').KafkaJS; | ||
|
|
||
| describe('Admin > describeGroups', () => { | ||
| let topicName, groupId, consumer, admin, groupInstanceId, producer; | ||
|
|
@@ -81,11 +82,12 @@ describe('Admin > describeGroups', () => { | |
| expect(describeGroupsResult.groups[0]).toEqual( | ||
| expect.objectContaining({ | ||
| groupId, | ||
| protocol: 'roundrobin', | ||
| partitionAssignor: 'roundrobin', | ||
| protocol: expect.any(String), | ||
|
||
| partitionAssignor: expect.any(String), | ||
| isSimpleConsumerGroup: false, | ||
| protocolType: 'consumer', | ||
| state: ConsumerGroupStates.STABLE, | ||
| type: testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CLASSIC : ConsumerGroupTypes.CONSUMER, | ||
| coordinator: expect.objectContaining({ | ||
| id: expect.any(Number), | ||
| host: expect.any(String), | ||
|
|
@@ -134,9 +136,10 @@ describe('Admin > describeGroups', () => { | |
| expect(describeGroupsResult.groups[0]).toEqual( | ||
| expect.objectContaining({ | ||
| groupId, | ||
| protocol: '', | ||
| partitionAssignor: '', | ||
| protocol: expect.any(String), | ||
| partitionAssignor: expect.any(String), | ||
| state: ConsumerGroupStates.EMPTY, | ||
| type: testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CLASSIC : ConsumerGroupTypes.CONSUMER, | ||
| protocolType: 'consumer', | ||
| isSimpleConsumerGroup: false, | ||
| coordinator: expect.objectContaining({ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since we're not actually using it, why have it here?