Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BIgtable: 11. Implement ReadRows retries #2986

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
<classifier>testlib</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalExtensionOnly;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import javax.annotation.Nonnull;
Expand All @@ -40,7 +41,8 @@
* ByteStringRange r2 = r1.clone().endUnbounded();
* }</pre>
*/
abstract class Range<T, R extends Range<T, R>> {
@InternalExtensionOnly
public abstract class Range<T, R extends Range<T, R>> {
public enum BoundType {
OPEN,
CLOSED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.ReadRowsRequest;
Expand All @@ -35,10 +37,13 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import java.io.IOException;
import java.util.List;
import org.threeten.bp.Duration;

/**
* The core client that converts method calls to RPCs.
Expand Down Expand Up @@ -75,6 +80,15 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
.setEndpoint(settings.getEndpoint())
.setCredentialsProvider(settings.getCredentialsProvider());

// ReadRow retries are handled in the overlay: disable retries in the base layer (but make

This comment was marked as spam.

This comment was marked as spam.

// sure to preserve the exception callable settings).
baseSettingsBuilder
.readRowsSettings()
.setSimpleTimeoutNoRetries(Duration.ofHours(2))
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
.setTimeoutCheckInterval(Duration.ZERO)
.setIdleTimeout(Duration.ZERO);

// SampleRowKeys retries are handled in the overlay: disable retries in the base layer (but make
// sure to preserve the exception callable settings.
baseSettingsBuilder
Expand Down Expand Up @@ -135,8 +149,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
* dispatch the RPC.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in {@link
* com.google.cloud.bigtable.data.v2.BigtableDataSettings}.
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
Expand All @@ -147,7 +160,27 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter);

FilterMarkerRowsCallable<RowT> filtering = new FilterMarkerRowsCallable<>(merging, rowAdapter);
// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the outer
// Query -> RowT callable or the inner ReadRowsRequest -> ReadRowsResponse callable).
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
.setRetrySettings(settings.readRowsSettings().getRetrySettings())
.setTimeoutCheckInterval(settings.readRowsSettings().getTimeoutCheckInterval())
.setIdleTimeout(settings.readRowsSettings().getIdleTimeout())
.build();

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(merging);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);

FilterMarkerRowsCallable<RowT> filtering =
new FilterMarkerRowsCallable<>(retrying2, rowAdapter);

ServerStreamingCallable<ReadRowsRequest, RowT> withContext =
filtering.withDefaultCallContext(clientContext.getDefaultCallContext());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.StreamResumptionStrategy;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsRequest.Builder;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.RowSet;
import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;

/**
* An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks the
* last complete row seen and upon retry can build a request to resume the stream from where it left
* off.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class ReadRowsResumptionStrategy<RowT>
implements StreamResumptionStrategy<ReadRowsRequest, RowT> {
private final RowAdapter<RowT> rowAdapter;
private ByteString lastKey = ByteString.EMPTY;
// Number of rows processed excluding Marker row.
private long numProcessed;

public ReadRowsResumptionStrategy(RowAdapter<RowT> rowAdapter) {
this.rowAdapter = rowAdapter;
}

@Override
public boolean canResume() {
return true;
}

@Override
public StreamResumptionStrategy<ReadRowsRequest, RowT> createNew() {
return new ReadRowsResumptionStrategy<>(rowAdapter);
}

@Override
public void onProgress(RowT response) {
// Last key can come from both the last processed row key and a synthetic row marker. The
// synthetic row marker is emitted when the server has read a lot of data that was filtered out.
// The row marker can be used to trim the start of the scan, but does not contribute to the row
// limit.
lastKey = rowAdapter.getKey(response);
if (!rowAdapter.isScanMarkerRow(response)) {
// Only real rows count towards the rows limit.
numProcessed++;
}
}

/**
* {@inheritDoc}
*
* <p>Given a request, this implementation will narrow that request to exclude all row keys and
* ranges that would produce rows that come before {@link #lastKey}. Furthermore this
* implementation takes care to update the row limit of the request to account for all of the
* received rows.
*/
@Override
public ReadRowsRequest getResumeRequest(ReadRowsRequest request) {
// An empty lastKey means that we have not successfully read the first row,
// so resume with the original request object.
if (lastKey.isEmpty()) {
return request;
}

ReadRowsRequest originalRequest = request;

// Special case: empty query implies full table scan, so make this explicit by adding an
// unbounded range to the request
if (request.getRows().getRowKeysList().isEmpty()
&& request.getRows().getRowRangesList().isEmpty()) {

originalRequest =
request
.toBuilder()
.setRows(RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()))
.build();
}

// Start building the resume request. The keys & ranges are cleared and will be recomputed.
Builder builder = originalRequest.toBuilder();
builder.clearRows();

RowSet.Builder rowSetBuilder = RowSet.newBuilder();

for (ByteString key : originalRequest.getRows().getRowKeysList()) {
if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) {
rowSetBuilder.addRowKeys(key);
}
}

for (RowRange rowRange : originalRequest.getRows().getRowRangesList()) {
RowRange.Builder rowRangeBuilder = RowRange.newBuilder();

switch (rowRange.getEndKeyCase()) {
case END_KEY_CLOSED:
if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyClosed(), lastKey) > 0) {
rowRangeBuilder.setEndKeyClosed(rowRange.getEndKeyClosed());
} else {
continue;
}
break;
case END_KEY_OPEN:
if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyOpen(), lastKey) > 0) {
rowRangeBuilder.setEndKeyOpen(rowRange.getEndKeyOpen());
} else {
continue;
}
break;
case ENDKEY_NOT_SET:
rowRangeBuilder.clearEndKey();
break;
default:
throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase());
}

