Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static File producerSnapshotFile(File logDir, long offset) {
* @param offset The offset to use in the file name
* @return The filename
*/
private static String filenamePrefixFromOffset(long offset) {
public static String filenamePrefixFromOffset(long offset) {
NumberFormat nf = NumberFormat.getInstance();
nf.setMinimumIntegerDigits(20);
nf.setMaximumFractionDigits(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,25 @@
* The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka.
* <p>
* The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
* follows the structure UuidBase64-FileType.
* follows the structure startOffset-UuidBase64-FileType.
* <p>
* Given the root directory of the storage, segments and associated files are organized as represented below.
* </p>
* <code>
* / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA / bCqX9U--S-6U8XUM9II25Q.log
* . . bCqX9U--S-6U8XUM9II25Q.index
* . . bCqX9U--S-6U8XUM9II25Q.timeindex
* . . h956soEzTzi9a-NOQ-DvKA.log
* . . h956soEzTzi9a-NOQ-DvKA.index
* . . h956soEzTzi9a-NOQ-DvKA.timeindex
* / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA / 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.log
* . . 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.index
* . . 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.timeindex
* . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.log
* . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.index
* . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.timeindex
* .
* / topic-1-LWgrMmVrT0a__7a4SasuPA / o8CQPT86QQmbFmi3xRmiHA.log
* . . o8CQPT86QQmbFmi3xRmiHA.index
* . . o8CQPT86QQmbFmi3xRmiHA.timeindex
* / topic-1-LWgrMmVrT0a__7a4SasuPA / 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.log
* . . 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.index
* . . 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.timeindex
* .
* / btopic-3-DRagLm_PS9Wl8fz1X43zVg / jvj3vhliTGeU90sIosmp_g.log
* . . jvj3vhliTGeU90sIosmp_g.index
* . . jvj3vhliTGeU90sIosmp_g.timeindex
* / topic-3-DRagLm_PS9Wl8fz1X43zVg / 00000000000000000011-jvj3vhliTGeU90sIosmp_g.log
* . . 00000000000000000011-jvj3vhliTGeU90sIosmp_g.index
* . . 00000000000000000011-jvj3vhliTGeU90sIosmp_g.timeindex
* </code>
*/
public final class LocalTieredStorage implements RemoteStorageManager {
Expand Down Expand Up @@ -310,7 +310,7 @@ public void copyLogSegmentData(final RemoteLogSegmentMetadata metadata, final Lo
RemoteLogSegmentFileset fileset = null;

try {
fileset = openFileset(storageDirectory, id);
fileset = openFileset(storageDirectory, metadata);

logger.info("Offloading log segment for {} from segment={}", id.topicIdPartition(), data.logSegment());

Expand Down Expand Up @@ -359,7 +359,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata metadata,
eventBuilder.withStartPosition(startPos).withEndPosition(endPos);

try {
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata.remoteLogSegmentId());
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata);

final InputStream inputStream = newInputStream(fileset.getFile(SEGMENT).toPath(), READ);
inputStream.skip(startPos);
Expand All @@ -386,7 +386,7 @@ public InputStream fetchIndex(RemoteLogSegmentMetadata metadata, IndexType index
final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(eventType, metadata);

try {
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata.remoteLogSegmentId());
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata);

File file = fileset.getFile(fileType);
final InputStream inputStream = (fileType.isOptional() && !file.exists()) ?
Expand All @@ -411,7 +411,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata metadata) throws
if (deleteEnabled) {
try {
final RemoteLogSegmentFileset fileset = openFileset(
storageDirectory, metadata.remoteLogSegmentId());
storageDirectory, metadata);

if (!fileset.delete()) {
throw new RemoteStorageException("Failed to delete remote log segment with id:" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,19 @@ public void copyEmptyLogSegment() throws RemoteStorageException {
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
tieredStorage.copyLogSegmentData(metadata, segment);

remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
}

@Test
public void copyDataFromLogSegment() throws RemoteStorageException {
final byte[] data = new byte[]{0, 1, 2};
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment(data);

tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
tieredStorage.copyLogSegmentData(metadata, segment);

remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(id, segment);
remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(metadata, segment);
}

@Test
Expand Down Expand Up @@ -201,49 +202,52 @@ public void fetchProducerSnapshot() throws RemoteStorageException {
@Test
public void deleteLogSegment() throws RemoteStorageException {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment();

tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);

tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
remoteStorageVerifier.verifyLogSegmentFilesAbsent(id);
remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
}

@Test
public void deletePartition() throws RemoteStorageException {
int segmentCount = 10;
List<RemoteLogSegmentId> segmentIds = new ArrayList<>();
List<RemoteLogSegmentMetadata> segmentMetadatas = new ArrayList<>();
for (int i = 0; i < segmentCount; i++) {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment();
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
segmentIds.add(id);
tieredStorage.copyLogSegmentData(metadata, segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
segmentMetadatas.add(metadata);
}
tieredStorage.deletePartition(topicIdPartition);
remoteStorageVerifier.assertFileDoesNotExist(remoteStorageVerifier.expectedPartitionPath());
for (RemoteLogSegmentId segmentId: segmentIds) {
remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentId);
for (RemoteLogSegmentMetadata segmentMetadata: segmentMetadatas) {
remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentMetadata);
}
}

@Test
public void deleteLogSegmentWithoutOptionalFiles() throws RemoteStorageException {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment();
segment.transactionIndex().get().toFile().delete();

tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id, path -> {
tieredStorage.copyLogSegmentData(metadata, segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata, path -> {
String fileName = path.getFileName().toString();
if (!fileName.contains(LogFileUtils.TXN_INDEX_FILE_SUFFIX)) {
remoteStorageVerifier.assertFileExists(path);
}
});

tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
remoteStorageVerifier.verifyLogSegmentFilesAbsent(id);
remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
}

@Test
Expand All @@ -252,12 +256,12 @@ public void segmentsAreNotDeletedIfDeleteApiIsDisabled(TestInfo testInfo) throws

final RemoteLogSegmentId id = newRemoteLogSegmentId();
final LogSegmentData segment = localLogSegments.nextSegment();

final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);

tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
}

@Test
Expand Down Expand Up @@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, final TopicIdPartition t
this.topicIdPartition = requireNonNull(topicIdPartition);
}

private List<Path> expectedPaths(final RemoteLogSegmentId id) {
private List<Path> expectedPaths(final RemoteLogSegmentMetadata metadata) {
final String rootPath = getStorageRootDirectory();
TopicPartition tp = topicIdPartition.topicPartition();
final String topicPartitionSubpath = format("%s-%d-%s", tp.topic(), tp.partition(),
topicIdPartition.topicId());
final String uuid = id.id().toString();
final String uuid = metadata.remoteLogSegmentId().id().toString();
final String startOffset = LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());

return Arrays.asList(
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.LOG_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we want the test implementation to be as close to the actual log file implementation as possible. Considering that, could we use LogFileUtils#logFile(File dir, long offset) here? Same for index file names.

Copy link
Contributor Author

@Owen-CH-Leung Owen-CH-Leung Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya Thanks for your feedback. I think the actual log file was named as [offset.filetype]. Looking at the implementation of LogFileUtils#logFile(File dir, long offset), I don't think it will allow us to insert a uuid in the middle as part of the filename.

If we are to keep the [offset-uuid.filetype] pattern, instead of using LogFileUtils#logFile(File dir, long offset), maybe we should make LogFileUtils#filenamePrefixFromOffset(long offset) as a public method so that we can construct a real offset using this method. What do you think ?

FYI, the method to create these offloaded files is RemoteLogSegmentFileset#openFileset(final File storageDir, final RemoteLogSegmentId id) . Currently my PR has changed this method to accept RemoteLogSegmentMetadata instead of RemoteLogSegmentId , get offset from metadata, and prepend it to the filename. (So yes, it's not close to the actual log file implementation, as the offset was just "0" without formatting, instead of "0000000")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will allow us to insert a uuid in the middle as part of the filename.

Ack. I missed that.

maybe we should make LogFileUtils#filenamePrefixFromOffset(long offset) as a public method so that we can construct a real offset using this method. What do you think ?

Yes please. Let's use that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya I've changed the code to use LogFileUtils#filenamePrefixFromOffset(long offset). The filename now should look like a real log file implementation like 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.log

Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
);
}

Expand All @@ -424,37 +429,37 @@ public Path expectedPartitionPath() {
return Paths.get(rootPath, topicPartitionSubpath);
}

public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id, final Consumer<Path> action) {
expectedPaths(id).forEach(action);
public void verifyContainsLogSegmentFiles(final RemoteLogSegmentMetadata metadata, final Consumer<Path> action) {
expectedPaths(metadata).forEach(action);
}

/**
* Verify the remote storage contains remote log segment and associated files for the provided {@code id}.
*
* @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes).
* @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes).
*/
public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id) {
expectedPaths(id).forEach(this::assertFileExists);
public void verifyContainsLogSegmentFiles(final RemoteLogSegmentMetadata metadata) {
expectedPaths(metadata).forEach(this::assertFileExists);
}

/**
* Verify the remote storage does NOT contain remote log segment and associated files for the provided {@code id}.
*
* @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes).
* @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes).
*/
public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentId id) {
expectedPaths(id).forEach(this::assertFileDoesNotExist);
public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentMetadata metadata) {
expectedPaths(metadata).forEach(this::assertFileDoesNotExist);
}

/**
* Compare the content of the remote segment with the provided {@link LogSegmentData}.
* This method does not fetch from the remote storage.
*
* @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes).
* @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes).
* @param seg The segment stored on Kafka's local storage.
*/
public void verifyRemoteLogSegmentMatchesLocal(final RemoteLogSegmentId id, final LogSegmentData seg) {
final Path remoteSegmentPath = expectedPaths(id).get(0);
public void verifyRemoteLogSegmentMatchesLocal(final RemoteLogSegmentMetadata metadata, final LogSegmentData seg) {
final Path remoteSegmentPath = expectedPaths(metadata).get(0);
assertFileDataEquals(remoteSegmentPath, seg.logSegment());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
* the local tiered storage:
*
* <code>
* / storage-directory / topic-partition-uuidBase64 / oAtiIQ95REujbuzNd_lkLQ.log
* . oAtiIQ95REujbuzNd_lkLQ.index
* . oAtiIQ95REujbuzNd_lkLQ.timeindex
* / storage-directory / topic-partition-uuidBase64 / 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.log
* . 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.index
* . 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.timeindex
* </code>
*/
public final class RemoteLogSegmentFileset {
Expand All @@ -73,9 +73,9 @@ public final class RemoteLogSegmentFileset {
* The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
* follows the structure UUID-FileType.
*/
private static final Pattern FILENAME_FORMAT = compile("([a-zA-Z0-9_-]{22})(\\.[a-z_]+)");
private static final int GROUP_UUID = 1;
private static final int GROUP_FILE_TYPE = 2;
private static final Pattern FILENAME_FORMAT = compile("(\\d+-)([a-zA-Z0-9_-]{22})(\\.[a-z_]+)");
private static final int GROUP_UUID = 2;
private static final int GROUP_FILE_TYPE = 3;

/**
* Characterises the type of a file in the local tiered storage copied from Apache Kafka's standard storage.
Expand All @@ -98,10 +98,10 @@ public enum RemoteLogSegmentFileType {

/**
* Provides the name of the file of this type for the given UUID in the local tiered storage,
* e.g. uuid.log.
* e.g. 0-uuid.log.
*/
public String toFilename(final Uuid uuid) {
return uuid.toString() + suffix;
public String toFilename(final String startOffset, final Uuid uuid) {
return startOffset + "-" + uuid.toString() + suffix;
}

/**
Expand Down Expand Up @@ -155,19 +155,21 @@ public String getSuffix() {
* the log segment offloaded are not created on the file system until transfer happens.
*
* @param storageDir The root directory of the local tiered storage.
* @param id Remote log segment id assigned to a log segment in Kafka.
* @param metadata Remote log metadata about a topic partition's remote log.
* @return A new fileset instance.
*/
public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentId id) {
public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentMetadata metadata) {

final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(
metadata.remoteLogSegmentId().topicIdPartition(), storageDir);
final File partitionDirectory = tpDir.getDirectory();
final Uuid uuid = id.id();
final Uuid uuid = metadata.remoteLogSegmentId().id();
final String startOffset = LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());

final Map<RemoteLogSegmentFileType, File> files = stream(RemoteLogSegmentFileType.values())
.collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(uuid))));
.collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(startOffset, uuid))));

return new RemoteLogSegmentFileset(tpDir, id, files);
return new RemoteLogSegmentFileset(tpDir, metadata.remoteLogSegmentId(), files);
}

/**
Expand All @@ -183,7 +185,7 @@ public static RemoteLogSegmentFileset openExistingFileset(final RemoteTopicParti
try {
final Map<RemoteLogSegmentFileType, File> files =
Files.list(tpDirectory.getDirectory().toPath())
.filter(path -> path.getFileName().toString().startsWith(uuid.toString()))
.filter(path -> path.getFileName().toString().contains(uuid.toString()))
.collect(toMap(path -> getFileType(path.getFileName().toString()), Path::toFile));

final Set<RemoteLogSegmentFileType> expectedFileTypes = stream(RemoteLogSegmentFileType.values())
Expand Down