Skip to content

Commit

Permalink
HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend
Browse files Browse the repository at this point in the history
  • Loading branch information
gjacoby126 committed Jul 29, 2019
1 parent 11f30de commit 7c911fa
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand Down Expand Up @@ -1104,4 +1105,18 @@ default DeleteTracker postInstantiateDeleteTracker(
throws IOException {
return delTracker;
}

/**
* Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
* coprocessors to add extended attributes to the WALKey that then get persisted to the
* WAL, and are available to replication endpoints to use in processing WAL Entries.
* @param ctx
* @param key
* @return
* @throws IOException
*/
default WALKeyImpl preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKeyImpl key)
throws IOException {
return key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7933,7 +7933,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
/**
* @return writeEntry associated with this append
*/
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
private WriteEntry doWALAppend( WALEdit walEdit, Durability durability, List<UUID> clusterIds,
long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException {
Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(),
"WALEdit is null or empty!");
Expand All @@ -7952,6 +7952,9 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
if (walEdit.isReplay()) {
walKey.setOrigLogSeqNum(origLogSeqNum);
}
if (this.coprocessorHost != null) {
walKey = this.coprocessorHost.preWALAppend(walKey);
}
WriteEntry writeEntry = null;
try {
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1720,6 +1721,19 @@ public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
});
}

public WALKeyImpl preWALAppend(WALKeyImpl key) throws IOException {
if (this.coprocEnvironments.isEmpty()){
return key;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, WALKeyImpl>(
regionObserverGetter, key) {
@Override
public WALKeyImpl call(RegionObserver observer) throws IOException {
return observer.preWALAppend(this, key);
}
});
}

public Message preEndpointInvocation(final Service service, final String methodName,
Message request) throws IOException {
if (coprocEnvironments.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,42 @@ public WALKeyImpl(final byte[] encodedRegionName,
mvcc, null, null);
}

/**
* Copy constructor that takes in an existing WALKeyImpl plus some extended attributes.
* Intended for coprocessors to add annotations to a system-generated WALKey
* for persistence to the WAL.
* @param key
* @param extendedAttributes
*/
public WALKeyImpl(WALKeyImpl key,
Map<String, byte[]> extendedAttributes){
init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
key.getWriteTime(), key.getClusterIds(), key.getNonceGroup(), key.getNonce(),
key.getMvcc(), key.getReplicationScopes(), extendedAttributes);

}

/**
* Copy constructor that takes in an existing WALKey, the extra WALKeyImpl fields that the
* parent interface is missing, plus some extended attributes. Intended
* for coprocessors to add annotations to a system-generated WALKey for
* persistence to the WAL.
* @param key
* @param clusterIds
* @param mvcc
* @param replicationScopes
* @param extendedAttributes
*/
public WALKeyImpl(WALKey key,
List<UUID> clusterIds,
MultiVersionConcurrencyControl mvcc,
final NavigableMap<byte[], Integer> replicationScopes,
Map<String, byte[]> extendedAttributes){
init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
key.getWriteTime(), clusterIds, key.getNonceGroup(), key.getNonce(),
mvcc, replicationScopes, extendedAttributes);

}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;

/**
* A sample region observer that tests the RegionObserver interface.
Expand Down Expand Up @@ -124,7 +126,11 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
final AtomicInteger ctPreWALAppend = new AtomicInteger(0);

static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
Map<String, byte[]> extendedAttributes = new HashMap<String,byte[]>();
static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");

public void setThrowOnPostFlush(Boolean val){
throwOnPostFlush.set(val);
Expand Down Expand Up @@ -631,6 +637,16 @@ public StoreFileReader postStoreFileReaderOpen(ObserverContext<RegionCoprocessor
return reader;
}

@Override
public WALKeyImpl preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
WALKeyImpl key) throws IOException {
ctPreWALAppend.incrementAndGet();
extendedAttributes = new HashMap<>();
extendedAttributes.put(Integer.toString(ctPreWALAppend.get()),
Bytes.toBytes("foo"));
return new WALKeyImpl(key, extendedAttributes);
}

public boolean hadPreGet() {
return ctPreGet.get() > 0;
}
Expand Down Expand Up @@ -864,6 +880,8 @@ public int getCtPostWALRestore() {
return ctPostWALRestore.get();
}

public int getCtPreWALAppend() { return ctPreWALAppend.get(); }

public boolean wasStoreFileReaderOpenCalled() {
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -70,20 +71,26 @@
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -663,6 +670,67 @@ public void testPreWALRestoreSkip() throws Exception {
table.close();
}

//called from testPreWALAppendIsWrittenToWAL
private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
int expectedCalls = 0;
String [] methodArray = new String[1];
methodArray[0] = "getCtPreWALAppend";
Object[] resultArray = new Object[1];

Put p = new Put(ROW);
p.addColumn(A, A, A);
table.put(p);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);

Append a = new Append(ROW);
a.addColumn(B, B, B);
table.append(a);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);

Increment i = new Increment(ROW);
i.addColumn(C, C, 1);
table.increment(i);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);

Delete d = new Delete(ROW);
table.delete(d);
resultArray[0] = ++expectedCalls;
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
}

@Test
public void testPreWALAppend() throws Exception {
SimpleRegionObserver sro = new SimpleRegionObserver();
ObserverContext ctx = Mockito.mock(ObserverContext.class);
WALKeyImpl key = new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE, EnvironmentEdgeManager.currentTime());
WALKeyImpl postCPKey = sro.preWALAppend(ctx, key);
Assert.assertEquals(key.getEncodedRegionName(), postCPKey.getEncodedRegionName());
Assert.assertEquals(key.getTableName(), postCPKey.getTableName());
Assert.assertEquals(key.getOriginatingClusterId(), postCPKey.getOriginatingClusterId());
Assert.assertEquals(key.getClusterIds(), postCPKey.getClusterIds());
Assert.assertEquals(1, postCPKey.getExtendedAttributes().size());
Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
postCPKey.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
}

@Test
public void testPreWALAppendIsWrittenToWAL() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
Table table = util.createTable(tableName, new byte[][] { A, B, C });

PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
//should be only one region
HRegion region = regions.get(0);
region.getWAL().registerWALActionsListener(listener);
testPreWALAppendHook(table, tableName);
boolean[] expectedResults = {true, true, true, true};
Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());

}
// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
Object value[]) throws IOException {
Expand Down Expand Up @@ -711,4 +779,21 @@ private static void createHFile(Configuration conf, FileSystem fs, Path path, by
writer.close();
}
}

private static class PreWALAppendWALActionsListener implements WALActionsListener {
boolean[] walKeysCorrect = {false, false, false, false};

@Override
public void postAppend(long entryLen, long elapsedTimeMillis,
WALKey logKey, WALEdit logEdit) throws IOException {
for (int k = 0; k < 4; k++) {
if (!walKeysCorrect[k]) {
walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
logKey.getExtendedAttribute(Integer.toString(k + 1)));
}
}
}

public boolean[] getWalKeysCorrectArray() { return walKeysCorrect; }
}
}

0 comments on commit 7c911fa

Please sign in to comment.