Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;

import java.io.IOException;
import java.text.ParseException;
Expand Down Expand Up @@ -846,6 +847,7 @@ private Tool initializeWalPlayer(long startTime, long endTime) {
Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
conf.setBoolean(IGNORE_EMPTY_FILES, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, this flag basically is only used for Continuous Backup? and should it be false other than the use case of Continuous Backup ?

nit: maybe add a javadoc comment in the code that at what situation we should and we should not use this flag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the default behavior is false, so I didn’t want to change it as it might be assumed in other parts of the code.

nit: maybe add a Javadoc comment explaining in what situations we should or shouldn't use this flag.

Sure — this flag controls whether the WALPlayer job should throw an exception when it encounters an empty file that it can't parse as a valid WAL file, or whether it should skip it silently.

Tool walPlayer = new WALPlayer();
walPlayer.setConf(conf);
return walPlayer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,21 @@ List<InputSplit> getSplits(final JobContext context, final String startKey, fina
throw e;
}
}

boolean ignoreEmptyFiles =
conf.getBoolean(WALPlayer.IGNORE_EMPTY_FILES, WALPlayer.DEFAULT_IGNORE_EMPTY_FILES);
List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
for (FileStatus file : allFiles) {
if (ignoreEmptyFiles && file.getLen() == 0) {
LOG.warn("Ignoring empty file: " + file.getPath());
continue;
}
splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
}
return splits;
}

private Path[] getInputPaths(Configuration conf) {
Path[] getInputPaths(Configuration conf) {
String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
return StringUtils
.stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
Expand All @@ -349,7 +356,7 @@ private Path[] getInputPaths(Configuration conf) {
* equal to this value else we will filter out the file. If name does not seem to
* have a timestamp, we will just return it w/o filtering.
*/
private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If this is only for testing, can we add @VisibleForTesting here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is used in WALInputFormat class itself. I just made it package private so that I can use them in tests.

throws IOException {
List<FileStatus> result = new ArrayList<>();
LOG.debug("Scanning " + dir.toString() + " for WAL files");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ public class WALPlayer extends Configured implements Tool {
public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support";

/**
* Configuration flag that controls how the WALPlayer handles empty input WAL files.
* <p>
* If set to {@code true}, the WALPlayer will silently ignore empty files that cannot be parsed as
* valid WAL files. This is useful in scenarios where such files are expected (e.g., due to
* partial writes or cleanup operations).
* </p>
* <p>
* If set to {@code false} (default), the WALPlayer will throw an exception when it encounters an
* empty or un-parsable WAL file. This is useful for catching unexpected data issues early.
* </p>
* <p>
* Default value: {@link #DEFAULT_IGNORE_EMPTY_FILES} ({@code false})
* </p>
*/
public final static String IGNORE_EMPTY_FILES = "wal.input.ignore.empty.files";
public final static boolean DEFAULT_IGNORE_EMPTY_FILES = false;

protected static final String tableSeparator = ";";

private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@
package org.apache.hadoop.hbase.mapreduce;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -74,4 +81,44 @@ public void testAddFile() {
WALInputFormat.addFile(lfss, lfs, now, now);
assertEquals(8, lfss.size());
}

@Test
public void testEmptyFileIsIgnoredWhenConfigured() throws IOException, InterruptedException {
List<InputSplit> splits = getSplitsForEmptyFile(true);
assertTrue("Empty file should be ignored when IGNORE_EMPTY_FILES is true", splits.isEmpty());
}

@Test
public void testEmptyFileIsIncludedWhenNotIgnored() throws IOException, InterruptedException {
List<InputSplit> splits = getSplitsForEmptyFile(false);
assertEquals("Empty file should be included when IGNORE_EMPTY_FILES is false", 1,
splits.size());
}

private List<InputSplit> getSplitsForEmptyFile(boolean ignoreEmptyFiles)
throws IOException, InterruptedException {
Configuration conf = new Configuration();
conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, ignoreEmptyFiles);

JobContext jobContext = Mockito.mock(JobContext.class);
Mockito.when(jobContext.getConfiguration()).thenReturn(conf);

LocatedFileStatus emptyFile = Mockito.mock(LocatedFileStatus.class);
Mockito.when(emptyFile.getLen()).thenReturn(0L);
Mockito.when(emptyFile.getPath()).thenReturn(new Path("/empty.wal"));

WALInputFormat inputFormat = new WALInputFormat() {
@Override
Path[] getInputPaths(Configuration conf) {
return new Path[] { new Path("/input") };
}

@Override
List<FileStatus> getFiles(FileSystem fs, Path inputPath, long startTime, long endTime) {
return Collections.singletonList(emptyFile);
}
};

return inputFormat.getSplits(jobContext, "", "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -31,10 +32,12 @@

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -338,4 +341,47 @@ public void testMainMethod() throws Exception {

}

@Test
public void testIgnoreEmptyWALFiles() throws Exception {
Path inputDir = createEmptyWALFile("empty-wal-dir");
FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
Path emptyWAL = new Path(inputDir, "empty.wal");

assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL));
assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen());

Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true);

int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() });
assertEquals("WALPlayer should exit cleanly even with empty files", 0, exitCode);
}

@Test
public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception {
Path inputDir = createEmptyWALFile("fail-empty-wal-dir");
FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
Path emptyWAL = new Path(inputDir, "empty.wal");

assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL));
assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen());

Copy link
Contributor

@kgeisz kgeisz Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: For consistency, maybe add the same assertions you have in the other test method regarding the newly created empty WAL file:

Suggested change
assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL));
assertEquals("WAL file should be 0 bytes", 0, dfs.getFileStatus(emptyWAL).getLen());

Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, false);

int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] { inputDir.toString() });
assertNotEquals("WALPlayer should fail on empty files when not ignored", 0, exitCode);
}

private Path createEmptyWALFile(String walDir) throws IOException {
FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
Path inputDir = new Path("/" + walDir);
dfs.mkdirs(inputDir);

Path emptyWAL = new Path(inputDir, "empty.wal");
FSDataOutputStream out = dfs.create(emptyWAL);
out.close(); // Explicitly closing the stream

return inputDir;
}
}