From 284c18d7d3d9c0183f7e7536c969d5a8687cc4c2 Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Tue, 22 Apr 2025 16:25:26 -0700 Subject: [PATCH] Increase Kafka topic polling time in test to 10 seconds We've seen multiple failures where the 3 second timeout was breached. This happens in the test setup of many different tests so multiple individual test failures have been reported. Also replace deprecated method in Kafka API and simplify a couple other code paths. Signed-off-by: Andrew Ross --- .../opensearch/plugin/kafka/KafkaUtils.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java index 543a5ecfc5497..62e7a529fcfbd 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java @@ -53,19 +53,16 @@ public static void createTopic(String topicName, int numOfPartitions, String boo } // validates topic is created - await().atMost(3, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers)); + await().atMost(10, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers)); } public static boolean checkTopicExistence(String topicName, String bootstrapServers) { return getAdminClient(bootstrapServers, (client -> { - Map> topics = client.describeTopics(List.of(topicName)).values(); + Map> topics = client.describeTopics(List.of(topicName)).topicNameValues(); try { return topics.containsKey(topicName) && topics.get(topicName).get().name().equals(topicName); - } catch (InterruptedException e) { - LOGGER.error("error on checkTopicExistence", e); - return false; - } catch (ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { LOGGER.error("error on checkTopicExistence", e); return false; } @@ -73,13 +70,12 @@ public static boolean checkTopicExistence(String topicName, String bootstrapServ } private static Rep getAdminClient(String bootstrapServer, Function function) { - AdminClient adminClient = KafkaAdminClient.create( - Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test") - ); - try { + try ( + AdminClient adminClient = KafkaAdminClient.create( + Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test") + ) + ) { return function.apply(adminClient); - } finally { - adminClient.close(); } } }