Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-8458 Support for batch version of checkAndMutate() #1648

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,22 @@ private void failAll(Stream<Action> actions, int tries) {
}

private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
List<CellScannable> cells, Map<Integer, Integer> indexMap) throws IOException {
ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the
// action list.
RequestConverter.buildNoDataRegionActions(entry.getKey(),
entry.getValue().actions.stream()
.sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex()))
.collect(Collectors.toList()),
cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
rowMutationsIndexMap);
cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder,
nonceGroup, indexMap);
}
return multiRequestBuilder.build();
}
Expand Down Expand Up @@ -367,10 +367,10 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
List<CellScannable> cells = new ArrayList<>();
// Map from a created RegionAction to the original index for a RowMutations within
// the original list of actions. This will be used to process the results when there
// is RowMutations in the action list.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
// is RowMutations/CheckAndMutate in the action list.
Map<Integer, Integer> indexMap = new HashMap<>();
try {
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
req = buildReq(serverReq.actionsByRegion, cells, indexMap);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
Expand All @@ -387,7 +387,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
} else {
try {
onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req,
rowMutationsIndexMap, resp, controller.cellScanner()));
indexMap, resp, controller.cellScanner()));
} catch (Exception e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,20 @@ default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family,
* });
* </code>
* </pre>
*
* @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);

/**
* A helper class for sending checkAndMutate request.
*
* @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
interface CheckAndMutateBuilder {

/**
Expand Down Expand Up @@ -309,12 +317,20 @@ default CheckAndMutateBuilder ifEquals(byte[] value) {
* });
* </code>
* </pre>
*
* @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);

/**
* A helper class for sending checkAndMutate request with a filter.
*
* @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
interface CheckAndMutateWithFilterBuilder {

/**
Expand Down Expand Up @@ -344,6 +360,37 @@ interface CheckAndMutateWithFilterBuilder {
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
}

/**
* checkAndMutate that atomically checks if a row matches the specified condition. If it does,
* it performs the specified action.
*
* @param checkAndMutate The CheckAndMutate object.
* @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate.
*/
CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate);

/**
* Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
* that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
* atomically (and thus, each may fail independently of others).
*
* @param checkAndMutates The list of CheckAndMutate.
* @return A list of {@link CompletableFuture}s that represent the result for each
* CheckAndMutate.
*/
List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> checkAndMutates);

/**
* A simple version of batch checkAndMutate. It will fail if there are any failures.
*
* @param checkAndMutates The list of rows to apply.
* @return A {@link CompletableFuture} that wrapper the result boolean list.
*/
default CompletableFuture<List<Boolean>> checkAndMutateAll(
List<CheckAndMutate> checkAndMutates) {
return allOf(checkAndMutate(checkAndMutates));
}

/**
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
};
}

@Override
public CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate) {
return wrap(rawTable.checkAndMutate(checkAndMutate));
}

@Override
public List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
return rawTable.checkAndMutate(checkAndMutates).stream()
.map(this::wrap).collect(toList());
}

@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return wrap(rawTable.mutateRow(mutation));
Expand Down
Loading