Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,15 @@ public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
//Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler.
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties());
Path committedOffsetsPath = new File(rlmmConfig.logDir(), COMMITTED_OFFSETS_FILE_NAME).toPath();
consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, topicPartitioner, committedOffsetsPath, time, 60_000L);
consumerTask = new ConsumerTask(
consumer,
rlmmConfig.remoteLogMetadataTopicName(),
remotePartitionMetadataEventHandler,
topicPartitioner,
committedOffsetsPath,
time,
60_000L
);
consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", consumerTask);
}

Expand All @@ -76,7 +84,8 @@ public void startConsumerThread() {
}

/**
* Waits if necessary for the consumption to reach the offset of the given {@code recordMetadata}.
* Waits if necessary for the consumption to reach the {@code offset} of the given record
* at a certain {@code partition} of the metadata topic.
*
* @param recordMetadata record metadata to be checked for consumption.
* @throws TimeoutException if this method execution did not complete with in the wait time configured with
Expand All @@ -87,36 +96,40 @@ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws T
}

/**
* Waits if necessary for the consumption to reach the offset of the given {@code recordMetadata}.
* Waits if necessary for the consumption to reach the partition/offset of the given {@code RecordMetadata}
*
* @param recordMetadata record metadata to be checked for consumption.
* @param timeoutMs wait timeout in milli seconds
* @param timeoutMs wait timeout in milliseconds
* @throws TimeoutException if this method execution did not complete with in the given {@code timeoutMs}.
*/
public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
long timeoutMs) throws TimeoutException {
final int partition = recordMetadata.partition();
final long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);

log.info("Waiting until consumer is caught up with the target partition: [{}]", partition);

// If the current assignment does not have the subscription for this partition then return immediately.
if (!consumerTask.isPartitionAssigned(partition)) {
throw new KafkaException("This consumer is not subscribed to the target partition " + partition + " on which message is produced.");
throw new KafkaException("This consumer is not assigned to the target partition " + partition + ". " +
"Partitions currently assigned: " + consumerTask.metadataPartitionsAssigned());
}

final long offset = recordMetadata.offset();
long startTimeMs = time.milliseconds();
while (true) {
log.debug("Checking if partition [{}] is up to date with offset [{}]", partition, offset);
long receivedOffset = consumerTask.receivedOffsetForPartition(partition).orElse(-1L);
if (receivedOffset >= offset) {
return;
}

log.debug("Committed offset [{}] for partition [{}], but the target offset: [{}], Sleeping for [{}] to retry again",
offset, partition, receivedOffset, consumeCheckIntervalMs);
log.debug("Expected offset [{}] for partition [{}], but the committed offset: [{}], Sleeping for [{}] to retry again",
offset, partition, receivedOffset, consumeCheckIntervalMs);

if (time.milliseconds() - startTimeMs > timeoutMs) {
log.warn("Committed offset for partition:[{}] is : [{}], but the target offset: [{}] ",
partition, receivedOffset, offset);
log.warn("Expected offset for partition:[{}] is : [{}], but the committed offset: [{}] ",
partition, receivedOffset, offset);
throw new TimeoutException("Timed out in catching up with the expected offset by consumer.");
}

Expand All @@ -126,7 +139,7 @@ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,

@Override
public void close() throws IOException {
// Consumer task will close the task and it internally closes all the resources including the consumer.
// Consumer task will close the task, and it internally closes all the resources including the consumer.
Utils.closeQuietly(consumerTask, "ConsumerTask");

// Wait until the consumer thread finishes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ class ConsumerTask implements Runnable, Closeable {

private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
private final KafkaConsumer<byte[], byte[]> consumer;
private final String metadataTopicName;
private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
private final RemoteLogMetadataTopicPartitioner topicPartitioner;
private final Time time;

// It indicates whether the closing process has been started or not. If it is set as true,
// consumer will stop consuming messages and it will not allow partition assignments to be updated.
// consumer will stop consuming messages, and it will not allow partition assignments to be updated.
private volatile boolean closing = false;

// It indicates whether the consumer needs to assign the partitions or not. This is set when it is
Expand Down Expand Up @@ -101,12 +102,14 @@ class ConsumerTask implements Runnable, Closeable {
private long lastSyncedTimeMs;

public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
String metadataTopicName,
RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
RemoteLogMetadataTopicPartitioner topicPartitioner,
Path committedOffsetsPath,
Time time,
long committedOffsetSyncIntervalMs) {
this.consumer = Objects.requireNonNull(consumer);
this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler);
this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
this.time = Objects.requireNonNull(time);
Expand Down Expand Up @@ -143,6 +146,7 @@ private void initializeConsumerAssignment(Path committedOffsetsPath) {

// Seek to the committed offsets
for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) {
log.debug("Updating consumed offset: [{}] for partition [{}]", entry.getValue(), entry.getKey());
partitionToConsumedOffsets.put(entry.getKey(), entry.getValue());
consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue());
}
Expand Down Expand Up @@ -187,6 +191,7 @@ private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
} else {
log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata);
}
log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition());
partitionToConsumedOffsets.put(record.partition(), record.offset());
}
}
Expand All @@ -209,7 +214,7 @@ private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
if (offset != null) {
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, metadataPartition, offset);
} else {
log.debug("Skipping syncup of the remote-log-metadata-file for partition:{} , with remote log metadata partition{}, and no offset",
log.debug("Skipping sync-up of the remote-log-metadata-file for partition: [{}] , with remote log metadata partition{}, and no offset",
topicIdPartition, metadataPartition);
}
}
Expand Down Expand Up @@ -313,7 +318,7 @@ private void updateAssignmentsForPartitions(Set<TopicIdPartition> addedPartition
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
}

