diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index b79b184e7a..10571ecd1f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -321,14 +321,28 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - ChangeStreamMutation otherChangeStreamMutation = (ChangeStreamMutation) o; - return Objects.equal(this.hashCode(), otherChangeStreamMutation.hashCode()); + ChangeStreamMutation other = (ChangeStreamMutation) o; + return Objects.equal(this.rowKey, other.rowKey) + && Objects.equal(this.type, other.type) + && Objects.equal(this.sourceClusterId, other.sourceClusterId) + && Objects.equal(this.commitTimestamp, other.commitTimestamp) + && Objects.equal(this.tieBreaker, other.tieBreaker) + && Objects.equal(this.token, other.token) + && Objects.equal(this.lowWatermark, other.lowWatermark) + && Objects.equal(this.entries.build(), other.entries.build()); } @Override public int hashCode() { return Objects.hashCode( - rowKey, type, sourceClusterId, commitTimestamp, tieBreaker, token, lowWatermark, entries); + rowKey, + type, + sourceClusterId, + commitTimestamp, + tieBreaker, + token, + lowWatermark, + entries.build()); } @Override diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java new file mode 100644 index 0000000000..6e9715a407 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java @@ -0,0 +1,173 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import com.google.api.core.InternalApi; +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import javax.annotation.Nonnull; + +/** + * An extension point that allows end users to plug in a custom implementation of logical change + * stream records. This is useful in cases where the user would like to apply advanced client side + * filtering(for example, only keep DeleteFamily in the mutations). This adapter acts like a factory + * for a SAX style change stream record builder. + */ +public interface ChangeStreamRecordAdapter { + /** Creates a new instance of a {@link ChangeStreamRecordBuilder}. */ + ChangeStreamRecordBuilder createChangeStreamRecordBuilder(); + + /** Checks if the given change stream record is a Heartbeat. */ + @InternalApi("Used in Changestream beam pipeline.") + boolean isHeartbeat(ChangeStreamRecordT record); + + /** + * Get the token from the given Heartbeat record. If the given record is not a Heartbeat, it will + * throw an Exception. + */ + @InternalApi("Used in Changestream beam pipeline.") + String getTokenFromHeartbeat(ChangeStreamRecordT heartbeatRecord); + + /** Checks if the given change stream record is a ChangeStreamMutation. */ + @InternalApi("Used in Changestream beam pipeline.") + boolean isChangeStreamMutation(ChangeStreamRecordT record); + + /** + * Get the token from the given ChangeStreamMutation record. If the given record is not a + * ChangeStreamMutation, it will throw an Exception. + */ + @InternalApi("Used in Changestream beam pipeline.") + String getTokenFromChangeStreamMutation(ChangeStreamRecordT record); + + /** + * A SAX style change stream record factory. It is responsible for creating one of the three types + * of change stream record: heartbeat, close stream, and a change stream mutation. + * + *

State management is handled external to the implementation of this class: + * + *

    + * Case 1: Heartbeat + *
  1. Exactly 1 {@code onHeartbeat}. + *
+ * + *
    + * Case 2: CloseStream + *
  1. Exactly 1 {@code onCloseStream}. + *
+ * + *
    + * Case 3: ChangeStreamMutation. A change stream mutation consists of one or more mods, where + * the SetCells might be chunked. There are 3 different types of mods that a ReadChangeStream + * response can have: + *
  1. DeleteFamily -> Exactly 1 {@code deleteFamily} + *
  2. DeleteCell -> Exactly 1 {@code deleteCell} + *
  3. SetCell -> Exactly 1 {@code startCell}, At least 1 {@code CellValue}, Exactly 1 {@code + * finishCell}. + *
+ * + *

The whole flow of constructing a ChangeStreamMutation is: + * + *

    + *
  1. Exactly 1 {@code startUserMutation} or {@code startGcMutation}. + *
  2. At least 1 DeleteFamily/DeleteCell/SetCell mods. + *
  3. Exactly 1 {@code finishChangeStreamMutation}. + *
+ * + *

Note: For a non-chunked SetCell, only 1 {@code CellValue} will be called. For a chunked + * SetCell, more than 1 {@code CellValue}s will be called. + * + *

Note: DeleteRow's won't appear in data changes since they'll be converted to multiple + * DeleteFamily's. + */ + interface ChangeStreamRecordBuilder { + /** + * Called to create a heartbeat. This will be called at most once. If called, the current change + * stream record must not include any data changes or close stream messages. + */ + ChangeStreamRecordT onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat); + + /** + * Called to create a close stream message. This will be called at most once. If called, the + * current change stream record must not include any data changes or heartbeats. + */ + ChangeStreamRecordT onCloseStream(ReadChangeStreamResponse.CloseStream closeStream); + + /** + * Called to start a new user initiated ChangeStreamMutation. This will be called at most once. + * If called, the current change stream record must not include any close stream message or + * heartbeat. + */ + void startUserMutation( + @Nonnull ByteString rowKey, + @Nonnull String sourceClusterId, + @Nonnull Timestamp commitTimestamp, + int tieBreaker); + + /** + * Called to start a new Garbage Collection ChangeStreamMutation. This will be called at most + * once. If called, the current change stream record must not include any close stream message + * or heartbeat. + */ + void startGcMutation( + @Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker); + + /** Called to add a DeleteFamily mod. */ + void deleteFamily(@Nonnull String familyName); + + /** Called to add a DeleteCell mod. */ + void deleteCells( + @Nonnull String familyName, + @Nonnull ByteString qualifier, + @Nonnull TimestampRange timestampRange); + + /** + * Called to start a SetCell. + * + *

    + * In case of a non-chunked cell, the following order is guaranteed: + *
  1. Exactly 1 {@code startCell}. + *
  2. Exactly 1 {@code cellValue}. + *
  3. Exactly 1 {@code finishCell}. + *
+ * + *
    + * In case of a chunked cell, the following order is guaranteed: + *
  1. Exactly 1 {@code startCell}. + *
  2. At least 2 {@code cellValue}. + *
  3. Exactly 1 {@code finishCell}. + *
+ */ + void startCell(String family, ByteString qualifier, long timestampMicros); + + /** + * Called once per non-chunked cell, or at least twice per chunked cell to concatenate the cell + * value. + */ + void cellValue(ByteString value); + + /** Called once per cell to signal the end of the value (unless reset). */ + void finishCell(); + + /** Called once per stream record to signal that all mods have been processed (unless reset). */ + ChangeStreamRecordT finishChangeStreamMutation( + @Nonnull String token, @Nonnull Timestamp lowWatermark); + + /** Called when the current in progress change stream record should be dropped */ + void reset(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java new file mode 100644 index 0000000000..d8eb632e54 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java @@ -0,0 +1,175 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import javax.annotation.Nonnull; + +/** + * Default implementation of a {@link ChangeStreamRecordAdapter} that uses {@link + * ChangeStreamRecord}s to represent change stream records. + */ +public class DefaultChangeStreamRecordAdapter + implements ChangeStreamRecordAdapter { + + /** {@inheritDoc} */ + @Override + public ChangeStreamRecordBuilder createChangeStreamRecordBuilder() { + return new DefaultChangeStreamRecordBuilder(); + } + + /** {@inheritDoc} */ + @Override + public boolean isHeartbeat(ChangeStreamRecord record) { + return record instanceof Heartbeat; + } + + /** {@inheritDoc} */ + @Override + public String getTokenFromHeartbeat(ChangeStreamRecord record) { + Preconditions.checkArgument(isHeartbeat(record), "record is not a Heartbeat."); + return ((Heartbeat) record).getChangeStreamContinuationToken().getToken(); + } + + /** {@inheritDoc} */ + @Override + public boolean isChangeStreamMutation(ChangeStreamRecord record) { + return record instanceof ChangeStreamMutation; + } + + /** {@inheritDoc} */ + @Override + public String getTokenFromChangeStreamMutation(ChangeStreamRecord record) { + Preconditions.checkArgument( + isChangeStreamMutation(record), "record is not a ChangeStreamMutation."); + return ((ChangeStreamMutation) record).getToken(); + } + + /** {@inheritDoc} */ + static class DefaultChangeStreamRecordBuilder + implements ChangeStreamRecordBuilder { + private ChangeStreamMutation.Builder changeStreamMutationBuilder = null; + + // For the current SetCell. + private String family; + private ByteString qualifier; + private long timestampMicros; + private ByteString value; + + public DefaultChangeStreamRecordBuilder() { + reset(); + } + + /** {@inheritDoc} */ + @Override + public ChangeStreamRecord onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + Preconditions.checkArgument( + this.changeStreamMutationBuilder == null, + "Can not create a Heartbeat when there is an existing ChangeStreamMutation being built."); + return Heartbeat.fromProto(heartbeat); + } + + /** {@inheritDoc} */ + @Override + public ChangeStreamRecord onCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + Preconditions.checkArgument( + this.changeStreamMutationBuilder == null, + "Can not create a CloseStream when there is an existing ChangeStreamMutation being built."); + return CloseStream.fromProto(closeStream); + } + + /** {@inheritDoc} */ + @Override + public void startUserMutation( + @Nonnull ByteString rowKey, + @Nonnull String sourceClusterId, + @Nonnull Timestamp commitTimestamp, + int tieBreaker) { + this.changeStreamMutationBuilder = + ChangeStreamMutation.createUserMutation( + rowKey, sourceClusterId, commitTimestamp, tieBreaker); + } + + /** {@inheritDoc} */ + @Override + public void startGcMutation( + @Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) { + this.changeStreamMutationBuilder = + ChangeStreamMutation.createGcMutation(rowKey, commitTimestamp, tieBreaker); + } + + /** {@inheritDoc} */ + @Override + public void deleteFamily(@Nonnull String familyName) { + this.changeStreamMutationBuilder.deleteFamily(familyName); + } + + /** {@inheritDoc} */ + @Override + public void deleteCells( + @Nonnull String familyName, + @Nonnull ByteString qualifier, + @Nonnull TimestampRange timestampRange) { + this.changeStreamMutationBuilder.deleteCells(familyName, qualifier, timestampRange); + } + + /** {@inheritDoc} */ + @Override + public void startCell(String family, ByteString qualifier, long timestampMicros) { + this.family = family; + this.qualifier = qualifier; + this.timestampMicros = timestampMicros; + this.value = ByteString.EMPTY; + } + + /** {@inheritDoc} */ + @Override + public void cellValue(ByteString value) { + this.value = this.value.concat(value); + } + + /** {@inheritDoc} */ + @Override + public void finishCell() { + this.changeStreamMutationBuilder.setCell( + this.family, this.qualifier, this.timestampMicros, this.value); + } + + /** {@inheritDoc} */ + @Override + public ChangeStreamRecord finishChangeStreamMutation( + @Nonnull String token, @Nonnull Timestamp lowWatermark) { + this.changeStreamMutationBuilder.setToken(token); + this.changeStreamMutationBuilder.setLowWatermark(lowWatermark); + return this.changeStreamMutationBuilder.build(); + } + + /** {@inheritDoc} */ + @Override + public void reset() { + changeStreamMutationBuilder = null; + + family = null; + qualifier = null; + timestampMicros = 0; + value = null; + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java index db82657e49..f2371c8507 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java @@ -26,7 +26,7 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608504L; - public static Heartbeat create( + private static Heartbeat create( ChangeStreamContinuationToken changeStreamContinuationToken, Timestamp lowWatermark) { return new AutoValue_Heartbeat(changeStreamContinuationToken, lowWatermark); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java new file mode 100644 index 0000000000..7ab7fa2b7b --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java @@ -0,0 +1,582 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.changestream; + +import com.google.bigtable.v2.Mutation; +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter.ChangeStreamRecordBuilder; +import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; +import com.google.common.base.Preconditions; + +/** + * A state machine to produce change stream records from a stream of {@link + * ReadChangeStreamResponse}. A change stream record can be a Heartbeat, a CloseStream or a + * ChangeStreamMutation. + * + *

There could be two types of chunking for a ChangeStreamMutation: + * + *

    + *
  • Non-SetCell chunking. For example, a ChangeStreamMutation has two mods, DeleteFamily and + * DeleteColumn. DeleteFamily is sent in the first {@link ReadChangeStreamResponse} and + * DeleteColumn is sent in the second {@link ReadChangeStreamResponse}. + *
  • {@link ReadChangeStreamResponse.MutationChunk} has a chunked {@link + * com.google.bigtable.v2.Mutation.SetCell} mutation. For example, a logical mutation has one + * big {@link Mutation.SetCell} mutation which is chunked into two {@link + * ReadChangeStreamResponse}s. The first {@link ReadChangeStreamResponse.DataChange} has the + * first half of the cell value, and the second {@link ReadChangeStreamResponse.DataChange} + * has the second half. + *
+ * + * This state machine handles both types of chunking. + * + *

Building of the actual change stream record object is delegated to a {@link + * ChangeStreamRecordBuilder}. This class is not thread safe. + * + *

The inputs are: + * + *

    + *
  • {@link ReadChangeStreamResponse.Heartbeat}s. + *
  • {@link ReadChangeStreamResponse.CloseStream}s. + *
  • {@link ReadChangeStreamResponse.DataChange}s, that must be merged to a + * ChangeStreamMutation. + *
  • ChangeStreamRecord consumption events that reset the state machine for the next change + * stream record. + *
+ * + *

The outputs are: + * + *

    + *
  • Heartbeat records. + *
  • CloseStream records. + *
  • ChangeStreamMutation records. + *
+ * + *

Expected Usage: + * + *

{@code
+ * ChangeStreamStateMachine changeStreamStateMachine = new ChangeStreamStateMachine<>(myChangeStreamRecordAdapter);
+ * while(responseIterator.hasNext()) {
+ *   ReadChangeStreamResponse response = responseIterator.next();
+ *   if (response.hasHeartbeat()) {
+ *     changeStreamStateMachine.handleHeartbeat(response.getHeartbeat());
+ *   } else if (response.hasCloseStream()) {
+ *     changeStreamStateMachine.handleCloseStream(response.getCloseStream());
+ *   } else {
+ *       changeStreamStateMachine.handleDataChange(response.getDataChange());
+ *   }
+ *   if (changeStreamStateMachine.hasCompleteChangeStreamRecord()) {
+ *       MyChangeStreamRecord = changeStreamStateMachine.consumeChangeStreamRecord();
+ *       // do something with the change stream record.
+ *   }
+ * }
+ * }
+ * + *

Package-private for internal use. + * + * @param The type of row the adapter will build + */ +final class ChangeStreamStateMachine { + private final ChangeStreamRecordBuilder builder; + private State currentState; + // debug stats + private int numHeartbeats = 0; + private int numCloseStreams = 0; + private int numDataChanges = 0; + private int numNonCellMods = 0; + private int numCellChunks = 0; // 1 for non-chunked cell. + private int actualTotalSizeOfChunkedSetCell = 0; + private ChangeStreamRecordT completeChangeStreamRecord; + + /** + * Initialize a new state machine that's ready for a new change stream record. + * + * @param builder The builder that will build the final change stream record. + */ + ChangeStreamStateMachine(ChangeStreamRecordBuilder builder) { + this.builder = builder; + reset(); + } + + /** + * Handle heartbeat events from the server. + * + *

+ *
Valid states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD} + *
Resulting states: + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME} + *
+ */ + void handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + try { + numHeartbeats++; + currentState = currentState.handleHeartbeat(heartbeat); + } catch (RuntimeException e) { + currentState = null; + throw e; + } + } + + /** + * Handle CloseStream events from the server. + * + *
+ *
Valid states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD} + *
Resulting states: + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME} + *
+ */ + void handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + try { + numCloseStreams++; + currentState = currentState.handleCloseStream(closeStream); + } catch (RuntimeException e) { + currentState = null; + throw e; + } + } + + /** + * Feeds a new dataChange into the state machine. If the dataChange is invalid, the state machine + * will throw an exception and should not be used for further input. + * + *
+ *
Valid states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD} + *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD} + *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE} + *
Resulting states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD} + *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE} + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME} + *
+ * + * @param dataChange The new chunk to process. + * @throws ChangeStreamStateMachine.InvalidInputException When the chunk is not applicable to the + * current state. + * @throws IllegalStateException When the internal state is inconsistent + */ + void handleDataChange(ReadChangeStreamResponse.DataChange dataChange) { + try { + numDataChanges++; + currentState = currentState.handleMod(dataChange, 0); + } catch (RuntimeException e) { + currentState = null; + throw e; + } + } + + /** + * Returns the completed change stream record and transitions to {@link + * ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD}. + * + * @return The completed change stream record. + * @throws IllegalStateException If the last dataChange did not complete a change stream record. + */ + ChangeStreamRecordT consumeChangeStreamRecord() { + Preconditions.checkState( + completeChangeStreamRecord != null, "No change stream record to consume."); + Preconditions.checkState( + currentState == AWAITING_STREAM_RECORD_CONSUME, + "Change stream record is not ready to consume: " + currentState); + ChangeStreamRecordT changeStreamRecord = completeChangeStreamRecord; + reset(); + return changeStreamRecord; + } + + /** Checks if there is a complete change stream record to be consumed. */ + boolean hasCompleteChangeStreamRecord() { + return completeChangeStreamRecord != null && currentState == AWAITING_STREAM_RECORD_CONSUME; + } + /** + * Checks if the state machine is in the middle of processing a change stream record. + * + * @return True If there is a change stream record in progress. + */ + boolean isChangeStreamRecordInProgress() { + return currentState != AWAITING_NEW_STREAM_RECORD; + } + + private void reset() { + currentState = AWAITING_NEW_STREAM_RECORD; + numHeartbeats = 0; + numCloseStreams = 0; + numDataChanges = 0; + numNonCellMods = 0; + numCellChunks = 0; + actualTotalSizeOfChunkedSetCell = 0; + completeChangeStreamRecord = null; + + builder.reset(); + } + + /** + * Base class for all the state machine's internal states. + * + *

Each state can consume 3 events: Heartbeat, CloseStream and a Mod. By default, the default + * implementation will just throw an IllegalStateException unless the subclass adds explicit + * handling for these events. + */ + abstract static class State { + /** + * Accepts a Heartbeat by the server. And completes the current change stream record. + * + * @throws IllegalStateException If the subclass can't handle heartbeat events. + */ + ChangeStreamStateMachine.State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + throw new IllegalStateException(); + } + + /** + * Accepts a CloseStream by the server. And completes the current change stream record. + * + * @throws IllegalStateException If the subclass can't handle CloseStream events. + */ + ChangeStreamStateMachine.State handleCloseStream( + ReadChangeStreamResponse.CloseStream closeStream) { + throw new IllegalStateException(); + } + + /** + * Accepts a new mod and transitions to the next state. A mod could be a DeleteFamily, a + * DeleteColumn, or a SetCell. + * + * @param dataChange The DataChange that holds the new mod to process. + * @param index The index of the mod in the DataChange. + * @return The next state. + * @throws IllegalStateException If the subclass can't handle the mod. + * @throws ChangeStreamStateMachine.InvalidInputException If the subclass determines that this + * dataChange is invalid. + */ + ChangeStreamStateMachine.State handleMod( + ReadChangeStreamResponse.DataChange dataChange, int index) { + throw new IllegalStateException(); + } + } + + /** + * The default state when the state machine is awaiting a ReadChangeStream response to start a new + * change stream record. It will notify the builder of the new change stream record and transits + * to one of the following states: + * + *

+ *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}, in case of a Heartbeat + * or a CloseStream. + *
Same as {@link ChangeStreamStateMachine#AWAITING_NEW_MOD}, depending on the DataChange. + *
+ */ + private final State AWAITING_NEW_STREAM_RECORD = + new State() { + @Override + State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + validate( + completeChangeStreamRecord == null, + "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet."); + completeChangeStreamRecord = builder.onHeartbeat(heartbeat); + return AWAITING_STREAM_RECORD_CONSUME; + } + + @Override + State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + validate( + completeChangeStreamRecord == null, + "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet."); + completeChangeStreamRecord = builder.onCloseStream(closeStream); + return AWAITING_STREAM_RECORD_CONSUME; + } + + @Override + State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + validate( + completeChangeStreamRecord == null, + "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet."); + validate( + !dataChange.getRowKey().isEmpty(), + "AWAITING_NEW_STREAM_RECORD: First data change missing rowKey."); + validate( + dataChange.hasCommitTimestamp(), + "AWAITING_NEW_STREAM_RECORD: First data change missing commit timestamp."); + validate( + index == 0, + "AWAITING_NEW_STREAM_RECORD: First data change should start with the first mod."); + validate( + dataChange.getChunksCount() > 0, + "AWAITING_NEW_STREAM_RECORD: First data change missing mods."); + if (dataChange.getType() == Type.GARBAGE_COLLECTION) { + validate( + dataChange.getSourceClusterId().isEmpty(), + "AWAITING_NEW_STREAM_RECORD: GC mutation shouldn't have source cluster id."); + builder.startGcMutation( + dataChange.getRowKey(), + dataChange.getCommitTimestamp(), + dataChange.getTiebreaker()); + } else if (dataChange.getType() == Type.USER) { + validate( + !dataChange.getSourceClusterId().isEmpty(), + "AWAITING_NEW_STREAM_RECORD: User initiated data change missing source cluster id."); + builder.startUserMutation( + dataChange.getRowKey(), + dataChange.getSourceClusterId(), + dataChange.getCommitTimestamp(), + dataChange.getTiebreaker()); + } else { + validate(false, "AWAITING_NEW_STREAM_RECORD: Unexpected type: " + dataChange.getType()); + } + return AWAITING_NEW_MOD.handleMod(dataChange, index); + } + }; + + /** + * A state to handle the next Mod. + * + *
+ *
Valid exit states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD}. Current mod is added, and we have more + * mods to expect. + *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE}. Current mod is the first chunk of a + * chunked SetCell. + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. Current mod is the last + * mod of the current logical mutation. + *
+ */ + private final State AWAITING_NEW_MOD = + new State() { + @Override + State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + throw new IllegalStateException( + "AWAITING_NEW_MOD: Can't handle a Heartbeat in the middle of building a ChangeStreamMutation."); + } + + @Override + State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + throw new IllegalStateException( + "AWAITING_NEW_MOD: Can't handle a CloseStream in the middle of building a ChangeStreamMutation."); + } + + @Override + State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + validate( + 0 <= index && index <= dataChange.getChunksCount() - 1, + "AWAITING_NEW_MOD: Index out of bound."); + ReadChangeStreamResponse.MutationChunk chunk = dataChange.getChunks(index); + Mutation mod = chunk.getMutation(); + // Case 1: SetCell + if (mod.hasSetCell()) { + // Start the Cell and delegate to AWAITING_CELL_VALUE to add the cell value. + Mutation.SetCell setCell = chunk.getMutation().getSetCell(); + if (chunk.hasChunkInfo()) { + // If it has chunk info, it must be the first chunk of a chunked SetCell. + validate( + chunk.getChunkInfo().getChunkedValueOffset() == 0, + "First chunk of a chunked cell must start with offset==0."); + actualTotalSizeOfChunkedSetCell = 0; + } + builder.startCell( + setCell.getFamilyName(), + setCell.getColumnQualifier(), + setCell.getTimestampMicros()); + return AWAITING_CELL_VALUE.handleMod(dataChange, index); + } + // Case 2: DeleteFamily + if (mod.hasDeleteFromFamily()) { + numNonCellMods++; + builder.deleteFamily(mod.getDeleteFromFamily().getFamilyName()); + return checkAndFinishMutationIfNeeded(dataChange, index + 1); + } + // Case 3: DeleteCell + if (mod.hasDeleteFromColumn()) { + numNonCellMods++; + builder.deleteCells( + mod.getDeleteFromColumn().getFamilyName(), + mod.getDeleteFromColumn().getColumnQualifier(), + TimestampRange.create( + mod.getDeleteFromColumn().getTimeRange().getStartTimestampMicros(), + mod.getDeleteFromColumn().getTimeRange().getEndTimestampMicros())); + return checkAndFinishMutationIfNeeded(dataChange, index + 1); + } + throw new IllegalStateException("AWAITING_NEW_MOD: Unexpected mod type"); + } + }; + + /** + * A state that represents a cell's value continuation. + * + *
+ *
Valid exit states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD}. Current chunked SetCell is added, and + * we have more mods to expect. + *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE}. Current chunked SetCell has more + * cell values to expect. + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. Current chunked SetCell + * is the last mod of the current logical mutation. + *
+ */ + private final State AWAITING_CELL_VALUE = + new State() { + @Override + State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + throw new IllegalStateException( + "AWAITING_CELL_VALUE: Can't handle a Heartbeat in the middle of building a SetCell."); + } + + @Override + State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + throw new IllegalStateException( + "AWAITING_CELL_VALUE: Can't handle a CloseStream in the middle of building a SetCell."); + } + + @Override + State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + validate( + 0 <= index && index <= dataChange.getChunksCount() - 1, + "AWAITING_CELL_VALUE: Index out of bound."); + ReadChangeStreamResponse.MutationChunk chunk = dataChange.getChunks(index); + validate( + chunk.getMutation().hasSetCell(), + "AWAITING_CELL_VALUE: Current mod is not a SetCell."); + Mutation.SetCell setCell = chunk.getMutation().getSetCell(); + numCellChunks++; + builder.cellValue(setCell.getValue()); + // Case 1: Current SetCell is chunked. For example: [ReadChangeStreamResponse1: + // {DeleteColumn, DeleteFamily, SetCell_1}, ReadChangeStreamResponse2: {SetCell_2, + // DeleteFamily}]. + if (chunk.hasChunkInfo()) { + validate( + chunk.getChunkInfo().getChunkedValueSize() > 0, + "AWAITING_CELL_VALUE: Chunked value size must be positive."); + actualTotalSizeOfChunkedSetCell += setCell.getValue().size(); + // If it's the last chunk of the chunked SetCell, finish the cell. + if (chunk.getChunkInfo().getLastChunk()) { + builder.finishCell(); + validate( + actualTotalSizeOfChunkedSetCell == chunk.getChunkInfo().getChunkedValueSize(), + "Chunked value size in ChunkInfo doesn't match the actual total size. " + + "ChunkInfo: " + + chunk.getChunkInfo().getChunkedValueSize() + + "; actual total size: " + + actualTotalSizeOfChunkedSetCell); + return checkAndFinishMutationIfNeeded(dataChange, index + 1); + } else { + // If this is not the last chunk of a chunked SetCell, then this must be the last mod + // of the current response, and we're expecting the rest of the chunked cells in the + // following ReadChangeStream response. + validate( + index == dataChange.getChunksCount() - 1, + "AWAITING_CELL_VALUE: Current mod is a chunked SetCell " + + "but not the last chunk, but it's not the last mod of the current response."); + return AWAITING_CELL_VALUE; + } + } + // Case 2: Current SetCell is not chunked. + builder.finishCell(); + return checkAndFinishMutationIfNeeded(dataChange, index + 1); + } + }; + + /** + * A state that represents a completed change stream record. It prevents new change stream records + * from being read until the current one has been consumed. The caller is supposed to consume the + * change stream record by calling {@link ChangeStreamStateMachine#consumeChangeStreamRecord()} + * which will reset the state to {@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD}. + */ + private final State AWAITING_STREAM_RECORD_CONSUME = + new State() { + @Override + State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + throw new IllegalStateException( + "AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record."); + } + + @Override + State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + throw new IllegalStateException( + "AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record."); + } + + @Override + State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + throw new IllegalStateException( + "AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record."); + } + }; + + /** + * Check if we should continue handling mods in the current DataChange or wrap up. There are 3 + * cases: + * + *
    + *
  • 1) index < dataChange.getChunksCount() -> continue to handle the next mod. + *
  • 2_1) index == dataChange.getChunksCount() && dataChange.done == true -> current change + * stream mutation is complete. Wrap it up and return {@link + * ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. + *
  • 2_2) index == dataChange.getChunksCount() && dataChange.done != true -> current change + * stream mutation isn't complete. Return {@link ChangeStreamStateMachine#AWAITING_NEW_MOD} + * to wait for more mods in the next ReadChangeStreamResponse. + *