switch (rowRange.getStartKeyCase()) {
case STARTKEY_NOT_SET:
rowRangeBuilder.setStartKeyOpen(lastKey);
break;
case START_KEY_OPEN:
if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyOpen(), lastKey) < 0) {
rowRangeBuilder.setStartKeyOpen(lastKey);
} else {
rowRangeBuilder.setStartKeyOpen(rowRange.getStartKeyOpen());
}
break;
case START_KEY_CLOSED:
if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyClosed(), lastKey) <= 0) {
rowRangeBuilder.setStartKeyOpen(lastKey);
} else {
rowRangeBuilder.setStartKeyClosed(rowRange.getStartKeyClosed());
}
break;
default:
throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase());
}
rowSetBuilder.addRowRanges(rowRangeBuilder.build());
}

// Edge case: retrying a fulfilled request.
// A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it
// had a row limit, has seen enough rows. These requests are replaced with a marker request that
// will be handled by ReadRowsRetryCompletedCallable. See docs in ReadRowsRetryCompletedCallable
// for more details.
if ((rowSetBuilder.getRowRangesCount() == 0 && rowSetBuilder.getRowKeysCount() == 0)
|| (originalRequest.getRowsLimit() > 0 && originalRequest.getRowsLimit() == numProcessed)) {
return ReadRowsRetryCompletedCallable.FULFILLED_REQUEST_MARKER;
}

if (originalRequest.getRowsLimit() > 0) {
Preconditions.checkState(
originalRequest.getRowsLimit() > numProcessed,
"Detected too many rows for the current row limit during a retry.");
builder.setRowsLimit(originalRequest.getRowsLimit() - numProcessed);
}

builder.setRows(rowSetBuilder.build());
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ReadRowsRequest;

/**
* This callable addresses edge case of a ReadRows stream receiving all of the rows, but receiving a
* retryable error status instead of an OK. If a retry attempt is scheduled, then it should return
* an OK response.
*
* <p>This callable works in tandem with {@link ReadRowsResumptionStrategy}, which will send a
* {@link #FULFILLED_REQUEST_MARKER} to be processed by this callable. Upon receiving the {@link
* #FULFILLED_REQUEST_MARKER}, this callable will promptly notify the {@link ResponseObserver} that
* the stream has been successfully compeleted.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public final class ReadRowsRetryCompletedCallable<RowT>
extends ServerStreamingCallable<ReadRowsRequest, RowT> {
static final ReadRowsRequest FULFILLED_REQUEST_MARKER =
ReadRowsRequest.newBuilder().setRowsLimit(-1).build();

private final ServerStreamingCallable<ReadRowsRequest, RowT> inner;

public ReadRowsRetryCompletedCallable(ServerStreamingCallable<ReadRowsRequest, RowT> inner) {
this.inner = inner;
}

@Override
public void call(
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {

if (request == FULFILLED_REQUEST_MARKER) {
responseObserver.onStart(new DummyController());
responseObserver.onComplete();
} else {
inner.call(request, responseObserver, context);
}
}

private static class DummyController implements StreamController {
@Override
public void cancel() {}

@Override
public void disableAutoInboundFlowControl() {}

@Override
public void request(int count) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
* An implementation of a {@link Reframer} that feeds the row merging {@link StateMachine}.
*
* <p>{@link com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver} pushes {@link
* ReadRowsResponse.CellChunk}s into this class and pops fully merged logical rows. Example usage:
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s into this class and pops fully merged logical
* rows. Example usage:
*
* <pre>{@code
* RowMerger<Row> rowMerger = new RowMerger<>(myRowBuilder);
Expand Down
Loading