Skip to content

Commit

Permalink
HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for…
Browse files Browse the repository at this point in the history
… wal file in all oldWALs directory. (#3636)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>

Conflicts:
	hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
  • Loading branch information
shahrs87 authored and apurtell committed Sep 2, 2021
1 parent 284103b commit 820ba21
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,9 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
}
return res;
} catch (IOException e) {
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
if (logFile != archivedLog) {
Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
// archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
openReader(archivedLog);
// Try call again in recursion
return nextKeyValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
Expand All @@ -31,6 +33,7 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
Expand All @@ -43,6 +46,7 @@
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
Expand Down Expand Up @@ -91,6 +95,11 @@ private static String getName() {
return "TestWALRecordReader";
}

private static String getServerName() {
ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
return serverName.toString();
}

@Before
public void setUp() throws Exception {
fs.delete(hbaseDir, true);
Expand Down Expand Up @@ -272,4 +281,53 @@ private void testSplit(InputSplit split, byte[]... columns) throws Exception {
assertFalse(reader.nextKeyValue());
reader.close();
}

/**
* Create a new reader from the split, match the edits against the passed columns,
* moving WAL to archive in between readings
*/
private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception {
WALRecordReader<WALKey> reader = getReader();
reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));

assertTrue(reader.nextKeyValue());
Cell cell = reader.getCurrentValue().getCells().get(0);
if (!Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength())) {
assertTrue(
"expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString(
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
false);
}
// Move log file to archive directory
// While WAL record reader is open
WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
Path logFile = new Path(split_.getLogFileName());
Path archivedLogDir = getWALArchiveDir(conf);
Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());

assertTrue(fs.rename(logFile, archivedLogLocation));
assertTrue(fs.exists(archivedLogDir));
assertFalse(fs.exists(logFile));
// TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
// TODO: the archivedLogLocation to read next key value.
assertTrue(reader.nextKeyValue());
cell = reader.getCurrentValue().getCells().get(0);
if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength())) {
assertTrue(
"expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString(
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
false);
}
reader.close();
}

private Path getWALArchiveDir(Configuration conf) throws IOException {
Path rootDir = CommonFSUtils.getWALRootDir(conf);
String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
return new Path(rootDir, archiveDir);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -400,8 +400,12 @@ private long getFileSize(Path currentPath) throws IOException {
try {
fileSize = fs.getContentSummary(currentPath).getLength();
} catch (FileNotFoundException e) {
currentPath = getArchivedLogPath(currentPath, conf);
fileSize = fs.getContentSummary(currentPath).getLength();
Path archivedLogPath = findArchivedLog(currentPath, conf);
// archivedLogPath can be null if unable to locate in archiveDir.
if (archivedLogPath == null) {
throw new FileNotFoundException("Couldn't find path: " + currentPath);
}
fileSize = fs.getContentSummary(archivedLogPath).getLength();
}
return fileSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private boolean handleEofException(Exception e, WALEntryBatch batch) {
if (!fs.exists(path)) {
// There is a chance that wal has moved to oldWALs directory, so look there also.
path = AbstractFSWALProvider.findArchivedLog(path, conf);
// path is null if it couldn't find archive path.
// path can be null if unable to locate in archiveDir.
}
if (path != null && fs.getFileStatus(path).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ private boolean openNextLog() throws IOException {
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
// If the log was archived, continue reading from there
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
// archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
openReader(archivedLog);
} else {
Expand Down Expand Up @@ -383,6 +384,7 @@ private void resetReader() throws IOException {
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
// archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
openReader(archivedLog);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -448,36 +447,6 @@ public static boolean isArchivedLogFile(Path p) {
return p.toString().contains(oldLog);
}

/**
* Get the archived WAL file path
* @param path - active WAL file path
* @param conf - configuration
* @return archived path if exists, path - otherwise
* @throws IOException exception
*/
public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
Path rootDir = CommonFSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
ServerName serverName = getServerNameFromWALDirectoryName(path);
if (serverName == null) {
LOG.error("Couldn't locate log: " + path);
return path;
}
oldLogDir = new Path(oldLogDir, serverName.getServerName());
}
Path archivedLogLocation = new Path(oldLogDir, path.getName());
final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);

if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
} else {
LOG.error("Couldn't locate log: " + path);
return path;
}
}

/**
* Find the archived WAL file path if it is not able to locate in WALs dir.
* @param path - active WAL file path
Expand Down Expand Up @@ -510,7 +479,6 @@ public static Path findArchivedLog(Path path, Configuration conf) throws IOExcep
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}

LOG.error("Couldn't locate log: " + path);
return null;
}
Expand All @@ -536,8 +504,9 @@ public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Confi
return reader;
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
if (!Objects.equals(path, archivedLog)) {
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
// archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
return openReader(archivedLog, conf);
} else {
throw fnfe;
Expand Down

0 comments on commit 820ba21

Please sign in to comment.