Skip to content

Commit

Permalink
MirroringTable: count references to scanners and secondary wrapper (#21)
Browse files Browse the repository at this point in the history
Current MirroringTable implementaion does not count its references held
by MirroringResultScanners and SecondaryAsyncWrapper, thus
MirroringConnection consideres it closed before we are sure that all
asynchronous operations have completed.

This PR adds correct reference counting of MirroringTable based on work
done in previously merged PRs.
  • Loading branch information
mwalkiewicz authored and dopiera committed Sep 6, 2021
1 parent a77e8dc commit 5538411
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.mirroring.hbase1_x.asyncwrappers.AsyncTableWrapper;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.ListenableCloseable;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.ListenableReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.RequestScheduling;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription;
Expand Down Expand Up @@ -73,9 +74,9 @@ public class MirroringTable implements Table, ListenableCloseable {
Table secondaryTable;
AsyncTableWrapper secondaryAsyncWrapper;
VerificationContinuationFactory verificationContinuationFactory;
private List<Runnable> onCloseListeners = new ArrayList<>();
private ListenableFuture<Void> closeFuture;
private FlowController flowController;
private ListenableReferenceCounter referenceCounter;
private boolean closed = false;

/**
* @param executorService ExecutorService is used to perform operations on secondaryTable and
Expand All @@ -96,6 +97,8 @@ public MirroringTable(
new AsyncTableWrapper(
this.secondaryTable, MoreExecutors.listeningDecorator(executorService));
this.flowController = flowController;
this.referenceCounter = new ListenableReferenceCounter();
this.referenceCounter.holdReferenceUntilClosing(this.secondaryAsyncWrapper);
}

@Override
Expand Down Expand Up @@ -193,12 +196,15 @@ public Result[] get(List<Get> list) throws IOException {

@Override
public ResultScanner getScanner(Scan scan) throws IOException {
return new MirroringResultScanner(
scan,
this.primaryTable.getScanner(scan),
this.secondaryAsyncWrapper,
this.verificationContinuationFactory,
this.flowController);
MirroringResultScanner scanner =
new MirroringResultScanner(
scan,
this.primaryTable.getScanner(scan),
this.secondaryAsyncWrapper,
this.verificationContinuationFactory,
this.flowController);
this.referenceCounter.holdReferenceUntilClosing(scanner);
return scanner;
}

@Override
Expand All @@ -221,10 +227,13 @@ public void close() throws IOException {
}

public synchronized ListenableFuture<Void> asyncClose() throws IOException {
if (this.closeFuture != null) {
return this.closeFuture;
if (closed) {
return this.referenceCounter.getOnLastReferenceClosed();
}

this.closed = true;
this.referenceCounter.decrementReferenceCount();

IOException primaryException = null;
try {
this.primaryTable.close();
Expand All @@ -233,20 +242,8 @@ public synchronized ListenableFuture<Void> asyncClose() throws IOException {
}

try {
this.closeFuture = this.secondaryAsyncWrapper.asyncClose();
this.closeFuture.addListener(
new Runnable() {
@Override
public void run() {
MirroringTable.this.runOnCloseListeners();
}
},
MoreExecutors.directExecutor());
this.secondaryAsyncWrapper.asyncClose();
} catch (RuntimeException e) {
// If scheduling close failed, run listeners now, and behave as if we have closed for
// correct reference counting.
this.runOnCloseListeners();

if (primaryException != null) {
primaryException.addSuppressed(e);
throw primaryException;
Expand All @@ -258,7 +255,7 @@ public void run() {
if (primaryException != null) {
throw primaryException;
}
return closeFuture;
return this.referenceCounter.getOnLastReferenceClosed();
}

@Override
Expand Down Expand Up @@ -498,40 +495,38 @@ public void setWriteRpcTimeout(int i) {

@Override
public void addOnCloseListener(Runnable listener) {
this.onCloseListeners.add(listener);
}

public void runOnCloseListeners() {
for (Runnable listener : this.onCloseListeners) {
listener.run();
}
this.referenceCounter
.getOnLastReferenceClosed()
.addListener(listener, MoreExecutors.directExecutor());
}

private <T> void scheduleVerificationAndRequestWithFlowControl(
final RequestResourcesDescription resultInfo,
final ListenableFuture<T> secondaryGetFuture,
final FutureCallback<T> verificationCallback) {
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
resultInfo, secondaryGetFuture, verificationCallback, this.flowController);
this.referenceCounter.holdReferenceUntilCompletion(
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
resultInfo, secondaryGetFuture, verificationCallback, this.flowController));
}

public <T> void scheduleWriteWithControlFlow(
final WriteOperationInfo writeOperationInfo,
final ListenableFuture<T> secondaryResultFuture,
final FlowController flowController) {
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
writeOperationInfo.requestResourcesDescription,
secondaryResultFuture,
new FutureCallback<T>() {
@Override
public void onSuccess(@NullableDecl T t) {}

@Override
public void onFailure(Throwable throwable) {
handleFailedOperations(writeOperationInfo.operations);
}
},
flowController);
this.referenceCounter.holdReferenceUntilCompletion(
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
writeOperationInfo.requestResourcesDescription,
secondaryResultFuture,
new FutureCallback<T>() {
@Override
public void onSuccess(@NullableDecl T t) {}

@Override
public void onFailure(Throwable throwable) {
handleFailedOperations(writeOperationInfo.operations);
}
},
flowController));
}

public static class WriteOperationInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ private void waitForExecutor() {
}
}

private void waitForMirroringScanner(ResultScanner mirroringScanner)
throws InterruptedException, ExecutionException, TimeoutException {
((MirroringResultScanner) mirroringScanner).asyncClose().get(3, TimeUnit.SECONDS);
}

@Test
public void testMismatchDetectorIsCalledOnGetSingle() throws IOException {
mockFlowController();
Expand Down Expand Up @@ -297,7 +302,8 @@ public void testSecondaryReadExceptionCallsVerificationErrorHandlerOnExistsAll()
}

@Test
public void testMismatchDetectorIsCalledOnScannerNextOne() throws IOException {
public void testMismatchDetectorIsCalledOnScannerNextOne()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
mockFlowController();
Result expected1 = createResult("test1", "value1");
Result expected2 = createResult("test2", "value2");
Expand All @@ -321,6 +327,7 @@ public void testMismatchDetectorIsCalledOnScannerNextOne() throws IOException {
assertThat(result2).isEqualTo(expected2);
assertThat(result3).isNull();

waitForMirroringScanner(mirroringScanner);
waitForExecutor();

verify(mismatchDetector, times(1)).scannerNext(scan, 0, expected1, expected1);
Expand All @@ -332,7 +339,7 @@ public void testMismatchDetectorIsCalledOnScannerNextOne() throws IOException {

@Test
public void testSecondaryReadExceptionCallsVerificationErrorHandlerOnScannerNextOne()
throws IOException {
throws IOException, InterruptedException, ExecutionException, TimeoutException {
mockFlowController();
Result expected1 = createResult("test1", "value1");
Result expected2 = createResult("test2", "value2");
Expand Down Expand Up @@ -360,6 +367,7 @@ public void testSecondaryReadExceptionCallsVerificationErrorHandlerOnScannerNext
assertThat(result2).isEqualTo(expected2);
assertThat(result3).isNull();

waitForMirroringScanner(mirroringScanner);
waitForExecutor();

verify(mismatchDetector, times(1)).scannerNext(scan, 0, expected1, expected1);
Expand All @@ -370,7 +378,8 @@ public void testSecondaryReadExceptionCallsVerificationErrorHandlerOnScannerNext
}

@Test
public void testMismatchDetectorIsCalledOnScannerNextMultiple() throws IOException {
public void testMismatchDetectorIsCalledOnScannerNextMultiple()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
mockFlowController();
Result[] expected =
new Result[] {createResult("test1", "value1"), createResult("test2", "value2")};
Expand All @@ -390,14 +399,15 @@ public void testMismatchDetectorIsCalledOnScannerNextMultiple() throws IOExcepti

assertThat(result).isEqualTo(expected);

waitForMirroringScanner(mirroringScanner);
waitForExecutor();

verify(mismatchDetector, times(1)).scannerNext(scan, 0, expected, expected);
}

@Test
public void testSecondaryReadExceptionCallsVerificationErrorHandlerOnScannerNextMultiple()
throws IOException {
throws IOException, InterruptedException, ExecutionException, TimeoutException {
mockFlowController();
Result[] expected =
new Result[] {createResult("test1", "value1"), createResult("test2", "value2")};
Expand All @@ -418,13 +428,15 @@ public void testSecondaryReadExceptionCallsVerificationErrorHandlerOnScannerNext

assertThat(result).isEqualTo(expected);

waitForMirroringScanner(mirroringScanner);
waitForExecutor();

verify(mismatchDetector, times(1)).scannerNext(scan, 0, 2, expectedException);
}

@Test
public void testScannerClose() throws IOException {
public void testScannerClose()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
ResultScanner primaryScannerMock = mock(ResultScanner.class);
when(primaryTable.getScanner((Scan) any())).thenReturn(primaryScannerMock);

Expand All @@ -435,13 +447,15 @@ public void testScannerClose() throws IOException {
ResultScanner mirroringScanner = mirroringTable.getScanner(scan);
mirroringScanner.close();

waitForMirroringScanner(mirroringScanner);
waitForExecutor();
verify(primaryScannerMock, times(1)).close();
verify(secondaryScannerMock, times(1)).close();
}

@Test
public void testScannerRenewLease() throws IOException {
public void testScannerRenewLease()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
ResultScanner primaryScannerMock = mock(ResultScanner.class);
when(primaryScannerMock.renewLease()).thenReturn(true);
when(primaryTable.getScanner((Scan) any())).thenReturn(primaryScannerMock);
Expand All @@ -456,34 +470,21 @@ public void testScannerRenewLease() throws IOException {
// primary scanner lease was renewed, so we waited for the second one, and it returned false.
assertThat(mirroringScanner.renewLease()).isFalse();

waitForMirroringScanner(mirroringScanner);
waitForExecutor();
verify(secondaryScannerMock, times(1)).renewLease();
}

@Test
public void testClosingTableWithFutureDecreasesListenableCounter() throws IOException {
public void testClosingTableWithFutureDecreasesListenableCounter()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
ListenableReferenceCounter listenableReferenceCounter = spy(new ListenableReferenceCounter());
listenableReferenceCounter.holdReferenceUntilClosing(mirroringTable);

verify(listenableReferenceCounter, times(1)).incrementReferenceCount();
verify(listenableReferenceCounter, never()).decrementReferenceCount();
IOException expectedException = new IOException("expected");
doThrow(expectedException).when(secondaryTable).close();

final ListenableFuture<Void> closingFuture = mirroringTable.asyncClose();
ExecutionException e =
assertThrows(
ExecutionException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
closingFuture.get();
}
});

assertThat(e).hasCauseThat().isInstanceOf(IOException.class);
assertThat(e).hasCauseThat().hasMessageThat().contains("expected");

closingFuture.get(3, TimeUnit.SECONDS);
verify(listenableReferenceCounter, times(1)).decrementReferenceCount();
}

Expand Down

0 comments on commit 5538411

Please sign in to comment.