From 9cfeec0deb79e1e8541e237b4f02fb590556372c Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Mon, 22 Feb 2021 10:46:16 +0900 Subject: [PATCH] HBASE-25575 Should validate Puts in RowMutations Signed-off-by: Duo Zhang --- .../hadoop/hbase/client/ConnectionUtils.java | 10 +++- .../hbase/client/RawAsyncTableImpl.java | 13 +++-- .../hadoop/hbase/client/TestAsyncTable.java | 43 ++++++++++++++++ .../hbase/client/TestAsyncTableBatch.java | 51 +++++++++++++++++++ 4 files changed, 113 insertions(+), 4 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index d6f21944a1a7..245c5194cb01 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -584,7 +584,7 @@ static CompletableFuture timelineConsistentRead(AsyncRegionLocator locato } // validate for well-formedness - static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException { + static void validatePut(Put put, int maxKeyValueSize) { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); } @@ -599,6 +599,14 @@ static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentExce } } + static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValueSize) { + for (Mutation mutation : rowMutations.getMutations()) { + if (mutation instanceof Put) { + validatePut((Put) mutation, maxKeyValueSize); + } + } + } + /** * Select the priority for the rpc call. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 63ade0d207fe..1222d83d0545 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; +import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.RpcChannel; @@ -381,6 +382,7 @@ public CompletableFuture thenDelete(Delete delete) { @Override public CompletableFuture thenMutate(RowMutations mutation) { preCheck(); + validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, @@ -441,6 +443,7 @@ public CompletableFuture thenDelete(Delete delete) { @Override public CompletableFuture thenMutate(RowMutations mutation) { + validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, @@ -458,9 +461,6 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) @Override public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { - if (checkAndMutate.getAction() instanceof Put) { - validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); - } if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete || checkAndMutate.getAction() instanceof Increment || checkAndMutate.getAction() instanceof Append) { @@ -480,6 +480,7 @@ public CompletableFuture checkAndMutate(CheckAndMutate che .call(); } else if (checkAndMutate.getAction() instanceof RowMutations) { RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); + validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> @@ -552,6 +553,7 @@ public void run(MultiResponse resp) { @Override public CompletableFuture mutateRow(RowMutations mutations) { + validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); return this. newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs).action((controller, loc, stub) -> this. mutateRow(controller, loc, stub, mutations, @@ -653,7 +655,12 @@ private List> batch(List actions, long r CheckAndMutate checkAndMutate = (CheckAndMutate) action; if (checkAndMutate.getAction() instanceof Put) { validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); + } else if (checkAndMutate.getAction() instanceof RowMutations) { + validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(), + conn.connConf.getMaxKeyValueSize()); } + } else if (action instanceof RowMutations) { + validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize()); } } return conn.callerFactory.batch().table(tableName).actions(actions) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index c863ec12a96d..453f3b871997 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -1672,4 +1672,47 @@ public void testInvalidPut() { assertThat(e.getMessage(), containsString("KeyValue size too large")); } } + + @Test + public void testInvalidPutInRowMutations() throws IOException { + final byte[] row = Bytes.toBytes(0); + try { + getTable.get().mutateRow(new RowMutations(row).add((Mutation) new Put(row))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + getTable.get() + .mutateRow(new RowMutations(row).add((Mutation) new Put(row) + .addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]))); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } + + @Test + public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException { + final byte[] row = Bytes.toBytes(0); + try { + getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER) + .build(new RowMutations(row).add((Mutation) new Put(row)))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER) + .build(new RowMutations(row).add((Mutation) new Put(row) + .addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])))); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index 2d7e3d4cbd50..a73a6b7d20ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -335,6 +335,57 @@ public void testInvalidPut() { } } + @Test + public void testInvalidPutInRowMutations() throws IOException { + final byte[] row = Bytes.toBytes(0); + + AsyncTable table = tableGetter.apply(TABLE_NAME); + try { + table.batch(Arrays.asList(new Delete(row), new RowMutations(row) + .add((Mutation) new Put(row)))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + table.batch( + Arrays.asList(new RowMutations(row).add((Mutation) new Put(row) + .addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE])), + new Delete(row))); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } + + @Test + public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException { + final byte[] row = Bytes.toBytes(0); + + AsyncTable table = tableGetter.apply(TABLE_NAME); + try { + table.batch(Arrays.asList(new Delete(row), CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, CQ) + .build(new RowMutations(row).add((Mutation) new Put(row))))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + table.batch( + Arrays.asList(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, CQ) + .build(new RowMutations(row).add((Mutation) new Put(row) + .addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]))), + new Delete(row))); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } + @Test public void testWithCheckAndMutate() throws Exception { AsyncTable table = tableGetter.apply(TABLE_NAME);