+ */ + private State checkAndFinishMutationIfNeeded( + ReadChangeStreamResponse.DataChange dataChange, int index) { + validate( + 0 <= index && index <= dataChange.getChunksCount(), + "checkAndFinishMutationIfNeeded: index out of bound."); + // Case 1): Handle the next mod. + if (index < dataChange.getChunksCount()) { + return AWAITING_NEW_MOD.handleMod(dataChange, index); + } + // If we reach here, it means that all the mods in this DataChange have been handled. We should + // finish up the logical mutation or wait for more mods in the next ReadChangeStreamResponse, + // depending on whether the current response is the last response for the logical mutation. + if (dataChange.getDone()) { + // Case 2_1): Current change stream mutation is complete. + validate(!dataChange.getToken().isEmpty(), "Last data change missing token"); + validate(dataChange.hasLowWatermark(), "Last data change missing lowWatermark"); + completeChangeStreamRecord = + builder.finishChangeStreamMutation(dataChange.getToken(), dataChange.getLowWatermark()); + return AWAITING_STREAM_RECORD_CONSUME; + } + // Case 2_2): The current DataChange itself is chunked, so wait for the next + // ReadChangeStreamResponse. Note that we should wait for the new mods instead + // of for the new change stream record since the current record hasn't finished yet. + return AWAITING_NEW_MOD; + } + + private void validate(boolean condition, String message) { + if (!condition) { + throw new ChangeStreamStateMachine.InvalidInputException( + message + + ". numHeartbeats: " + + numHeartbeats + + ", numCloseStreams: " + + numCloseStreams + + ", numDataChanges: " + + numDataChanges + + ", numNonCellMods: " + + numNonCellMods + + ", numCellChunks: " + + numCellChunks + + ", actualTotalSizeOfChunkedSetCell: " + + actualTotalSizeOfChunkedSetCell); + } + } + + static class InvalidInputException extends RuntimeException { + InvalidInputException(String message) { + super(message); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java index 938213fb36..a14fe001cd 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java @@ -87,15 +87,7 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti oos.close(); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); ChangeStreamMutation actual = (ChangeStreamMutation) ois.readObject(); - Assert.assertEquals(actual.getRowKey(), changeStreamMutation.getRowKey()); - Assert.assertEquals(actual.getType(), changeStreamMutation.getType()); - Assert.assertEquals(actual.getSourceClusterId(), changeStreamMutation.getSourceClusterId()); - Assert.assertEquals(actual.getCommitTimestamp(), changeStreamMutation.getCommitTimestamp()); - Assert.assertEquals(actual.getTieBreaker(), changeStreamMutation.getTieBreaker()); - Assert.assertEquals(actual.getToken(), changeStreamMutation.getToken()); - Assert.assertEquals(actual.getLowWatermark(), changeStreamMutation.getLowWatermark()); - assertThat(actual.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT)) - .isEqualTo(changeStreamMutation.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT)); + Assert.assertEquals(actual, changeStreamMutation); } @Test @@ -138,15 +130,7 @@ public void gcMutationTest() throws IOException, ClassNotFoundException { oos.close(); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); ChangeStreamMutation actual = (ChangeStreamMutation) ois.readObject(); - Assert.assertEquals(actual.getRowKey(), changeStreamMutation.getRowKey()); - Assert.assertEquals(actual.getType(), changeStreamMutation.getType()); - Assert.assertEquals(actual.getSourceClusterId(), changeStreamMutation.getSourceClusterId()); - Assert.assertEquals(actual.getCommitTimestamp(), changeStreamMutation.getCommitTimestamp()); - Assert.assertEquals(actual.getTieBreaker(), changeStreamMutation.getTieBreaker()); - Assert.assertEquals(actual.getToken(), changeStreamMutation.getToken()); - Assert.assertEquals(actual.getLowWatermark(), changeStreamMutation.getLowWatermark()); - assertThat(actual.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT)) - .isEqualTo(changeStreamMutation.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT)); + Assert.assertEquals(actual, changeStreamMutation); } @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java new file mode 100644 index 0000000000..e29b914ffc --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java @@ -0,0 +1,446 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bigtable.v2.Mutation; +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.TimestampRange; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter.ChangeStreamRecordBuilder; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.rpc.Status; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DefaultChangeStreamRecordAdapterTest { + + private final DefaultChangeStreamRecordAdapter adapter = new DefaultChangeStreamRecordAdapter(); + private ChangeStreamRecordBuilder changeStreamRecordBuilder; + + @Rule public ExpectedException expect = ExpectedException.none(); + + @Before + public void setUp() { + changeStreamRecordBuilder = adapter.createChangeStreamRecordBuilder(); + } + + @Test + public void isHeartbeatTest() { + ChangeStreamRecord heartbeatRecord = + Heartbeat.fromProto(ReadChangeStreamResponse.Heartbeat.getDefaultInstance()); + ChangeStreamRecord closeStreamRecord = + CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + ChangeStreamRecord changeStreamMutationRecord = + ChangeStreamMutation.createGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0) + .setToken("token") + .setLowWatermark(Timestamp.getDefaultInstance()) + .build(); + Assert.assertTrue(adapter.isHeartbeat(heartbeatRecord)); + Assert.assertFalse(adapter.isHeartbeat(closeStreamRecord)); + Assert.assertFalse(adapter.isHeartbeat(changeStreamMutationRecord)); + } + + @Test + public void getTokenFromHeartbeatTest() { + ChangeStreamRecord heartbeatRecord = + Heartbeat.fromProto( + ReadChangeStreamResponse.Heartbeat.newBuilder() + .setLowWatermark(Timestamp.newBuilder().setSeconds(1000).build()) + .setContinuationToken( + StreamContinuationToken.newBuilder().setToken("heartbeat-token").build()) + .build()); + Assert.assertEquals(adapter.getTokenFromHeartbeat(heartbeatRecord), "heartbeat-token"); + } + + @Test(expected = IllegalArgumentException.class) + public void getTokenFromHeartbeatInvalidTypeTest() { + ChangeStreamRecord closeStreamRecord = + CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + adapter.getTokenFromHeartbeat(closeStreamRecord); + expect.expectMessage("record is not a Heartbeat."); + } + + @Test + public void isChangeStreamMutationTest() { + ChangeStreamRecord heartbeatRecord = + Heartbeat.fromProto(ReadChangeStreamResponse.Heartbeat.getDefaultInstance()); + ChangeStreamRecord closeStreamRecord = + CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + ChangeStreamRecord changeStreamMutationRecord = + ChangeStreamMutation.createGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0) + .setToken("token") + .setLowWatermark(Timestamp.getDefaultInstance()) + .build(); + Assert.assertFalse(adapter.isChangeStreamMutation(heartbeatRecord)); + Assert.assertFalse(adapter.isChangeStreamMutation(closeStreamRecord)); + Assert.assertTrue(adapter.isChangeStreamMutation(changeStreamMutationRecord)); + } + + @Test + public void getTokenFromChangeStreamMutationTest() { + ChangeStreamRecord changeStreamMutationRecord = + ChangeStreamMutation.createGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0) + .setToken("change-stream-mutation-token") + .setLowWatermark(Timestamp.getDefaultInstance()) + .build(); + Assert.assertEquals( + adapter.getTokenFromChangeStreamMutation(changeStreamMutationRecord), + "change-stream-mutation-token"); + } + + @Test(expected = IllegalArgumentException.class) + public void getTokenFromChangeStreamMutationInvalidTypeTest() { + ChangeStreamRecord closeStreamRecord = + CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + adapter.getTokenFromChangeStreamMutation(closeStreamRecord); + expect.expectMessage("record is not a ChangeStreamMutation."); + } + + @Test + public void heartbeatTest() { + ReadChangeStreamResponse.Heartbeat expectedHeartbeat = + ReadChangeStreamResponse.Heartbeat.newBuilder() + .setLowWatermark(Timestamp.newBuilder().setSeconds(1000).build()) + .setContinuationToken( + StreamContinuationToken.newBuilder().setToken("random-token").build()) + .build(); + assertThat(changeStreamRecordBuilder.onHeartbeat(expectedHeartbeat)) + .isEqualTo(Heartbeat.fromProto(expectedHeartbeat)); + // Call again. + assertThat(changeStreamRecordBuilder.onHeartbeat(expectedHeartbeat)) + .isEqualTo(Heartbeat.fromProto(expectedHeartbeat)); + } + + @Test + public void closeStreamTest() { + ReadChangeStreamResponse.CloseStream expectedCloseStream = + ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens( + StreamContinuationToken.newBuilder().setToken("random-token").build()) + .setStatus(Status.newBuilder().setCode(0).build()) + .build(); + assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream)) + .isEqualTo(CloseStream.fromProto(expectedCloseStream)); + // Call again. + assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream)) + .isEqualTo(CloseStream.fromProto(expectedCloseStream)); + } + + @Test(expected = IllegalArgumentException.class) + public void createHeartbeatWithExistingMutationShouldFailTest() { + changeStreamRecordBuilder.startGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0); + changeStreamRecordBuilder.onHeartbeat(ReadChangeStreamResponse.Heartbeat.getDefaultInstance()); + } + + @Test(expected = IllegalArgumentException.class) + public void createCloseStreamWithExistingMutationShouldFailTest() { + changeStreamRecordBuilder.startGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0); + changeStreamRecordBuilder.onCloseStream( + ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + } + + @Test + public void singleDeleteFamilyTest() { + // Suppose this is the mod we get from the ReadChangeStreamResponse. + Mutation.DeleteFromFamily deleteFromFamily = + Mutation.DeleteFromFamily.newBuilder().setFamilyName("fake-family").build(); + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + + // Expected logical mutation in the change stream record. + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .deleteFamily("fake-family") + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.deleteFamily(deleteFromFamily.getFamilyName()); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } + + @Test + public void singleDeleteCellTest() { + // Suppose this is the mod we get from the ReadChangeStreamResponse. + Mutation.DeleteFromColumn deleteFromColumn = + Mutation.DeleteFromColumn.newBuilder() + .setFamilyName("fake-family") + .setColumnQualifier(ByteString.copyFromUtf8("fake-qualifier")) + .setTimeRange( + TimestampRange.newBuilder() + .setStartTimestampMicros(1000L) + .setEndTimestampMicros(2000L) + .build()) + .build(); + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + + // Expected logical mutation in the change stream record. + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .deleteCells( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + Range.TimestampRange.create(1000L, 2000L)) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.deleteCells( + deleteFromColumn.getFamilyName(), + deleteFromColumn.getColumnQualifier(), + Range.TimestampRange.create( + deleteFromColumn.getTimeRange().getStartTimestampMicros(), + deleteFromColumn.getTimeRange().getEndTimestampMicros())); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } + + @Test + public void singleNonChunkedCellTest() { + // Expected logical mutation in the change stream record. + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("fake-value")) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. + // Suppose the SetCell is not chunked and the state machine calls `cellValue()` once. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("fake-value")); + changeStreamRecordBuilder.finishCell(); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } + + @Test + public void singleChunkedCellTest() { + // Expected logical mutation in the change stream record. + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("fake-value1-value2")) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. + // Suppose the SetCell is chunked into two pieces and the state machine calls `cellValue()` + // twice. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("fake-value1")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value2")); + changeStreamRecordBuilder.finishCell(); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } + + @Test + public void multipleChunkedCellsTest() { + // Expected logical mutation in the change stream record. + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation.Builder expectedChangeStreamMutationBuilder = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + for (int i = 0; i < 10; ++i) { + expectedChangeStreamMutationBuilder.setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8(i + "-fake-value1-value2-value3")); + } + expectedChangeStreamMutationBuilder.setToken("fake-token").setLowWatermark(fakeLowWatermark); + + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + for (int i = 0; i < 10; ++i) { + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8(i + "-fake-value1")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value2")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value3")); + changeStreamRecordBuilder.finishCell(); + } + // Check that they're the same. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutationBuilder.build()); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutationBuilder.build()); + } + + @Test + public void multipleDifferentModsTest() { + // Expected logical mutation in the change stream record, which contains one DeleteFromFamily, + // one non-chunked cell, and one chunked cell. + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation.Builder expectedChangeStreamMutationBuilder = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .deleteFamily("fake-family") + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("non-chunked-value")) + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("chunked-value")) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark); + + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.deleteFamily("fake-family"); + // Add non-chunked cell. + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("non-chunked-value")); + changeStreamRecordBuilder.finishCell(); + // Add chunked cell. + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("chunked")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value")); + changeStreamRecordBuilder.finishCell(); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutationBuilder.build()); + } + + @Test + public void resetTest() { + // Build a Heartbeat. + ReadChangeStreamResponse.Heartbeat expectedHeartbeat = + ReadChangeStreamResponse.Heartbeat.getDefaultInstance(); + assertThat(changeStreamRecordBuilder.onHeartbeat(expectedHeartbeat)) + .isEqualTo(Heartbeat.fromProto(expectedHeartbeat)); + + // Reset and build a CloseStream. + changeStreamRecordBuilder.reset(); + ReadChangeStreamResponse.CloseStream expectedCloseStream = + ReadChangeStreamResponse.CloseStream.getDefaultInstance(); + assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream)) + .isEqualTo(CloseStream.fromProto(expectedCloseStream)); + + // Reset and build a DeleteFamily. + changeStreamRecordBuilder.reset(); + Mutation deleteFromFamily = + Mutation.newBuilder() + .setDeleteFromFamily( + Mutation.DeleteFromFamily.newBuilder().setFamilyName("fake-family").build()) + .build(); + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .deleteFamily("fake-family") + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.deleteFamily(deleteFromFamily.getDeleteFromFamily().getFamilyName()); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + + // Reset a build a cell. + changeStreamRecordBuilder.reset(); + expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("fake-value1-value2")) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("fake-value1")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value2")); + changeStreamRecordBuilder.finishCell(); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java new file mode 100644 index 0000000000..d86df91c35 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.changestream; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; +import com.google.cloud.bigtable.data.v2.models.DefaultChangeStreamRecordAdapter; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ChangeStreamStateMachineTest { + ChangeStreamStateMachine changeStreamStateMachine; + + @Before + public void setUp() throws Exception { + changeStreamStateMachine = + new ChangeStreamStateMachine<>( + new DefaultChangeStreamRecordAdapter().createChangeStreamRecordBuilder()); + } + + @Test + public void testErrorHandlingStats() { + ReadChangeStreamResponse.DataChange dataChange = + ReadChangeStreamResponse.DataChange.newBuilder().build(); + + ChangeStreamStateMachine.InvalidInputException actualError = null; + try { + changeStreamStateMachine.handleDataChange(dataChange); + } catch (ChangeStreamStateMachine.InvalidInputException e) { + actualError = e; + } + + assertThat(actualError) + .hasMessageThat() + .containsMatch("AWAITING_NEW_STREAM_RECORD: First data change missing rowKey"); + assertThat(actualError).hasMessageThat().contains("numHeartbeats: 0"); + assertThat(actualError).hasMessageThat().contains("numCloseStreams: 0"); + assertThat(actualError).hasMessageThat().contains("numDataChanges: 1"); + assertThat(actualError).hasMessageThat().contains("numNonCellMods: 0"); + assertThat(actualError).hasMessageThat().contains("numCellChunks: 0"); + assertThat(actualError).hasMessageThat().contains("actualTotalSizeOfChunkedSetCell: 0"); + } +}