From 731486ca579fb2625a90349d1b1f107eac611c97 Mon Sep 17 00:00:00 2001 From: mgarolera Date: Tue, 28 Oct 2014 13:07:33 -0400 Subject: [PATCH 1/4] Use Result instead of Object when possible, even if the table.batch uses Object[] and we have to make an array copy. --- .../cloud/anviltop/hbase/AnvilTopTable.java | 15 ++- .../cloud/anviltop/hbase/BatchExecutor.java | 108 +++--------------- 2 files changed, 26 insertions(+), 97 deletions(-) diff --git a/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java b/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java index f2c9af8805..87934ff532 100644 --- a/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java +++ b/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -163,7 +164,9 @@ public Boolean[] exists(List gets) throws IOException { @Override public void batch(List actions, Object[] results) throws IOException, InterruptedException { - batchExecutor.batch(actions, results); + Result[] tempResults = new Result[results.length]; + batchExecutor.batch(actions, tempResults); + System.arraycopy(tempResults, 0, results, 0, results.length); } @Override @@ -174,7 +177,9 @@ public Object[] batch(List actions) throws IOException, Interrupt @Override public void batchCallback(List actions, Object[] results, Batch.Callback callback) throws IOException, InterruptedException { - batchExecutor.batchCallback(actions, results, callback); + Result[] tempResults = new Result[results.length]; + batchExecutor.batchCallback(actions, tempResults, callback); + System.arraycopy(tempResults, 0, results, 0, results.length); } @Override @@ -207,7 +212,7 @@ public Result get(Get get) throws IOException { @Override public Result[] get(List gets) throws IOException { - return batchExecutor.get(gets); + return batchExecutor.batch(gets); } @Override @@ -264,7 +269,7 @@ public void put(Put put) throws IOException { @Override public void put(List puts) throws IOException { - batchExecutor.put(puts); + batchExecutor.batch(puts); } @Override @@ -299,7 +304,7 @@ public void delete(Delete delete) throws IOException { @Override public void delete(List deletes) throws IOException { - batchExecutor.delete(deletes); + batchExecutor.batch(deletes); } @Override diff --git a/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java b/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java index 4587f841ec..9a04c2190d 100644 --- a/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java +++ b/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java @@ -65,14 +65,14 @@ static abstract class RpcResultFutureCallback private final Row row; private final Batch.Callback callback; private final int index; - private final Object[] resultsArray; + private final Result[] resultsArray; private final SettableFuture resultFuture; public RpcResultFutureCallback( Row row, Batch.Callback callback, int index, - Object[] resultsArray, + Result[] resultsArray, SettableFuture resultFuture) { this.row = row; this.callback = callback; @@ -84,7 +84,7 @@ public RpcResultFutureCallback( /** * Adapt a proto result into a client result */ - abstract Object adaptResponse(T response); + abstract Result adaptResponse(T response); @SuppressWarnings("unchecked") R unchecked(Object o) { @@ -94,7 +94,7 @@ R unchecked(Object o) { @Override public void onSuccess(T t) { try { - Object result = adaptResponse(t); + Result result = adaptResponse(t); resultsArray[index] = result; if (callback != null) { callback.update(NO_REGION, row.getRow(), unchecked(result)); @@ -321,7 +321,7 @@ ListenableFuture issueRowMutationsRequest( * @return A ListenableFuture that will have the result when the RPC completes. */ ListenableFuture issueRowRequest( - final Row row, final Batch.Callback callback, final Object[] results, final int index) { + final Row row, final Batch.Callback callback, final Result[] results, final int index) { final SettableFuture resultFuture = SettableFuture.create(); results[index] = null; if (row instanceof Delete) { @@ -331,7 +331,7 @@ ListenableFuture issueRowRequest( new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Object adaptResponse(AnviltopServices.MutateRowResponse response) { + Result adaptResponse(AnviltopServices.MutateRowResponse response) { return new Result(); } }, @@ -343,7 +343,7 @@ Object adaptResponse(AnviltopServices.MutateRowResponse response) { new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Object adaptResponse(AnviltopServices.GetRowResponse response) { + Result adaptResponse(AnviltopServices.GetRowResponse response) { return getRowResponseAdapter.adaptResponse(response); } }, @@ -355,7 +355,7 @@ Object adaptResponse(AnviltopServices.GetRowResponse response) { new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Object adaptResponse(AnviltopServices.IncrementRowResponse response) { + Result adaptResponse(AnviltopServices.IncrementRowResponse response) { return incrRespAdapter.adaptResponse(response); } }, @@ -367,7 +367,7 @@ Object adaptResponse(AnviltopServices.IncrementRowResponse response) { new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Object adaptResponse(AnviltopServices.MutateRowResponse response) { + Result adaptResponse(AnviltopServices.MutateRowResponse response) { return new Result(); } }, @@ -379,7 +379,7 @@ Object adaptResponse(AnviltopServices.MutateRowResponse response) { new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Object adaptResponse(AnviltopServices.MutateRowResponse response) { + Result adaptResponse(AnviltopServices.MutateRowResponse response) { return new Result(); } }, @@ -396,10 +396,10 @@ Object adaptResponse(AnviltopServices.MutateRowResponse response) { /** * Implementation of {@link org.apache.hadoop.hbase.client.HTable#batch(List, Object[])} */ - public void batch(List actions, @Nullable Object[] results) + public void batch(List actions, @Nullable Result[] results) throws IOException, InterruptedException { if (results == null) { - results = new Object[actions.size()]; + results = new Result[actions.size()]; } Preconditions.checkArgument(results.length == actions.size(), "Result array must have same dimensions as actions list."); @@ -436,7 +436,7 @@ public void batch(List actions, @Nullable Object[] results) /** * Implementation of {@link org.apache.hadoop.hbase.client.HTable#batch(List)} */ - public Object[] batch(List actions) throws IOException { + public Result[] batch(List actions) throws IOException { Result[] results = new Result[actions.size()]; try { batch(actions, results); @@ -450,7 +450,7 @@ public Object[] batch(List actions) throws IOException { * Implementation of * {@link org.apache.hadoop.hbase.client.HTable#batchCallback(List, Batch.Callback)} */ - public Object[] batchCallback( + public Result[] batchCallback( List actions, Batch.Callback callback) throws IOException, InterruptedException { Result[] results = new Result[actions.size()]; @@ -472,7 +472,7 @@ public Object[] batchCallback( * {@link org.apache.hadoop.hbase.client.HTable#batchCallback(List, Object[], Batch.Callback)} */ public void batchCallback(List actions, - Object[] results, Batch.Callback callback) throws IOException, InterruptedException { + Result[] results, Batch.Callback callback) throws IOException, InterruptedException { Preconditions.checkArgument(results.length == actions.size(), "Result array must be the same length as actions."); int index = 0; @@ -489,34 +489,12 @@ public void batchCallback(List actions, } } - /** - * Implementation of {@link org.apache.hadoop.hbase.client.HTable#delete(List)} - */ - public List delete(List deletes) throws IOException { - List responses; - try { - /* The following is from Table#delete(List): "If there are any failures even after retries, - * there will be a null in the results array for those Gets, AND an exception will be thrown." - * This sentence makes my head hurt. The best interpretation I can come up with is "execute - * all gets and then throw an exception". This is what allAsList will do. Wait for all to - * complete and throw an exception for failed futures. - */ - responses = Futures.allAsList(issueDeleteRequests(deletes)).get(); - } catch (ExecutionException | InterruptedException e) { - // TODO: For Execution exception, add inspection of ExecutionException#getCause to get the - // real issue. - throw new IOException("Error in batch delete", e); - } - - return responses; - } - /** * Implementation of {@link org.apache.hadoop.hbase.client.HTable#exists(List)}. */ public Boolean[] exists(List gets) throws IOException { // get(gets) will throw if there are any errors: - Result[] getResults = get(gets); + Result[] getResults = batch(gets); Boolean[] exists = new Boolean[getResults.length]; for (int index = 0; index < getResults.length; index++) { @@ -525,58 +503,4 @@ public Boolean[] exists(List gets) throws IOException { return exists; } - /** - * Implementation of {@link org.apache.hadoop.hbase.client.HTable#get(List)} - */ - public Result[] get(List gets) throws IOException { - List responses; - try { - /* The following is from Table#get(List): "If there are any failures even after retries, - * there will be a null in the results array for those Gets, AND an exception will be thrown." - * This sentence makes my head hurt. The best interpretation I can come up with is "execute - * all gets and then throw an exception". This is what allAsList will do. Wait for all to - * complete and throw an exception for failed futures. - */ - responses = Futures.allAsList(issueGetRequests(gets)).get(); - } catch (ExecutionException | InterruptedException e) { - // TODO: For Execution exception, add inspection of ExecutionException#getCause to get the - // real issue. - throw new IOException("Error in batch get", e); - } - - List resultList = - Lists.transform(responses, new Function(){ - @Override - public Result apply(@Nullable AnviltopServices.GetRowResponse getRowResponse) { - return getRowResponseAdapter.adaptResponse(getRowResponse); - } - }); - - int resultCount = resultList.size(); - Result[] results = new Result[resultCount]; - resultList.toArray(results); - return results; - } - - /** - * Implementation of {@link org.apache.hadoop.hbase.client.HTable#put(List)} - */ - public List put(List puts) throws IOException { - List responses; - try { - /* The following is from Table#put(List): "If there are any failures even after retries, - * there will be a null in the results array for those Gets, AND an exception will be thrown." - * This sentence makes my head hurt. The best interpretation I can come up with is "execute - * all gets and then throw an exception". This is what allAsList will do. Wait for all to - * complete and throw an exception for failed futures. - */ - responses = Futures.allAsList(issuePutRequests(puts)).get(); - } catch (ExecutionException | InterruptedException e) { - // TODO: For Execution exception, add inspection of ExecutionException#getCause to get the - // real issue. - throw new IOException("Error in batch put", e); - } - - return responses; - } } From f6f97288a1842e8ba7cf1a83529f767b8f37e4a2 Mon Sep 17 00:00:00 2001 From: mgarolera Date: Tue, 28 Oct 2014 15:37:42 -0400 Subject: [PATCH 2/4] Remove unused import. --- src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java b/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java index 87934ff532..59039ab3a7 100644 --- a/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java +++ b/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; From 936e2997a6de04a6b7b3a6bc8a36180a6b861291 Mon Sep 17 00:00:00 2001 From: mgarolera Date: Wed, 29 Oct 2014 10:51:01 -0400 Subject: [PATCH 3/4] Revert Object->Result replacement. --- .../cloud/anviltop/hbase/AnvilTopTable.java | 10 ++----- .../cloud/anviltop/hbase/BatchExecutor.java | 30 +++++++++---------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java b/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java index 59039ab3a7..8a5af77346 100644 --- a/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java +++ b/src/main/java/com/google/cloud/anviltop/hbase/AnvilTopTable.java @@ -163,9 +163,7 @@ public Boolean[] exists(List gets) throws IOException { @Override public void batch(List actions, Object[] results) throws IOException, InterruptedException { - Result[] tempResults = new Result[results.length]; - batchExecutor.batch(actions, tempResults); - System.arraycopy(tempResults, 0, results, 0, results.length); + batchExecutor.batch(actions, results); } @Override @@ -176,9 +174,7 @@ public Object[] batch(List actions) throws IOException, Interrupt @Override public void batchCallback(List actions, Object[] results, Batch.Callback callback) throws IOException, InterruptedException { - Result[] tempResults = new Result[results.length]; - batchExecutor.batchCallback(actions, tempResults, callback); - System.arraycopy(tempResults, 0, results, 0, results.length); + batchExecutor.batchCallback(actions, results, callback); } @Override @@ -211,7 +207,7 @@ public Result get(Get get) throws IOException { @Override public Result[] get(List gets) throws IOException { - return batchExecutor.batch(gets); + return (Result[]) batchExecutor.batch(gets); } @Override diff --git a/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java b/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java index 9a04c2190d..d5c92bcf1a 100644 --- a/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java +++ b/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java @@ -65,14 +65,14 @@ static abstract class RpcResultFutureCallback private final Row row; private final Batch.Callback callback; private final int index; - private final Result[] resultsArray; + private final Object[] resultsArray; private final SettableFuture resultFuture; public RpcResultFutureCallback( Row row, Batch.Callback callback, int index, - Result[] resultsArray, + Object[] resultsArray, SettableFuture resultFuture) { this.row = row; this.callback = callback; @@ -84,7 +84,7 @@ public RpcResultFutureCallback( /** * Adapt a proto result into a client result */ - abstract Result adaptResponse(T response); + abstract Object adaptResponse(T response); @SuppressWarnings("unchecked") R unchecked(Object o) { @@ -94,7 +94,7 @@ R unchecked(Object o) { @Override public void onSuccess(T t) { try { - Result result = adaptResponse(t); + Object result = adaptResponse(t); resultsArray[index] = result; if (callback != null) { callback.update(NO_REGION, row.getRow(), unchecked(result)); @@ -321,7 +321,7 @@ ListenableFuture issueRowMutationsRequest( * @return A ListenableFuture that will have the result when the RPC completes. */ ListenableFuture issueRowRequest( - final Row row, final Batch.Callback callback, final Result[] results, final int index) { + final Row row, final Batch.Callback callback, final Object[] results, final int index) { final SettableFuture resultFuture = SettableFuture.create(); results[index] = null; if (row instanceof Delete) { @@ -331,7 +331,7 @@ ListenableFuture issueRowRequest( new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Result adaptResponse(AnviltopServices.MutateRowResponse response) { + Object adaptResponse(AnviltopServices.MutateRowResponse response) { return new Result(); } }, @@ -343,7 +343,7 @@ Result adaptResponse(AnviltopServices.MutateRowResponse response) { new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Result adaptResponse(AnviltopServices.GetRowResponse response) { + Object adaptResponse(AnviltopServices.GetRowResponse response) { return getRowResponseAdapter.adaptResponse(response); } }, @@ -355,7 +355,7 @@ Result adaptResponse(AnviltopServices.GetRowResponse response) { new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Result adaptResponse(AnviltopServices.IncrementRowResponse response) { + Object adaptResponse(AnviltopServices.IncrementRowResponse response) { return incrRespAdapter.adaptResponse(response); } }, @@ -367,7 +367,7 @@ Result adaptResponse(AnviltopServices.IncrementRowResponse response) { new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Result adaptResponse(AnviltopServices.MutateRowResponse response) { + Object adaptResponse(AnviltopServices.MutateRowResponse response) { return new Result(); } }, @@ -379,7 +379,7 @@ Result adaptResponse(AnviltopServices.MutateRowResponse response) { new RpcResultFutureCallback( row, callback, index, results, resultFuture) { @Override - Result adaptResponse(AnviltopServices.MutateRowResponse response) { + Object adaptResponse(AnviltopServices.MutateRowResponse response) { return new Result(); } }, @@ -396,10 +396,10 @@ Result adaptResponse(AnviltopServices.MutateRowResponse response) { /** * Implementation of {@link org.apache.hadoop.hbase.client.HTable#batch(List, Object[])} */ - public void batch(List actions, @Nullable Result[] results) + public void batch(List actions, @Nullable Object[] results) throws IOException, InterruptedException { if (results == null) { - results = new Result[actions.size()]; + results = new Object[actions.size()]; } Preconditions.checkArgument(results.length == actions.size(), "Result array must have same dimensions as actions list."); @@ -436,7 +436,7 @@ public void batch(List actions, @Nullable Result[] results) /** * Implementation of {@link org.apache.hadoop.hbase.client.HTable#batch(List)} */ - public Result[] batch(List actions) throws IOException { + public Object[] batch(List actions) throws IOException { Result[] results = new Result[actions.size()]; try { batch(actions, results); @@ -472,7 +472,7 @@ public Result[] batchCallback( * {@link org.apache.hadoop.hbase.client.HTable#batchCallback(List, Object[], Batch.Callback)} */ public void batchCallback(List actions, - Result[] results, Batch.Callback callback) throws IOException, InterruptedException { + Object[] results, Batch.Callback callback) throws IOException, InterruptedException { Preconditions.checkArgument(results.length == actions.size(), "Result array must be the same length as actions."); int index = 0; @@ -494,7 +494,7 @@ public void batchCallback(List actions, */ public Boolean[] exists(List gets) throws IOException { // get(gets) will throw if there are any errors: - Result[] getResults = batch(gets); + Result[] getResults = (Result[]) batch(gets); Boolean[] exists = new Boolean[getResults.length]; for (int index = 0; index < getResults.length; index++) { From fd12ca4ca3c87bd393c056f6df501bcd59323d92 Mon Sep 17 00:00:00 2001 From: mgarolera Date: Wed, 29 Oct 2014 10:53:09 -0400 Subject: [PATCH 4/4] Revert one more Object->Result --- .../java/com/google/cloud/anviltop/hbase/BatchExecutor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java b/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java index d5c92bcf1a..653fc62e28 100644 --- a/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java +++ b/src/main/java/com/google/cloud/anviltop/hbase/BatchExecutor.java @@ -450,7 +450,7 @@ public Object[] batch(List actions) throws IOException { * Implementation of * {@link org.apache.hadoop.hbase.client.HTable#batchCallback(List, Batch.Callback)} */ - public Result[] batchCallback( + public Object[] batchCallback( List actions, Batch.Callback callback) throws IOException, InterruptedException { Result[] results = new Result[actions.size()]; @@ -472,7 +472,7 @@ public Result[] batchCallback( * {@link org.apache.hadoop.hbase.client.HTable#batchCallback(List, Object[], Batch.Callback)} */ public void batchCallback(List actions, - Object[] results, Batch.Callback callback) throws IOException, InterruptedException { + Object[] results, Batch.Callback callback) throws IOException, InterruptedException { Preconditions.checkArgument(results.length == actions.size(), "Result array must be the same length as actions."); int index = 0;