Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend #470

Merged
merged 1 commit into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -548,4 +548,10 @@ public DeleteTracker postInstantiateDeleteTracker(
throws IOException {
return delTracker;
}

@Override
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
WALEdit edit) throws IOException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1341,4 +1341,16 @@ Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
DeleteTracker postInstantiateDeleteTracker(
final ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
throws IOException;

/**
* Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
* coprocessors to add extended attributes to the WALKey that then get persisted to the
* WAL, and are available to replication endpoints to use in processing WAL Entries.
* @param ctx the environment provided by the region server
* @param key the WALKey associated with a particular append to a WAL
* @param edit the WALEdit associated with a particular append to a WAL
*/
void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
WALEdit edit)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3527,6 +3527,7 @@ private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOE
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
preWALAppend(walKey, walEdit);
txid = this.wal
.append(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, true);
Expand Down Expand Up @@ -7593,6 +7594,7 @@ public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce, mvcc);
preWALAppend(walKey, walEdit);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdit, true);
}
Expand Down Expand Up @@ -7884,6 +7886,7 @@ public Result append(Append mutate, long nonceGroup, long nonce) throws IOExcept
nonceGroup,
nonce,
mvcc);
preWALAppend(walKey, walEdits);
txid =
this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
} else {
Expand Down Expand Up @@ -7973,6 +7976,12 @@ public Result append(Append mutate, long nonceGroup, long nonce) throws IOExcept
return mutate.isReturnResults() ? Result.create(allKVs) : null;
}

private void preWALAppend(WALKey walKey, WALEdit walEdits) throws IOException {
if (this.coprocessorHost != null && !walEdits.isMetaEdit()) {
this.coprocessorHost.preWALAppend(walKey, walEdits);
}
}

private static Cell getNewCell(final byte [] row, final long ts, final Cell cell,
final Cell oldCell, final byte [] tagBytes) {
// allocate an empty cell once
Expand Down Expand Up @@ -8124,6 +8133,7 @@ private Result doIncrement(Increment increment, long nonceGroup, long nonce) thr
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce,
getMVCC());
preWALAppend(walKey, walEdits);
txid =
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,19 @@ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnviro
});
}

public void preWALAppend(final WALKey key, final WALEdit edit) throws IOException {
if (coprocessors.isEmpty()){
return;
}
execOperation(new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
oserver.preWALAppend(ctx, key, edit);
}
});
}

