diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index a1b58577b547..d4cd857245c6 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -91,6 +91,11 @@ testlib test + + io.grpc + grpc-testing + test + diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java index fb0a7cc09f45..226eaa73318a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.models; +import com.google.api.core.InternalExtensionOnly; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import javax.annotation.Nonnull; @@ -40,7 +41,8 @@ * ByteStringRange r2 = r1.clone().endUnbounded(); * } */ -abstract class Range> { +@InternalExtensionOnly +public abstract class Range> { public enum BoundType { OPEN, CLOSED, diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index a10f12e4a2eb..fbcc682155c2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -17,9 +17,11 @@ import com.google.api.core.ApiFuture; import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.Callables; import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; import com.google.bigtable.v2.ReadRowsRequest; @@ -35,10 +37,13 @@ import com.google.cloud.bigtable.data.v2.models.RowAdapter; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import java.io.IOException; import java.util.List; +import org.threeten.bp.Duration; /** * The core client that converts method calls to RPCs. @@ -75,6 +80,15 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) .setEndpoint(settings.getEndpoint()) .setCredentialsProvider(settings.getCredentialsProvider()); + // ReadRow retries are handled in the overlay: disable retries in the base layer (but make + // sure to preserve the exception callable settings). + baseSettingsBuilder + .readRowsSettings() + .setSimpleTimeoutNoRetries(Duration.ofHours(2)) + .setRetryableCodes(settings.readRowsSettings().getRetryableCodes()) + .setTimeoutCheckInterval(Duration.ZERO) + .setIdleTimeout(Duration.ZERO); + // SampleRowKeys retries are handled in the overlay: disable retries in the base layer (but make // sure to preserve the exception callable settings. baseSettingsBuilder @@ -135,8 +149,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) * dispatch the RPC. *
  • Upon receiving the response stream, it will merge the {@link * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row - * implementation can be configured in {@link - * com.google.cloud.bigtable.data.v2.BigtableDataSettings}. + * implementation can be configured in by the {@code rowAdapter} parameter. *
  • Retry/resume on failure. *
  • Filter out marker rows. * @@ -147,7 +160,27 @@ public ServerStreamingCallable createReadRowsCallable( ServerStreamingCallable merging = new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter); - FilterMarkerRowsCallable filtering = new FilterMarkerRowsCallable<>(merging, rowAdapter); + // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the outer + // Query -> RowT callable or the inner ReadRowsRequest -> ReadRowsResponse callable). + ServerStreamingCallSettings innerSettings = + ServerStreamingCallSettings.newBuilder() + .setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter)) + .setRetryableCodes(settings.readRowsSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowsSettings().getRetrySettings()) + .setTimeoutCheckInterval(settings.readRowsSettings().getTimeoutCheckInterval()) + .setIdleTimeout(settings.readRowsSettings().getIdleTimeout()) + .build(); + + // Retry logic is split into 2 parts to workaround a rare edge case described in + // ReadRowsRetryCompletedCallable + ServerStreamingCallable retrying1 = + new ReadRowsRetryCompletedCallable<>(merging); + + ServerStreamingCallable retrying2 = + Callables.retrying(retrying1, innerSettings, clientContext); + + FilterMarkerRowsCallable filtering = + new FilterMarkerRowsCallable<>(retrying2, rowAdapter); ServerStreamingCallable withContext = filtering.withDefaultCallContext(clientContext.getDefaultCallContext()); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java new file mode 100644 index 000000000000..dfc1ea1ef5d1 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -0,0 +1,183 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsRequest.Builder; +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.RowSet; +import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator; +import com.google.cloud.bigtable.data.v2.models.RowAdapter; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; + +/** + * An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks the + * last complete row seen and upon retry can build a request to resume the stream from where it left + * off. + * + *

    This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public class ReadRowsResumptionStrategy + implements StreamResumptionStrategy { + private final RowAdapter rowAdapter; + private ByteString lastKey = ByteString.EMPTY; + // Number of rows processed excluding Marker row. + private long numProcessed; + + public ReadRowsResumptionStrategy(RowAdapter rowAdapter) { + this.rowAdapter = rowAdapter; + } + + @Override + public boolean canResume() { + return true; + } + + @Override + public StreamResumptionStrategy createNew() { + return new ReadRowsResumptionStrategy<>(rowAdapter); + } + + @Override + public void onProgress(RowT response) { + // Last key can come from both the last processed row key and a synthetic row marker. The + // synthetic row marker is emitted when the server has read a lot of data that was filtered out. + // The row marker can be used to trim the start of the scan, but does not contribute to the row + // limit. + lastKey = rowAdapter.getKey(response); + if (!rowAdapter.isScanMarkerRow(response)) { + // Only real rows count towards the rows limit. + numProcessed++; + } + } + + /** + * {@inheritDoc} + * + *

    Given a request, this implementation will narrow that request to exclude all row keys and + * ranges that would produce rows that come before {@link #lastKey}. Furthermore this + * implementation takes care to update the row limit of the request to account for all of the + * received rows. + */ + @Override + public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { + // An empty lastKey means that we have not successfully read the first row, + // so resume with the original request object. + if (lastKey.isEmpty()) { + return request; + } + + ReadRowsRequest originalRequest = request; + + // Special case: empty query implies full table scan, so make this explicit by adding an + // unbounded range to the request + if (request.getRows().getRowKeysList().isEmpty() + && request.getRows().getRowRangesList().isEmpty()) { + + originalRequest = + request + .toBuilder() + .setRows(RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance())) + .build(); + } + + // Start building the resume request. The keys & ranges are cleared and will be recomputed. + Builder builder = originalRequest.toBuilder(); + builder.clearRows(); + + RowSet.Builder rowSetBuilder = RowSet.newBuilder(); + + for (ByteString key : originalRequest.getRows().getRowKeysList()) { + if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) { + rowSetBuilder.addRowKeys(key); + } + } + + for (RowRange rowRange : originalRequest.getRows().getRowRangesList()) { + RowRange.Builder rowRangeBuilder = RowRange.newBuilder(); + + switch (rowRange.getEndKeyCase()) { + case END_KEY_CLOSED: + if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyClosed(), lastKey) > 0) { + rowRangeBuilder.setEndKeyClosed(rowRange.getEndKeyClosed()); + } else { + continue; + } + break; + case END_KEY_OPEN: + if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyOpen(), lastKey) > 0) { + rowRangeBuilder.setEndKeyOpen(rowRange.getEndKeyOpen()); + } else { + continue; + } + break; + case ENDKEY_NOT_SET: + rowRangeBuilder.clearEndKey(); + break; + default: + throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase()); + } + + switch (rowRange.getStartKeyCase()) { + case STARTKEY_NOT_SET: + rowRangeBuilder.setStartKeyOpen(lastKey); + break; + case START_KEY_OPEN: + if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyOpen(), lastKey) < 0) { + rowRangeBuilder.setStartKeyOpen(lastKey); + } else { + rowRangeBuilder.setStartKeyOpen(rowRange.getStartKeyOpen()); + } + break; + case START_KEY_CLOSED: + if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyClosed(), lastKey) <= 0) { + rowRangeBuilder.setStartKeyOpen(lastKey); + } else { + rowRangeBuilder.setStartKeyClosed(rowRange.getStartKeyClosed()); + } + break; + default: + throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase()); + } + rowSetBuilder.addRowRanges(rowRangeBuilder.build()); + } + + // Edge case: retrying a fulfilled request. + // A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it + // had a row limit, has seen enough rows. These requests are replaced with a marker request that + // will be handled by ReadRowsRetryCompletedCallable. See docs in ReadRowsRetryCompletedCallable + // for more details. + if ((rowSetBuilder.getRowRangesCount() == 0 && rowSetBuilder.getRowKeysCount() == 0) + || (originalRequest.getRowsLimit() > 0 && originalRequest.getRowsLimit() == numProcessed)) { + return ReadRowsRetryCompletedCallable.FULFILLED_REQUEST_MARKER; + } + + if (originalRequest.getRowsLimit() > 0) { + Preconditions.checkState( + originalRequest.getRowsLimit() > numProcessed, + "Detected too many rows for the current row limit during a retry."); + builder.setRowsLimit(originalRequest.getRowsLimit() - numProcessed); + } + + builder.setRows(rowSetBuilder.build()); + return builder.build(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java new file mode 100644 index 000000000000..6c698a51ca21 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java @@ -0,0 +1,72 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.bigtable.v2.ReadRowsRequest; + +/** + * This callable addresses edge case of a ReadRows stream receiving all of the rows, but receiving a + * retryable error status instead of an OK. If a retry attempt is scheduled, then it should return + * an OK response. + * + *

    This callable works in tandem with {@link ReadRowsResumptionStrategy}, which will send a + * {@link #FULFILLED_REQUEST_MARKER} to be processed by this callable. Upon receiving the {@link + * #FULFILLED_REQUEST_MARKER}, this callable will promptly notify the {@link ResponseObserver} that + * the stream has been successfully compeleted. + * + *

    This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public final class ReadRowsRetryCompletedCallable + extends ServerStreamingCallable { + static final ReadRowsRequest FULFILLED_REQUEST_MARKER = + ReadRowsRequest.newBuilder().setRowsLimit(-1).build(); + + private final ServerStreamingCallable inner; + + public ReadRowsRetryCompletedCallable(ServerStreamingCallable inner) { + this.inner = inner; + } + + @Override + public void call( + ReadRowsRequest request, ResponseObserver responseObserver, ApiCallContext context) { + + if (request == FULFILLED_REQUEST_MARKER) { + responseObserver.onStart(new DummyController()); + responseObserver.onComplete(); + } else { + inner.call(request, responseObserver, context); + } + } + + private static class DummyController implements StreamController { + @Override + public void cancel() {} + + @Override + public void disableAutoInboundFlowControl() {} + + @Override + public void request(int count) {} + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java index 4ce13eb87daa..dcb7347dda10 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java @@ -25,7 +25,8 @@ * An implementation of a {@link Reframer} that feeds the row merging {@link StateMachine}. * *

    {@link com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver} pushes {@link - * ReadRowsResponse.CellChunk}s into this class and pops fully merged logical rows. Example usage: + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s into this class and pops fully merged logical + * rows. Example usage: * *

    {@code
      * RowMerger rowMerger = new RowMerger<>(myRowBuilder);
    diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java
    new file mode 100644
    index 000000000000..21a0186b49d5
    --- /dev/null
    +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java
    @@ -0,0 +1,40 @@
    +/*
    + * Copyright 2018 Google LLC
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     https://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +/**
    + * Implementation details for {@link
    + * com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub#readRowsCallable()}.
    + *
    + * 

    The ReadRows protocol is optimized for transmission and is not designed to be consumed + * directly. This package implements significant customizations on top of the raw GAPIC generated + * stub to handle row merging and retries. + * + *

      + *
    • ReadRowsUserCallable: Creates protobuf {@link com.google.bigtable.v2.ReadRowsRequest}s from + * user facing wrappers. + *
    • RowMergingCallable (+ helpers): Implements a state machine that merges chunks into logical + * rows. The logical row representation is configurable via a RowAdapter. Please note that + * this will also emit special marker rows that help with retries in the next stage, but need + * to be filtered out. + *
    • RowMerger (+ helpers): Implements resuming retries for gax's Callables#retrying middleware. + *
    • FilterMarkerRowsCallable: Filters out marker rows that are used for efficient retry + * resumes. The marker is an internal implementation detail to communicate state to the + * RowMerger and should not be exposed to the caller. + *
    + * + *

    This package is considered an internal implementation detail and is not meant to be used by + * applications directly. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java new file mode 100644 index 000000000000..f37975b6c267 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java @@ -0,0 +1,403 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.client.util.Lists; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.ServerStream; +import com.google.bigtable.admin.v2.InstanceName; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.ReadRowsResponse.CellChunk; +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.TableName; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.common.collect.Queues; +import com.google.common.collect.Range; +import com.google.common.truth.Truth; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import java.io.IOException; +import java.util.List; +import java.util.Queue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ReadRowsRetryTest { + private static final InstanceName instanceName = InstanceName.of("fake-project", "fake-instance"); + + private static final TableName tableName = + TableName.of(instanceName.getProject(), instanceName.getInstance(), "fake-table"); + + @Rule public GrpcServerRule serverRule = new GrpcServerRule(); + private TestBigtableService service; + private BigtableDataClient client; + + @Before + public void setUp() throws IOException { + service = new TestBigtableService(); + serverRule.getServiceRegistry().addService(service); + + BigtableDataSettings settings = + BigtableDataSettings.newBuilder() + .setInstanceName(instanceName) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create(serverRule.getChannel()))) + .build(); + + client = BigtableDataClient.create(settings); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + @Test + public void happyPathTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest("k1") + .expectRequest(Range.closedOpen("r1", "r3")) + .respondWith("k1", "r1", "r2")); + + List actualResults = + getResults(Query.create(tableName.getTable()).rowKey("k1").range("r1", "r3")); + Truth.assertThat(actualResults).containsExactly("k1", "r1", "r2").inOrder(); + } + + @Test + public void immediateRetryTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest("k1") + .expectRequest(Range.closedOpen("r1", "r3")) + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create() + .expectRequest("k1") + .expectRequest(Range.closedOpen("r1", "r3")) + .respondWith("k1", "r1", "r2")); + + List actualResults = + getResults(Query.create(tableName.getTable()).rowKey("k1").range("r1", "r3")); + Truth.assertThat(actualResults).containsExactly("k1", "r1", "r2").inOrder(); + } + + @Test + public void multipleRetryTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r9")) + .respondWith("r1", "r2", "r3", "r4") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.open("r4", "r9")) + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.open("r4", "r9")) + .respondWith("r5", "r6", "r7") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.open("r7", "r9")).respondWith("r8")); + + List actualResults = getResults(Query.create(tableName.getTable()).range("r1", "r9")); + Truth.assertThat(actualResults) + .containsExactly("r1", "r2", "r3", "r4", "r5", "r6", "r7", "r8") + .inOrder(); + } + + @Test + public void rowLimitTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r3")) + .expectRowLimit(2) + .respondWith("r1") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.open("r1", "r3")) + .expectRowLimit(1) + .respondWith("r2")); + + List actualResults = + getResults(Query.create(tableName.getTable()).range("r1", "r3").limit(2)); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + @Test + public void errorAfterRowLimitMetTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r3")) + .expectRowLimit(2) + .respondWith("r1", "r2") + .respondWithStatus(Code.UNAVAILABLE)); + + // Second retry request is handled locally in ReadRowsRetryCompletedCallable + + List actualResults = + getResults(Query.create(tableName.getTable()).range("r1", "r3").limit(2)); + + Truth.assertThat(actualResults).containsExactly("r1", "r2"); + } + + @Test + public void errorAfterRequestCompleteTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r3")) + .expectRequest("r4") + .respondWith("r2", "r4") + .respondWithStatus(Code.UNAVAILABLE)); + + // Second retry request is handled locally in ReadRowsRetryCompletedCallable + + List actualResults = + getResults(Query.create(tableName.getTable()).range("r1", "r3").rowKey("r4")); + + Truth.assertThat(actualResults).containsExactly("r2", "r4"); + } + + @Test + public void pointTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest("r1", "r2") + .respondWith("r1") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add(RpcExpectation.create().expectRequest("r2").respondWith("r2")); + + List actualResults = + getResults(Query.create(tableName.getTable()).rowKey("r1").rowKey("r2")); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + @Test + public void fullTableScanTest() { + service.expectations.add( + RpcExpectation.create().respondWith("r1").respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.greaterThan("r1")).respondWith("r2")); + List actualResults = getResults(Query.create(tableName.getTable())); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + @Test + public void retryUnboundedStartTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.lessThan("r9")) + .respondWith("r1") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.open("r1", "r9")).respondWith("r2")); + + List actualResults = + getResults( + Query.create(tableName.getTable()).range(ByteStringRange.unbounded().endOpen("r9"))); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + @Test + public void retryUnboundedEndTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.atLeast("r1")) + .respondWith("r1") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.greaterThan("r1")).respondWith("r2")); + + List actualResults = + getResults( + Query.create(tableName.getTable()) + .range(ByteStringRange.unbounded().startClosed("r1"))); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + @Test + public void retryWithLastScannedKeyTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r9")) + .respondWithLastScannedKey("r5") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.open("r5", "r9")).respondWith("r7")); + List actualResults = + getResults(Query.create(tableName.getTable()).range(ByteStringRange.create("r1", "r9"))); + Truth.assertThat(actualResults).containsExactly("r7").inOrder(); + } + + private List getResults(Query query) { + ServerStream actualRows = client.readRows(query); + List actualValues = Lists.newArrayList(); + for (Row row : actualRows) { + actualValues.add(row.getKey().toStringUtf8()); + } + return actualValues; + } + + private static class TestBigtableService extends BigtableGrpc.BigtableImplBase { + Queue expectations = Queues.newArrayDeque(); + int i = -1; + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + + RpcExpectation expectedRpc = expectations.poll(); + i++; + + Truth.assertWithMessage("Unexpected request#" + i + ":" + request.toString()) + .that(expectedRpc) + .isNotNull(); + Truth.assertWithMessage("Unexpected request#" + i) + .that(request) + .isEqualTo(expectedRpc.getExpectedRequest()); + + for (ReadRowsResponse response : expectedRpc.responses) { + responseObserver.onNext(response); + } + if (expectedRpc.statusCode.toStatus().isOk()) { + responseObserver.onCompleted(); + } else { + responseObserver.onError(expectedRpc.statusCode.toStatus().asRuntimeException()); + } + } + } + + private static class RpcExpectation { + ReadRowsRequest.Builder requestBuilder; + Status.Code statusCode; + List responses; + + private RpcExpectation() { + this.requestBuilder = ReadRowsRequest.newBuilder().setTableName(tableName.toString()); + this.statusCode = Status.Code.OK; + this.responses = Lists.newArrayList(); + } + + static RpcExpectation create() { + return new RpcExpectation(); + } + + RpcExpectation expectRequest(String... keys) { + for (String key : keys) { + requestBuilder.getRowsBuilder().addRowKeys(ByteString.copyFromUtf8(key)); + } + return this; + } + + RpcExpectation expectRequest(Range range) { + RowRange.Builder rowRange = requestBuilder.getRowsBuilder().addRowRangesBuilder(); + + if (range.hasLowerBound()) { + switch (range.lowerBoundType()) { + case CLOSED: + rowRange.setStartKeyClosed(ByteString.copyFromUtf8(range.lowerEndpoint())); + break; + case OPEN: + rowRange.setStartKeyOpen(ByteString.copyFromUtf8(range.lowerEndpoint())); + break; + default: + throw new IllegalArgumentException( + "Unexpected lowerBoundType: " + range.lowerBoundType()); + } + } else { + rowRange.clearStartKey(); + } + + if (range.hasUpperBound()) { + switch (range.upperBoundType()) { + case CLOSED: + rowRange.setEndKeyClosed(ByteString.copyFromUtf8(range.upperEndpoint())); + break; + case OPEN: + rowRange.setEndKeyOpen(ByteString.copyFromUtf8(range.upperEndpoint())); + break; + default: + throw new IllegalArgumentException( + "Unexpected upperBoundType: " + range.upperBoundType()); + } + } else { + rowRange.clearEndKey(); + } + + return this; + } + + RpcExpectation expectRowLimit(int limit) { + requestBuilder.setRowsLimit(limit); + return this; + } + + RpcExpectation respondWithStatus(Status.Code code) { + this.statusCode = code; + return this; + } + + RpcExpectation respondWith(String... responses) { + for (String response : responses) { + this.responses.add( + ReadRowsResponse.newBuilder() + .addChunks( + CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8(response)) + .setFamilyName(StringValue.newBuilder().setValue("family").build()) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.EMPTY).build()) + .setTimestampMicros(10_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build()); + } + return this; + } + + RpcExpectation respondWithLastScannedKey(String key) { + this.responses.add( + ReadRowsResponse.newBuilder().setLastScannedRowKey(ByteString.copyFromUtf8(key)).build()); + return this; + } + + ReadRowsRequest getExpectedRequest() { + return requestBuilder.build(); + } + } +}