diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java index 77f83fb90b76a..14ec707a2ebb4 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java @@ -61,7 +61,15 @@ public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig, //Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler. KafkaConsumer consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties()); Path committedOffsetsPath = new File(rlmmConfig.logDir(), COMMITTED_OFFSETS_FILE_NAME).toPath(); - consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, topicPartitioner, committedOffsetsPath, time, 60_000L); + consumerTask = new ConsumerTask( + consumer, + rlmmConfig.remoteLogMetadataTopicName(), + remotePartitionMetadataEventHandler, + topicPartitioner, + committedOffsetsPath, + time, + 60_000L + ); consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", consumerTask); } @@ -76,7 +84,8 @@ public void startConsumerThread() { } /** - * Waits if necessary for the consumption to reach the offset of the given {@code recordMetadata}. + * Waits if necessary for the consumption to reach the {@code offset} of the given record + * at a certain {@code partition} of the metadata topic. * * @param recordMetadata record metadata to be checked for consumption. * @throws TimeoutException if this method execution did not complete with in the wait time configured with @@ -87,10 +96,10 @@ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws T } /** - * Waits if necessary for the consumption to reach the offset of the given {@code recordMetadata}. + * Waits if necessary for the consumption to reach the partition/offset of the given {@code RecordMetadata} * * @param recordMetadata record metadata to be checked for consumption. - * @param timeoutMs wait timeout in milli seconds + * @param timeoutMs wait timeout in milliseconds * @throws TimeoutException if this method execution did not complete with in the given {@code timeoutMs}. */ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata, @@ -98,25 +107,29 @@ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata, final int partition = recordMetadata.partition(); final long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs); + log.info("Waiting until consumer is caught up with the target partition: [{}]", partition); + // If the current assignment does not have the subscription for this partition then return immediately. if (!consumerTask.isPartitionAssigned(partition)) { - throw new KafkaException("This consumer is not subscribed to the target partition " + partition + " on which message is produced."); + throw new KafkaException("This consumer is not assigned to the target partition " + partition + ". " + + "Partitions currently assigned: " + consumerTask.metadataPartitionsAssigned()); } final long offset = recordMetadata.offset(); long startTimeMs = time.milliseconds(); while (true) { + log.debug("Checking if partition [{}] is up to date with offset [{}]", partition, offset); long receivedOffset = consumerTask.receivedOffsetForPartition(partition).orElse(-1L); if (receivedOffset >= offset) { return; } - log.debug("Committed offset [{}] for partition [{}], but the target offset: [{}], Sleeping for [{}] to retry again", - offset, partition, receivedOffset, consumeCheckIntervalMs); + log.debug("Expected offset [{}] for partition [{}], but the committed offset: [{}], Sleeping for [{}] to retry again", + offset, partition, receivedOffset, consumeCheckIntervalMs); if (time.milliseconds() - startTimeMs > timeoutMs) { - log.warn("Committed offset for partition:[{}] is : [{}], but the target offset: [{}] ", - partition, receivedOffset, offset); + log.warn("Expected offset for partition:[{}] is : [{}], but the committed offset: [{}] ", + partition, receivedOffset, offset); throw new TimeoutException("Timed out in catching up with the expected offset by consumer."); } @@ -126,7 +139,7 @@ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata, @Override public void close() throws IOException { - // Consumer task will close the task and it internally closes all the resources including the consumer. + // Consumer task will close the task, and it internally closes all the resources including the consumer. Utils.closeQuietly(consumerTask, "ConsumerTask"); // Wait until the consumer thread finishes. diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java index 8e0b52d71dd33..2c95bf399a52d 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java @@ -68,12 +68,13 @@ class ConsumerTask implements Runnable, Closeable { private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); private final KafkaConsumer consumer; + private final String metadataTopicName; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; private final Time time; // It indicates whether the closing process has been started or not. If it is set as true, - // consumer will stop consuming messages and it will not allow partition assignments to be updated. + // consumer will stop consuming messages, and it will not allow partition assignments to be updated. private volatile boolean closing = false; // It indicates whether the consumer needs to assign the partitions or not. This is set when it is @@ -101,12 +102,14 @@ class ConsumerTask implements Runnable, Closeable { private long lastSyncedTimeMs; public ConsumerTask(KafkaConsumer consumer, + String metadataTopicName, RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, Path committedOffsetsPath, Time time, long committedOffsetSyncIntervalMs) { this.consumer = Objects.requireNonNull(consumer); + this.metadataTopicName = Objects.requireNonNull(metadataTopicName); this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); this.topicPartitioner = Objects.requireNonNull(topicPartitioner); this.time = Objects.requireNonNull(time); @@ -143,6 +146,7 @@ private void initializeConsumerAssignment(Path committedOffsetsPath) { // Seek to the committed offsets for (Map.Entry entry : committedOffsets.entrySet()) { + log.debug("Updating consumed offset: [{}] for partition [{}]", entry.getValue(), entry.getKey()); partitionToConsumedOffsets.put(entry.getKey(), entry.getValue()); consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue()); } @@ -187,6 +191,7 @@ private void processConsumerRecord(ConsumerRecord record) { } else { log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata); } + log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition()); partitionToConsumedOffsets.put(record.partition(), record.offset()); } } @@ -209,7 +214,7 @@ private void maybeSyncCommittedDataAndOffsets(boolean forceSync) { if (offset != null) { remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, metadataPartition, offset); } else { - log.debug("Skipping syncup of the remote-log-metadata-file for partition:{} , with remote log metadata partition{}, and no offset", + log.debug("Skipping sync-up of the remote-log-metadata-file for partition: [{}] , with remote log metadata partition{}, and no offset", topicIdPartition, metadataPartition); } } @@ -313,7 +318,7 @@ private void updateAssignmentsForPartitions(Set addedPartition updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp)); } - // Clear removed topic partitions from inmemory cache. + // Clear removed topic partitions from in-memory cache. for (TopicIdPartition removedPartition : removedPartitions) { remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition); } @@ -353,4 +358,8 @@ public void close() { } } } + + public Set metadataPartitionsAssigned() { + return Collections.unmodifiableSet(assignedMetaPartitions); + } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index 0271780174b47..ffd6e14503935 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -41,7 +41,6 @@ import java.io.File; import java.io.IOException; -import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -169,7 +168,8 @@ public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionD * * @param topicIdPartition partition of the given remoteLogMetadata. * @param remoteLogMetadata RemoteLogMetadata to be stored. - * @return + * @return a future with acknowledge and potentially waiting also for consumer to catch up. + * This ensures cache is synchronized with backing topic. * @throws RemoteStorageException if there are any storage errors occur. */ private CompletableFuture storeRemoteLogMetadata(TopicIdPartition topicIdPartition, @@ -182,13 +182,12 @@ private CompletableFuture storeRemoteLogMetadata(TopicIdPartition topicIdP CompletableFuture produceFuture = producerManager.publishMessage(remoteLogMetadata); // Create and return a `CompletableFuture` instance which completes when the consumer is caught up with the produced record's offset. - return produceFuture.thenApplyAsync(recordMetadata -> { + return produceFuture.thenAcceptAsync(recordMetadata -> { try { consumerManager.waitTillConsumptionCatchesUp(recordMetadata); } catch (TimeoutException e) { throw new KafkaException(e); } - return null; }); } catch (KafkaException e) { if (e instanceof RetriableException) { @@ -338,18 +337,18 @@ public void configure(Map configs) { return; } - log.info("Started initializing with configs: {}", configs); + log.info("Started configuring topic-based RLMM with configs: {}", configs); rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs); rlmmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); remotePartitionMetadataStore = new RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath()); configured = true; - log.info("Successfully initialized with rlmmConfig: {}", rlmmConfig); + log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig); // Scheduling the initialization producer/consumer managers in a separate thread. Required resources may // not yet be available now. This thread makes sure that it is retried at regular intervals until it is // successful. - initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", () -> initializeResources()); + initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources); initializationThread.start(); } finally { lock.writeLock().unlock(); @@ -357,14 +356,11 @@ public void configure(Map configs) { } private void initializeResources() { - log.info("Initializing the resources."); + log.info("Initializing topic-based RLMM resources"); final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest(); boolean topicCreated = false; long startTimeMs = time.milliseconds(); - AdminClient adminClient = null; - try { - adminClient = AdminClient.create(rlmmConfig.producerProperties()); - + try (AdminClient adminClient = AdminClient.create(rlmmConfig.commonProperties())) { // Stop if it is already initialized or closing. while (!(initialized.get() || closing.get())) { @@ -417,7 +413,7 @@ private void initializeResources() { } initialized.set(true); - log.info("Initialized resources successfully."); + log.info("Initialized topic-based RLMM resources successfully"); } catch (Exception e) { log.error("Encountered error while initializing producer/consumer", e); return; @@ -425,16 +421,6 @@ private void initializeResources() { lock.writeLock().unlock(); } } - - } finally { - if (adminClient != null) { - try { - adminClient.close(Duration.ofSeconds(10)); - } catch (Exception e) { - // Ignore the error. - log.debug("Error occurred while closing the admin client", e); - } - } } } @@ -515,7 +501,7 @@ public void startConsumerThread() { @Override public void close() throws IOException { // Close all the resources. - log.info("Closing the resources."); + log.info("Closing topic-based RLMM resources"); if (closing.compareAndSet(false, true)) { lock.writeLock().lock(); try { @@ -532,7 +518,7 @@ public void close() throws IOException { Utils.closeQuietly(remotePartitionMetadataStore, "RemotePartitionMetadataStore"); } finally { lock.writeLock().unlock(); - log.info("Closed the resources."); + log.info("Closed topic-based RLMM resources"); } } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java index 7e52519f2eb01..1ab57f5b8d992 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java @@ -105,6 +105,7 @@ DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS, atLeast(0), LOW, private final long initializationRetryMaxTimeoutMs; private final long initializationRetryIntervalMs; + private Map commonProps; private Map consumerProps; private Map producerProps; @@ -149,6 +150,8 @@ private void initializeProducerConsumerProperties(Map configs) { } } + commonProps = new HashMap<>(commonClientConfigs); + HashMap allProducerConfigs = new HashMap<>(commonClientConfigs); allProducerConfigs.putAll(producerOnlyConfigs); producerProps = createProducerProps(allProducerConfigs); @@ -190,6 +193,10 @@ public String logDir() { return logDir; } + public Map commonProperties() { + return commonProps; + } + public Map consumerProperties() { return consumerProps; } @@ -232,6 +239,7 @@ public String toString() { ", metadataTopicReplicationFactor=" + metadataTopicReplicationFactor + ", initializationRetryMaxTimeoutMs=" + initializationRetryMaxTimeoutMs + ", initializationRetryIntervalMs=" + initializationRetryIntervalMs + + ", commonProps=" + commonProps + ", consumerProps=" + consumerProps + ", producerProps=" + producerProps + '}'; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java index b8af14e319e6d..b847e7cba3fb9 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java @@ -271,11 +271,13 @@ private void checkListSegments(RemoteLogSegmentLifecycleManager remoteLogSegment throws RemoteStorageException { // cache.listRemoteLogSegments(leaderEpoch) should contain the above segment. Iterator segmentsIter = remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch); - Assertions.assertTrue(segmentsIter.hasNext() && Objects.equals(segmentsIter.next(), expectedSegment)); + Assertions.assertTrue(segmentsIter.hasNext()); + Assertions.assertEquals(expectedSegment, segmentsIter.next()); // cache.listAllRemoteLogSegments() should contain the above segment. Iterator allSegmentsIter = remoteLogSegmentLifecycleManager.listAllRemoteLogSegments(); - Assertions.assertTrue(allSegmentsIter.hasNext() && Objects.equals(allSegmentsIter.next(), expectedSegment)); + Assertions.assertTrue(allSegmentsIter.hasNext()); + Assertions.assertEquals(expectedSegment, allSegmentsIter.next()); } @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}") @@ -285,7 +287,7 @@ public void testCacheSegmentWithCopySegmentStartedState(RemoteLogSegmentLifecycl try { remoteLogSegmentLifecycleManager.initialize(topicIdPartition); - // Create a segment with state COPY_SEGMENT_STARTED, and check for searching that segment and listing the + // Create a segment with state COPY_SEGMENT_STARTED, and check for searching that segment and listing the // segments. RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0, diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java index f66253b46284c..8e3985d0d5fb5 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java @@ -39,13 +39,12 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; public class TopicBasedRemoteLogMetadataManagerConfigTest { - private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class); + private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class); private static final String BOOTSTRAP_SERVERS = "localhost:9091"; @Test public void testValidConfig() { - Map commonClientConfig = new HashMap<>(); commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10); commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 1000L); @@ -64,11 +63,14 @@ public void testValidConfig() { Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), rlmmConfig.metadataTopicPartitionsCount()); // Check for common client configs. + Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); for (Map.Entry entry : commonClientConfig.entrySet()) { log.info("Checking config: " + entry.getKey()); + Assertions.assertEquals(entry.getValue(), + rlmmConfig.commonProperties().get(entry.getKey())); Assertions.assertEquals(entry.getValue(), rlmmConfig.producerProperties().get(entry.getKey())); Assertions.assertEquals(entry.getValue(), @@ -91,12 +93,13 @@ public void testValidConfig() { } @Test - public void testProducerConsumerOverridesConfig() { + public void testCommonProducerConsumerOverridesConfig() { Map.Entry overrideEntry = new AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L); Map commonClientConfig = new HashMap<>(); commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10); commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 1000L); - commonClientConfig.put(overrideEntry.getKey(), overrideEntry.getValue()); + Long overrideCommonPropValue = overrideEntry.getValue(); + commonClientConfig.put(overrideEntry.getKey(), overrideCommonPropValue); Map producerConfig = new HashMap<>(); producerConfig.put(ProducerConfig.ACKS_CONFIG, -1); @@ -111,6 +114,8 @@ public void testProducerConsumerOverridesConfig() { Map props = createValidConfigProps(commonClientConfig, producerConfig, consumerConfig); TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props); + Assertions.assertEquals(overrideCommonPropValue, + rlmmConfig.commonProperties().get(overrideEntry.getKey())); Assertions.assertEquals(overriddenProducerPropValue, rlmmConfig.producerProperties().get(overrideEntry.getKey())); Assertions.assertEquals(overriddenConsumerPropValue, diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index 3dd02962de5ce..714405a30e918 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -120,7 +120,7 @@ public void testRLMMAPIsAfterRestart() throws Exception { // Register these partitions to RLMM. topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition)); - // Add segments for these partitions but they are not available as they have not yet been subscribed. + // Add segments for these partitions, but they are not available as they have not yet been subscribed. RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index 20bad28d79444..a41a9a3869979 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -124,15 +124,15 @@ public void testNewPartitionUpdates() throws Exception { // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - waitUntilConsumerCatchesup(newLeaderTopicIdPartition, newFollowerTopicIdPartition, 30_000L); + waitUntilConsumerCatchesUp(newLeaderTopicIdPartition, newFollowerTopicIdPartition, 30_000L); Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext()); Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); } - private void waitUntilConsumerCatchesup(TopicIdPartition newLeaderTopicIdPartition, - TopicIdPartition newFollowerTopicIdPartition, - long timeoutMs) throws TimeoutException { + private void waitUntilConsumerCatchesUp(TopicIdPartition newLeaderTopicIdPartition, + TopicIdPartition newFollowerTopicIdPartition, + long timeoutMs) throws TimeoutException { int leaderMetadataPartition = topicBasedRlmm().metadataPartition(newLeaderTopicIdPartition); int followerMetadataPartition = topicBasedRlmm().metadataPartition(newFollowerTopicIdPartition);