private static abstract class CoprocessorOperation
extends ObserverContext<RegionCoprocessorEnvironment> {
public CoprocessorOperation() {
Expand Down
12 changes: 12 additions & 0 deletions hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,18 @@ public UUID getOriginatingClusterId(){
return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
}

/**
* Add a named String value to this WALKey to be persisted into the WAL
* @param attributeKey Name of the attribute
* @param attributeValue Value of the attribute
*/
public void addExtendedAttribute(String attributeKey, byte[] attributeValue){
if (extendedAttributes == null){
extendedAttributes = new HashMap<String, byte[]>();
}
extendedAttributes.put(attributeKey, attributeValue);
}

/**
* Return a named String value injected into the WALKey during processing, such as by a
* coprocessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0);
final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
final AtomicInteger ctPreWALAppend = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");

public void setThrowOnPostFlush(Boolean val){
throwOnPostFlush.set(val);
Expand Down Expand Up @@ -718,6 +720,15 @@ public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironme
return reader;
}

@Override
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
WALKey key, WALEdit edit) throws IOException {
ctPreWALAppend.incrementAndGet();

key.addExtendedAttribute(Integer.toString(ctPreWALAppend.get()),
Bytes.toBytes("foo"));
}

public boolean hadPreGet() {
return ctPreGet.get() > 0;
}
Expand Down Expand Up @@ -975,6 +986,10 @@ public int getCtPostWALRestoreDeprecated() {
return ctPostWALRestoreDeprecated.get();
}

public int getCtPreWALAppend() {
return ctPreWALAppend.get();
}

public boolean wasStoreFileReaderOpenCalled() {
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.logging.Log;
Expand All @@ -42,6 +43,8 @@
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -78,11 +81,14 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

@Category(MediumTests.class)
public class TestRegionObserverInterface {
Expand All @@ -93,6 +99,7 @@ public class TestRegionObserverInterface {
public final static byte[] B = Bytes.toBytes("b");
public final static byte[] C = Bytes.toBytes("c");
public final static byte[] ROW = Bytes.toBytes("testrow");
public final static byte[] FAMILY = Bytes.toBytes("f");

private static HBaseTestingUtility util = new HBaseTestingUtility();
private static MiniHBaseCluster cluster = null;
Expand Down Expand Up @@ -750,6 +757,100 @@ public void testPreWALRestoreSkip() throws Exception {
table.close();
}

//called from testPreWALAppendIsWrittenToWAL
private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
int expectedCalls = 0;
String [] methodArray = new String[1];
methodArray[0] = "getCtPreWALAppend";
Object[] resultArray = new Object[1];

Put p = new Put(ROW);
p.addColumn(A, A, A);
table.put(p);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);

Append a = new Append(ROW);
a.add(B, B, B);
table.append(a);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);

Increment i = new Increment(ROW);
i.addColumn(C, C, 1);
table.increment(i);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);

Delete d = new Delete(ROW);
table.delete(d);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
}

@Test
public void testPreWALAppend() throws Exception {
SimpleRegionObserver sro = new SimpleRegionObserver();
ObserverContext ctx = Mockito.mock(ObserverContext.class);
WALKey key = new WALKey(Bytes.toBytes("region"), TEST_TABLE,
EnvironmentEdgeManager.currentTime());
WALEdit edit = new WALEdit();
sro.preWALAppend(ctx, key, edit);
Assert.assertEquals(1, key.getExtendedAttributes().size());
Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
}

@Test
public void testPreWALAppendIsWrittenToWAL() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
".testPreWALAppendIsWrittenToWAL");
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(A));
htd.addFamily(new HColumnDescriptor(B));
htd.addFamily(new HColumnDescriptor(C));
htd.addCoprocessor(SimpleRegionObserver.class.getName());
Table table = util.createTable(htd, null);
PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
//should be only one region
HRegion region = regions.get(0);
region.getWAL().registerWALActionsListener(listener);
testPreWALAppendHook(table, tableName);
boolean[] expectedResults = {true, true, true, true};
Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());

}

@Test
public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
".testPreWALAppendNotCalledOnMetaEdt");

HTableDescriptor td = new HTableDescriptor(tableName);
td.addCoprocessor(SimpleRegionObserver.class.getName());
td.addFamily(new HColumnDescriptor(FAMILY));
Table table = util.createTable(td, new byte[][] { A, B, C });

PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
//should be only one region
HRegion region = regions.get(0);

region.getWAL().registerWALActionsListener(listener);
//flushing should write to the WAL
region.flush(true);
//so should compaction
region.compact(false);
//and so should closing the region
region.close();

//but we still shouldn't have triggered preWALAppend because no user data was written
String[] methods = new String[] {"getCtPreWALAppend"};
Object[] expectedResult = new Integer[]{0};
verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
}

// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class<?> c, String methodName[], TableName tableName,
Object value[]) throws IOException {
Expand Down Expand Up @@ -800,4 +901,69 @@ private static void createHFile(
writer.close();
}
}

private static class PreWALAppendWALActionsListener implements WALActionsListener {
boolean[] walKeysCorrect = {false, false, false, false};

@Override
public void postAppend(long entryLen, long elapsedTimeMillis,
WALKey logKey, WALEdit logEdit) throws IOException {
for (int k = 0; k < 4; k++) {
if (!walKeysCorrect[k]) {
walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
logKey.getExtendedAttribute(Integer.toString(k + 1)));
}
}
}

@Override
public void postSync(long timeInNanos, int handlerSyncs) {

}

boolean[] getWalKeysCorrectArray() {
return walKeysCorrect;
}
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {

}

@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {

}

@Override
public void preLogArchive(Path oldPath, Path newPath) throws IOException {

}

@Override
public void postLogArchive(Path oldPath, Path newPath) throws IOException {

}

@Override
public void logRollRequested(RollRequestReason reason) {

}

@Override
public void logCloseRequested() {

}

@Override
public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey,
WALEdit logEdit) {

}

@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
WALEdit logEdit) throws IOException {

}
}
}