Skip to content

Commit

Permalink
feat: add support for awaiting Data Boost (#2329)
Browse files Browse the repository at this point in the history
* Introduce ConsistencyParams model

Change-Id: I7ae07cc4f13e8ffe9ea4a55fb407eae1d64f8547

* Create AwaitConsistencyCallable, a delegate for AwaitReplicationCallable

Change-Id: I16d7c69b4e1b9153f93e83f7f846d6c172aae8a6

* Address some PR comments

Change-Id: Icdf94f2f6a13f55d2c9204774d4ebbac5aa6a8b3

* Remove unused imports from AwaitReplicationCallable

Change-Id: Ia4861f1a5796061ca86844c67c68f54711fdbb94

* Plumb the Consistency callable through to some places, add some tests

Change-Id: Ibe60e2a1044933af1008c0cd1b84f757dd6867a8

* Add integration test

Change-Id: Ie3b2b2983ca585cb1d6a2cdb8b18b55e81205759

* Rework the ConsistencyRequest model, plumb through RequestContext to BigtableTableAdminClient

Change-Id: I840282587d3d6cb4150dfbdd568c347dc32a732d

* Fix imports

Change-Id: Ic7588b3d04877a56089c23036d6df73a5c9b0cd5

* Fix more imports, fix some tests

Change-Id: I2723fd67bd301a4eb3aeae80d91fa663cdd6ab01

* Rename some things

Change-Id: Ie1bc8478c418d49b0c2e014edbeb6f56b56b0dd1

* Add tests for ConsistencyRequest model

Change-Id: I3548b7aa673be5a92cd4c180e3edb8649657811c

* Add newline

Change-Id: Icdd22ce2857e5b4316c6fa3f0e139ea9de825178

* Fix broken test

Change-Id: Idbd7c0f10ebe575d104ab7ac46a3a1e347e35fe8

* Make request context a final variable in test

Change-Id: I81f2a25fe4493021bab150ab0af65d7318ba2399

* Get test working using correct expectations

Change-Id: Ie34d5171bd7a472fc695d603849e260054aedfbd

* Add a couple of tests for AwaitReplicationCallable

Change-Id: I70014db2c0a1d4e74c23b18de7ef591bc70cda2a

* Use RequestContextNoAP class

Change-Id: I897b343cd1067d43bcc644cac3db44e88bbf1e69

* Make ConsistencyRequest model an AutoValue

Change-Id: I9529fb79da69e12a834a2d0fea032d72ae6ea157

* Fix license year, fix some formatting

Change-Id: Ibcca1ca9f49988764fdbeeacc59cac5d276ab266

* Run auto formatter

Change-Id: I9f5e3f7c7fd79262092c507a523e16a533bc4382

* Rename new RequestContext to TableAdminRequestContext, re run auto formatter

Change-Id: Ib3f5918ef0f5b1ac53147baf93dcb72c476d877b

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add license header to ConsistencyRequestTest

Change-Id: I733d2f8c082647ad32b72b04b218cd5ba79d2377

* Add EnhancedBigtableTableAdminStub to clirr-ignored-differences

Change-Id: I7eefeda777305dd3d7c5664097bda87ac63daa72

* Fix IT tests, skip data boost one for now until we run it concurrently

Change-Id: I764190b0f91614753080e0a96e7e11e3dfb1fde0

* Run autoformatter

Change-Id: Iba4671e4781f1b333279a2410563869f53b284d5

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
djyau and gcf-owl-bot[bot] authored Sep 16, 2024
1 parent 3a9b5a6 commit 8556574
Show file tree
Hide file tree
Showing 11 changed files with 592 additions and 166 deletions.
6 changes: 6 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,10 @@
<className>com/google/cloud/bigtable/admin/v2/models/Type$Int64$Encoding$BigEndianBytes</className>
<method>*</method>
</difference>
<difference>
<!-- change method args is ok because EnhancedBigtableTableAdminStub is InternalApi -->
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/admin/v2/stub/EnhancedBigtableTableAdminStub</className>
<method>*</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.cloud.bigtable.admin.v2.internal.NameUtil;
import com.google.cloud.bigtable.admin.v2.models.AuthorizedView;
import com.google.cloud.bigtable.admin.v2.models.Backup;
import com.google.cloud.bigtable.admin.v2.models.ConsistencyRequest;
import com.google.cloud.bigtable.admin.v2.models.CopyBackupRequest;
import com.google.cloud.bigtable.admin.v2.models.CreateAuthorizedViewRequest;
import com.google.cloud.bigtable.admin.v2.models.CreateBackupRequest;
Expand All @@ -61,6 +62,7 @@
import com.google.cloud.bigtable.admin.v2.models.UpdateBackupRequest;
import com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest;
import com.google.cloud.bigtable.admin.v2.stub.EnhancedBigtableTableAdminStub;
import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -154,8 +156,10 @@ public static BigtableTableAdminClient create(
/** Constructs an instance of BigtableTableAdminClient with the given settings. */
public static BigtableTableAdminClient create(@Nonnull BigtableTableAdminSettings settings)
throws IOException {
TableAdminRequestContext requestContext =
TableAdminRequestContext.create(settings.getProjectId(), settings.getInstanceId());
EnhancedBigtableTableAdminStub stub =
EnhancedBigtableTableAdminStub.createEnhanced(settings.getStubSettings());
EnhancedBigtableTableAdminStub.createEnhanced(settings.getStubSettings(), requestContext);
return create(settings.getProjectId(), settings.getInstanceId(), stub);
}

Expand Down Expand Up @@ -917,6 +921,11 @@ public void awaitReplication(String tableId) {
stub.awaitReplicationCallable().futureCall(tableName));
}

public void awaitConsistency(ConsistencyRequest consistencyRequest) {
ApiExceptions.callAndTranslateApiException(
stub.awaitConsistencyCallable().futureCall(consistencyRequest));
}

/**
* Creates a backup with the specified configuration.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.admin.v2.models;

import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
import com.google.bigtable.admin.v2.DataBoostReadLocalWrites;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest;
import com.google.bigtable.admin.v2.StandardReadRemoteWrites;
import com.google.bigtable.admin.v2.TableName;
import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext;
import javax.annotation.Nonnull;

@AutoValue
public abstract class ConsistencyRequest {
@Nonnull
protected abstract String getTableId();

@Nonnull
protected abstract CheckConsistencyRequest.ModeCase getMode();

public static ConsistencyRequest forReplication(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES);
}

public static ConsistencyRequest forDataBoost(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES);
}

@InternalApi
public CheckConsistencyRequest toCheckConsistencyProto(
TableAdminRequestContext requestContext, String token) {
CheckConsistencyRequest.Builder builder = CheckConsistencyRequest.newBuilder();
TableName tableName =
TableName.of(requestContext.getProjectId(), requestContext.getInstanceId(), getTableId());

if (getMode().equals(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES)) {
builder.setStandardReadRemoteWrites(StandardReadRemoteWrites.newBuilder().build());
} else {
builder.setDataBoostReadLocalWrites(DataBoostReadLocalWrites.newBuilder().build());
}

return builder.setName(tableName.toString()).setConsistencyToken(token).build();
}

@InternalApi
public GenerateConsistencyTokenRequest toGenerateTokenProto(
TableAdminRequestContext requestContext) {
GenerateConsistencyTokenRequest.Builder builder = GenerateConsistencyTokenRequest.newBuilder();
TableName tableName =
TableName.of(requestContext.getProjectId(), requestContext.getInstanceId(), getTableId());

return builder.setName(tableName.toString()).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.admin.v2.stub;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.ExponentialPollAlgorithm;
import com.google.api.gax.retrying.NonCancellableFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
import com.google.bigtable.admin.v2.CheckConsistencyResponse;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse;
import com.google.cloud.bigtable.admin.v2.models.ConsistencyRequest;
import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;

/**
* Callable that waits until either replication or Data Boost has caught up to the point it was
* called.
*
* <p>This callable wraps GenerateConsistencyToken and CheckConsistency RPCs. It will generate a
* token then poll until isConsistent is true.
*/
class AwaitConsistencyCallable extends UnaryCallable<ConsistencyRequest, Void> {
private final UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable;
private final UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable;
private final RetryingExecutor<CheckConsistencyResponse> executor;

private final TableAdminRequestContext requestContext;

static AwaitConsistencyCallable create(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
ClientContext clientContext,
RetrySettings pollingSettings,
TableAdminRequestContext requestContext) {

RetryAlgorithm<CheckConsistencyResponse> retryAlgorithm =
new RetryAlgorithm<>(
new PollResultAlgorithm(),
new ExponentialPollAlgorithm(pollingSettings, clientContext.getClock()));

RetryingExecutor<CheckConsistencyResponse> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new AwaitConsistencyCallable(
generateCallable, checkCallable, retryingExecutor, requestContext);
}

@VisibleForTesting
AwaitConsistencyCallable(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
RetryingExecutor<CheckConsistencyResponse> executor,
TableAdminRequestContext requestContext) {
this.generateCallable = generateCallable;
this.checkCallable = checkCallable;
this.executor = executor;
this.requestContext = requestContext;
}

@Override
public ApiFuture<Void> futureCall(
final ConsistencyRequest consistencyRequest, final ApiCallContext apiCallContext) {
ApiFuture<GenerateConsistencyTokenResponse> tokenFuture =
generateToken(consistencyRequest.toGenerateTokenProto(requestContext), apiCallContext);

return ApiFutures.transformAsync(
tokenFuture,
new ApiAsyncFunction<GenerateConsistencyTokenResponse, Void>() {
@Override
public ApiFuture<Void> apply(GenerateConsistencyTokenResponse input) {
CheckConsistencyRequest request =
consistencyRequest.toCheckConsistencyProto(
requestContext, input.getConsistencyToken());
return pollToken(request, apiCallContext);
}
},
MoreExecutors.directExecutor());
}

private ApiFuture<GenerateConsistencyTokenResponse> generateToken(
GenerateConsistencyTokenRequest generateRequest, ApiCallContext context) {
return generateCallable.futureCall(generateRequest, context);
}

private ApiFuture<Void> pollToken(CheckConsistencyRequest request, ApiCallContext context) {
AttemptCallable<CheckConsistencyRequest, CheckConsistencyResponse> attemptCallable =
new AttemptCallable<>(checkCallable, request, context);
RetryingFuture<CheckConsistencyResponse> retryingFuture =
executor.createFuture(attemptCallable);
attemptCallable.setExternalFuture(retryingFuture);
attemptCallable.call();

return ApiFutures.transform(
retryingFuture,
new ApiFunction<CheckConsistencyResponse, Void>() {
@Override
public Void apply(CheckConsistencyResponse input) {
return null;
}
},
MoreExecutors.directExecutor());
}

/** A callable representing an attempt to make an RPC call. */
private static class AttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
private final UnaryCallable<RequestT, ResponseT> callable;
private final RequestT request;

private volatile RetryingFuture<ResponseT> externalFuture;
private volatile ApiCallContext callContext;

AttemptCallable(
UnaryCallable<RequestT, ResponseT> callable, RequestT request, ApiCallContext callContext) {
this.callable = callable;
this.request = request;
this.callContext = callContext;
}

void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
this.externalFuture = externalFuture;
}

@Override
public ResponseT call() {
try {
// NOTE: unlike gax's AttemptCallable, this ignores rpc timeouts
externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
if (externalFuture.isDone()) {
return null;
}
ApiFuture<ResponseT> internalFuture = callable.futureCall(request, callContext);
externalFuture.setAttemptFuture(internalFuture);
} catch (Throwable e) {
externalFuture.setAttemptFuture(ApiFutures.<ResponseT>immediateFailedFuture(e));
}

return null;
}
}

/**
* A polling algorithm for waiting for a consistent {@link CheckConsistencyResponse}. Please note
* that this class doesn't handle retryable errors and expects the underlying callable chain to
* handle this.
*/
private static class PollResultAlgorithm
implements ResultRetryAlgorithm<CheckConsistencyResponse> {
@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable,
CheckConsistencyResponse prevResponse,
TimedAttemptSettings prevSettings) {
return null;
}

@Override
public boolean shouldRetry(Throwable prevThrowable, CheckConsistencyResponse prevResponse)
throws CancellationException {
return prevResponse != null && !prevResponse.getConsistent();
}
}
}
Loading

0 comments on commit 8556574

Please sign in to comment.