Skip to content

Commit

Permalink
HBASE-27732 NPE in TestBasicWALEntryStreamFSHLog.testEOFExceptionInOl…
Browse files Browse the repository at this point in the history
…dWALsDirectory
  • Loading branch information
Apache9 committed Mar 19, 2023
1 parent e6977a9 commit 30f8dec
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,27 +298,34 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
final Comparator<Path> LOG_NAME_COMPARATOR =
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));

private static final class WalProps {
private static final class WALProps {

/**
* Map the encoded region name to the highest sequence id.
* <p/>
* Contains all the regions it has an entry for.
*/
public final Map<byte[], Long> encodedName2HighestSequenceId;
final Map<byte[], Long> encodedName2HighestSequenceId;

/**
* The log file size. Notice that the size may not be accurate if we do asynchronous close in
* sub classes.
*/
public final long logSize;
final long logSize;

/**
* The nanoTime of the log rolling, used to determine the time interval that has passed since.
*/
public final long rollTimeNs;
final long rollTimeNs;

public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
/**
* If we do asynchronous close in sub classes, it is possible that when adding WALProps to the
* rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
* for safety.
*/
volatile boolean closed = false;

public WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
this.logSize = logSize;
this.rollTimeNs = System.nanoTime();
Expand All @@ -329,7 +336,7 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp
* (contained in the log file name).
*/
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);

/**
Expand All @@ -348,6 +355,9 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {

protected final AtomicBoolean rollRequested = new AtomicBoolean(false);

protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());

// Run in caller if we get reject execution exception, to avoid aborting region server when we get
// reject execution exception. Usually this should not happen but let's make it more robust.
private final ExecutorService logArchiveExecutor =
Expand Down Expand Up @@ -697,7 +707,7 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
Map<byte[], List<byte[]>> regions = null;
int logCount = getNumRolledLogFiles();
if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
regions =
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
}
Expand All @@ -720,17 +730,34 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
return regions;
}

/**
* Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
*/
protected final void markClosedAndClean(Path path) {
WALProps props = walFile2Props.get(path);
// typically this should not be null, but if there is no big issue if it is already null, so
// let's make the code more robust
if (props != null) {
props.closed = true;
cleanOldLogs();
}
}

/**
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
*/
private void cleanOldLogs() throws IOException {
private void cleanOldLogs() {
List<Pair<Path, Long>> logsToArchive = null;
long now = System.nanoTime();
boolean mayLogTooOld = nextLogTooOldNs <= now;
ArrayList<byte[]> regionsBlockingWal = null;
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
// are older than what is currently in memory, the WAL can be GC'd.
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
if (!e.getValue().closed) {
LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
continue;
}
Path log = e.getKey();
ArrayList<byte[]> regionsBlockingThisWal = null;
long ageNs = now - e.getValue().rollTimeNs;
Expand Down Expand Up @@ -834,7 +861,7 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
if (oldPath != null) {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -179,9 +178,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {

private final long batchSize;

private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());

private volatile AsyncFSOutput fsOut;

private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
Expand Down Expand Up @@ -778,32 +774,35 @@ private void waitForSafePoint() {
}
}

protected final long closeWriter(AsyncWriter writer, Path path) {
if (writer != null) {
inflightWALClosures.put(path.getName(), writer);
long fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
} finally {
inflightWALClosures.remove(path.getName());
}
});
return fileLength;
} else {
return 0L;
}
private void closeWriter(AsyncWriter writer, Path path) {
inflightWALClosures.put(path.getName(), writer);
closeExecutor.execute(() -> {
try {
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
} finally {
// call this even if the above close fails, as there is no other chance we can set closed to
// true, it will not cause big problems.
markClosedAndClean(path);
inflightWALClosures.remove(path.getName());
}
});
}

@Override
protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
throws IOException {
Preconditions.checkNotNull(nextWriter);
waitForSafePoint();
long oldFileLen = closeWriter(this.writer, oldPath);
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
if (writer != null) {
long oldFileLen = writer.getLength();
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
closeWriter(writer, oldPath);
} else {
logRollAndSetupWalProps(oldPath, newPath, 0);
}

this.writer = nextWriter;
if (nextWriter instanceof AsyncProtobufLogWriter) {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -169,8 +167,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private final AtomicInteger closeErrorCount = new AtomicInteger();

private final int waitOnShutdownInSeconds;
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());

/**
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
Expand Down Expand Up @@ -377,10 +373,10 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
LOG.warn(
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
}
long oldFileLen = 0L;
// It is at the safe point. Swap out writer from under the blocked writer thread.
if (this.writer != null) {
oldFileLen = this.writer.getLength();
long oldFileLen = this.writer.getLength();
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
// In case of having unflushed entries or we already reached the
// closeErrorsTolerated count, call the closeWriter inline rather than in async
// way so that in case of an IOE we will throw it back and abort RS.
Expand All @@ -393,12 +389,20 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
try {
closeWriter(localWriter, oldPath, false);
} catch (IOException e) {
// We will never reach here.
LOG.warn("close old writer failed", e);
} finally {
// call this even if the above close fails, as there is no other chance we can set
// closed to
// true, it will not cause big problems.
markClosedAndClean(oldPath);
inflightWALClosures.remove(oldPath.getName());
}
});
}
} else {
logRollAndSetupWalProps(oldPath, newPath, 0);
}
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);

this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
Expand Down Expand Up @@ -453,8 +457,6 @@ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws
}
LOG.warn("Riding over failed WAL close of " + path
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
} finally {
inflightWALClosures.remove(path.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,11 @@ public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
serverName = ServerName.parseServerName(logDirName);
} catch (IllegalArgumentException | IllegalStateException ex) {
serverName = null;
LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
LOG.warn("Cannot parse a server name from path={}", logFile, ex);
}
if (serverName != null && serverName.getStartCode() < 0) {
LOG.warn("Invalid log file path=" + logFile);
LOG.warn("Invalid log file path={}, start code {} is less than 0", logFile,
serverName.getStartCode());
serverName = null;
}
return serverName;
Expand Down Expand Up @@ -483,6 +484,11 @@ public static Path findArchivedLog(Path path, Configuration conf) throws IOExcep
}

ServerName serverName = getServerNameFromWALDirectoryName(path);
if (serverName == null) {
LOG.warn("Can not extract server name from path {}, "
+ "give up searching the separated old log dir", path);
return null;
}
// Try finding the log in separate old log dir
oldLogDir = new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
Expand Down

0 comments on commit 30f8dec

Please sign in to comment.