Skip to content

Commit

Permalink
HBASE-22539 WAL corruption due to early DBBs re-use when Durability.A…
Browse files Browse the repository at this point in the history
…SYNC_WAL is used (#437)

Signed-off-by: Zheng Hu <openinx@gmail.com>
  • Loading branch information
Apache9 committed Aug 5, 2019
1 parent 3971773 commit 10d22b5
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
Expand Down Expand Up @@ -51,7 +51,7 @@
* the result.
*/
@InterfaceAudience.Private
abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {

protected final int id; // the client's call id
protected final BlockingService service;
Expand Down Expand Up @@ -91,6 +91,12 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
private long exceptionSize = 0;
private final boolean retryImmediatelySupported;

// This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
// second bit is for WAL reference. We can only call release if both of them are zero. The reason
// why we can not use a general reference counting is that, we may call cleanup multiple times in
// the current implementation. We should fix this in the future.
private final AtomicInteger reference = new AtomicInteger(0b01);

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Expand Down Expand Up @@ -141,14 +147,43 @@ public void done() {
cleanup();
}

private void release(int mask) {
for (;;) {
int ref = reference.get();
if ((ref & mask) == 0) {
return;
}
int nextRef = ref & (~mask);
if (reference.compareAndSet(ref, nextRef)) {
if (nextRef == 0) {
if (this.reqCleanup != null) {
this.reqCleanup.run();
}
}
return;
}
}
}

@Override
public void cleanup() {
if (this.reqCleanup != null) {
this.reqCleanup.run();
this.reqCleanup = null;
release(0b01);
}

public void retainByWAL() {
for (;;) {
int ref = reference.get();
int nextRef = ref | 0b10;
if (reference.compareAndSet(ref, nextRef)) {
return;
}
}
}

public void releaseByWAL() {
release(0b10);
}

@Override
public String toString() {
return toShortString() + " param: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.trace.TraceUtil;
Expand Down Expand Up @@ -929,7 +931,7 @@ boolean isUnflushedEntries() {
* Exposed for testing only. Use to tricks like halt the ring buffer appending.
*/
@VisibleForTesting
void atHeadOfRingBufferEventHandlerAppend() {
protected void atHeadOfRingBufferEventHandlerAppend() {
// Noop
}

Expand Down Expand Up @@ -1019,8 +1021,10 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ private void syncFailed(long epochWhenSync, Throwable error) {
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
highestSyncedTxid.set(processedTxid);
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
if (iter.next().getTxid() <= processedTxid) {
FSWALEntry entry = iter.next();
if (entry.getTxid() <= processedTxid) {
entry.release();
iter.remove();
} else {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
Expand All @@ -39,7 +38,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -64,6 +62,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
Expand Down Expand Up @@ -981,7 +980,6 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
//TODO handle htrace API change, see HBASE-18895
//TraceScope scope = Trace.continueSpan(entry.detachSpan());
try {

if (this.exception != null) {
// Return to keep processing events coming off the ringbuffer
return;
Expand All @@ -998,6 +996,8 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
: new DamagedWALException("On sync", this.exception));
// Return to keep processing events coming off the ringbuffer
return;
} finally {
entry.release();
}
} else {
// What is this if not an append or sync. Fail all up to this!!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static java.util.stream.Collectors.toCollection;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand All @@ -56,19 +54,24 @@ class FSWALEntry extends Entry {
private final transient boolean inMemstore;
private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
private final transient Optional<ServerCall<?>> rpcCall;

FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit,
final RegionInfo regionInfo, final boolean inMemstore) {
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
final boolean inMemstore, ServerCall<?> rpcCall) {
super(key, edit);
this.inMemstore = inMemstore;
this.regionInfo = regionInfo;
this.txid = txid;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
Set<byte []> families = edit.getFamilies();
this.familyNames = families != null? families: collectFamilies(edit.getCells());
Set<byte[]> families = edit.getFamilies();
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
} else {
this.familyNames = Collections.<byte[]>emptySet();
this.familyNames = Collections.<byte[]> emptySet();
}
this.rpcCall = Optional.ofNullable(rpcCall);
if (rpcCall != null) {
rpcCall.retainByWAL();
}
}

Expand All @@ -77,12 +80,13 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
if (CollectionUtils.isEmpty(cells)) {
return Collections.emptySet();
} else {
return cells.stream()
.filter(v -> !CellUtil.matchingFamily(v, WALEdit.METAFAMILY))
.collect(toCollection(() -> new TreeSet<>(CellComparator.getInstance()::compareFamilies)))
.stream()
.map(CellUtil::cloneFamily)
.collect(toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
for (Cell cell: cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
set.add(CellUtil.cloneFamily(cell));
}
}
return set;
}
}

Expand Down Expand Up @@ -129,4 +133,8 @@ long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws
Set<byte[]> getFamilyNames() {
return familyNames;
}

void release() {
rpcCall.ifPresent(ServerCall::releaseByWAL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1156,9 +1156,8 @@ private WALEdit createWALEdit(final byte[] rowName, final byte[] family, Environ
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
FSWALEntry entry =
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
rowName, family, ee, index), hri, true);
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
createWALEdit(rowName, family, ee, index), hri, true, null);
entry.stampRegionSequenceId(mvcc.begin());
return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir
AsyncFSWAL asyncFSWAL = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners,
failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {
@Override
void atHeadOfRingBufferEventHandlerAppend() {
protected void atHeadOfRingBufferEventHandlerAppend() {
action.run();
super.atHeadOfRingBufferEventHandlerAppend();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir
conf, listeners, failIfWALExists, prefix, suffix) {

@Override
void atHeadOfRingBufferEventHandlerAppend() {
protected void atHeadOfRingBufferEventHandlerAppend() {
action.run();
super.atHeadOfRingBufferEventHandlerAppend();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;

/**
* Testcase for HBASE-22539
*/
@Category({ RegionServerTests.class, MediumTests.class })
public class TestAsyncFSWALCorruptionDueToDanglingByteBuffer
extends WALCorruptionDueToDanglingByteBufferTestBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class);

public static final class PauseWAL extends AsyncFSWAL {

public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
eventLoopGroup, channelClass);
}

@Override
protected void atHeadOfRingBufferEventHandlerAppend() {
if (ARRIVE != null) {
ARRIVE.countDown();
try {
RESUME.await();
} catch (InterruptedException e) {
}
}
}
}

public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {

private EventLoopGroup eventLoopGroup;

private Class<? extends Channel> channelClass;

@Override
protected PauseWAL createWAL() throws IOException {
return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
channelClass);
}

@Override
protected void doInit(Configuration conf) throws IOException {
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
}
}

@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
WALProvider.class);
UTIL.startMiniCluster(1);
UTIL.createTable(TABLE_NAME, CF);
UTIL.waitTableAvailable(TABLE_NAME);
}

@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
}
Loading

0 comments on commit 10d22b5

Please sign in to comment.