diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 3fb0858c4949..8353a4753918 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -548,4 +548,10 @@ public DeleteTracker postInstantiateDeleteTracker( throws IOException { return delTracker; } + + @Override + public void preWALAppend(ObserverContext ctx, WALKey key, + WALEdit edit) throws IOException { + + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index ad16b9772640..ea831cafe6cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -1341,4 +1341,16 @@ Cell postMutationBeforeWAL(ObserverContext ctx, DeleteTracker postInstantiateDeleteTracker( final ObserverContext ctx, DeleteTracker delTracker) throws IOException; + + /** + * Called just before the WAL Entry is appended to the WAL. Implementing this hook allows + * coprocessors to add extended attributes to the WALKey that then get persisted to the + * WAL, and are available to replication endpoints to use in processing WAL Entries. + * @param ctx the environment provided by the region server + * @param key the WALKey associated with a particular append to a WAL + * @param edit the WALEdit associated with a particular append to a WAL + */ + void preWALAppend(ObserverContext ctx, WALKey key, + WALEdit edit) + throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3cb0de506b83..d7844149c492 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3527,6 +3527,7 @@ private long doMiniBatchMutation(BatchOperationInProgress batchOp) throws IOE walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); + preWALAppend(walKey, walEdit); txid = this.wal .append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); @@ -7593,6 +7594,7 @@ public void processRowsWithLocks(RowProcessor processor, long timeout, walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, processor.getClusterIds(), nonceGroup, nonce, mvcc); + preWALAppend(walKey, walEdit); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); } @@ -7884,6 +7886,7 @@ public Result append(Append mutate, long nonceGroup, long nonce) throws IOExcept nonceGroup, nonce, mvcc); + preWALAppend(walKey, walEdits); txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true); } else { @@ -7973,6 +7976,12 @@ public Result append(Append mutate, long nonceGroup, long nonce) throws IOExcept return mutate.isReturnResults() ? Result.create(allKVs) : null; } + private void preWALAppend(WALKey walKey, WALEdit walEdits) throws IOException { + if (this.coprocessorHost != null && !walEdits.isMetaEdit()) { + this.coprocessorHost.preWALAppend(walKey, walEdits); + } + } + private static Cell getNewCell(final byte [] row, final long ts, final Cell cell, final Cell oldCell, final byte [] tagBytes) { // allocate an empty cell once @@ -8124,6 +8133,7 @@ private Result doIncrement(Increment increment, long nonceGroup, long nonce) thr walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce, getMVCC()); + preWALAppend(walKey, walEdits); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index bc5af2022bc4..c0f357d85c60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1691,6 +1691,19 @@ public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preWALAppend(ctx, key, edit); + } + }); + } + private static abstract class CoprocessorOperation extends ObserverContext { public CoprocessorOperation() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 6c231edb8d9d..b031fdd1d490 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -464,6 +464,18 @@ public UUID getOriginatingClusterId(){ return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0); } + /** + * Add a named String value to this WALKey to be persisted into the WAL + * @param attributeKey Name of the attribute + * @param attributeValue Value of the attribute + */ + public void addExtendedAttribute(String attributeKey, byte[] attributeValue){ + if (extendedAttributes == null){ + extendedAttributes = new HashMap(); + } + extendedAttributes.put(attributeKey, attributeValue); + } + /** * Return a named String value injected into the WALKey during processing, such as by a * coprocessor diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index dbcf2e9bb473..7b8c323ba2a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -139,8 +139,10 @@ public class SimpleRegionObserver extends BaseRegionObserver { final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0); final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0); final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0); + final AtomicInteger ctPreWALAppend = new AtomicInteger(0); final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false); static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE"; + static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo"); public void setThrowOnPostFlush(Boolean val){ throwOnPostFlush.set(val); @@ -718,6 +720,15 @@ public Reader postStoreFileReaderOpen(ObserverContext ctx, + WALKey key, WALEdit edit) throws IOException { + ctPreWALAppend.incrementAndGet(); + + key.addExtendedAttribute(Integer.toString(ctPreWALAppend.get()), + Bytes.toBytes("foo")); + } + public boolean hadPreGet() { return ctPreGet.get() > 0; } @@ -975,6 +986,10 @@ public int getCtPostWALRestoreDeprecated() { return ctPostWALRestoreDeprecated.get(); } + public int getCtPreWALAppend() { + return ctPreWALAppend.get(); + } + public boolean wasStoreFileReaderOpenCalled() { return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index dc8040eec34b..75c8bab8d821 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.logging.Log; @@ -42,6 +43,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; @@ -78,11 +81,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WALKey; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; @Category(MediumTests.class) public class TestRegionObserverInterface { @@ -93,6 +99,7 @@ public class TestRegionObserverInterface { public final static byte[] B = Bytes.toBytes("b"); public final static byte[] C = Bytes.toBytes("c"); public final static byte[] ROW = Bytes.toBytes("testrow"); + public final static byte[] FAMILY = Bytes.toBytes("f"); private static HBaseTestingUtility util = new HBaseTestingUtility(); private static MiniHBaseCluster cluster = null; @@ -750,6 +757,100 @@ public void testPreWALRestoreSkip() throws Exception { table.close(); } + //called from testPreWALAppendIsWrittenToWAL + private void testPreWALAppendHook(Table table, TableName tableName) throws IOException { + int expectedCalls = 0; + String [] methodArray = new String[1]; + methodArray[0] = "getCtPreWALAppend"; + Object[] resultArray = new Object[1]; + + Put p = new Put(ROW); + p.addColumn(A, A, A); + table.put(p); + resultArray[0] = ++expectedCalls; + verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray); + + Append a = new Append(ROW); + a.add(B, B, B); + table.append(a); + resultArray[0] = ++expectedCalls; + verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray); + + Increment i = new Increment(ROW); + i.addColumn(C, C, 1); + table.increment(i); + resultArray[0] = ++expectedCalls; + verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray); + + Delete d = new Delete(ROW); + table.delete(d); + resultArray[0] = ++expectedCalls; + verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray); + } + + @Test + public void testPreWALAppend() throws Exception { + SimpleRegionObserver sro = new SimpleRegionObserver(); + ObserverContext ctx = Mockito.mock(ObserverContext.class); + WALKey key = new WALKey(Bytes.toBytes("region"), TEST_TABLE, + EnvironmentEdgeManager.currentTime()); + WALEdit edit = new WALEdit(); + sro.preWALAppend(ctx, key, edit); + Assert.assertEquals(1, key.getExtendedAttributes().size()); + Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES, + key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend()))); + } + + @Test + public void testPreWALAppendIsWrittenToWAL() throws Exception { + final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + + ".testPreWALAppendIsWrittenToWAL"); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(A)); + htd.addFamily(new HColumnDescriptor(B)); + htd.addFamily(new HColumnDescriptor(C)); + htd.addCoprocessor(SimpleRegionObserver.class.getName()); + Table table = util.createTable(htd, null); + PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener(); + List regions = util.getHBaseCluster().getRegions(tableName); + //should be only one region + HRegion region = regions.get(0); + region.getWAL().registerWALActionsListener(listener); + testPreWALAppendHook(table, tableName); + boolean[] expectedResults = {true, true, true, true}; + Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray()); + + } + + @Test + public void testPreWALAppendNotCalledOnMetaEdit() throws Exception { + final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + + ".testPreWALAppendNotCalledOnMetaEdt"); + + HTableDescriptor td = new HTableDescriptor(tableName); + td.addCoprocessor(SimpleRegionObserver.class.getName()); + td.addFamily(new HColumnDescriptor(FAMILY)); + Table table = util.createTable(td, new byte[][] { A, B, C }); + + PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener(); + List regions = util.getHBaseCluster().getRegions(tableName); + //should be only one region + HRegion region = regions.get(0); + + region.getWAL().registerWALActionsListener(listener); + //flushing should write to the WAL + region.flush(true); + //so should compaction + region.compact(false); + //and so should closing the region + region.close(); + + //but we still shouldn't have triggered preWALAppend because no user data was written + String[] methods = new String[] {"getCtPreWALAppend"}; + Object[] expectedResult = new Integer[]{0}; + verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult); + } + // check each region whether the coprocessor upcalls are called or not. private void verifyMethodResult(Class c, String methodName[], TableName tableName, Object value[]) throws IOException { @@ -800,4 +901,69 @@ private static void createHFile( writer.close(); } } + + private static class PreWALAppendWALActionsListener implements WALActionsListener { + boolean[] walKeysCorrect = {false, false, false, false}; + + @Override + public void postAppend(long entryLen, long elapsedTimeMillis, + WALKey logKey, WALEdit logEdit) throws IOException { + for (int k = 0; k < 4; k++) { + if (!walKeysCorrect[k]) { + walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES, + logKey.getExtendedAttribute(Integer.toString(k + 1))); + } + } + } + + @Override + public void postSync(long timeInNanos, int handlerSyncs) { + + } + + boolean[] getWalKeysCorrectArray() { + return walKeysCorrect; + } + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + + } + + @Override + public void preLogArchive(Path oldPath, Path newPath) throws IOException { + + } + + @Override + public void postLogArchive(Path oldPath, Path newPath) throws IOException { + + } + + @Override + public void logRollRequested(RollRequestReason reason) { + + } + + @Override + public void logCloseRequested() { + + } + + @Override + public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, + WALEdit logEdit) { + + } + + @Override + public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, + WALEdit logEdit) throws IOException { + + } + } }