Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,13 @@ void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
* @param partitions topic partitions that have been stopped.
*/
void onStopPartitions(Set<TopicIdPartition> partitions);

/**
* Returns total size of the log for the given leader epoch in remote storage.
*
* @param topicIdPartition topic partition for which size needs to be calculated.
* @param leaderEpoch Size will only include segments belonging to this epoch.
* @return Total size of the log stored in remote storage in bytes.
*/
long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
public void onStopPartitions(Set<TopicIdPartition> partitions) {
}

@Override
public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) {
return 0;
}

@Override
public void close() throws IOException {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public void onStopPartitions(Set<TopicIdPartition> partitions) {
});
}

@Override
public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
return withClassLoader(() -> delegate.remoteLogSize(topicIdPartition, leaderEpoch));
}

@Override
public void configure(Map<String, ?> configs) {
withClassLoader(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,23 @@ public void onStopPartitions(Set<TopicIdPartition> partitions) {
}
}

@Override
public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
long remoteLogSize = 0L;
// This is a simple-to-understand but not the most optimal solution.
// The TopicBasedRemoteLogMetadataManager's remote metadata store is file-based. During design discussions
// at https://lists.apache.org/thread/kxd6fffq02thbpd0p5y4mfbs062g7jr6
// we reached a consensus that sequential iteration over files on the local file system is performant enough.
// Should this stop being the case, the remote log size could be calculated by incrementing/decrementing
// counters during API calls for a more performant implementation.
Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch);
while (remoteLogSegmentMetadataIterator.hasNext()) {
RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
}
return remoteLogSize;
}

@Override
public void configure(Map<String, ?> configs) {
Objects.requireNonNull(configs, "configs can not be null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -168,4 +169,86 @@ private void waitUntilConsumerCatchesUp(TopicIdPartition newLeaderTopicIdPartiti
}
}

@Test
public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0));
}

@Test
public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();

RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(0, 0L));

topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(thirdSegmentMetadata);

topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);

Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0);

Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize);
}

@Test
public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();

RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L));
RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(2, 200L));

topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(thirdSegmentMetadata);

topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);

Assertions.assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0));
Assertions.assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1));
Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2));
}

@Test
public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();

RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L));

topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata);
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata);

topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition), Collections.emptySet());

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);

Assertions.assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public void onStopPartitions(Set<TopicIdPartition> partitions) {
remoteLogMetadataManagerHarness.remoteLogMetadataManager().onStopPartitions(partitions);
}

@Override
public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
return remoteLogMetadataManagerHarness.remoteLogMetadataManager().remoteLogSize(topicIdPartition, leaderEpoch);
}

@Override
public void close() throws IOException {
remoteLogMetadataManagerHarness.remoteLogMetadataManager().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,18 @@ public void onStopPartitions(Set<TopicIdPartition> partitions) {
// this instance. It does not depend upon stopped partitions.
}

@Override
public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
long remoteLogSize = 0L;
RemoteLogMetadataCache remoteLogMetadataCache = getRemoteLogMetadataCache(topicIdPartition);
Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = remoteLogMetadataCache.listAllRemoteLogSegments();
while (remoteLogSegmentMetadataIterator.hasNext()) {
RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
}
return remoteLogSize;
}

@Override
public void close() throws IOException {
// Clearing the references to the map and assigning empty immutable maps.
Expand Down