// Clear removed topic partitions from inmemory cache.
// Clear removed topic partitions from in-memory cache.
for (TopicIdPartition removedPartition : removedPartitions) {
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
}
Expand Down Expand Up @@ -353,4 +358,8 @@ public void close() {
}
}
}

public Set<Integer> metadataPartitionsAssigned() {
return Collections.unmodifiableSet(assignedMetaPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -169,7 +168,8 @@ public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionD
*
* @param topicIdPartition partition of the given remoteLogMetadata.
* @param remoteLogMetadata RemoteLogMetadata to be stored.
* @return
* @return a future with acknowledge and potentially waiting also for consumer to catch up.
* This ensures cache is synchronized with backing topic.
* @throws RemoteStorageException if there are any storage errors occur.
*/
private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition topicIdPartition,
Expand All @@ -182,13 +182,12 @@ private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition topicIdP
CompletableFuture<RecordMetadata> produceFuture = producerManager.publishMessage(remoteLogMetadata);

// Create and return a `CompletableFuture` instance which completes when the consumer is caught up with the produced record's offset.
return produceFuture.thenApplyAsync(recordMetadata -> {
return produceFuture.thenAcceptAsync(recordMetadata -> {
try {
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
} catch (TimeoutException e) {
throw new KafkaException(e);
}
return null;
});
} catch (KafkaException e) {
if (e instanceof RetriableException) {
Expand Down Expand Up @@ -338,33 +337,30 @@ public void configure(Map<String, ?> configs) {
return;
}

log.info("Started initializing with configs: {}", configs);
log.info("Started configuring topic-based RLMM with configs: {}", configs);

rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
rlmmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
remotePartitionMetadataStore = new RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath());
configured = true;
log.info("Successfully initialized with rlmmConfig: {}", rlmmConfig);
log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig);

// Scheduling the initialization producer/consumer managers in a separate thread. Required resources may
// not yet be available now. This thread makes sure that it is retried at regular intervals until it is
// successful.
initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", () -> initializeResources());
initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources);
initializationThread.start();
} finally {
lock.writeLock().unlock();
}
}

