Skip to content

Commit

Permalink
fix: RequestScheduling should handle rejected resource reservations (#24
Browse files Browse the repository at this point in the history
)

Custom FlowControlerStrategy implementations might, contrary to the
default implementation, resolve reservation requests with exception,
what we should handle by not performing the action that had to acquire
the resources.
  • Loading branch information
mwalkiewicz authored Sep 27, 2021
1 parent 01df100 commit 5a29253
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,13 @@ public void onFailure(Throwable throwable) {
secondaryWriteErrorConsumer.consume(writeOperationInfo.operations);
}
},
flowController));
flowController,
new Runnable() {
@Override
public void run() {
secondaryWriteErrorConsumer.consume(writeOperationInfo.operations);
}
}));
}

private void scheduleSecondaryWriteBatchOperations(
Expand Down Expand Up @@ -631,7 +637,13 @@ private void scheduleSecondaryWriteBatchOperations(
this.secondaryAsyncWrapper.batch(
primarySplitResponse.allSuccessfulOperations, resultsSecondary),
verificationFuture,
this.flowController);
this.flowController,
new Runnable() {
@Override
public void run() {
secondaryWriteErrorConsumer.consume(primarySplitResponse.successfulWrites);
}
});
}

public static class WriteOperationInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ public static <T> ListenableFuture<Void> scheduleVerificationAndRequestWithFlowC
final Supplier<ListenableFuture<T>> secondaryResultFutureSupplier,
final FutureCallback<T> verificationCallback,
final FlowController flowController) {
return scheduleVerificationAndRequestWithFlowControl(
requestResourcesDescription,
secondaryResultFutureSupplier,
verificationCallback,
flowController,
new Runnable() {
@Override
public void run() {}
});
}

