From 8847fed7f77ce4715c197ca1cfcc3108e0fa1004 Mon Sep 17 00:00:00 2001 From: tengzhonger <109308630+tengzhonger@users.noreply.github.com> Date: Wed, 1 Mar 2023 17:06:14 -0500 Subject: [PATCH] feat: Add getNewPartitions method to CloseStream for Bigtable ChangeStream (#1655) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md). --- .../clirr-ignored-differences.xml | 6 ++ .../bigtable/data/v2/models/CloseStream.java | 31 ++++++- .../ReadChangeStreamResumptionStrategy.java | 8 +- .../v2/models/ChangeStreamRecordTest.java | 75 ++++++++++++++++- .../DefaultChangeStreamRecordAdapterTest.java | 2 - ...ChangeStreamRecordMergingCallableTest.java | 6 +- ...ReadChangeStreamMergingAcceptanceTest.java | 9 +++ .../ReadChangeStreamRetryTest.java | 30 +++++-- .../src/test/resources/changestream.json | 80 ++++++++++++++++++- 9 files changed, 228 insertions(+), 19 deletions(-) diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index a0ffe39bd1..da5feada67 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -100,6 +100,12 @@ *getStatus* com.google.cloud.bigtable.common.Status + + + 7013 + com/google/cloud/bigtable/data/v2/models/CloseStream + *getNewPartitions* + 7006 diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index d5e121e664..221b05f587 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -19,6 +19,8 @@ import com.google.auto.value.AutoValue; import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.cloud.bigtable.common.Status; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.List; @@ -35,8 +37,22 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable { private static CloseStream create( com.google.rpc.Status status, - List changeStreamContinuationTokens) { - return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens); + List changeStreamContinuationTokens, + List newPartitions) { + if (status.getCode() == 0) { + Preconditions.checkState( + changeStreamContinuationTokens.isEmpty(), + "An OK CloseStream should not have continuation tokens."); + } else { + Preconditions.checkState( + !changeStreamContinuationTokens.isEmpty(), + "A non-OK CloseStream should have continuation token(s)."); + Preconditions.checkState( + changeStreamContinuationTokens.size() == newPartitions.size(), + "Number of continuation tokens does not match number of new partitions."); + } + return new AutoValue_CloseStream( + Status.fromProto(status), changeStreamContinuationTokens, newPartitions); } /** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */ @@ -46,6 +62,13 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea closeStream.getStatus(), closeStream.getContinuationTokensList().stream() .map(ChangeStreamContinuationToken::fromProto) + .collect(ImmutableList.toImmutableList()), + closeStream.getNewPartitionsList().stream() + .map( + newPartition -> + ByteStringRange.create( + newPartition.getRowRange().getStartKeyClosed(), + newPartition.getRowRange().getEndKeyOpen())) .collect(ImmutableList.toImmutableList())); } @@ -56,4 +79,8 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea @InternalApi("Intended for use by the BigtableIO in apache/beam only.") @Nonnull public abstract List getChangeStreamContinuationTokens(); + + @InternalApi("Intended for use by the BigtableIO in apache/beam only.") + @Nonnull + public abstract List getNewPartitions(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java index 660466db95..fda608eda5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java @@ -56,7 +56,13 @@ public StreamResumptionStrategy cr public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) { // Update the token from a Heartbeat or a ChangeStreamMutation. // We don't worry about resumption after CloseStream, since the server - // will return an OK status right after sending a CloseStream. + // will close the stream with an OK status right after sending a CloseStream, + // no matter what status the CloseStream.Status is: + // 1) ... => CloseStream.Ok => final OK. This means the read finishes successfully. + // 2) ... => CloseStream.Error => final OK. This means the client should start + // a new ReadChangeStream call with the continuation tokens specified in + // CloseStream. + // Either case, we don't need to retry after receiving a CloseStream. if (changeStreamRecordAdapter.isHeartbeat(response)) { this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response); } else if (changeStreamRecordAdapter.isChangeStreamMutation(response)) { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java index 688ce46bcf..c00221be3d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java @@ -30,7 +30,11 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.function.ThrowingRunnable; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Instant; @@ -38,6 +42,8 @@ @RunWith(JUnit4.class) public class ChangeStreamRecordTest { + @Rule public ExpectedException expect = ExpectedException.none(); + @Test public void heartbeatSerializationTest() throws IOException, ClassNotFoundException { ReadChangeStreamResponse.Heartbeat heartbeatProto = @@ -60,7 +66,7 @@ public void heartbeatSerializationTest() throws IOException, ClassNotFoundExcept @Test public void closeStreamSerializationTest() throws IOException, ClassNotFoundException { - Status status = Status.newBuilder().setCode(0).build(); + Status status = Status.newBuilder().setCode(11).build(); RowRange rowRange1 = RowRange.newBuilder() .setStartKeyClosed(ByteString.copyFromUtf8("")) @@ -85,6 +91,8 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce .setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build()) .setToken(token2) .build()) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1)) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2)) .setStatus(status) .build(); CloseStream closeStream = CloseStream.fromProto(closeStreamProto); @@ -98,6 +106,7 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce assertThat(actual.getChangeStreamContinuationTokens()) .isEqualTo(closeStream.getChangeStreamContinuationTokens()); assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus()); + assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions()); } @Test @@ -129,7 +138,7 @@ public void heartbeatTest() { @Test public void closeStreamTest() { - Status status = Status.newBuilder().setCode(0).build(); + Status status = Status.newBuilder().setCode(11).build(); RowRange rowRange1 = RowRange.newBuilder() .setStartKeyClosed(ByteString.copyFromUtf8("")) @@ -154,6 +163,8 @@ public void closeStreamTest() { .setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build()) .setToken(token2) .build()) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1)) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2)) .setStatus(status) .build(); CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto); @@ -169,5 +180,65 @@ public void closeStreamTest() { ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen())); assertThat(token2) .isEqualTo(actualCloseStream.getChangeStreamContinuationTokens().get(1).getToken()); + assertThat(actualCloseStream.getNewPartitions().get(0)) + .isEqualTo( + ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen())); + assertThat(actualCloseStream.getNewPartitions().get(1)) + .isEqualTo( + ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen())); + } + + // Tests that an OK CloseStream should not have continuation tokens. + @Test(expected = IllegalStateException.class) + public void closeStreamOkWithContinuationTokenShouldFail() { + Status status = Status.newBuilder().setCode(0).build(); + RowRange rowRange = + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("")) + .setEndKeyOpen(ByteString.copyFromUtf8("apple")) + .build(); + String token = "close-stream-token-1"; + ReadChangeStreamResponse.CloseStream closeStreamProto = + ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens( + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(rowRange)) + .setToken(token)) + .setStatus(status) + .build(); + Assert.assertThrows( + IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto)); + } + + // Tests that a non-OK CloseStream should have continuation tokens. + @Test(expected = IllegalStateException.class) + public void closeStreamErrorWithoutContinuationTokenShouldFail() { + Status status = Status.newBuilder().setCode(11).build(); + ReadChangeStreamResponse.CloseStream closeStreamProto = + ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(status).build(); + Assert.assertThrows( + IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto)); + } + + // Tests that the number of continuation tokens should match the number of new partitions. + @Test(expected = IllegalStateException.class) + public void closeStreamTokenAndNewPartitionCountMismatchedTest() { + Status status = Status.newBuilder().setCode(11).build(); + RowRange rowRange = + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("")) + .setEndKeyOpen(ByteString.copyFromUtf8("apple")) + .build(); + String token = "close-stream-token-1"; + ReadChangeStreamResponse.CloseStream closeStreamProto = + ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens( + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(rowRange)) + .setToken(token)) + .setStatus(status) + .build(); + Assert.assertThrows( + IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto)); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java index 99af76fb03..22270bc269 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java @@ -150,8 +150,6 @@ public void heartbeatTest() { public void closeStreamTest() { ReadChangeStreamResponse.CloseStream expectedCloseStream = ReadChangeStreamResponse.CloseStream.newBuilder() - .addContinuationTokens( - StreamContinuationToken.newBuilder().setToken("random-token").build()) .setStatus(Status.newBuilder().setCode(0).build()) .build(); assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream)) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java index 736491a0af..f0939fb0cf 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java @@ -102,7 +102,8 @@ public void closeStreamTest() { ReadChangeStreamResponse.CloseStream closeStreamProto = ReadChangeStreamResponse.CloseStream.newBuilder() .addContinuationTokens(streamContinuationToken) - .setStatus(Status.newBuilder().setCode(0).build()) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange)) + .setStatus(Status.newBuilder().setCode(11)) .build(); ReadChangeStreamResponse response = ReadChangeStreamResponse.newBuilder().setCloseStream(closeStreamProto).build(); @@ -127,5 +128,8 @@ public void closeStreamTest() { .isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen())); assertThat(changeStreamContinuationToken.getToken()) .isEqualTo(streamContinuationToken.getToken()); + assertThat(closeStream.getNewPartitions().size()).isEqualTo(1); + assertThat(closeStream.getNewPartitions().get(0)) + .isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen())); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java index 67d6a99f7b..7c3243ecfe 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java @@ -38,6 +38,7 @@ import com.google.cloud.bigtable.data.v2.models.DeleteFamily; import com.google.cloud.bigtable.data.v2.models.Entry; import com.google.cloud.bigtable.data.v2.models.Heartbeat; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.SetCell; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi; import com.google.cloud.conformance.bigtable.v2.ChangeStreamTestDefinition.ChangeStreamTestFile; @@ -173,6 +174,14 @@ public void test() throws Exception { .setToken(token.getToken()) .build()); } + for (ByteStringRange newPartition : closeStream.getNewPartitions()) { + builder.addNewPartitions( + StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(newPartition.getStart()) + .setEndKeyOpen(newPartition.getEnd()))); + } ReadChangeStreamResponse.CloseStream closeStreamProto = builder.build(); actualResults.add( ReadChangeStreamTest.Result.newBuilder() diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java index c994f3fc8d..48a62bfee8 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java @@ -122,6 +122,15 @@ private StreamContinuationToken createStreamContinuationToken(@Nonnull String to .build(); } + private StreamPartition createNewPartitionForCloseStream() { + return StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8(START_KEY_CLOSED)) + .setEndKeyOpen(ByteString.copyFromUtf8(END_KEY_OPEN))) + .build(); + } + private ReadChangeStreamResponse.Heartbeat createHeartbeat( StreamContinuationToken streamContinuationToken) { return ReadChangeStreamResponse.Heartbeat.newBuilder() @@ -130,11 +139,18 @@ private ReadChangeStreamResponse.Heartbeat createHeartbeat( .build(); } - private ReadChangeStreamResponse.CloseStream createCloseStream() { - return ReadChangeStreamResponse.CloseStream.newBuilder() - .addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN)) - .setStatus(com.google.rpc.Status.newBuilder().setCode(0).build()) - .build(); + private ReadChangeStreamResponse.CloseStream createCloseStream(boolean isOk) { + ReadChangeStreamResponse.CloseStream.Builder builder = + ReadChangeStreamResponse.CloseStream.newBuilder(); + if (isOk) { + builder.setStatus(com.google.rpc.Status.newBuilder().setCode(0)); + } else { + builder + .setStatus(com.google.rpc.Status.newBuilder().setCode(11)) + .addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN)) + .addNewPartitions(createNewPartitionForCloseStream()); + } + return builder.build(); } private ReadChangeStreamResponse.DataChange createDataChange(boolean done) { @@ -178,7 +194,7 @@ public void happyPathHeartbeatTest() { @Test public void happyPathCloseStreamTest() { ReadChangeStreamResponse closeStreamResponse = - ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build(); + ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(true)).build(); service.expectations.add( RpcExpectation.create().expectInitialRequest().respondWith(closeStreamResponse)); List actualResults = getResults(); @@ -221,7 +237,7 @@ public void singleHeartbeatImmediateRetryTest() { public void singleCloseStreamImmediateRetryTest() { // CloseStream. ReadChangeStreamResponse closeStreamResponse = - ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build(); + ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(false)).build(); service.expectations.add( RpcExpectation.create().expectInitialRequest().respondWithStatus(Code.UNAVAILABLE)); // Resume with the exact same request. diff --git a/google-cloud-bigtable/src/test/resources/changestream.json b/google-cloud-bigtable/src/test/resources/changestream.json index 9d9e2d46cc..661bf1b4cb 100644 --- a/google-cloud-bigtable/src/test/resources/changestream.json +++ b/google-cloud-bigtable/src/test/resources/changestream.json @@ -61,11 +61,25 @@ "partition": { "row_range": { "start_key_closed": "0000000000000001", - "end_key_open": "0000000000000002" + "end_key_open": "0000000000000003" } }, "token": "close-stream-token-2" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + }, + { + "row_range": { + "start_key_closed": "0000000000000002", + "end_key_open": "0000000000000003" + } + } ] } } @@ -92,11 +106,25 @@ "partition": { "row_range": { "start_key_closed": "0000000000000001", - "end_key_open": "0000000000000002" + "end_key_open": "0000000000000003" } }, "token": "close-stream-token-2" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + }, + { + "row_range": { + "start_key_closed": "0000000000000002", + "end_key_open": "0000000000000003" + } + } ] } }, @@ -137,6 +165,14 @@ }, "token": "close-stream-token-1" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + } ] } } @@ -176,6 +212,14 @@ }, "token": "close-stream-token-1" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + } ] } }, @@ -1280,11 +1324,25 @@ "partition": { "row_range": { "start_key_closed": "0000000000000001", - "end_key_open": "0000000000000002" + "end_key_open": "0000000000000003" } }, "token": "close-stream-token-2" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + }, + { + "row_range": { + "start_key_closed": "0000000000000002", + "end_key_open": "0000000000000003" + } + } ] } } @@ -1363,11 +1421,25 @@ "partition": { "row_range": { "start_key_closed": "0000000000000001", - "end_key_open": "0000000000000002" + "end_key_open": "0000000000000003" } }, "token": "close-stream-token-2" } + ], + "new_partitions": [ + { + "row_range": { + "start_key_closed": "", + "end_key_open": "0000000000000002" + } + }, + { + "row_range": { + "start_key_closed": "0000000000000002", + "end_key_open": "0000000000000003" + } + } ] } },