Skip to content

Commit

Permalink
refactor: renames and moves in RequestScheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
mwalkiewicz committed Oct 19, 2021
1 parent 50b7d22 commit 23c8174
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private <T> void scheduleRequest(
return;
}
this.listenableReferenceCounter.holdReferenceUntilCompletion(
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
RequestScheduling.scheduleRequestAndVerificationWithFlowControl(
requestResourcesDescription,
nextSupplier,
this.mirroringTracer.spanFactory.wrapReadVerificationCallback(scannerNext),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ private <T> void scheduleSequentialReadOperationWithVerification(
final Supplier<ListenableFuture<T>> secondaryGetFutureSupplier,
final FutureCallback<T> verificationCallback) {
this.referenceCounter.holdReferenceUntilCompletion(
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
RequestScheduling.scheduleRequestAndVerificationWithFlowControl(
resultInfo,
secondaryGetFutureSupplier,
this.mirroringTracer.spanFactory.wrapReadVerificationCallback(verificationCallback),
Expand All @@ -731,7 +731,7 @@ public void onFailure(Throwable throwable) {
};

this.referenceCounter.holdReferenceUntilCompletion(
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
RequestScheduling.scheduleRequestAndVerificationWithFlowControl(
writeOperationInfo.requestResourcesDescription,
secondaryResultFutureSupplier,
this.mirroringTracer.spanFactory.wrapWriteOperationCallback(writeErrorCallback),
Expand Down Expand Up @@ -896,7 +896,7 @@ public void onFailure(Throwable throwable) {
};

this.referenceCounter.holdReferenceUntilCompletion(
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
RequestScheduling.scheduleRequestAndVerificationWithFlowControl(
requestResourcesDescription,
invokeBothOperations,
verification,
Expand Down Expand Up @@ -973,7 +973,7 @@ public Void apply(Throwable throwable) {
};

this.referenceCounter.holdReferenceUntilCompletion(
RequestScheduling.scheduleVerificationAndRequestWithFlowControl(
RequestScheduling.scheduleRequestAndVerificationWithFlowControl(
requestResourcesDescription,
this.secondaryAsyncWrapper.batch(operationsToScheduleOnSecondary, resultsSecondary),
verificationFuture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
public class RequestScheduling {
private static final Logger Log = new Logger(RequestScheduling.class);

public static <T> ListenableFuture<Void> scheduleVerificationAndRequestWithFlowControl(
public static <T> ListenableFuture<Void> scheduleRequestAndVerificationWithFlowControl(
final RequestResourcesDescription requestResourcesDescription,
final Supplier<ListenableFuture<T>> secondaryResultFutureSupplier,
final FutureCallback<T> verificationCallback,
final FlowController flowController,
final MirroringTracer mirroringTracer) {
return scheduleVerificationAndRequestWithFlowControl(
return scheduleRequestAndVerificationWithFlowControl(
requestResourcesDescription,
secondaryResultFutureSupplier,
verificationCallback,
Expand All @@ -58,7 +58,7 @@ public Void apply(Throwable t) {
});
}

public static <T> ListenableFuture<Void> scheduleVerificationAndRequestWithFlowControl(
public static <T> ListenableFuture<Void> scheduleRequestAndVerificationWithFlowControl(
final RequestResourcesDescription requestResourcesDescription,
final Supplier<ListenableFuture<T>> invokeOperation,
final FutureCallback<T> verificationCallback,
Expand All @@ -69,38 +69,11 @@ public static <T> ListenableFuture<Void> scheduleVerificationAndRequestWithFlowC

final ListenableFuture<ResourceReservation> reservationRequest =
flowController.asyncRequestResource(requestResourcesDescription);
final ResourceReservation reservation;
try {
final ResourceReservation reservation;
try (Scope scope = mirroringTracer.spanFactory.flowControlScope()) {
reservation = reservationRequest.get();
}
Futures.addCallback(
invokeOperation.get(),
mirroringTracer.spanFactory.wrapWithCurrentSpan(
new FutureCallback<T>() {
@Override
public void onSuccess(@NullableDecl T t) {
try {
Log.trace("starting verification %s", t);
verificationCallback.onSuccess(t);
Log.trace("verification done %s", t);
} finally {
reservation.release();
verificationCompletedFuture.set(null);
}
}

@Override
public void onFailure(Throwable throwable) {
try {
verificationCallback.onFailure(throwable);
} finally {
reservation.release();
verificationCompletedFuture.set(null);
}
}
}),
MoreExecutors.directExecutor());
} catch (InterruptedException | ExecutionException e) {
flowControlReservationErrorConsumer.apply(e);
FlowController.cancelRequest(reservationRequest);
Expand All @@ -113,6 +86,34 @@ public void onFailure(Throwable throwable) {
return verificationCompletedFuture;
}

Futures.addCallback(
invokeOperation.get(),
mirroringTracer.spanFactory.wrapWithCurrentSpan(
new FutureCallback<T>() {
@Override
public void onSuccess(@NullableDecl T t) {
try {
Log.trace("starting verification %s", t);
verificationCallback.onSuccess(t);
Log.trace("verification done %s", t);
} finally {
reservation.release();
verificationCompletedFuture.set(null);
}
}

@Override
public void onFailure(Throwable throwable) {
try {
verificationCallback.onFailure(throwable);
} finally {
reservation.release();
verificationCompletedFuture.set(null);
}
}
}),
MoreExecutors.directExecutor());

return verificationCompletedFuture;
}
}

0 comments on commit 23c8174

Please sign in to comment.