public static <T> ListenableFuture<Void> scheduleVerificationAndRequestWithFlowControl(
final RequestResourcesDescription requestResourcesDescription,
final Supplier<ListenableFuture<T>> secondaryResultFutureSupplier,
final FutureCallback<T> verificationCallback,
final FlowController flowController,
final Runnable flowControlReservationErrorConsumer) {
final SettableFuture<Void> verificationCompletedFuture = SettableFuture.create();

final ListenableFuture<ResourceReservation> reservationRequest =
Expand Down Expand Up @@ -72,14 +89,15 @@ public void onFailure(Throwable throwable) {
}
},
MoreExecutors.directExecutor());
} catch (InterruptedException e) {
} catch (InterruptedException | ExecutionException e) {
flowControlReservationErrorConsumer.run();
FlowController.cancelRequest(reservationRequest);

verificationCompletedFuture.set(null);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// We couldn't obtain reservation, this shouldn't happen.
assert false;
verificationCompletedFuture.set(null);

if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
return verificationCompletedFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@

/**
* FlowController limits the number of concurrently performed requests to the secondary database.
* Call to {@link #asyncRequestResource(RequestResourcesDescription)} returns object with a future
* that will be notified when {@link FlowControlStrategy} decides that it can be allowed to perform
* the requests.
* Call to {@link #asyncRequestResource(RequestResourcesDescription)} returns a future that will be
* completed when {@link FlowControlStrategy} decides that it can be allowed to perform the
* requests. The future might also be completed exceptionally if the resource was not allowed to
* obtain the resources.
*
* <p>Order of allowing requests in determined by {@link FlowControlStrategy}.
*
Expand All @@ -45,15 +46,17 @@ public ListenableFuture<ResourceReservation> asyncRequestResource(
}

public static void cancelRequest(Future<ResourceReservation> resourceReservationFuture) {
// The cancellation may fail - then the resources have already been allocated by FlowController.
// Then we must release them - the user wouldn't be able to do it on their own.
// The cancellation may fail if the resources were already allocated by the FlowController, then
// we should free them, or when the reservation was rejected, which we should ignore.
if (!resourceReservationFuture.cancel(true)) {
try {
resourceReservationFuture.get().release();
} catch (InterruptedException | ExecutionException ex) {
} catch (InterruptedException ex) {
// If we couldn't cancel the request, it must have already been set, we assume
// that we will get the reservation without problems
assert false;
} catch (ExecutionException ex) {
// The request was rejected.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController.ResourceReservation;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -1121,4 +1122,86 @@ public void testBatchCallbackWithoutResultParameter() throws IOException, Interr
verify(primaryTable, times(1)).batchCallback(eq(mutations), any(Object[].class), eq(callback));
verify(secondaryTable, never()).batch(ArgumentMatchers.<Row>anyList(), any(Object[].class));
}

@Test
public void testFlowControllerExceptionInGetPreventsSecondaryOperation() throws IOException {
SettableFuture<ResourceReservation> resourceReservationFuture = SettableFuture.create();
resourceReservationFuture.setException(new Exception("test"));

doReturn(resourceReservationFuture)
.when(flowController)
.asyncRequestResource(any(RequestResourcesDescription.class));

Get request = createGet("test");
Result expectedResult = createResult("test", "value");

when(primaryTable.get(request)).thenReturn(expectedResult);

Result result = mirroringTable.get(request);
executorServiceRule.waitForExecutor();

assertThat(result).isEqualTo(expectedResult);

verify(primaryTable, times(1)).get(request);
verify(secondaryTable, never()).get(any(Get.class));
}

@Test
public void testFlowControllerExceptionInPutExecutesWriteErrorHandler() throws IOException {
SettableFuture<ResourceReservation> resourceReservationFuture = SettableFuture.create();
resourceReservationFuture.setException(new Exception("test"));

doReturn(resourceReservationFuture)
.when(flowController)
.asyncRequestResource(any(RequestResourcesDescription.class));

Put request = createPut("test", "f1", "q1", "v1");

mirroringTable.put(request);
executorServiceRule.waitForExecutor();

verify(primaryTable, times(1)).put(request);
verify(secondaryTable, never()).get(any(Get.class));
verify(secondaryWriteErrorConsumer, times(1)).consume(ImmutableList.of(request));
}

@Test
public void testFlowControllerExceptionInBatchExecutesWriteErrorHandler()
throws IOException, InterruptedException {
SettableFuture<ResourceReservation> resourceReservationFuture = SettableFuture.create();
resourceReservationFuture.setException(new Exception("test"));

doReturn(resourceReservationFuture)
.when(flowController)
.asyncRequestResource(any(RequestResourcesDescription.class));

Put put1 = createPut("test0", "f1", "q1", "v1");
Put put2 = createPut("test1", "f1", "q2", "v1");
List<? extends Row> request = ImmutableList.of(put1, put2, createGet("test2"));

doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
Object[] args = invocationOnMock.getArguments();
Object[] result = (Object[]) args[1];

// secondary
result[0] = Result.create(new Cell[0]);
result[1] = Result.create(new Cell[0]);
result[2] = Result.create(new Cell[0]);
return null;
}
})
.when(primaryTable)
.batch(eq(request), any(Object[].class));

Object[] results = new Object[3];
mirroringTable.batch(request, results);
executorServiceRule.waitForExecutor();

verify(primaryTable, times(1)).batch(request, results);
verify(secondaryTable, never()).batch(ArgumentMatchers.<Row>anyList(), any(Object[].class));
verify(secondaryWriteErrorConsumer, times(1)).consume(ImmutableList.of(put1, put2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,14 @@ public void testCancellingPendingReservationFuture() {
FlowController.cancelRequest(grantedFuture);
verify(reservation, never()).release();
}

@Test
public void testCancellingRejectedReservationFuture() {
ResourceReservation reservation = mock(ResourceReservation.class);
SettableFuture<ResourceReservation> notGrantedFuture = SettableFuture.create();
notGrantedFuture.setException(new Exception("test"));

FlowController.cancelRequest(notGrantedFuture);
verify(reservation, never()).release();
}
}

0 comments on commit 5a29253

Please sign in to comment.