From 4fcf3477c5b150601e9efc26d1699c259c91ea34 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 13 Feb 2025 16:39:41 -0500 Subject: [PATCH] chore: extend resumption strategy --- .../BigtableStreamResumptionStrategy.java | 27 +++++++++++++++++++ .../readrows/ReadRowsResumptionStrategy.java | 15 ++++++++--- .../ServerStreamingAttemptCallable.java | 5 ++++ 3 files changed, 43 insertions(+), 4 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableStreamResumptionStrategy.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableStreamResumptionStrategy.java new file mode 100644 index 0000000000..d10a10a24f --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableStreamResumptionStrategy.java @@ -0,0 +1,27 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.StreamResumptionStrategy; + +@InternalApi +/** Expand StreamResumptionStrategy to also process the error. */ +public abstract class BigtableStreamResumptionStrategy + implements StreamResumptionStrategy { + + public abstract Throwable processError(Throwable throwable); +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java index 2db46c0c29..68af76c34e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -22,20 +22,21 @@ import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.data.v2.internal.RowSetUtil; import com.google.cloud.bigtable.data.v2.models.RowAdapter; +import com.google.cloud.bigtable.data.v2.stub.BigtableStreamResumptionStrategy; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; /** - * An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks the - * last complete row seen and upon retry can build a request to resume the stream from where it left - * off. + * An implementation of a {@link BigtableStreamResumptionStrategy} for merged rows. This class + * tracks the last complete row seen and upon retry can build a request to resume the stream from + * where it left off. * *

This class is considered an internal implementation detail and not meant to be used by * applications. */ @InternalApi public class ReadRowsResumptionStrategy - implements StreamResumptionStrategy { + extends BigtableStreamResumptionStrategy { private final RowAdapter rowAdapter; private ByteString lastKey = ByteString.EMPTY; // Number of rows processed excluding Marker row. @@ -69,6 +70,12 @@ public RowT processResponse(RowT response) { return response; } + @Override + public Throwable processError(Throwable throwable) { + // Noop + return throwable; + } + /** * {@inheritDoc} * diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java index 793cf2e91c..7ec29f8b77 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java @@ -25,6 +25,7 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StateCheckingResponseObserver; import com.google.api.gax.rpc.StreamController; +import com.google.cloud.bigtable.data.v2.stub.BigtableStreamResumptionStrategy; import com.google.common.base.Preconditions; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -345,6 +346,10 @@ private void onAttemptError(Throwable throwable) { localCancellationCause = cancellationCause; } + if (resumptionStrategy instanceof BigtableStreamResumptionStrategy) { + throwable = ((BigtableStreamResumptionStrategy) resumptionStrategy).processError(throwable); + } + if (localCancellationCause != null) { // Take special care to preserve the cancellation's stack trace. innerAttemptFuture.setException(localCancellationCause);