Skip to content

Commit

Permalink
feat: Add ChangeStreamRecordAdapter and ChangeStreamStateMachine (#1334)
Browse files Browse the repository at this point in the history
* Add ChangeStreamRecordAdapter and ChangeStreamStateMachine

These will be used later for ChangeStreamMergingCallable.

* fix: Fix styles and add some tests.

* fix: Address comments

* fix: Update comments

Co-authored-by: Teng Zhong <tengzhong@google.com>
  • Loading branch information
tengzhonger and Teng Zhong authored Aug 3, 2022
1 parent 53dd0f0 commit cb7b455
Show file tree
Hide file tree
Showing 8 changed files with 1,457 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,28 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ChangeStreamMutation otherChangeStreamMutation = (ChangeStreamMutation) o;
return Objects.equal(this.hashCode(), otherChangeStreamMutation.hashCode());
ChangeStreamMutation other = (ChangeStreamMutation) o;
return Objects.equal(this.rowKey, other.rowKey)
&& Objects.equal(this.type, other.type)
&& Objects.equal(this.sourceClusterId, other.sourceClusterId)
&& Objects.equal(this.commitTimestamp, other.commitTimestamp)
&& Objects.equal(this.tieBreaker, other.tieBreaker)
&& Objects.equal(this.token, other.token)
&& Objects.equal(this.lowWatermark, other.lowWatermark)
&& Objects.equal(this.entries.build(), other.entries.build());
}

@Override
public int hashCode() {
return Objects.hashCode(
rowKey, type, sourceClusterId, commitTimestamp, tieBreaker, token, lowWatermark, entries);
rowKey,
type,
sourceClusterId,
commitTimestamp,
tieBreaker,
token,
lowWatermark,
entries.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import javax.annotation.Nonnull;

/**
* An extension point that allows end users to plug in a custom implementation of logical change
* stream records. This is useful in cases where the user would like to apply advanced client side
* filtering(for example, only keep DeleteFamily in the mutations). This adapter acts like a factory
* for a SAX style change stream record builder.
*/
public interface ChangeStreamRecordAdapter<ChangeStreamRecordT> {
/** Creates a new instance of a {@link ChangeStreamRecordBuilder}. */
ChangeStreamRecordBuilder<ChangeStreamRecordT> createChangeStreamRecordBuilder();

/** Checks if the given change stream record is a Heartbeat. */
@InternalApi("Used in Changestream beam pipeline.")
boolean isHeartbeat(ChangeStreamRecordT record);

/**
* Get the token from the given Heartbeat record. If the given record is not a Heartbeat, it will
* throw an Exception.
*/
@InternalApi("Used in Changestream beam pipeline.")
String getTokenFromHeartbeat(ChangeStreamRecordT heartbeatRecord);

/** Checks if the given change stream record is a ChangeStreamMutation. */
@InternalApi("Used in Changestream beam pipeline.")
boolean isChangeStreamMutation(ChangeStreamRecordT record);

/**
* Get the token from the given ChangeStreamMutation record. If the given record is not a
* ChangeStreamMutation, it will throw an Exception.
*/
@InternalApi("Used in Changestream beam pipeline.")
String getTokenFromChangeStreamMutation(ChangeStreamRecordT record);

/**
* A SAX style change stream record factory. It is responsible for creating one of the three types
* of change stream record: heartbeat, close stream, and a change stream mutation.
*
* <p>State management is handled external to the implementation of this class:
*
* <ol>
* Case 1: Heartbeat
* <li>Exactly 1 {@code onHeartbeat}.
* </ol>
*
* <ol>
* Case 2: CloseStream
* <li>Exactly 1 {@code onCloseStream}.
* </ol>
*
* <ol>
* Case 3: ChangeStreamMutation. A change stream mutation consists of one or more mods, where
* the SetCells might be chunked. There are 3 different types of mods that a ReadChangeStream
* response can have:
* <li>DeleteFamily -> Exactly 1 {@code deleteFamily}
* <li>DeleteCell -> Exactly 1 {@code deleteCell}
* <li>SetCell -> Exactly 1 {@code startCell}, At least 1 {@code CellValue}, Exactly 1 {@code
* finishCell}.
* </ol>
*
* <p>The whole flow of constructing a ChangeStreamMutation is:
*
* <ol>
* <li>Exactly 1 {@code startUserMutation} or {@code startGcMutation}.
* <li>At least 1 DeleteFamily/DeleteCell/SetCell mods.
* <li>Exactly 1 {@code finishChangeStreamMutation}.
* </ol>
*
* <p>Note: For a non-chunked SetCell, only 1 {@code CellValue} will be called. For a chunked
* SetCell, more than 1 {@code CellValue}s will be called.
*
* <p>Note: DeleteRow's won't appear in data changes since they'll be converted to multiple
* DeleteFamily's.
*/
interface ChangeStreamRecordBuilder<ChangeStreamRecordT> {
/**
* Called to create a heartbeat. This will be called at most once. If called, the current change
* stream record must not include any data changes or close stream messages.
*/
ChangeStreamRecordT onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat);

/**
* Called to create a close stream message. This will be called at most once. If called, the
* current change stream record must not include any data changes or heartbeats.
*/
ChangeStreamRecordT onCloseStream(ReadChangeStreamResponse.CloseStream closeStream);

/**
* Called to start a new user initiated ChangeStreamMutation. This will be called at most once.
* If called, the current change stream record must not include any close stream message or
* heartbeat.
*/
void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
int tieBreaker);

/**
* Called to start a new Garbage Collection ChangeStreamMutation. This will be called at most
* once. If called, the current change stream record must not include any close stream message
* or heartbeat.
*/
void startGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker);

/** Called to add a DeleteFamily mod. */
void deleteFamily(@Nonnull String familyName);

