diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java
new file mode 100644
index 0000000000..b79b184e7a
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java
@@ -0,0 +1,351 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.models;
+
+import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
+import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nonnull;
+
+/**
+ * A ChangeStreamMutation represents a list of mods(represented by List<{@link Entry}>) targeted at
+ * a single row, which is concatenated by (TODO:ChangeStreamRecordMerger). It represents a logical
+ * row mutation and can be converted to the original write request(i.e. {@link RowMutation} or
+ * {@link RowMutationEntry}.
+ *
+ *
A ChangeStreamMutation can be constructed in two ways, depending on whether it's a user
+ * initiated mutation or a Garbage Collection mutation. Either way, the caller should explicitly set
+ * `token` and `lowWatermark` before build(), otherwise it'll raise an error.
+ *
+ *
Case 1) User initiated mutation.
+ *
+ *
{@code
+ * ChangeStreamMutation.Builder builder = ChangeStreamMutation.createUserMutation(...);
+ * builder.setCell(...);
+ * builder.deleteFamily(...);
+ * builder.deleteCells(...);
+ * ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build();
+ * }
+ *
+ * Case 2) Garbage Collection mutation.
+ *
+ * {@code
+ * ChangeStreamMutation.Builder builder = ChangeStreamMutation.createGcMutation(...);
+ * builder.setCell(...);
+ * builder.deleteFamily(...);
+ * builder.deleteCells(...);
+ * ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build();
+ * }
+ */
+public final class ChangeStreamMutation implements ChangeStreamRecord, Serializable {
+ private static final long serialVersionUID = 8419520253162024218L;
+
+ private final ByteString rowKey;
+
+ /** Possible values: USER/GARBAGE_COLLECTION. */
+ private final Type type;
+
+ /** This should only be set when type==USER. */
+ private final String sourceClusterId;
+
+ private final Timestamp commitTimestamp;
+
+ private final int tieBreaker;
+
+ private transient ImmutableList.Builder entries = ImmutableList.builder();
+
+ private String token;
+
+ private Timestamp lowWatermark;
+
+ private ChangeStreamMutation(Builder builder) {
+ this.rowKey = builder.rowKey;
+ this.type = builder.type;
+ this.sourceClusterId = builder.sourceClusterId;
+ this.commitTimestamp = builder.commitTimestamp;
+ this.tieBreaker = builder.tieBreaker;
+ this.token = builder.token;
+ this.lowWatermark = builder.lowWatermark;
+ this.entries = builder.entries;
+ }
+
+ /**
+ * Creates a new instance of a user initiated mutation. It returns a builder instead of a
+ * ChangeStreamMutation because `token` and `loWatermark` must be set later when we finish
+ * building the logical mutation.
+ */
+ static Builder createUserMutation(
+ @Nonnull ByteString rowKey,
+ @Nonnull String sourceClusterId,
+ @Nonnull Timestamp commitTimestamp,
+ int tieBreaker) {
+ return new Builder(rowKey, Type.USER, sourceClusterId, commitTimestamp, tieBreaker);
+ }
+
+ /**
+ * Creates a new instance of a GC mutation. It returns a builder instead of a ChangeStreamMutation
+ * because `token` and `loWatermark` must be set later when we finish building the logical
+ * mutation.
+ */
+ static Builder createGcMutation(
+ @Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
+ return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
+ }
+
+ private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
+ input.defaultReadObject();
+
+ @SuppressWarnings("unchecked")
+ ImmutableList deserialized = (ImmutableList) input.readObject();
+ this.entries = ImmutableList.builder().addAll(deserialized);
+ }
+
+ private void writeObject(ObjectOutputStream output) throws IOException {
+ output.defaultWriteObject();
+ output.writeObject(entries.build());
+ }
+
+ /** Get the row key of the current mutation. */
+ @Nonnull
+ public ByteString getRowKey() {
+ return this.rowKey;
+ }
+
+ /** Get the type of the current mutation. */
+ @Nonnull
+ public Type getType() {
+ return this.type;
+ }
+
+ /** Get the source cluster id of the current mutation. Null for Garbage collection mutation. */
+ public String getSourceClusterId() {
+ return this.sourceClusterId;
+ }
+
+ /** Get the commit timestamp of the current mutation. */
+ @Nonnull
+ public Timestamp getCommitTimestamp() {
+ return this.commitTimestamp;
+ }
+
+ /**
+ * Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple
+ * mutations are applied to different clusters at the same time.
+ */
+ public int getTieBreaker() {
+ return this.tieBreaker;
+ }
+
+ /** Get the token of the current mutation, which can be used to resume the changestream. */
+ public String getToken() {
+ return this.token;
+ }
+
+ /** Get the low watermark of the current mutation. */
+ public Timestamp getLowWatermark() {
+ return this.lowWatermark;
+ }
+
+ /** Get the list of mods of the current mutation. */
+ @Nonnull
+ public List getEntries() {
+ return this.entries.build();
+ }
+
+ /** Returns a builder containing all the values of this ChangeStreamMutation class. */
+ Builder toBuilder() {
+ return new Builder(this);
+ }
+
+ /** Helper class to create a ChangeStreamMutation. */
+ public static class Builder {
+ private final ByteString rowKey;
+
+ private final Type type;
+
+ private final String sourceClusterId;
+
+ private final Timestamp commitTimestamp;
+
+ private final int tieBreaker;
+
+ private transient ImmutableList.Builder entries = ImmutableList.builder();
+
+ private String token;
+
+ private Timestamp lowWatermark;
+
+ private Builder(
+ ByteString rowKey,
+ Type type,
+ String sourceClusterId,
+ Timestamp commitTimestamp,
+ int tieBreaker) {
+ this.rowKey = rowKey;
+ this.type = type;
+ this.sourceClusterId = sourceClusterId;
+ this.commitTimestamp = commitTimestamp;
+ this.tieBreaker = tieBreaker;
+ }
+
+ private Builder(ChangeStreamMutation changeStreamMutation) {
+ this.rowKey = changeStreamMutation.rowKey;
+ this.type = changeStreamMutation.type;
+ this.sourceClusterId = changeStreamMutation.sourceClusterId;
+ this.commitTimestamp = changeStreamMutation.commitTimestamp;
+ this.tieBreaker = changeStreamMutation.tieBreaker;
+ this.entries = changeStreamMutation.entries;
+ this.token = changeStreamMutation.token;
+ this.lowWatermark = changeStreamMutation.lowWatermark;
+ }
+
+ Builder setCell(
+ @Nonnull String familyName,
+ @Nonnull ByteString qualifier,
+ long timestamp,
+ @Nonnull ByteString value) {
+ this.entries.add(SetCell.create(familyName, qualifier, timestamp, value));
+ return this;
+ }
+
+ Builder deleteCells(
+ @Nonnull String familyName,
+ @Nonnull ByteString qualifier,
+ @Nonnull TimestampRange timestampRange) {
+ this.entries.add(DeleteCells.create(familyName, qualifier, timestampRange));
+ return this;
+ }
+
+ Builder deleteFamily(@Nonnull String familyName) {
+ this.entries.add(DeleteFamily.create(familyName));
+ return this;
+ }
+
+ public Builder setToken(@Nonnull String token) {
+ this.token = token;
+ return this;
+ }
+
+ public Builder setLowWatermark(@Nonnull Timestamp lowWatermark) {
+ this.lowWatermark = lowWatermark;
+ return this;
+ }
+
+ public ChangeStreamMutation build() {
+ Preconditions.checkArgument(
+ token != null && lowWatermark != null,
+ "ChangeStreamMutation must have a continuation token and low watermark.");
+ return new ChangeStreamMutation(this);
+ }
+ }
+
+ public RowMutation toRowMutation(@Nonnull String tableId) {
+ RowMutation rowMutation = RowMutation.create(tableId, rowKey);
+ for (Entry entry : this.entries.build()) {
+ if (entry instanceof DeleteFamily) {
+ rowMutation.deleteFamily(((DeleteFamily) entry).getFamilyName());
+ } else if (entry instanceof DeleteCells) {
+ DeleteCells deleteCells = (DeleteCells) entry;
+ rowMutation.deleteCells(
+ deleteCells.getFamilyName(),
+ deleteCells.getQualifier(),
+ deleteCells.getTimestampRange());
+ } else if (entry instanceof SetCell) {
+ SetCell setCell = (SetCell) entry;
+ rowMutation.setCell(
+ setCell.getFamilyName(),
+ setCell.getQualifier(),
+ setCell.getTimestamp(),
+ setCell.getValue());
+ } else {
+ throw new IllegalArgumentException("Unexpected Entry type.");
+ }
+ }
+ return rowMutation;
+ }
+
+ public RowMutationEntry toRowMutationEntry() {
+ RowMutationEntry rowMutationEntry = RowMutationEntry.create(rowKey);
+ for (Entry entry : this.entries.build()) {
+ if (entry instanceof DeleteFamily) {
+ rowMutationEntry.deleteFamily(((DeleteFamily) entry).getFamilyName());
+ } else if (entry instanceof DeleteCells) {
+ DeleteCells deleteCells = (DeleteCells) entry;
+ rowMutationEntry.deleteCells(
+ deleteCells.getFamilyName(),
+ deleteCells.getQualifier(),
+ deleteCells.getTimestampRange());
+ } else if (entry instanceof SetCell) {
+ SetCell setCell = (SetCell) entry;
+ rowMutationEntry.setCell(
+ setCell.getFamilyName(),
+ setCell.getQualifier(),
+ setCell.getTimestamp(),
+ setCell.getValue());
+ } else {
+ throw new IllegalArgumentException("Unexpected Entry type.");
+ }
+ }
+ return rowMutationEntry;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ChangeStreamMutation otherChangeStreamMutation = (ChangeStreamMutation) o;
+ return Objects.equal(this.hashCode(), otherChangeStreamMutation.hashCode());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ rowKey, type, sourceClusterId, commitTimestamp, tieBreaker, token, lowWatermark, entries);
+ }
+
+ @Override
+ public String toString() {
+ List entriesAsStrings = new ArrayList<>();
+ for (Entry entry : this.entries.build()) {
+ entriesAsStrings.add(entry.toString());
+ }
+ String entryString = "[" + String.join(";\t", entriesAsStrings) + "]";
+ return MoreObjects.toStringHelper(this)
+ .add("rowKey", this.rowKey.toStringUtf8())
+ .add("type", this.type)
+ .add("sourceClusterId", this.sourceClusterId)
+ .add("commitTimestamp", this.commitTimestamp.toString())
+ .add("token", this.token)
+ .add("lowWatermark", this.lowWatermark)
+ .add("entries", entryString)
+ .toString();
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DeleteCells.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DeleteCells.java
new file mode 100644
index 0000000000..238ddb1638
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DeleteCells.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.models;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
+import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+
+/** Representation of a DeleteCells mod in a data change. */
+@AutoValue
+public abstract class DeleteCells implements Entry, Serializable {
+ private static final long serialVersionUID = 851772158721462017L;
+
+ public static DeleteCells create(
+ @Nonnull String familyName,
+ @Nonnull ByteString qualifier,
+ @Nonnull TimestampRange timestampRange) {
+ return new AutoValue_DeleteCells(familyName, qualifier, timestampRange);
+ }
+
+ /** Get the column family of the current DeleteCells. */
+ @Nonnull
+ public abstract String getFamilyName();
+
+ /** Get the column qualifier of the current DeleteCells. */
+ @Nonnull
+ public abstract ByteString getQualifier();
+
+ /** Get the timestamp range of the current DeleteCells. */
+ @Nonnull
+ public abstract TimestampRange getTimestampRange();
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DeleteFamily.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DeleteFamily.java
new file mode 100644
index 0000000000..171ecccb41
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DeleteFamily.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.models;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+
+/** Representation of a DeleteFamily mod in a data change. */
+@AutoValue
+public abstract class DeleteFamily implements Entry, Serializable {
+ private static final long serialVersionUID = 81806775917145615L;
+
+ public static DeleteFamily create(@Nonnull String familyName) {
+ return new AutoValue_DeleteFamily(familyName);
+ }
+
+ /** Get the column family of the current DeleteFamily. */
+ @Nonnull
+ public abstract String getFamilyName();
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Entry.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Entry.java
new file mode 100644
index 0000000000..c5c30016f4
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Entry.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.models;
+
+import com.google.api.core.InternalExtensionOnly;
+
+/**
+ * Default representation of a mod in a data change, which can be a {@link DeleteFamily}, a {@link
+ * DeleteCells}, or a {@link SetCell} This class will be used by {@link ChangeStreamMutation} to
+ * represent a list of mods in a logical change stream mutation.
+ */
+@InternalExtensionOnly
+public interface Entry {}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java
index 73876f887b..db82657e49 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java
@@ -16,64 +16,31 @@
package com.google.cloud.bigtable.data.v2.models;
import com.google.api.core.InternalApi;
+import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
import com.google.protobuf.Timestamp;
import java.io.Serializable;
import javax.annotation.Nonnull;
-public final class Heartbeat implements ChangeStreamRecord, Serializable {
+@AutoValue
+public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608504L;
- private final Timestamp lowWatermark;
- private final ChangeStreamContinuationToken changeStreamContinuationToken;
- private Heartbeat(
- Timestamp lowWatermark, ChangeStreamContinuationToken changeStreamContinuationToken) {
- this.lowWatermark = lowWatermark;
- this.changeStreamContinuationToken = changeStreamContinuationToken;
- }
-
- @InternalApi("Used in Changestream beam pipeline.")
- public ChangeStreamContinuationToken getChangeStreamContinuationToken() {
- return changeStreamContinuationToken;
- }
-
- @InternalApi("Used in Changestream beam pipeline.")
- public Timestamp getLowWatermark() {
- return lowWatermark;
+ public static Heartbeat create(
+ ChangeStreamContinuationToken changeStreamContinuationToken, Timestamp lowWatermark) {
+ return new AutoValue_Heartbeat(changeStreamContinuationToken, lowWatermark);
}
/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
- return new Heartbeat(
- heartbeat.getLowWatermark(),
- ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()));
+ return create(
+ ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()),
+ heartbeat.getLowWatermark());
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Heartbeat record = (Heartbeat) o;
- return Objects.equal(lowWatermark, record.getLowWatermark())
- && Objects.equal(changeStreamContinuationToken, record.getChangeStreamContinuationToken());
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(lowWatermark, changeStreamContinuationToken);
- }
+ @InternalApi("Used in Changestream beam pipeline.")
+ public abstract ChangeStreamContinuationToken getChangeStreamContinuationToken();
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("lowWatermark", lowWatermark)
- .add("changeStreamContinuationToken", changeStreamContinuationToken)
- .toString();
- }
+ @InternalApi("Used in Changestream beam pipeline.")
+ public abstract Timestamp getLowWatermark();
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/SetCell.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/SetCell.java
new file mode 100644
index 0000000000..a157b5cd73
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/SetCell.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.models;
+
+import com.google.auto.value.AutoValue;
+import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+
+/**
+ * Representation of a SetCell mod in a data change, whose value is concatenated by
+ * (TODO:ChangeStreamRecordMerger) in case of SetCell value chunking.
+ */
+@AutoValue
+public abstract class SetCell implements Entry, Serializable {
+ private static final long serialVersionUID = 77123872266724154L;
+
+ public static SetCell create(
+ @Nonnull String familyName,
+ @Nonnull ByteString qualifier,
+ long timestamp,
+ @Nonnull ByteString value) {
+ return new AutoValue_SetCell(familyName, qualifier, timestamp, value);
+ }
+
+ /** Get the column family of the current SetCell. */
+ @Nonnull
+ public abstract String getFamilyName();
+
+ /** Get the column qualifier of the current SetCell. */
+ @Nonnull
+ public abstract ByteString getQualifier();
+
+ /** Get the timestamp of the current SetCell. */
+ public abstract long getTimestamp();
+
+ /** Get the value of the current SetCell. */
+ @Nonnull
+ public abstract ByteString getValue();
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java
new file mode 100644
index 0000000000..938213fb36
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.models;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.ReadChangeStreamResponse;
+import com.google.cloud.bigtable.data.v2.internal.NameUtil;
+import com.google.cloud.bigtable.data.v2.internal.RequestContext;
+import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ChangeStreamMutationTest {
+ private static final String PROJECT_ID = "fake-project";
+ private static final String INSTANCE_ID = "fake-instance";
+ private static final String TABLE_ID = "fake-table";
+ private static final String APP_PROFILE_ID = "fake-profile";
+ private static final RequestContext REQUEST_CONTEXT =
+ RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID);
+
+ @Rule public ExpectedException expect = ExpectedException.none();
+
+ @Test
+ public void userInitiatedMutationTest() throws IOException, ClassNotFoundException {
+ // Create a user initiated logical mutation.
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createUserMutation(
+ ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0)
+ .setCell(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ 1000,
+ ByteString.copyFromUtf8("fake-value"))
+ .deleteFamily("fake-family")
+ .deleteCells(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ Range.TimestampRange.create(1000L, 2000L))
+ .setToken("fake-token")
+ .setLowWatermark(fakeLowWatermark)
+ .build();
+
+ // Test the getters.
+ Assert.assertEquals(changeStreamMutation.getRowKey(), ByteString.copyFromUtf8("key"));
+ Assert.assertEquals(
+ changeStreamMutation.getType(), ReadChangeStreamResponse.DataChange.Type.USER);
+ Assert.assertEquals(changeStreamMutation.getSourceClusterId(), "fake-source-cluster-id");
+ Assert.assertEquals(changeStreamMutation.getCommitTimestamp(), fakeCommitTimestamp);
+ Assert.assertEquals(changeStreamMutation.getTieBreaker(), 0);
+ Assert.assertEquals(changeStreamMutation.getToken(), "fake-token");
+ Assert.assertEquals(changeStreamMutation.getLowWatermark(), fakeLowWatermark);
+
+ // Test serialization.
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(changeStreamMutation);
+ oos.close();
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ ChangeStreamMutation actual = (ChangeStreamMutation) ois.readObject();
+ Assert.assertEquals(actual.getRowKey(), changeStreamMutation.getRowKey());
+ Assert.assertEquals(actual.getType(), changeStreamMutation.getType());
+ Assert.assertEquals(actual.getSourceClusterId(), changeStreamMutation.getSourceClusterId());
+ Assert.assertEquals(actual.getCommitTimestamp(), changeStreamMutation.getCommitTimestamp());
+ Assert.assertEquals(actual.getTieBreaker(), changeStreamMutation.getTieBreaker());
+ Assert.assertEquals(actual.getToken(), changeStreamMutation.getToken());
+ Assert.assertEquals(actual.getLowWatermark(), changeStreamMutation.getLowWatermark());
+ assertThat(actual.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT))
+ .isEqualTo(changeStreamMutation.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT));
+ }
+
+ @Test
+ public void gcMutationTest() throws IOException, ClassNotFoundException {
+ // Create a GC mutation.
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createGcMutation(
+ ByteString.copyFromUtf8("key"), fakeCommitTimestamp, 0)
+ .setCell(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ 1000,
+ ByteString.copyFromUtf8("fake-value"))
+ .deleteFamily("fake-family")
+ .deleteCells(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ Range.TimestampRange.create(1000L, 2000L))
+ .setToken("fake-token")
+ .setLowWatermark(fakeLowWatermark)
+ .build();
+
+ // Test the getters.
+ Assert.assertEquals(changeStreamMutation.getRowKey(), ByteString.copyFromUtf8("key"));
+ Assert.assertEquals(
+ changeStreamMutation.getType(),
+ ReadChangeStreamResponse.DataChange.Type.GARBAGE_COLLECTION);
+ Assert.assertNull(changeStreamMutation.getSourceClusterId());
+ Assert.assertEquals(changeStreamMutation.getCommitTimestamp(), fakeCommitTimestamp);
+ Assert.assertEquals(changeStreamMutation.getTieBreaker(), 0);
+ Assert.assertEquals(changeStreamMutation.getToken(), "fake-token");
+ Assert.assertEquals(changeStreamMutation.getLowWatermark(), fakeLowWatermark);
+
+ // Test serialization.
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(changeStreamMutation);
+ oos.close();
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ ChangeStreamMutation actual = (ChangeStreamMutation) ois.readObject();
+ Assert.assertEquals(actual.getRowKey(), changeStreamMutation.getRowKey());
+ Assert.assertEquals(actual.getType(), changeStreamMutation.getType());
+ Assert.assertEquals(actual.getSourceClusterId(), changeStreamMutation.getSourceClusterId());
+ Assert.assertEquals(actual.getCommitTimestamp(), changeStreamMutation.getCommitTimestamp());
+ Assert.assertEquals(actual.getTieBreaker(), changeStreamMutation.getTieBreaker());
+ Assert.assertEquals(actual.getToken(), changeStreamMutation.getToken());
+ Assert.assertEquals(actual.getLowWatermark(), changeStreamMutation.getLowWatermark());
+ assertThat(actual.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT))
+ .isEqualTo(changeStreamMutation.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT));
+ }
+
+ @Test
+ public void toRowMutationTest() {
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createUserMutation(
+ ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0)
+ .setCell(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ 1000,
+ ByteString.copyFromUtf8("fake-value"))
+ .deleteFamily("fake-family")
+ .deleteCells(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ Range.TimestampRange.create(1000L, 2000L))
+ .setToken("fake-token")
+ .setLowWatermark(fakeLowWatermark)
+ .build();
+
+ // Convert it to a rowMutation and construct a MutateRowRequest.
+ RowMutation rowMutation = changeStreamMutation.toRowMutation(TABLE_ID);
+ MutateRowRequest mutateRowRequest = rowMutation.toProto(REQUEST_CONTEXT);
+ String tableName =
+ NameUtil.formatTableName(
+ REQUEST_CONTEXT.getProjectId(), REQUEST_CONTEXT.getInstanceId(), TABLE_ID);
+ assertThat(mutateRowRequest.getTableName()).isEqualTo(tableName);
+ assertThat(mutateRowRequest.getMutationsList()).hasSize(3);
+ assertThat(mutateRowRequest.getMutations(0).getSetCell().getValue())
+ .isEqualTo(ByteString.copyFromUtf8("fake-value"));
+ assertThat(mutateRowRequest.getMutations(1).getDeleteFromFamily().getFamilyName())
+ .isEqualTo("fake-family");
+ assertThat(mutateRowRequest.getMutations(2).getDeleteFromColumn().getFamilyName())
+ .isEqualTo("fake-family");
+ assertThat(mutateRowRequest.getMutations(2).getDeleteFromColumn().getColumnQualifier())
+ .isEqualTo(ByteString.copyFromUtf8("fake-qualifier"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void toRowMutationWithoutTokenShouldFailTest() {
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createUserMutation(
+ ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0)
+ .deleteFamily("fake-family")
+ .setLowWatermark(fakeLowWatermark)
+ .build();
+ expect.expect(IllegalArgumentException.class);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void toRowMutationWithoutLowWatermarkShouldFailTest() {
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createUserMutation(
+ ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0)
+ .deleteFamily("fake-family")
+ .setToken("fake-token")
+ .build();
+ expect.expect(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void toRowMutationEntryTest() {
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createUserMutation(
+ ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0)
+ .setCell(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ 1000,
+ ByteString.copyFromUtf8("fake-value"))
+ .deleteFamily("fake-family")
+ .deleteCells(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ Range.TimestampRange.create(1000L, 2000L))
+ .setToken("fake-token")
+ .setLowWatermark(fakeLowWatermark)
+ .build();
+
+ // Convert it to a rowMutationEntry and construct a MutateRowRequest.
+ RowMutationEntry rowMutationEntry = changeStreamMutation.toRowMutationEntry();
+ MutateRowsRequest.Entry mutateRowsRequestEntry = rowMutationEntry.toProto();
+ assertThat(mutateRowsRequestEntry.getRowKey()).isEqualTo(ByteString.copyFromUtf8("key"));
+ assertThat(mutateRowsRequestEntry.getMutationsList()).hasSize(3);
+ assertThat(mutateRowsRequestEntry.getMutations(0).getSetCell().getValue())
+ .isEqualTo(ByteString.copyFromUtf8("fake-value"));
+ assertThat(mutateRowsRequestEntry.getMutations(1).getDeleteFromFamily().getFamilyName())
+ .isEqualTo("fake-family");
+ assertThat(mutateRowsRequestEntry.getMutations(2).getDeleteFromColumn().getFamilyName())
+ .isEqualTo("fake-family");
+ assertThat(mutateRowsRequestEntry.getMutations(2).getDeleteFromColumn().getColumnQualifier())
+ .isEqualTo(ByteString.copyFromUtf8("fake-qualifier"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void toRowMutationEntryWithoutTokenShouldFailTest() {
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createUserMutation(
+ ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0)
+ .deleteFamily("fake-family")
+ .setLowWatermark(fakeLowWatermark)
+ .build();
+ expect.expect(IllegalArgumentException.class);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void toRowMutationEntryWithoutLowWatermarkShouldFailTest() {
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createUserMutation(
+ ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0)
+ .deleteFamily("fake-family")
+ .setToken("fake-token")
+ .build();
+ expect.expect(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testWithLongValue() {
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createUserMutation(
+ ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0)
+ .setCell(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ 1000L,
+ ByteString.copyFrom(Longs.toByteArray(1L)))
+ .setToken("fake-token")
+ .setLowWatermark(fakeLowWatermark)
+ .build();
+
+ RowMutation rowMutation = changeStreamMutation.toRowMutation(TABLE_ID);
+ MutateRowRequest mutateRowRequest = rowMutation.toProto(REQUEST_CONTEXT);
+ String tableName =
+ NameUtil.formatTableName(
+ REQUEST_CONTEXT.getProjectId(), REQUEST_CONTEXT.getInstanceId(), TABLE_ID);
+ assertThat(mutateRowRequest.getTableName()).isEqualTo(tableName);
+ assertThat(mutateRowRequest.getMutationsList()).hasSize(1);
+ assertThat(mutateRowRequest.getMutations(0).getSetCell().getValue())
+ .isEqualTo(ByteString.copyFromUtf8("\000\000\000\000\000\000\000\001"));
+ }
+
+ @Test
+ public void toBuilderTest() {
+ // Create a user initiated logical mutation.
+ Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build();
+ ChangeStreamMutation changeStreamMutation =
+ ChangeStreamMutation.createUserMutation(
+ ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0)
+ .setCell(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ 1000,
+ ByteString.copyFromUtf8("fake-value"))
+ .deleteFamily("fake-family")
+ .deleteCells(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ Range.TimestampRange.create(1000L, 2000L))
+ .setToken("fake-token")
+ .setLowWatermark(fakeLowWatermark)
+ .build();
+
+ // Test round-trip of a ChangeStreamMutation through `toBuilder().build()`.
+ ChangeStreamMutation otherMutation = changeStreamMutation.toBuilder().build();
+ assertThat(changeStreamMutation).isEqualTo(otherMutation);
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java
index c82aae7330..05df603959 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java
@@ -23,6 +23,7 @@
import com.google.bigtable.v2.StreamPartition;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
+import com.google.rpc.Status;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -57,7 +58,7 @@ public void heartbeatSerializationTest() throws IOException, ClassNotFoundExcept
@Test
public void closeStreamSerializationTest() throws IOException, ClassNotFoundException {
- com.google.rpc.Status status = com.google.rpc.Status.newBuilder().setCode(0).build();
+ Status status = Status.newBuilder().setCode(0).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
@@ -124,7 +125,7 @@ public void heartbeatTest() {
@Test
public void closeStreamTest() {
- com.google.rpc.Status status = com.google.rpc.Status.newBuilder().setCode(0).build();
+ Status status = Status.newBuilder().setCode(0).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/EntryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/EntryTest.java
new file mode 100644
index 0000000000..11ff0a9f02
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/EntryTest.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.models;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class EntryTest {
+ private void validateSerializationRoundTrip(Object obj)
+ throws IOException, ClassNotFoundException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(obj);
+ oos.close();
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ assertThat(ois.readObject()).isEqualTo(obj);
+ }
+
+ @Test
+ public void serializationTest() throws IOException, ClassNotFoundException {
+ // DeleteFamily
+ Entry deleteFamilyEntry = DeleteFamily.create("fake-family");
+ validateSerializationRoundTrip(deleteFamilyEntry);
+
+ // DeleteCell
+ Entry deleteCellsEntry =
+ DeleteCells.create(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ Range.TimestampRange.create(1000L, 2000L));
+ validateSerializationRoundTrip(deleteCellsEntry);
+
+ // SetCell
+ Entry setCellEntry =
+ SetCell.create(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ 1000,
+ ByteString.copyFromUtf8("fake-value"));
+ validateSerializationRoundTrip(setCellEntry);
+ }
+
+ @Test
+ public void deleteFamilyTest() {
+ Entry deleteFamilyEntry = DeleteFamily.create("fake-family");
+ DeleteFamily deleteFamily = (DeleteFamily) deleteFamilyEntry;
+ Assert.assertEquals("fake-family", deleteFamily.getFamilyName());
+ }
+
+ @Test
+ public void deleteCellsTest() {
+ Entry deleteCellEntry =
+ DeleteCells.create(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ Range.TimestampRange.create(1000L, 2000L));
+ DeleteCells deleteCells = (DeleteCells) deleteCellEntry;
+ Assert.assertEquals("fake-family", deleteCells.getFamilyName());
+ Assert.assertEquals(ByteString.copyFromUtf8("fake-qualifier"), deleteCells.getQualifier());
+ Assert.assertEquals(Range.TimestampRange.create(1000L, 2000L), deleteCells.getTimestampRange());
+ }
+
+ @Test
+ public void setSellTest() {
+ Entry setCellEntry =
+ SetCell.create(
+ "fake-family",
+ ByteString.copyFromUtf8("fake-qualifier"),
+ 1000,
+ ByteString.copyFromUtf8("fake-value"));
+ SetCell setCell = (SetCell) setCellEntry;
+ Assert.assertEquals("fake-family", setCell.getFamilyName());
+ Assert.assertEquals(ByteString.copyFromUtf8("fake-qualifier"), setCell.getQualifier());
+ Assert.assertEquals(1000, setCell.getTimestamp());
+ Assert.assertEquals(ByteString.copyFromUtf8("fake-value"), setCell.getValue());
+ }
+}