Skip to content

Commit

Permalink
HBASE-28457 Introduce a version field in file based tracker record (a…
Browse files Browse the repository at this point in the history
…pache#5784)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
(cherry picked from commit c1012a9)
  • Loading branch information
Apache9 committed Apr 7, 2024
1 parent afd4da0 commit 479c756
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ message StoreFileEntry {
message StoreFileList {
required uint64 timestamp = 1;
repeated StoreFileEntry store_file = 2;
optional uint64 version = 3 [default = 1];
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -47,15 +48,24 @@
* without error on partial bytes if you stop at some special points, but the return message will
* have incorrect field value. We should try our best to prevent this happens because loading an
* incorrect store file list file usually leads to data loss.
* <p/>
* To prevent failing silently while downgrading, where we may miss some newly introduced fields in
* {@link StoreFileList} which are necessary, we introduce a 'version' field in
* {@link StoreFileList}. If we find out that we are reading a {@link StoreFileList} with higher
* version, we will fail immediately and tell users that you need extra steps while downgrading, to
* prevent potential data loss.
*/
@InterfaceAudience.Private
class StoreFileListFile {

private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class);

// the current version for StoreFileList
static final long VERSION = 1;

static final String TRACK_FILE_DIR = ".filelist";

private static final String TRACK_FILE = "f1";
static final String TRACK_FILE = "f1";

private static final String TRACK_FILE_ROTATE = "f2";

Expand Down Expand Up @@ -101,7 +111,18 @@ private StoreFileList load(Path path) throws IOException {
throw new IOException(
"Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum);
}
return StoreFileList.parseFrom(data);
StoreFileList storeFileList = StoreFileList.parseFrom(data);
if (storeFileList.getVersion() > VERSION) {
LOG.error(
"The loaded store file list is in version {}, which is higher than expected"
+ " version {}. Stop loading to prevent potential data loss. This usually because your"
+ " cluster is downgraded from a newer version. You need extra steps before downgrading,"
+ " like switching back to default store file tracker.",
storeFileList.getVersion(), VERSION);
throw new IOException("Higher store file list version detected, expected " + VERSION
+ ", got " + storeFileList.getVersion());
}
return storeFileList;
}

private int select(StoreFileList[] lists) {
Expand Down Expand Up @@ -134,30 +155,38 @@ StoreFileList load() throws IOException {
return lists[winnerIndex];
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/StoreFileListFile.java|.*/src/test/.*")
static void write(FileSystem fs, Path file, StoreFileList storeFileList) throws IOException {
byte[] data = storeFileList.toByteArray();
CRC32 crc32 = new CRC32();
crc32.update(data);
int checksum = (int) crc32.getValue();
// 4 bytes length at the beginning, plus 4 bytes checksum
try (FSDataOutputStream out = fs.create(file, true)) {
out.writeInt(data.length);
out.write(data);
out.writeInt(checksum);
}
}

/**
* We will set the timestamp in this method so just pass the builder in
* We will set the timestamp and version in this method so just pass the builder in
*/
void update(StoreFileList.Builder builder) throws IOException {
if (nextTrackFile < 0) {
// we need to call load first to load the prevTimestamp and also the next file
load();
}
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
byte[] actualData = builder.setTimestamp(timestamp).build().toByteArray();
CRC32 crc32 = new CRC32();
crc32.update(actualData);
int checksum = (int) crc32.getValue();
// 4 bytes length at the beginning, plus 4 bytes checksum
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
out.writeInt(actualData.length);
out.write(actualData);
out.writeInt(checksum);
}
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
write(fs, trackFiles[nextTrackFile],
builder.setTimestamp(timestamp).setVersion(VERSION).build());
// record timestamp
prevTimestamp = timestamp;
// rotate the file
nextTrackFile = 1 - nextTrackFile;

try {
fs.delete(trackFiles[nextTrackFile], false);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
Expand All @@ -35,6 +36,7 @@
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -162,4 +164,18 @@ public void testChecksumMismatch() throws IOException {
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
assertThrows(IOException.class, () -> storeFileListFile.load());
}

@Test
public void testLoadHigherVersion() throws IOException {
// write a fake StoreFileList file with higher version
StoreFileList storeFileList =
StoreFileList.newBuilder().setVersion(StoreFileListFile.VERSION + 1)
.setTimestamp(EnvironmentEdgeManager.currentTime()).build();
Path trackFileDir = new Path(testDir, StoreFileListFile.TRACK_FILE_DIR);
StoreFileListFile.write(FileSystem.get(UTIL.getConfiguration()),
new Path(trackFileDir, StoreFileListFile.TRACK_FILE), storeFileList);
IOException error = assertThrows(IOException.class, () -> storeFileListFile.load());
assertEquals("Higher store file list version detected, expected " + StoreFileListFile.VERSION
+ ", got " + (StoreFileListFile.VERSION + 1), error.getMessage());
}
}

0 comments on commit 479c756

Please sign in to comment.