/** Called to add a DeleteCell mod. */
void deleteCells(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange);

/**
* Called to start a SetCell.
*
* <ol>
* In case of a non-chunked cell, the following order is guaranteed:
* <li>Exactly 1 {@code startCell}.
* <li>Exactly 1 {@code cellValue}.
* <li>Exactly 1 {@code finishCell}.
* </ol>
*
* <ol>
* In case of a chunked cell, the following order is guaranteed:
* <li>Exactly 1 {@code startCell}.
* <li>At least 2 {@code cellValue}.
* <li>Exactly 1 {@code finishCell}.
* </ol>
*/
void startCell(String family, ByteString qualifier, long timestampMicros);

/**
* Called once per non-chunked cell, or at least twice per chunked cell to concatenate the cell
* value.
*/
void cellValue(ByteString value);

/** Called once per cell to signal the end of the value (unless reset). */
void finishCell();

/** Called once per stream record to signal that all mods have been processed (unless reset). */
ChangeStreamRecordT finishChangeStreamMutation(
@Nonnull String token, @Nonnull Timestamp lowWatermark);

/** Called when the current in progress change stream record should be dropped */
void reset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import javax.annotation.Nonnull;

/**
* Default implementation of a {@link ChangeStreamRecordAdapter} that uses {@link
* ChangeStreamRecord}s to represent change stream records.
*/
public class DefaultChangeStreamRecordAdapter
implements ChangeStreamRecordAdapter<ChangeStreamRecord> {

/** {@inheritDoc} */
@Override
public ChangeStreamRecordBuilder<ChangeStreamRecord> createChangeStreamRecordBuilder() {
return new DefaultChangeStreamRecordBuilder();
}

/** {@inheritDoc} */
@Override
public boolean isHeartbeat(ChangeStreamRecord record) {
return record instanceof Heartbeat;
}

/** {@inheritDoc} */
@Override
public String getTokenFromHeartbeat(ChangeStreamRecord record) {
Preconditions.checkArgument(isHeartbeat(record), "record is not a Heartbeat.");
return ((Heartbeat) record).getChangeStreamContinuationToken().getToken();
}

/** {@inheritDoc} */
@Override
public boolean isChangeStreamMutation(ChangeStreamRecord record) {
return record instanceof ChangeStreamMutation;
}

/** {@inheritDoc} */
@Override
public String getTokenFromChangeStreamMutation(ChangeStreamRecord record) {
Preconditions.checkArgument(
isChangeStreamMutation(record), "record is not a ChangeStreamMutation.");
return ((ChangeStreamMutation) record).getToken();
}

/** {@inheritDoc} */
static class DefaultChangeStreamRecordBuilder
implements ChangeStreamRecordBuilder<ChangeStreamRecord> {
private ChangeStreamMutation.Builder changeStreamMutationBuilder = null;

// For the current SetCell.
private String family;
private ByteString qualifier;
private long timestampMicros;
private ByteString value;

public DefaultChangeStreamRecordBuilder() {
reset();
}

/** {@inheritDoc} */
@Override
public ChangeStreamRecord onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) {
Preconditions.checkArgument(
this.changeStreamMutationBuilder == null,
"Can not create a Heartbeat when there is an existing ChangeStreamMutation being built.");
return Heartbeat.fromProto(heartbeat);
}

/** {@inheritDoc} */
@Override
public ChangeStreamRecord onCloseStream(ReadChangeStreamResponse.CloseStream closeStream) {
Preconditions.checkArgument(
this.changeStreamMutationBuilder == null,
"Can not create a CloseStream when there is an existing ChangeStreamMutation being built.");
return CloseStream.fromProto(closeStream);
}

/** {@inheritDoc} */
@Override
public void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createUserMutation(
rowKey, sourceClusterId, commitTimestamp, tieBreaker);
}

/** {@inheritDoc} */
@Override
public void startGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createGcMutation(rowKey, commitTimestamp, tieBreaker);
}

/** {@inheritDoc} */
@Override
public void deleteFamily(@Nonnull String familyName) {
this.changeStreamMutationBuilder.deleteFamily(familyName);
}

/** {@inheritDoc} */
@Override
public void deleteCells(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange) {
this.changeStreamMutationBuilder.deleteCells(familyName, qualifier, timestampRange);
}

/** {@inheritDoc} */
@Override
public void startCell(String family, ByteString qualifier, long timestampMicros) {
this.family = family;
this.qualifier = qualifier;
this.timestampMicros = timestampMicros;
this.value = ByteString.EMPTY;
}

/** {@inheritDoc} */
@Override
public void cellValue(ByteString value) {
this.value = this.value.concat(value);
}

/** {@inheritDoc} */
@Override
public void finishCell() {
this.changeStreamMutationBuilder.setCell(
this.family, this.qualifier, this.timestampMicros, this.value);
}

/** {@inheritDoc} */
@Override
public ChangeStreamRecord finishChangeStreamMutation(
@Nonnull String token, @Nonnull Timestamp lowWatermark) {
this.changeStreamMutationBuilder.setToken(token);
this.changeStreamMutationBuilder.setLowWatermark(lowWatermark);
return this.changeStreamMutationBuilder.build();
}

/** {@inheritDoc} */
@Override
public void reset() {
changeStreamMutationBuilder = null;

family = null;
qualifier = null;
timestampMicros = 0;
value = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608504L;

public static Heartbeat create(
private static Heartbeat create(
ChangeStreamContinuationToken changeStreamContinuationToken, Timestamp lowWatermark) {
return new AutoValue_Heartbeat(changeStreamContinuationToken, lowWatermark);
}
Expand Down
Loading

0 comments on commit cb7b455

Please sign in to comment.