Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22539 WAL corruption due to early DBBs re-use when Durability.A… #437

Merged
merged 1 commit into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
Expand Down Expand Up @@ -51,7 +51,7 @@
* the result.
*/
@InterfaceAudience.Private
abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {

protected final int id; // the client's call id
protected final BlockingService service;
Expand Down Expand Up @@ -91,6 +91,12 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
private long exceptionSize = 0;
private final boolean retryImmediatelySupported;

// This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
// second bit is for WAL reference. We can only call release if both of them are zero. The reason
// why we can not use a general reference counting is that, we may call cleanup multiple times in
// the current implementation. We should fix this in the future.
private final AtomicInteger reference = new AtomicInteger(0b01);

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Expand Down Expand Up @@ -141,14 +147,43 @@ public void done() {
cleanup();
}

private void release(int mask) {
openinx marked this conversation as resolved.
Show resolved Hide resolved
for (;;) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should rather wait for notifications from cleanup()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I can not get your point. What's the problem here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to put a wait call inside this for block, so that it does not keep iterating indefinitely. But I guess condition on #153 will already halt it on the 2nd iteration, so should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for CAS. You can see the code in AtomicInteger.getAndUpdate, almost the same.

int ref = reference.get();
if ((ref & mask) == 0) {
return;
}
int nextRef = ref & (~mask);
if (reference.compareAndSet(ref, nextRef)) {
if (nextRef == 0) {
if (this.reqCleanup != null) {
this.reqCleanup.run();
}
}
return;
}
}
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here we rely that cleanup() will never be called before retainByWal(), otherwise the rpc call may end before the wal write is done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should call retainByWal before cleanup. So we must call retainByWal in the rpc handler thread, before putting it into the ringbuffer of WAL, I think this is enough to make sure that we call retainByWal before cleanup.

public void cleanup() {
if (this.reqCleanup != null) {
this.reqCleanup.run();
this.reqCleanup = null;
release(0b01);
}

public void retainByWAL() {
for (;;) {
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
int ref = reference.get();
int nextRef = ref | 0b10;
if (reference.compareAndSet(ref, nextRef)) {
return;
}
}
}

public void releaseByWAL() {
release(0b10);
}

@Override
public String toString() {
return toShortString() + " param: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.trace.TraceUtil;
Expand Down Expand Up @@ -971,7 +973,7 @@ boolean isUnflushedEntries() {
* Exposed for testing only. Use to tricks like halt the ring buffer appending.
*/
@VisibleForTesting
void atHeadOfRingBufferEventHandlerAppend() {
protected void atHeadOfRingBufferEventHandlerAppend() {
// Noop
}

Expand Down Expand Up @@ -1061,8 +1063,10 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The currentCall should always be a ServerCall I think, so no need the extra instanceof ? It's also OK, if you think it's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for safety.

.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ private void syncFailed(long epochWhenSync, Throwable error) {
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
highestSyncedTxid.set(processedTxid);
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
if (iter.next().getTxid() <= processedTxid) {
FSWALEntry entry = iter.next();
if (entry.getTxid() <= processedTxid) {
entry.release();
iter.remove();
} else {
break;
Expand Down Expand Up @@ -487,6 +489,7 @@ private void drainNonMarkerEditsAndFailSyncs() {
while (iter.hasNext()) {
FSWALEntry entry = iter.next();
if (!entry.getEdit().isMetaEdit()) {
entry.release();
hasNonMarkerEdits = true;
break;
}
Expand All @@ -497,7 +500,10 @@ private void drainNonMarkerEditsAndFailSyncs() {
if (!iter.hasNext()) {
break;
}
iter.next();
iter.next().release();
}
for (FSWALEntry entry : unackedAppends) {
entry.release();
}
unackedAppends.clear();
// fail the sync futures which are under the txid of the first remaining edit, if none, fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
Expand All @@ -39,7 +38,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -64,6 +62,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
Expand Down Expand Up @@ -985,7 +984,6 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
//TODO handle htrace API change, see HBASE-18895
//TraceScope scope = Trace.continueSpan(entry.detachSpan());
try {

if (this.exception != null) {
// Return to keep processing events coming off the ringbuffer
return;
Expand All @@ -1002,6 +1000,8 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
: new DamagedWALException("On sync", this.exception));
// Return to keep processing events coming off the ringbuffer
return;
} finally {
entry.release();
}
} else {
// What is this if not an append or sync. Fail all up to this!!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static java.util.stream.Collectors.toCollection;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand All @@ -56,19 +54,24 @@ class FSWALEntry extends Entry {
private final transient boolean inMemstore;
private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
private final transient Optional<ServerCall<?>> rpcCall;

FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit,
final RegionInfo regionInfo, final boolean inMemstore) {
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
final boolean inMemstore, ServerCall<?> rpcCall) {
super(key, edit);
this.inMemstore = inMemstore;
this.regionInfo = regionInfo;
this.txid = txid;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
Set<byte []> families = edit.getFamilies();
this.familyNames = families != null? families: collectFamilies(edit.getCells());
Set<byte[]> families = edit.getFamilies();
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
} else {
this.familyNames = Collections.<byte[]>emptySet();
this.familyNames = Collections.<byte[]> emptySet();
}
this.rpcCall = Optional.ofNullable(rpcCall);
if (rpcCall != null) {
rpcCall.retainByWAL();
}
}

Expand All @@ -77,12 +80,13 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
if (CollectionUtils.isEmpty(cells)) {
return Collections.emptySet();
} else {
return cells.stream()
.filter(v -> !CellUtil.matchingFamily(v, WALEdit.METAFAMILY))
.collect(toCollection(() -> new TreeSet<>(CellComparator.getInstance()::compareFamilies)))
.stream()
.map(CellUtil::cloneFamily)
.collect(toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
for (Cell cell: cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
set.add(CellUtil.cloneFamily(cell));
}
}
return set;
}
}

Expand Down Expand Up @@ -129,4 +133,8 @@ long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws
Set<byte[]> getFamilyNames() {
return familyNames;
}

void release() {
rpcCall.ifPresent(ServerCall::releaseByWAL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1156,9 +1156,8 @@ private WALEdit createWALEdit(final byte[] rowName, final byte[] family, Environ
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
FSWALEntry entry =
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
rowName, family, ee, index), hri, true);
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
createWALEdit(rowName, family, ee, index), hri, true, null);
entry.stampRegionSequenceId(mvcc.begin());
return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir
failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {

@Override
void atHeadOfRingBufferEventHandlerAppend() {
protected void atHeadOfRingBufferEventHandlerAppend() {
action.run();
super.atHeadOfRingBufferEventHandlerAppend();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir
prefix, suffix) {

@Override
void atHeadOfRingBufferEventHandlerAppend() {
protected void atHeadOfRingBufferEventHandlerAppend() {
action.run();
super.atHeadOfRingBufferEventHandlerAppend();
}
Expand Down
Loading