Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Change CDC related APIs to return ByteStringRange instead of Ro… #1355

Merged
merged 3 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand Down Expand Up @@ -1503,11 +1503,11 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
* String tableId = "[TABLE]";
*
* try {
* ServerStream<RowRange> stream = bigtableDataClient.generateInitialChangeStreamPartitions(tableId);
* ServerStream<ByteStringRange> stream = bigtableDataClient.generateInitialChangeStreamPartitions(tableId);
* int count = 0;
*
* // Iterator style
* for (RowRange partition : stream) {
* for (ByteStringRange partition : stream) {
* if (++count > 10) {
* stream.cancel();
* break;
Expand All @@ -1525,7 +1525,7 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStream<RowRange> generateInitialChangeStreamPartitions(String tableId) {
public ServerStream<ByteStringRange> generateInitialChangeStreamPartitions(String tableId) {
return generateInitialChangeStreamPartitionsCallable().call(tableId);
}

Expand All @@ -1545,7 +1545,7 @@ public ServerStream<RowRange> generateInitialChangeStreamPartitions(String table
* public void onStart(StreamController controller) {
* this.controller = controller;
* }
* public void onResponse(RowRange partition) {
* public void onResponse(ByteStringRange partition) {
* if (++count > 10) {
* controller.cancel();
* return;
Expand All @@ -1568,7 +1568,7 @@ public ServerStream<RowRange> generateInitialChangeStreamPartitions(String table
*/
@InternalApi("Used in Changestream beam pipeline.")
public void generateInitialChangeStreamPartitionsAsync(
String tableId, ResponseObserver<RowRange> observer) {
String tableId, ResponseObserver<ByteStringRange> observer) {
generateInitialChangeStreamPartitionsCallable().call(tableId, observer);
}

Expand All @@ -1584,7 +1584,7 @@ public void generateInitialChangeStreamPartitionsAsync(
*
* // Iterator style
* try {
* for(RowRange partition : bigtableDataClient.generateInitialChangeStreamPartitionsCallable().call(tableId)) {
* for(ByteStringRange partition : bigtableDataClient.generateInitialChangeStreamPartitionsCallable().call(tableId)) {
* // Do something with partition
* }
* } catch (NotFoundException e) {
Expand All @@ -1595,18 +1595,18 @@ public void generateInitialChangeStreamPartitionsAsync(
*
* // Sync style
* try {
* List<RowRange> partitions = bigtableDataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId);
* List<ByteStringRange> partitions = bigtableDataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Point look up
* ApiFuture<RowRange> partitionFuture =
* ApiFuture<ByteStringRange> partitionFuture =
* bigtableDataClient.generateInitialChangeStreamPartitionsCallable().first().futureCall(tableId);
*
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<RowRange>() {
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<ByteStringRange>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
Expand All @@ -1626,7 +1626,8 @@ public void generateInitialChangeStreamPartitionsAsync(
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStreamingCallable<String, RowRange> generateInitialChangeStreamPartitionsCallable() {
public ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable() {
return stub.generateInitialChangeStreamPartitionsCallable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ public ChangeStreamContinuationToken(
.build();
}

// TODO: Change this to return ByteStringRange.
public RowRange getRowRange() {
return this.tokenProto.getPartition().getRowRange();
/**
* Get the partition of the current continuation token, represented by a {@link ByteStringRange}.
*/
public ByteStringRange getPartition() {
return ByteStringRange.create(
this.tokenProto.getPartition().getRowRange().getStartKeyClosed(),
this.tokenProto.getPartition().getRowRange().getEndKeyOpen());
}

public String getToken() {
Expand Down Expand Up @@ -95,19 +99,19 @@ public boolean equals(Object o) {
return false;
}
ChangeStreamContinuationToken otherToken = (ChangeStreamContinuationToken) o;
return Objects.equal(getRowRange(), otherToken.getRowRange())
return Objects.equal(getPartition(), otherToken.getPartition())
&& Objects.equal(getToken(), otherToken.getToken());
}

@Override
public int hashCode() {
return Objects.hashCode(getRowRange(), getToken());
return Objects.hashCode(getPartition(), getToken());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("rowRange", getRowRange())
.add("partition", getPartition())
.add("token", getToken())
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.bigtable.v2.RowRange;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
Expand Down Expand Up @@ -395,6 +398,22 @@ private void writeObject(ObjectOutputStream output) throws IOException {
output.defaultWriteObject();
}

@InternalApi("Used in Changestream beam pipeline.")
public static ByteString toByteString(ByteStringRange byteStringRange) {
return RowRange.newBuilder()
.setStartKeyClosed(byteStringRange.getStart())
.setEndKeyOpen(byteStringRange.getEnd())
.build()
.toByteString();
}

@InternalApi("Used in Changestream beam pipeline.")
public static ByteStringRange toByteStringRange(ByteString byteString)
throws InvalidProtocolBufferException {
RowRange rowRange = RowRange.newBuilder().mergeFrom(byteString).build();
return ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen());
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand Down Expand Up @@ -155,7 +156,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;

private final ServerStreamingCallable<String, RowRange>
private final ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable;

private final ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecord>
Expand Down Expand Up @@ -833,7 +834,7 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
* RowRange}.
* </ul>
*/
private ServerStreamingCallable<String, RowRange>
private ServerStreamingCallable<String, ByteStringRange>
createGenerateInitialChangeStreamPartitionsCallable() {
ServerStreamingCallable<
GenerateInitialChangeStreamPartitionsRequest,
Expand Down Expand Up @@ -862,22 +863,22 @@ public Map<String, String> extract(
.build(),
settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes());

ServerStreamingCallable<String, RowRange> userCallable =
ServerStreamingCallable<String, ByteStringRange> userCallable =
new GenerateInitialChangeStreamPartitionsUserCallable(base, requestContext);

ServerStreamingCallable<String, RowRange> withStatsHeaders =
ServerStreamingCallable<String, ByteStringRange> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(userCallable);

// Sometimes GenerateInitialChangeStreamPartitions connections are disconnected via an RST
// frame. This error is transient and should be treated similar to UNAVAILABLE. However, this
// exception has an INTERNAL error code which by default is not retryable. Convert the exception
// so it can be retried in the client.
ServerStreamingCallable<String, RowRange> convertException =
ServerStreamingCallable<String, ByteStringRange> convertException =
new ConvertStreamExceptionCallable<>(withStatsHeaders);

// Copy idle timeout settings for watchdog.
ServerStreamingCallSettings<String, RowRange> innerSettings =
ServerStreamingCallSettings.<String, RowRange>newBuilder()
ServerStreamingCallSettings<String, ByteStringRange> innerSettings =
ServerStreamingCallSettings.<String, ByteStringRange>newBuilder()
.setRetryableCodes(
settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes())
.setRetrySettings(
Expand All @@ -886,17 +887,17 @@ public Map<String, String> extract(
settings.generateInitialChangeStreamPartitionsSettings().getIdleTimeout())
.build();

ServerStreamingCallable<String, RowRange> watched =
ServerStreamingCallable<String, ByteStringRange> watched =
Callables.watched(convertException, innerSettings, clientContext);

ServerStreamingCallable<String, RowRange> withBigtableTracer =
ServerStreamingCallable<String, ByteStringRange> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<String, RowRange> retrying =
ServerStreamingCallable<String, ByteStringRange> retrying =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);

SpanName span = getSpanName("GenerateInitialChangeStreamPartitions");
ServerStreamingCallable<String, RowRange> traced =
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
Expand Down Expand Up @@ -1039,7 +1040,8 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
}

/** Returns a streaming generate initial change stream partitions callable */
public ServerStreamingCallable<String, RowRange> generateInitialChangeStreamPartitionsCallable() {
public ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable() {
return generateInitialChangeStreamPartitionsCallable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.auth.Credentials;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.Version;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand Down Expand Up @@ -212,7 +212,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final BigtableBulkReadRowsCallSettings bulkReadRowsSettings;
private final UnaryCallSettings<ConditionalRowMutation, Boolean> checkAndMutateRowSettings;
private final UnaryCallSettings<ReadModifyWriteRow, Row> readModifyWriteRowSettings;
private final ServerStreamingCallSettings<String, RowRange>
private final ServerStreamingCallSettings<String, ByteStringRange>
generateInitialChangeStreamPartitionsSettings;
private final ServerStreamingCallSettings<ReadChangeStreamQuery, ChangeStreamRecord>
readChangeStreamSettings;
Expand Down Expand Up @@ -537,7 +537,7 @@ public UnaryCallSettings<ReadModifyWriteRow, Row> readModifyWriteRowSettings() {
return readModifyWriteRowSettings;
}

public ServerStreamingCallSettings<String, RowRange>
public ServerStreamingCallSettings<String, ByteStringRange>
generateInitialChangeStreamPartitionsSettings() {
return generateInitialChangeStreamPartitionsSettings;
}
Expand Down Expand Up @@ -571,7 +571,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private final UnaryCallSettings.Builder<ConditionalRowMutation, Boolean>
checkAndMutateRowSettings;
private final UnaryCallSettings.Builder<ReadModifyWriteRow, Row> readModifyWriteRowSettings;
private final ServerStreamingCallSettings.Builder<String, RowRange>
private final ServerStreamingCallSettings.Builder<String, ByteStringRange>
generateInitialChangeStreamPartitionsSettings;
private final ServerStreamingCallSettings.Builder<ReadChangeStreamQuery, ChangeStreamRecord>
readChangeStreamSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest;
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;

/**
* Simple wrapper for GenerateInitialChangeStreamPartitions to wrap the request and response
* protobufs.
*/
public class GenerateInitialChangeStreamPartitionsUserCallable
extends ServerStreamingCallable<String, RowRange> {
extends ServerStreamingCallable<String, ByteStringRange> {
private final RequestContext requestContext;
private final ServerStreamingCallable<
GenerateInitialChangeStreamPartitionsRequest,
Expand All @@ -49,7 +49,7 @@ public GenerateInitialChangeStreamPartitionsUserCallable(

@Override
public void call(
String tableId, ResponseObserver<RowRange> responseObserver, ApiCallContext context) {
String tableId, ResponseObserver<ByteStringRange> responseObserver, ApiCallContext context) {
String tableName =
NameUtil.formatTableName(
requestContext.getProjectId(), requestContext.getInstanceId(), tableId);
Expand All @@ -62,12 +62,12 @@ public void call(
inner.call(request, new ConvertPartitionToRangeObserver(responseObserver), context);
}

private class ConvertPartitionToRangeObserver
private static class ConvertPartitionToRangeObserver
implements ResponseObserver<GenerateInitialChangeStreamPartitionsResponse> {

private final ResponseObserver<RowRange> outerObserver;
private final ResponseObserver<ByteStringRange> outerObserver;

ConvertPartitionToRangeObserver(ResponseObserver<RowRange> observer) {
ConvertPartitionToRangeObserver(ResponseObserver<ByteStringRange> observer) {
this.outerObserver = observer;
}

Expand All @@ -78,12 +78,11 @@ public void onStart(final StreamController controller) {

@Override
public void onResponse(GenerateInitialChangeStreamPartitionsResponse response) {
RowRange rowRange =
RowRange.newBuilder()
.setStartKeyClosed(response.getPartition().getRowRange().getStartKeyClosed())
.setEndKeyOpen(response.getPartition().getRowRange().getEndKeyOpen())
.build();
outerObserver.onResponse(rowRange);
ByteStringRange byteStringRange =
ByteStringRange.create(
response.getPartition().getRowRange().getStartKeyClosed(),
response.getPartition().getRowRange().getEndKeyOpen());
outerObserver.onResponse(byteStringRange);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Mutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand Down Expand Up @@ -83,7 +83,7 @@ public class BigtableDataClientTests {
@Mock private Batcher<ByteString, Row> mockBulkReadRowsBatcher;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ServerStreamingCallable<String, RowRange>
private ServerStreamingCallable<String, ByteStringRange>
mockGenerateInitialChangeStreamPartitionsCallable;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
Expand Down Expand Up @@ -342,7 +342,7 @@ public void proxyGenerateInitialChangeStreamPartitionsAsyncTest() {
.thenReturn(mockGenerateInitialChangeStreamPartitionsCallable);

@SuppressWarnings("unchecked")
ResponseObserver<RowRange> mockObserver = Mockito.mock(ResponseObserver.class);
ResponseObserver<ByteStringRange> mockObserver = Mockito.mock(ResponseObserver.class);
bigtableDataClient.generateInitialChangeStreamPartitionsAsync("fake-table", mockObserver);

Mockito.verify(mockGenerateInitialChangeStreamPartitionsCallable)
Expand Down
Loading