From 5356edfe63d1deff636db6157f110a7f0b52fa0e Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Mon, 22 Feb 2021 09:37:30 +0900 Subject: [PATCH] HBASE-25574 Revisit put/delete/increment/append related RegionObserver methods Signed-off-by: Duo Zhang --- .../hbase/coprocessor/RegionObserver.java | 187 ++++++++++++++++-- .../hadoop/hbase/regionserver/HRegion.java | 17 +- .../regionserver/RegionCoprocessorHost.java | 57 +++--- 3 files changed, 208 insertions(+), 53 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 0782dae067d5..2bd766557fee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -374,10 +374,30 @@ default boolean postExists(ObserverContext c, Get * @param put The Put object * @param edit The WALEdit object that will be written to the wal * @param durability Persistence guarantee for this Put + * @deprecated since 2.5.0 and will be removed in 4.0.0. Use + * {@link #prePut(ObserverContext, Put, WALEdit)} instead. */ + @Deprecated default void prePut(ObserverContext c, Put put, WALEdit edit, Durability durability) throws IOException {} + /** + * Called before the client stores a value. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param put The Put object + * @param edit The WALEdit object that will be written to the wal + */ + default void prePut(ObserverContext c, Put put, WALEdit edit) + throws IOException { + prePut(c, put, edit, put.getDurability()); + } + /** * Called after the client stores a value. *

@@ -387,10 +407,27 @@ default void prePut(ObserverContext c, Put put, WA * @param put The Put object * @param edit The WALEdit object for the wal * @param durability Persistence guarantee for this Put + * @deprecated since 2.5.0 and will be removed in 4.0.0. Use + * {@link #postPut(ObserverContext, Put, WALEdit)} instead. */ + @Deprecated default void postPut(ObserverContext c, Put put, WALEdit edit, Durability durability) throws IOException {} + /** + * Called after the client stores a value. + *

+ * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param put The Put object + * @param edit The WALEdit object for the wal + */ + default void postPut(ObserverContext c, Put put, WALEdit edit) + throws IOException { + postPut(c, put, edit, put.getDurability()); + } + /** * Called before the client deletes a value. *

@@ -403,10 +440,30 @@ default void postPut(ObserverContext c, Put put, W * @param delete The Delete object * @param edit The WALEdit object for the wal * @param durability Persistence guarantee for this Delete + * @deprecated since 2.5.0 and will be removed in 4.0.0. Use + * {@link #preDelete(ObserverContext, Delete, WALEdit)} instead. */ + @Deprecated default void preDelete(ObserverContext c, Delete delete, WALEdit edit, Durability durability) throws IOException {} + /** + * Called before the client deletes a value. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param delete The Delete object + * @param edit The WALEdit object for the wal + */ + default void preDelete(ObserverContext c, Delete delete, + WALEdit edit) throws IOException { + preDelete(c, delete, edit, delete.getDurability()); + } + /** * Called before the server updates the timestamp for version delete with latest timestamp. *

@@ -434,10 +491,27 @@ default void prePrepareTimeStampForDeleteVersion(ObserverContext c, Delete delete, WALEdit edit, Durability durability) throws IOException {} + /** + * Called after the client deletes a value. + *

+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param delete The Delete object + * @param edit The WALEdit object for the wal + */ + default void postDelete(ObserverContext c, Delete delete, + WALEdit edit) throws IOException { + postDelete(c, delete, edit, delete.getDurability()); + } + /** * This will be called for every batch mutation operation happening at the server. This will be * called after acquiring the locks on the mutating rows and after applying the proper timestamp @@ -457,10 +531,10 @@ default void preBatchMutate(ObserverContext c, /** * This will be called after applying a batch of Mutations on a region. The Mutations are added * to memstore and WAL. The difference of this one with - * {@link #postPut(ObserverContext, Put, WALEdit, Durability)} - * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} - * and {@link #postIncrement(ObserverContext, Increment, Result)} - * and {@link #postAppend(ObserverContext, Append, Result)} is + * {@link #postPut(ObserverContext, Put, WALEdit)} + * and {@link #postDelete(ObserverContext, Delete, WALEdit)} + * and {@link #postIncrement(ObserverContext, Increment, Result, WALEdit)} + * and {@link #postAppend(ObserverContext, Append, Result, WALEdit)} is * this hook will be executed before the mvcc transaction completion. *

* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation. @@ -968,12 +1042,33 @@ default CheckAndMutateResult postCheckAndMutate(ObserverContext c, Append append) - throws IOException { + throws IOException { return null; } + /** + * Called before Append. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in 'append' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param append Append object + * @param edit The WALEdit object that will be written to the wal + * @return result to return to the client if bypassing default processing + */ + default Result preAppend(ObserverContext c, Append append, + WALEdit edit) throws IOException { + return preAppend(c, append); + } + /** * Called before Append but after acquiring rowlock. *

@@ -989,9 +1084,12 @@ default Result preAppend(ObserverContext c, Append * @param c the environment provided by the region server * @param append Append object * @return result to return to the client if bypassing default processing + * @deprecated since 2.5.0 and will be removed in 4.0.0. Use + * {@link #preBatchMutate(ObserverContext, MiniBatchOperationInProgress)} instead. */ + @Deprecated default Result preAppendAfterRowLock(ObserverContext c, - Append append) throws IOException { + Append append) throws IOException { return null; } @@ -1004,12 +1102,31 @@ default Result preAppendAfterRowLock(ObserverContext c, Append append, - Result result) throws IOException { + Result result) throws IOException { return result; } + /** + * Called after Append + *

+ * Note: Do not retain references to any Cells in 'append' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param append Append object + * @param result the result returned by increment + * @param edit The WALEdit object for the wal + * @return the result to return to the client + */ + default Result postAppend(ObserverContext c, Append append, + Result result, WALEdit edit) throws IOException { + return postAppend(c, append, result); + } + /** * Called before Increment. *

@@ -1021,12 +1138,33 @@ default Result postAppend(ObserverContext c, Appen * @param c the environment provided by the region server * @param increment increment object * @return result to return to the client if bypassing default processing + * @deprecated since 2.5.0 and will be removed in 4.0.0. Use + * {@link #preIncrement(ObserverContext, Increment, WALEdit)} instead. */ + @Deprecated default Result preIncrement(ObserverContext c, Increment increment) - throws IOException { + throws IOException { return null; } + /** + * Called before Increment. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param increment increment object + * @param edit The WALEdit object that will be written to the wal + * @return result to return to the client if bypassing default processing + */ + default Result preIncrement(ObserverContext c, Increment increment, + WALEdit edit) throws IOException { + return preIncrement(c, increment); + } + /** * Called before Increment but after acquiring rowlock. *

@@ -1040,15 +1178,15 @@ default Result preIncrement(ObserverContext c, Inc * Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation. * If need a Cell reference for later use, copy the cell and use that. * - * @param c - * the environment provided by the region server - * @param increment - * increment object + * @param c the environment provided by the region server + * @param increment increment object * @return result to return to the client if bypassing default processing - * if an error occurred on the coprocessor + * @deprecated since 2.5.0 and will be removed in 4.0.0. Use + * {@link #preBatchMutate(ObserverContext, MiniBatchOperationInProgress)} instead. */ + @Deprecated default Result preIncrementAfterRowLock(ObserverContext c, - Increment increment) throws IOException { + Increment increment) throws IOException { return null; } @@ -1061,12 +1199,31 @@ default Result preIncrementAfterRowLock(ObserverContext c, Increment increment, - Result result) throws IOException { + Result result) throws IOException { return result; } + /** + * Called after increment + *

+ * Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param increment increment object + * @param result the result returned by increment + * @param edit The WALEdit object for the wal + * @return the result to return to the client + */ + default Result postIncrement(ObserverContext c, Increment increment, + Result result, WALEdit edit) throws IOException { + return postIncrement(c, increment, result); + } + /** * Called before the client opens a new scanner. *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index eec6e5113521..0b7d76e3a5fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4159,18 +4159,19 @@ public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress m if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS) { Mutation m = getMutation(i); if (m instanceof Put) { - region.coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); + region.coprocessorHost.postPut((Put) m, walEdit); } else if (m instanceof Delete) { - region.coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); + region.coprocessorHost.postDelete((Delete) m, walEdit); } else if (m instanceof Increment) { Result result = region.getCoprocessorHost().postIncrement((Increment) m, - results[i]); + results[i], walEdit); if (result != results[i]) { retCodeDetails[i] = new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result); } } else if (m instanceof Append) { - Result result = region.getCoprocessorHost().postAppend((Append) m, results[i]); + Result result = region.getCoprocessorHost().postAppend((Append) m, results[i], + walEdit); if (result != results[i]) { retCodeDetails[i] = new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result); @@ -4232,7 +4233,7 @@ private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] m throws IOException { Mutation m = getMutation(index); if (m instanceof Put) { - if (region.coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { + if (region.coprocessorHost.prePut((Put) m, walEdit)) { // pre hook says skip this Put // mark as success and skip in doMiniBatchMutation metrics[0]++; @@ -4246,7 +4247,7 @@ private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] m // Can this be avoided? region.prepareDelete(curDel); } - if (region.coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) { + if (region.coprocessorHost.preDelete(curDel, walEdit)) { // pre hook says skip this Delete // mark as success and skip in doMiniBatchMutation metrics[1]++; @@ -4254,7 +4255,7 @@ private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] m } } else if (m instanceof Increment) { Increment increment = (Increment) m; - Result result = region.coprocessorHost.preIncrement(increment); + Result result = region.coprocessorHost.preIncrement(increment, walEdit); if (result != null) { // pre hook says skip this Increment // mark as success and skip in doMiniBatchMutation @@ -4263,7 +4264,7 @@ private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] m } } else if (m instanceof Append) { Append append = (Append) m; - Result result = region.coprocessorHost.preAppend(append); + Result result = region.coprocessorHost.preAppend(append, walEdit); if (result != null) { // pre hook says skip this Append // mark as success and skip in doMiniBatchMutation diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 581852491e7f..6961bfdaf1a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; @@ -912,12 +911,10 @@ public Boolean call(RegionObserver observer) throws IOException { * Supports Coprocessor 'bypass'. * @param put The Put object * @param edit The WALEdit object. - * @param durability The durability used * @return true if default processing should be bypassed * @exception IOException Exception */ - public boolean prePut(final Put put, final WALEdit edit, final Durability durability) - throws IOException { + public boolean prePut(final Put put, final WALEdit edit) throws IOException { if (coprocEnvironments.isEmpty()) { return false; } @@ -925,7 +922,7 @@ public boolean prePut(final Put put, final WALEdit edit, final Durability durabi return execOperation(new RegionObserverOperationWithoutResult(bypassable) { @Override public void call(RegionObserver observer) throws IOException { - observer.prePut(this, put, edit, durability); + observer.prePut(this, put, edit); } }); } @@ -959,18 +956,16 @@ public void call(RegionObserver observer) throws IOException { /** * @param put The Put object * @param edit The WALEdit object. - * @param durability The durability used * @exception IOException Exception */ - public void postPut(final Put put, final WALEdit edit, final Durability durability) - throws IOException { + public void postPut(final Put put, final WALEdit edit) throws IOException { if (coprocEnvironments.isEmpty()) { return; } execOperation(new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { - observer.postPut(this, put, edit, durability); + observer.postPut(this, put, edit); } }); } @@ -979,12 +974,10 @@ public void call(RegionObserver observer) throws IOException { * Supports Coprocessor 'bypass'. * @param delete The Delete object * @param edit The WALEdit object. - * @param durability The durability used * @return true if default processing should be bypassed * @exception IOException Exception */ - public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) - throws IOException { + public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException { if (this.coprocEnvironments.isEmpty()) { return false; } @@ -992,7 +985,7 @@ public boolean preDelete(final Delete delete, final WALEdit edit, final Durabili return execOperation(new RegionObserverOperationWithoutResult(bypassable) { @Override public void call(RegionObserver observer) throws IOException { - observer.preDelete(this, delete, edit, durability); + observer.preDelete(this, delete, edit); } }); } @@ -1000,18 +993,16 @@ public void call(RegionObserver observer) throws IOException { /** * @param delete The Delete object * @param edit The WALEdit object. - * @param durability The durability used * @exception IOException Exception */ - public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) - throws IOException { + public void postDelete(final Delete delete, final WALEdit edit) throws IOException { execOperation(coprocEnvironments.isEmpty()? null: - new RegionObserverOperationWithoutResult() { - @Override - public void call(RegionObserver observer) throws IOException { - observer.postDelete(this, delete, edit, durability); - } - }); + new RegionObserverOperationWithoutResult() { + @Override + public void call(RegionObserver observer) throws IOException { + observer.postDelete(this, delete, edit); + } + }); } public void preBatchMutate( @@ -1127,10 +1118,11 @@ public CheckAndMutateResult call(RegionObserver observer) throws IOException { /** * Supports Coprocessor 'bypass'. * @param append append object + * @param edit The WALEdit object. * @return result to return to client if default operation should be bypassed, null otherwise * @throws IOException if an error occurred on the coprocessor */ - public Result preAppend(final Append append) throws IOException { + public Result preAppend(final Append append, final WALEdit edit) throws IOException { boolean bypassable = true; Result defaultResult = null; if (this.coprocEnvironments.isEmpty()) { @@ -1141,7 +1133,7 @@ public Result preAppend(final Append append) throws IOException { bypassable) { @Override public Result call(RegionObserver observer) throws IOException { - return observer.preAppend(this, append); + return observer.preAppend(this, append, edit); } }); } @@ -1171,10 +1163,11 @@ public Result call(RegionObserver observer) throws IOException { /** * Supports Coprocessor 'bypass'. * @param increment increment object + * @param edit The WALEdit object. * @return result to return to client if default operation should be bypassed, null otherwise * @throws IOException if an error occurred on the coprocessor */ - public Result preIncrement(final Increment increment) throws IOException { + public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException { boolean bypassable = true; Result defaultResult = null; if (coprocEnvironments.isEmpty()) { @@ -1185,7 +1178,7 @@ public Result preIncrement(final Increment increment) throws IOException { bypassable) { @Override public Result call(RegionObserver observer) throws IOException { - return observer.preIncrement(this, increment); + return observer.preIncrement(this, increment, edit); } }); } @@ -1215,9 +1208,11 @@ public Result call(RegionObserver observer) throws IOException { /** * @param append Append object * @param result the result returned by the append + * @param edit The WALEdit object. * @throws IOException if an error occurred on the coprocessor */ - public Result postAppend(final Append append, final Result result) throws IOException { + public Result postAppend(final Append append, final Result result, final WALEdit edit) + throws IOException { if (this.coprocEnvironments.isEmpty()) { return result; } @@ -1225,7 +1220,7 @@ public Result postAppend(final Append append, final Result result) throws IOExce new ObserverOperationWithResult(regionObserverGetter, result) { @Override public Result call(RegionObserver observer) throws IOException { - return observer.postAppend(this, append, result); + return observer.postAppend(this, append, result, edit); } }); } @@ -1233,9 +1228,11 @@ public Result call(RegionObserver observer) throws IOException { /** * @param increment increment object * @param result the result returned by postIncrement + * @param edit The WALEdit object. * @throws IOException if an error occurred on the coprocessor */ - public Result postIncrement(final Increment increment, Result result) throws IOException { + public Result postIncrement(final Increment increment, Result result, final WALEdit edit) + throws IOException { if (this.coprocEnvironments.isEmpty()) { return result; } @@ -1243,7 +1240,7 @@ public Result postIncrement(final Increment increment, Result result) throws IOE new ObserverOperationWithResult(regionObserverGetter, result) { @Override public Result call(RegionObserver observer) throws IOException { - return observer.postIncrement(this, increment, getResult()); + return observer.postIncrement(this, increment, getResult(), edit); } }); }