Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF #2990

Merged
merged 2 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -66,7 +66,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -265,6 +264,11 @@ public void enqueueLog(Path wal) {
}
}

@InterfaceAudience.Private
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
return logQueue.getQueues();
}

@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;

Expand Down Expand Up @@ -123,44 +122,64 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
@Override
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
WALEntryBatch batch = null;
WALEntryStream entryStream = null;
try {
// we only loop back here if something fatal happened to our stream
while (isReaderRunning()) {
try {
entryStream =
new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}

batch = createBatch(entryStream);
batch = readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
if (batch == null) {
// either the queue have no WAL to read
// or got no new entries (didn't advance position in WAL)
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
} else {
addBatchToShippingQueue(batch);
}
}
WALEntryBatch batch = readWALEntries(entryStream);
currentPosition = entryStream.getPosition();
if (batch != null) {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
} catch (IOException e) { // stream related
if (handleEofException(e, batch)) {
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch(entryStream.getCurrentPath());
entryStream.reset(); // reuse stream
} else {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} finally {
entryStream.close();
}
} catch (IOException e) { // stream related
if (!handleEofException(e)) {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier ++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
} catch (IOException e) {
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
}

Expand Down Expand Up @@ -189,14 +208,19 @@ protected static final boolean switched(WALEntryStream entryStream, Path path) {
return newPath == null || !path.getName().equals(newPath.getName());
}

protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
// We need to get the WALEntryBatch from the caller so we can add entries in there
// This is required in case there is any exception in while reading entries
// we do want to loss the existing entries in the batch
protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
WALEntryBatch batch) throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
// This would mean either no more files in the queue
// or there is no new data yet on the current wal
return null;
}
}
Expand All @@ -208,7 +232,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
WALEntryBatch batch = createBatch(entryStream);
batch.setLastWalPath(currentPath);
for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
Expand All @@ -231,10 +255,12 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
return batch;
}

private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
private void handleEmptyWALEntryBatch() throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
if (source.isRecovered()) {
// we're done with queue recovery, shut ourself down
if (logQueue.getQueue(walGroupId).isEmpty()) {
// we're done with current queue, either this is a recovered queue, or it is the special group
// for a sync replication peer and the peer has been transited to DA or S state.
LOG.debug("Stopping the replication source wal reader");
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
Expand All @@ -244,22 +270,38 @@ private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedExcept
}

/**
* if we get an EOF due to a zero-length log, and there are other logs in queue
* (highly likely we've closed the current log), and autorecovery is
* enabled, then dump the log
* This is to handle the EOFException from the WAL entry stream. EOFException should
* be handled carefully because there are chances of data loss because of never replicating
* the data. Thus we should always try to ship existing batch of entries here.
* If there was only one log in the queue before EOF, we ship the empty batch here
* and since reader is still active, in the next iteration of reader we will
* stop the reader.
* If there was more than one log in the queue before EOF, we ship the existing batch
* and reset the wal patch and position to the log with EOF, so shipper can remove
* logs from replication queue
* @return true only the IOE can be handled
*/
private boolean handleEofException(IOException e) {
private boolean handleEofException(IOException e, WALEntryBatch batch)
throws InterruptedException {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
if ((e instanceof EOFException || e.getCause() instanceof EOFException)
&& (source.isRecovered() || queue.size() > 1)
&& this.eofAutoRecovery) {
Path head = queue.peek();
try {
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
if (fs.getFileStatus(head).getLen() == 0) {
// head of the queue is an empty log file
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
logQueue.remove(walGroupId);
currentPosition = 0;
// After we removed the WAL from the queue, we should
// try shipping the existing batch of entries and set the wal position
// and path to the wal just dequeued to correctly remove logs from the zk
batch.setLastWalPath(head);
batch.setLastWalPosition(currentPosition);
addBatchToShippingQueue(batch);
return true;
}
} catch (IOException ioe) {
Expand All @@ -269,6 +311,20 @@ private boolean handleEofException(IOException e) {
return false;
}

/**
* Update the batch try to ship and return true if shipped
* @param batch Batch of entries to ship
* @throws InterruptedException throws interrupted exception
* @throws IOException throws io exception from stream
*/
private void addBatchToShippingQueue(WALEntryBatch batch)
throws InterruptedException, IOException {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
}

public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
}

@Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
Expand All @@ -70,7 +70,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
currentPath = entryStream.getCurrentPath();
}
long positionBefore = entryStream.getPosition();
WALEntryBatch batch = createBatch(entryStream);
batch = createBatch(entryStream);
for (;;) {
Entry entry = entryStream.peek();
boolean doFiltering = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public Path getLastWalPath() {
return lastWalPath;
}

public void setLastWalPath(Path lastWalPath) {
this.lastWalPath = lastWalPath;
}

/**
* @return the position in the last WAL that was read.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class WALEntryStream implements Closeable {
* @param walFileLengthProvider provides the length of the WAL file
* @param serverName the server name which all WALs belong to
* @param metrics the replication metrics
* @throws IOException
* @throws IOException throw IO exception from stream
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
Expand Down Expand Up @@ -368,7 +368,9 @@ private void openReader(Path path) throws IOException {
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) throw ioe;
if (!(ioe instanceof FileNotFoundException)) {
throw ioe;
}
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
Expand Down
Loading