Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. #3636

Merged
merged 10 commits into from
Sep 2, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,9 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
}
return res;
} catch (IOException e) {
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
if (logFile != archivedLog) {
Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
// archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
openReader(archivedLog);
// Try call again in recursion
return nextKeyValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
Expand All @@ -31,6 +33,7 @@
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
Expand Down Expand Up @@ -92,6 +95,11 @@ private static String getName() {
return "TestWALRecordReader";
}

private static String getServerName() {
ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
return serverName.toString();
}

@Before
public void setUp() throws Exception {
fs.delete(hbaseDir, true);
Expand Down Expand Up @@ -282,7 +290,6 @@ public void testWALRecordReaderActiveArchiveTolerance() throws Exception {
LOG.debug("log="+logDir+" file="+ split.getLogFileName());

testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));

}

protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
Expand Down Expand Up @@ -335,13 +342,16 @@ private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2)
// Move log file to archive directory
// While WAL record reader is open
WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;

Path logFile = new Path(split_.getLogFileName());
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
boolean result = fs.rename(logFile, archivedLog);
assertTrue(result);
result = fs.exists(archivedLog);
assertTrue(result);
Path archivedLogDir = getWALArchiveDir(conf);
Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());

assertTrue(fs.rename(logFile, archivedLogLocation));
assertTrue(fs.exists(archivedLogDir));
assertFalse(fs.exists(logFile));
// TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not testing what it is intending to.
Before this patch, it was using AbstractFSWALProvider.getArchivedLogPath(logFile, conf); to get archivedLogLocation. But AbstractFSWALProvider#getArchivedLogPath was returning the same path as logFile from here

After this patch, we are able to successfully rename the file to oldWALs directory but somehow it is not triggering the condition within nextKeyValue method to look into archiveDir.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you mean we should file another issue to address this? Anyway, I think this PR is a critical one as it could cause data loss, so let me merge it to all branches first in case of not blocking the upcoming 2.4.x release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to cut 2.4.6RC0 today but saw this issue. Will wait until this is merged to branch-2.4.

Copy link
Contributor Author

@shahrs87 shahrs87 Sep 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you mean we should file another issue to address this

Yes, we need to open a new jira to modify the test. Will file it soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO: the archivedLogLocation to read next key value.
assertTrue(reader.nextKeyValue());
cell = reader.getCurrentValue().getCells().get(0);
if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
Expand All @@ -353,4 +363,10 @@ private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2)
}
reader.close();
}

private Path getWALArchiveDir(Configuration conf) throws IOException {
Path rootDir = CommonFSUtils.getWALRootDir(conf);
String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
return new Path(rootDir, archiveDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -396,8 +396,12 @@ private long getFileSize(Path currentPath) throws IOException {
try {
fileSize = fs.getContentSummary(currentPath).getLength();
} catch (FileNotFoundException e) {
currentPath = getArchivedLogPath(currentPath, conf);
fileSize = fs.getContentSummary(currentPath).getLength();
Path archivedLogPath = findArchivedLog(currentPath, conf);
// archivedLogPath can be null if unable to locate in archiveDir.
if (archivedLogPath == null) {
throw new FileNotFoundException("Couldn't find path: " + currentPath);
}
fileSize = fs.getContentSummary(archivedLogPath).getLength();
}
return fileSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private boolean handleEofException(Exception e, WALEntryBatch batch) {
if (!fs.exists(path)) {
// There is a chance that wal has moved to oldWALs directory, so look there also.
path = AbstractFSWALProvider.findArchivedLog(path, conf);
// path is null if it couldn't find archive path.
// path can be null if unable to locate in archiveDir.
}
if (path != null && fs.getFileStatus(path).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ private boolean openNextLog() throws IOException {
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
// If the log was archived, continue reading from there
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
// archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
openReader(archivedLog);
} else {
Expand Down Expand Up @@ -384,6 +385,7 @@ private void resetReader() throws IOException {
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
// archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
openReader(archivedLog);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -469,36 +468,6 @@ public static boolean isArchivedLogFile(Path p) {
return p.toString().contains(oldLog);
}

/**
* Get the archived WAL file path
* @param path - active WAL file path
* @param conf - configuration
* @return archived path if exists, path - otherwise
* @throws IOException exception
*/
public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
Path rootDir = CommonFSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
ServerName serverName = getServerNameFromWALDirectoryName(path);
if (serverName == null) {
LOG.error("Couldn't locate log: " + path);
return path;
}
oldLogDir = new Path(oldLogDir, serverName.getServerName());
}
Path archivedLogLocation = new Path(oldLogDir, path.getName());
final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);

if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
} else {
LOG.error("Couldn't locate log: " + path);
return path;
}
}

/**
* Find the archived WAL file path if it is not able to locate in WALs dir.
* @param path - active WAL file path
Expand Down Expand Up @@ -531,7 +500,6 @@ public static Path findArchivedLog(Path path, Configuration conf) throws IOExcep
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}

LOG.error("Couldn't locate log: " + path);
return null;
}
Expand All @@ -557,8 +525,9 @@ public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Confi
return reader;
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
if (!Objects.equals(path, archivedLog)) {
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
// archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
return openReader(archivedLog, conf);
} else {
throw fnfe;
Expand Down