Skip to content

Commit

Permalink
HBASE-25575 Should validate Puts in RowMutations
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
brfrn169 committed Feb 22, 2021
1 parent 5356edf commit cfbae4d
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locato
}

// validate for well-formedness
static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
static void validatePut(Put put, int maxKeyValueSize) {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
Expand All @@ -585,6 +585,14 @@ static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentExce
}
}

static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValueSize) {
for (Mutation mutation : rowMutations.getMutations()) {
if (mutation instanceof Put) {
validatePut((Put) mutation, maxKeyValueSize);
}
}
}

/**
* Select the priority for the rpc call.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import com.google.protobuf.RpcChannel;
Expand Down Expand Up @@ -381,6 +382,7 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
Expand Down Expand Up @@ -441,6 +443,7 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {

@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
Expand All @@ -458,9 +461,6 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)

@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
if (checkAndMutate.getAction() instanceof Put) {
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
}
if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
|| checkAndMutate.getAction() instanceof Increment
|| checkAndMutate.getAction() instanceof Append) {
Expand All @@ -480,6 +480,7 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
rowMutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
Expand Down Expand Up @@ -552,6 +553,7 @@ public void run(MultiResponse resp) {

@Override
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
writeRpcTimeoutNs).action((controller, loc, stub) ->
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
Expand Down Expand Up @@ -653,7 +655,12 @@ private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long r
CheckAndMutate checkAndMutate = (CheckAndMutate) action;
if (checkAndMutate.getAction() instanceof Put) {
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
} else if (checkAndMutate.getAction() instanceof RowMutations) {
validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(),
conn.connConf.getMaxKeyValueSize());
}
} else if (action instanceof RowMutations) {
validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize());
}
}
return conn.callerFactory.batch().table(tableName).actions(actions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1673,4 +1673,47 @@ public void testInvalidPut() {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}

@Test
public void testInvalidPutInRowMutations() throws IOException {
final byte[] row = Bytes.toBytes(0);
try {
getTable.get().mutateRow(new RowMutations(row).add((Mutation) new Put(row)));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}

try {
getTable.get()
.mutateRow(new RowMutations(row).add((Mutation) new Put(row)
.addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}

@Test
public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException {
final byte[] row = Bytes.toBytes(0);
try {
getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(new RowMutations(row).add((Mutation) new Put(row))));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}

try {
getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(new RowMutations(row).add((Mutation) new Put(row)
.addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]))));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,57 @@ public void testInvalidPut() {
}
}

@Test
public void testInvalidPutInRowMutations() throws IOException {
final byte[] row = Bytes.toBytes(0);

AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
try {
table.batch(Arrays.asList(new Delete(row), new RowMutations(row)
.add((Mutation) new Put(row))));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}

try {
table.batch(
Arrays.asList(new RowMutations(row).add((Mutation) new Put(row)
.addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE])),
new Delete(row)));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}

@Test
public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException {
final byte[] row = Bytes.toBytes(0);

AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
try {
table.batch(Arrays.asList(new Delete(row), CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, CQ)
.build(new RowMutations(row).add((Mutation) new Put(row)))));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}

try {
table.batch(
Arrays.asList(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, CQ)
.build(new RowMutations(row).add((Mutation) new Put(row)
.addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]))),
new Delete(row)));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}

@Test
public void testWithCheckAndMutate() throws Exception {
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
Expand Down

0 comments on commit cfbae4d

Please sign in to comment.