Skip to content

Commit

Permalink
refactor: Move HBaseOperation into WriteOperationInfo (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwalkiewicz authored Oct 12, 2021
1 parent 9b9e4ea commit e89ea05
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,7 @@ public Void call() throws IOException {
},
HBaseOperation.PUT);
scheduleWriteWithControlFlow(
new WriteOperationInfo(put),
this.secondaryAsyncWrapper.put(put),
this.flowController,
HBaseOperation.PUT);
new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put), this.flowController);
}
}

Expand Down Expand Up @@ -499,8 +496,7 @@ public Void call() throws IOException {
scheduleWriteWithControlFlow(
new WriteOperationInfo(delete),
this.secondaryAsyncWrapper.delete(delete),
this.flowController,
HBaseOperation.DELETE);
this.flowController);
}
}

Expand Down Expand Up @@ -569,8 +565,7 @@ public Void call() throws IOException {
scheduleWriteWithControlFlow(
new WriteOperationInfo(rowMutations),
this.secondaryAsyncWrapper.mutateRow(rowMutations),
this.flowController,
HBaseOperation.MUTATE_ROW);
this.flowController);
}
}

Expand All @@ -589,11 +584,10 @@ public Result call() throws IOException {
},
HBaseOperation.APPEND);

Put put = makePutFromResult(result);

scheduleWriteWithControlFlow(
new WriteOperationInfo(append),
this.secondaryAsyncWrapper.put(makePutFromResult(result)),
this.flowController,
HBaseOperation.PUT);
new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put), this.flowController);
return result;
}
}
Expand All @@ -613,11 +607,10 @@ public Result call() throws IOException {
},
HBaseOperation.INCREMENT);

Put put = makePutFromResult(result);

scheduleWriteWithControlFlow(
new WriteOperationInfo(increment),
this.secondaryAsyncWrapper.put(makePutFromResult(result)),
this.flowController,
HBaseOperation.PUT);
new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put), this.flowController);
return result;
}
}
Expand Down Expand Up @@ -719,8 +712,7 @@ public Boolean call() throws IOException {
scheduleWriteWithControlFlow(
new WriteOperationInfo(rowMutations),
this.secondaryAsyncWrapper.mutateRow(rowMutations),
this.flowController,
HBaseOperation.MUTATE_ROW);
this.flowController);
}
return wereMutationsApplied;
}
Expand Down Expand Up @@ -807,13 +799,13 @@ private <T> void scheduleVerificationAndRequestWithFlowControl(
private <T> void scheduleWriteWithControlFlow(
final WriteOperationInfo writeOperationInfo,
final Supplier<ListenableFuture<T>> secondaryResultFutureSupplier,
final FlowController flowController,
final HBaseOperation operation) {
final FlowController flowController) {
WriteOperationFutureCallback<T> writeErrorCallback =
new WriteOperationFutureCallback<T>() {
@Override
public void onFailure(Throwable throwable) {
secondaryWriteErrorConsumer.consume(operation, writeOperationInfo.operations);
secondaryWriteErrorConsumer.consume(
writeOperationInfo.hBaseOperation, writeOperationInfo.operations);
}
};

Expand All @@ -827,7 +819,8 @@ public void onFailure(Throwable throwable) {
new Runnable() {
@Override
public void run() {
secondaryWriteErrorConsumer.consume(operation, writeOperationInfo.operations);
secondaryWriteErrorConsumer.consume(
writeOperationInfo.hBaseOperation, writeOperationInfo.operations);
}
}));
}
Expand Down Expand Up @@ -917,37 +910,35 @@ private List<? extends Row> rewriteIncrementsAndAppendsAsPuts(
public static class WriteOperationInfo {
public final RequestResourcesDescription requestResourcesDescription;
public final List<? extends Row> operations;
public final HBaseOperation hBaseOperation;

public WriteOperationInfo(Put operation) {
this(new RequestResourcesDescription(operation), operation);
this(new RequestResourcesDescription(operation), operation, HBaseOperation.PUT);
}

public WriteOperationInfo(Delete operation) {
this(new RequestResourcesDescription(operation), operation);
this(new RequestResourcesDescription(operation), operation, HBaseOperation.DELETE);
}

public WriteOperationInfo(Append operation) {
this(new RequestResourcesDescription(operation), operation);
this(new RequestResourcesDescription(operation), operation, HBaseOperation.APPEND);
}

public WriteOperationInfo(Increment operation) {
this(new RequestResourcesDescription(operation), operation);
this(new RequestResourcesDescription(operation), operation, HBaseOperation.INCREMENT);
}

public WriteOperationInfo(RowMutations operation) {
this(new RequestResourcesDescription(operation), operation);
this(new RequestResourcesDescription(operation), operation, HBaseOperation.MUTATE_ROW);
}

private WriteOperationInfo(
RequestResourcesDescription requestResourcesDescription, Row operation) {
RequestResourcesDescription requestResourcesDescription,
Row operation,
HBaseOperation hBaseOperation) {
this.requestResourcesDescription = requestResourcesDescription;
this.operations = Collections.singletonList(operation);
}

public WriteOperationInfo(List<? extends Row> successfulOperations, Result[] readResults) {
this.operations = successfulOperations;
this.requestResourcesDescription =
new RequestResourcesDescription(this.operations, readResults);
this.hBaseOperation = hBaseOperation;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ public CompletableFuture<Void> put(Put put) {
return writeWithFlowControl(
new MirroringTable.WriteOperationInfo(put),
primaryFuture,
() -> this.secondaryTable.put(put),
() -> this.secondaryWriteErrorConsumer.consume(HBaseOperation.PUT, put));
() -> this.secondaryTable.put(put));
}

@Override
Expand All @@ -119,8 +118,7 @@ public CompletableFuture<Void> delete(Delete delete) {
return writeWithFlowControl(
new MirroringTable.WriteOperationInfo(delete),
primaryFuture,
() -> this.secondaryTable.delete(delete),
() -> this.secondaryWriteErrorConsumer.consume(HBaseOperation.DELETE, delete));
() -> this.secondaryTable.delete(delete));
}

@Override
Expand All @@ -129,8 +127,7 @@ public CompletableFuture<Result> append(Append append) {
return writeWithFlowControl(
new MirroringTable.WriteOperationInfo(append),
primaryFuture,
() -> this.secondaryTable.append(append),
() -> this.secondaryWriteErrorConsumer.consume(HBaseOperation.APPEND, append));
() -> this.secondaryTable.append(append));
}