private void initializeResources() {
log.info("Initializing the resources.");
log.info("Initializing topic-based RLMM resources");
final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest();
boolean topicCreated = false;
long startTimeMs = time.milliseconds();
AdminClient adminClient = null;
try {
adminClient = AdminClient.create(rlmmConfig.producerProperties());

try (AdminClient adminClient = AdminClient.create(rlmmConfig.commonProperties())) {
// Stop if it is already initialized or closing.
while (!(initialized.get() || closing.get())) {

Expand Down Expand Up @@ -417,24 +413,14 @@ private void initializeResources() {
}

initialized.set(true);
log.info("Initialized resources successfully.");
log.info("Initialized topic-based RLMM resources successfully");
} catch (Exception e) {
log.error("Encountered error while initializing producer/consumer", e);
return;
} finally {
lock.writeLock().unlock();
}
}

} finally {
if (adminClient != null) {
try {
adminClient.close(Duration.ofSeconds(10));
} catch (Exception e) {
// Ignore the error.
log.debug("Error occurred while closing the admin client", e);
}
}
}
}

Expand Down Expand Up @@ -515,7 +501,7 @@ public void startConsumerThread() {
@Override
public void close() throws IOException {
// Close all the resources.
log.info("Closing the resources.");
log.info("Closing topic-based RLMM resources");
if (closing.compareAndSet(false, true)) {
lock.writeLock().lock();
try {
Expand All @@ -532,7 +518,7 @@ public void close() throws IOException {
Utils.closeQuietly(remotePartitionMetadataStore, "RemotePartitionMetadataStore");
} finally {
lock.writeLock().unlock();
log.info("Closed the resources.");
log.info("Closed topic-based RLMM resources");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS, atLeast(0), LOW,
private final long initializationRetryMaxTimeoutMs;
private final long initializationRetryIntervalMs;

private Map<String, Object> commonProps;
private Map<String, Object> consumerProps;
private Map<String, Object> producerProps;

Expand Down Expand Up @@ -149,6 +150,8 @@ private void initializeProducerConsumerProperties(Map<String, ?> configs) {
}
}

commonProps = new HashMap<>(commonClientConfigs);

HashMap<String, Object> allProducerConfigs = new HashMap<>(commonClientConfigs);
allProducerConfigs.putAll(producerOnlyConfigs);
producerProps = createProducerProps(allProducerConfigs);
Expand Down Expand Up @@ -190,6 +193,10 @@ public String logDir() {
return logDir;
}

public Map<String, Object> commonProperties() {
return commonProps;
}

public Map<String, Object> consumerProperties() {
return consumerProps;
}
Expand Down Expand Up @@ -232,6 +239,7 @@ public String toString() {
", metadataTopicReplicationFactor=" + metadataTopicReplicationFactor +
", initializationRetryMaxTimeoutMs=" + initializationRetryMaxTimeoutMs +
", initializationRetryIntervalMs=" + initializationRetryIntervalMs +
", commonProps=" + commonProps +
", consumerProps=" + consumerProps +
", producerProps=" + producerProps +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,13 @@ private void checkListSegments(RemoteLogSegmentLifecycleManager remoteLogSegment
throws RemoteStorageException {
// cache.listRemoteLogSegments(leaderEpoch) should contain the above segment.
Iterator<RemoteLogSegmentMetadata> segmentsIter = remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch);
Assertions.assertTrue(segmentsIter.hasNext() && Objects.equals(segmentsIter.next(), expectedSegment));
Assertions.assertTrue(segmentsIter.hasNext());
Assertions.assertEquals(expectedSegment, segmentsIter.next());

// cache.listAllRemoteLogSegments() should contain the above segment.
Iterator<RemoteLogSegmentMetadata> allSegmentsIter = remoteLogSegmentLifecycleManager.listAllRemoteLogSegments();
Assertions.assertTrue(allSegmentsIter.hasNext() && Objects.equals(allSegmentsIter.next(), expectedSegment));
Assertions.assertTrue(allSegmentsIter.hasNext());
Assertions.assertEquals(expectedSegment, allSegmentsIter.next());
}

@ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
Expand All @@ -285,7 +287,7 @@ public void testCacheSegmentWithCopySegmentStartedState(RemoteLogSegmentLifecycl
try {
remoteLogSegmentLifecycleManager.initialize(topicIdPartition);

// Create a segment with state COPY_SEGMENT_STARTED, and check for searching that segment and listing the
// Create a segment with state COPY_SEGMENT_STARTED, and check for searching that segment and listing the
// segments.
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0,
Expand Down
Loading