File tree Expand file tree Collapse file tree 1 file changed +6
-1
lines changed
Expand file tree Collapse file tree 1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change @@ -20,6 +20,7 @@ import { retryIfRetriable } from '../utils/retries'
2020import { promisifyCallback } from '../utils/utils'
2121import { ensureTopicExists } from './admin'
2222import { getKafkaConfigFromEnv } from './config'
23+ import { match } from 'assert'
2324
2425const DEFAULT_BATCH_TIMEOUT_MS = 500
2526const SLOW_BATCH_PROCESSING_LOG_THRESHOLD_MS = 10000
@@ -129,9 +130,13 @@ export class KafkaConsumer {
129130 this . maxHealthHeartbeatIntervalMs =
130131 defaultConfig . CONSUMER_MAX_HEARTBEAT_INTERVAL_MS || MAX_HEALTH_HEARTBEAT_INTERVAL_MS
131132
133+ const securityProtocol = defaultConfig . KAFKA_SECURITY_PROTOCOL ?. toLowerCase ( ) as Partial <
134+ 'plaintext' | 'ssl' | 'sasl_plaintext' | 'sasl_ssl'
135+ >
136+
132137 this . consumerConfig = {
133138 'client.id' : hostname ( ) ,
134- 'security.protocol' : defaultConfig . KAFKA_SECURITY_PROTOCOL ,
139+ 'security.protocol' : securityProtocol ,
135140 'metadata.broker.list' : defaultConfig . KAFKA_HOSTS , // Overridden with KAFKA_CONSUMER_METADATA_BROKER_LIST
136141 log_level : 4 , // WARN as the default
137142 'group.id' : this . config . groupId ,
You can’t perform that action at this time.
0 commit comments