@Override
Expand All @@ -139,8 +136,7 @@ public CompletableFuture<Result> increment(Increment increment) {
return writeWithFlowControl(
new MirroringTable.WriteOperationInfo(increment),
primaryFuture,
() -> this.secondaryTable.increment(increment),
() -> this.secondaryWriteErrorConsumer.consume(HBaseOperation.INCREMENT, increment));
() -> this.secondaryTable.increment(increment));
}

@Override
Expand All @@ -149,8 +145,7 @@ public CompletableFuture<Void> mutateRow(RowMutations rowMutations) {
return writeWithFlowControl(
new MirroringTable.WriteOperationInfo(rowMutations),
primaryFuture,
() -> this.secondaryTable.mutateRow(rowMutations),
() -> this.secondaryWriteErrorConsumer.consume(HBaseOperation.MUTATE_ROW, rowMutations));
() -> this.secondaryTable.mutateRow(rowMutations));
}

@Override
Expand Down Expand Up @@ -311,8 +306,12 @@ private <T> CompletableFuture<T> readWithVerificationAndFlowControl(
private <T> CompletableFuture<T> writeWithFlowControl(
final MirroringTable.WriteOperationInfo writeOperationInfo,
final CompletableFuture<T> primaryFuture,
final Supplier<CompletableFuture<T>> secondaryFutureSupplier,
final Runnable secondaryWriteErrorHandler) {
final Supplier<CompletableFuture<T>> secondaryFutureSupplier) {
final Runnable secondaryWriteErrorHandler =
() ->
this.secondaryWriteErrorConsumer.consume(
writeOperationInfo.hBaseOperation, writeOperationInfo.operations);

return reserveFlowControlResourcesThenScheduleSecondary(
primaryFuture,
FutureConverter.toCompletable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -299,7 +300,8 @@ public void testPutWithSecondaryErrorCallsErrorHandler()

verify(primaryTable, times(1)).put(put);
verify(secondaryTable, times(1)).put(put);
verify(secondaryWriteErrorConsumer, times(1)).consume(HBaseOperation.PUT, put);
verify(secondaryWriteErrorConsumer, times(1))
.consume(HBaseOperation.PUT, Collections.singletonList(put));
}

<T> List<T> waitForAll(List<CompletableFuture<T>> futures) {
Expand Down

0 comments on commit e89ea05

Please sign in to comment.