Skip to content

Commit

Permalink
[kafka] StreamMetadataProvider.getTopics() to use admin client instea…
Browse files Browse the repository at this point in the history
…d of consumer (#14678)
  • Loading branch information
suvodeep-pyne authored Dec 18, 2024
1 parent b4c0d2b commit 772bea7
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -53,12 +55,21 @@ public abstract class KafkaPartitionLevelConnectionHandler {
protected final String _topic;
protected final Consumer<String, Bytes> _consumer;
protected final TopicPartition _topicPartition;
protected final Properties _consumerProp;

public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) {
_config = new KafkaPartitionLevelStreamConfig(streamConfig);
_clientId = clientId;
_partition = partition;
_topic = _config.getKafkaTopicName();
_consumerProp = buildProperties(streamConfig);
KafkaSSLUtils.initSSL(_consumerProp);
_consumer = createConsumer(_consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
}

private Properties buildProperties(StreamConfig streamConfig) {
Properties consumerProp = new Properties();
consumerProp.putAll(streamConfig.getStreamConfigsMap());
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
Expand All @@ -68,28 +79,32 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel());
}
consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
KafkaSSLUtils.initSSL(consumerProp);
_consumer = createConsumer(consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
return consumerProp;
}

private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
return retry(() -> new KafkaConsumer<>(consumerProp), 5);
}

protected AdminClient createAdminClient() {
return retry(() -> AdminClient.create(_consumerProp), 5);
}

private static <T> T retry(Supplier<T> s, int nRetries) {
// Creation of the KafkaConsumer can fail for multiple reasons including DNS issues.
// We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 10 seconds total felt
// like a good balance of not waiting too long for a retry, but also not retrying too many times.
int maxTries = 5;
int tries = 0;
while (true) {
try {
return new KafkaConsumer<>(consumerProp);
return s.get();
} catch (KafkaException e) {
tries++;
if (tries >= maxTries) {
if (tries >= nRetries) {
LOGGER.error("Caught exception while creating Kafka consumer, giving up", e);
throw e;
}
LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, maxTries, e);
LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, nRetries, e);
// We are choosing to sleepUniterruptibly here because other parts of the Kafka consumer code do this
// as well. We don't want random interrupts to cause us to fail to create the consumer and have the table
// stuck in ERROR state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -169,14 +172,19 @@ public Map<String, PartitionLagState> getCurrentPartitionLagState(

@Override
public List<TopicMetadata> getTopics() {
Map<String, List<PartitionInfo>> namePartitionsMap = _consumer.listTopics();
if (namePartitionsMap == null) {
return Collections.emptyList();
try (AdminClient adminClient = createAdminClient()) {
ListTopicsResult result = adminClient.listTopics();
if (result == null) {
return Collections.emptyList();
}
return result.names()
.get()
.stream()
.map(topic -> new KafkaTopicMetadata().setName(topic))
.collect(Collectors.toList());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
return namePartitionsMap.keySet()
.stream()
.map(topic -> new KafkaTopicMetadata().setName(topic))
.collect(Collectors.toList());
}

public static class KafkaTopicMetadata implements TopicMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -39,6 +41,7 @@
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -399,4 +402,29 @@ public void testOffsetsExpired()
}
assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "700");
}

@Test
public void testGetTopics() {
String streamType = "kafka";
String streamKafkaBrokerList = _kafkaBrokerAddress;
String streamKafkaConsumerType = "simple";
String clientId = "clientId";
String tableNameWithType = "tableName_REALTIME";

Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put("streamType", streamType);
streamConfigMap.put("stream.kafka.topic.name", "NON_EXISTING_TOPIC");
streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
streamConfigMap.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName());
streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
StreamConfig streamConfig = new StreamConfig(tableNameWithType, streamConfigMap);

KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig);
List<StreamMetadataProvider.TopicMetadata> topics = streamMetadataProvider.getTopics();
List<String> topicNames = topics.stream()
.map(StreamMetadataProvider.TopicMetadata::getName)
.collect(Collectors.toList());
assertTrue(topicNames.containsAll(List.of(TEST_TOPIC_1, TEST_TOPIC_2, TEST_TOPIC_3)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -38,7 +40,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* KafkaPartitionLevelConnectionHandler provides low level APIs to access Kafka partition level information.
* E.g. partition counts, offsets per partition.
Expand All @@ -53,12 +54,21 @@ public abstract class KafkaPartitionLevelConnectionHandler {
protected final String _topic;
protected final Consumer<String, Bytes> _consumer;
protected final TopicPartition _topicPartition;
protected final Properties _consumerProp;

public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) {
_config = new KafkaPartitionLevelStreamConfig(streamConfig);
_clientId = clientId;
_partition = partition;
_topic = _config.getKafkaTopicName();
_consumerProp = buildProperties(streamConfig);
KafkaSSLUtils.initSSL(_consumerProp);
_consumer = createConsumer(_consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
}

private Properties buildProperties(StreamConfig streamConfig) {
Properties consumerProp = new Properties();
consumerProp.putAll(streamConfig.getStreamConfigsMap());
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
Expand All @@ -68,28 +78,32 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel());
}
consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
KafkaSSLUtils.initSSL(consumerProp);
_consumer = createConsumer(consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
return consumerProp;
}

private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
return retry(() -> new KafkaConsumer<>(consumerProp), 5);
}

protected AdminClient createAdminClient() {
return retry(() -> AdminClient.create(_consumerProp), 5);
}

private static <T> T retry(Supplier<T> s, int nRetries) {
// Creation of the KafkaConsumer can fail for multiple reasons including DNS issues.
// We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 10 seconds total felt
// like a good balance of not waiting too long for a retry, but also not retrying too many times.
int maxTries = 5;
int tries = 0;
while (true) {
try {
return new KafkaConsumer<>(consumerProp);
return s.get();
} catch (KafkaException e) {
tries++;
if (tries >= maxTries) {
if (tries >= nRetries) {
LOGGER.error("Caught exception while creating Kafka consumer, giving up", e);
throw e;
}
LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, maxTries, e);
LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, nRetries, e);
// We are choosing to sleepUniterruptibly here because other parts of the Kafka consumer code do this
// as well. We don't want random interrupts to cause us to fail to create the consumer and have the table
// stuck in ERROR state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -169,14 +172,19 @@ public Map<String, PartitionLagState> getCurrentPartitionLagState(

@Override
public List<TopicMetadata> getTopics() {
Map<String, List<PartitionInfo>> namePartitionsMap = _consumer.listTopics();
if (namePartitionsMap == null) {
return Collections.emptyList();
try (AdminClient adminClient = createAdminClient()) {
ListTopicsResult result = adminClient.listTopics();
if (result == null) {
return Collections.emptyList();
}
return result.names()
.get()
.stream()
.map(topic -> new KafkaTopicMetadata().setName(topic))
.collect(Collectors.toList());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
return namePartitionsMap.keySet()
.stream()
.map(topic -> new KafkaTopicMetadata().setName(topic))
.collect(Collectors.toList());
}

public static class KafkaTopicMetadata implements TopicMetadata {
Expand Down

0 comments on commit 772bea7

Please sign in to comment.