Skip to content

Commit

Permalink
feat: Add ChangeStreamMutation which is a ChangeStreamRecord (#1324)
Browse files Browse the repository at this point in the history
* Add ChangeStreamMutation which is a ChangeStreamRecord

A ChangeStreamMutation holds a list of mods, represented by List<Entry>,
where an Entry is one of DeleteFamily/DeleteCells/SetCell.

* fix: Fix styles

* fix: Address comments

* fix: Update Heartbeat to use AutoValue

* fix: Add more comments

* fix: Address comments

* fix: Fix unit test due to toString().

Can't compare ByteString.toString() directly even though the contents
are the same. So we compare their fields and toRowMutation.

Co-authored-by: Teng Zhong <tengzhong@google.com>
  • Loading branch information
tengzhonger and Teng Zhong authored Aug 1, 2022
1 parent 39a7b58 commit 53dd0f0
Show file tree
Hide file tree
Showing 9 changed files with 958 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;

/**
* A ChangeStreamMutation represents a list of mods(represented by List<{@link Entry}>) targeted at
* a single row, which is concatenated by (TODO:ChangeStreamRecordMerger). It represents a logical
* row mutation and can be converted to the original write request(i.e. {@link RowMutation} or
* {@link RowMutationEntry}.
*
* <p>A ChangeStreamMutation can be constructed in two ways, depending on whether it's a user
* initiated mutation or a Garbage Collection mutation. Either way, the caller should explicitly set
* `token` and `lowWatermark` before build(), otherwise it'll raise an error.
*
* <p>Case 1) User initiated mutation.
*
* <pre>{@code
* ChangeStreamMutation.Builder builder = ChangeStreamMutation.createUserMutation(...);
* builder.setCell(...);
* builder.deleteFamily(...);
* builder.deleteCells(...);
* ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build();
* }</pre>
*
* Case 2) Garbage Collection mutation.
*
* <pre>{@code
* ChangeStreamMutation.Builder builder = ChangeStreamMutation.createGcMutation(...);
* builder.setCell(...);
* builder.deleteFamily(...);
* builder.deleteCells(...);
* ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build();
* }</pre>
*/
public final class ChangeStreamMutation implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 8419520253162024218L;

private final ByteString rowKey;

/** Possible values: USER/GARBAGE_COLLECTION. */
private final Type type;

/** This should only be set when type==USER. */
private final String sourceClusterId;

private final Timestamp commitTimestamp;

private final int tieBreaker;

private transient ImmutableList.Builder<Entry> entries = ImmutableList.builder();

private String token;

private Timestamp lowWatermark;

private ChangeStreamMutation(Builder builder) {
this.rowKey = builder.rowKey;
this.type = builder.type;
this.sourceClusterId = builder.sourceClusterId;
this.commitTimestamp = builder.commitTimestamp;
this.tieBreaker = builder.tieBreaker;
this.token = builder.token;
this.lowWatermark = builder.lowWatermark;
this.entries = builder.entries;
}

/**
* Creates a new instance of a user initiated mutation. It returns a builder instead of a
* ChangeStreamMutation because `token` and `loWatermark` must be set later when we finish
* building the logical mutation.
*/
static Builder createUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
int tieBreaker) {
return new Builder(rowKey, Type.USER, sourceClusterId, commitTimestamp, tieBreaker);
}

/**
* Creates a new instance of a GC mutation. It returns a builder instead of a ChangeStreamMutation
* because `token` and `loWatermark` must be set later when we finish building the logical
* mutation.
*/
static Builder createGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
}

private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
input.defaultReadObject();

@SuppressWarnings("unchecked")
ImmutableList<Entry> deserialized = (ImmutableList<Entry>) input.readObject();
this.entries = ImmutableList.<Entry>builder().addAll(deserialized);
}

private void writeObject(ObjectOutputStream output) throws IOException {
output.defaultWriteObject();
output.writeObject(entries.build());
}

/** Get the row key of the current mutation. */
@Nonnull
public ByteString getRowKey() {
return this.rowKey;
}

/** Get the type of the current mutation. */
@Nonnull
public Type getType() {
return this.type;
}

/** Get the source cluster id of the current mutation. Null for Garbage collection mutation. */
public String getSourceClusterId() {
return this.sourceClusterId;
}

/** Get the commit timestamp of the current mutation. */
@Nonnull
public Timestamp getCommitTimestamp() {
return this.commitTimestamp;
}

/**
* Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple
* mutations are applied to different clusters at the same time.
*/
public int getTieBreaker() {
return this.tieBreaker;
}

/** Get the token of the current mutation, which can be used to resume the changestream. */
public String getToken() {
return this.token;
}

/** Get the low watermark of the current mutation. */
public Timestamp getLowWatermark() {
return this.lowWatermark;
}

/** Get the list of mods of the current mutation. */
@Nonnull
public List<Entry> getEntries() {
return this.entries.build();
}

/** Returns a builder containing all the values of this ChangeStreamMutation class. */
Builder toBuilder() {
return new Builder(this);
}

/** Helper class to create a ChangeStreamMutation. */
public static class Builder {
private final ByteString rowKey;

private final Type type;

private final String sourceClusterId;

private final Timestamp commitTimestamp;

private final int tieBreaker;

private transient ImmutableList.Builder<Entry> entries = ImmutableList.builder();

private String token;

private Timestamp lowWatermark;

private Builder(
ByteString rowKey,
Type type,
String sourceClusterId,
Timestamp commitTimestamp,
int tieBreaker) {
this.rowKey = rowKey;
this.type = type;
this.sourceClusterId = sourceClusterId;
this.commitTimestamp = commitTimestamp;
this.tieBreaker = tieBreaker;
}

private Builder(ChangeStreamMutation changeStreamMutation) {
this.rowKey = changeStreamMutation.rowKey;
this.type = changeStreamMutation.type;
this.sourceClusterId = changeStreamMutation.sourceClusterId;
this.commitTimestamp = changeStreamMutation.commitTimestamp;
this.tieBreaker = changeStreamMutation.tieBreaker;
this.entries = changeStreamMutation.entries;
this.token = changeStreamMutation.token;
this.lowWatermark = changeStreamMutation.lowWatermark;
}

Builder setCell(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
long timestamp,
@Nonnull ByteString value) {
this.entries.add(SetCell.create(familyName, qualifier, timestamp, value));
return this;
}

Builder deleteCells(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange) {
this.entries.add(DeleteCells.create(familyName, qualifier, timestampRange));
return this;
}

Builder deleteFamily(@Nonnull String familyName) {
this.entries.add(DeleteFamily.create(familyName));
return this;
}

public Builder setToken(@Nonnull String token) {
this.token = token;
return this;
}

public Builder setLowWatermark(@Nonnull Timestamp lowWatermark) {
this.lowWatermark = lowWatermark;
return this;
}

public ChangeStreamMutation build() {
Preconditions.checkArgument(
token != null && lowWatermark != null,
"ChangeStreamMutation must have a continuation token and low watermark.");
return new ChangeStreamMutation(this);
}
}

public RowMutation toRowMutation(@Nonnull String tableId) {
RowMutation rowMutation = RowMutation.create(tableId, rowKey);
for (Entry entry : this.entries.build()) {
if (entry instanceof DeleteFamily) {
rowMutation.deleteFamily(((DeleteFamily) entry).getFamilyName());
} else if (entry instanceof DeleteCells) {
DeleteCells deleteCells = (DeleteCells) entry;
rowMutation.deleteCells(
deleteCells.getFamilyName(),
deleteCells.getQualifier(),
deleteCells.getTimestampRange());
} else if (entry instanceof SetCell) {
SetCell setCell = (SetCell) entry;
rowMutation.setCell(
setCell.getFamilyName(),
setCell.getQualifier(),
setCell.getTimestamp(),
setCell.getValue());
} else {
throw new IllegalArgumentException("Unexpected Entry type.");
}
}
return rowMutation;
}

public RowMutationEntry toRowMutationEntry() {
RowMutationEntry rowMutationEntry = RowMutationEntry.create(rowKey);
for (Entry entry : this.entries.build()) {
if (entry instanceof DeleteFamily) {
rowMutationEntry.deleteFamily(((DeleteFamily) entry).getFamilyName());
} else if (entry instanceof DeleteCells) {
DeleteCells deleteCells = (DeleteCells) entry;
rowMutationEntry.deleteCells(
deleteCells.getFamilyName(),
deleteCells.getQualifier(),
deleteCells.getTimestampRange());
} else if (entry instanceof SetCell) {
SetCell setCell = (SetCell) entry;
rowMutationEntry.setCell(
setCell.getFamilyName(),
setCell.getQualifier(),
setCell.getTimestamp(),
setCell.getValue());
} else {
throw new IllegalArgumentException("Unexpected Entry type.");
}
}
return rowMutationEntry;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChangeStreamMutation otherChangeStreamMutation = (ChangeStreamMutation) o;
return Objects.equal(this.hashCode(), otherChangeStreamMutation.hashCode());
}

@Override
public int hashCode() {
return Objects.hashCode(
rowKey, type, sourceClusterId, commitTimestamp, tieBreaker, token, lowWatermark, entries);
}

@Override
public String toString() {
List<String> entriesAsStrings = new ArrayList<>();
for (Entry entry : this.entries.build()) {
entriesAsStrings.add(entry.toString());
}
String entryString = "[" + String.join(";\t", entriesAsStrings) + "]";
return MoreObjects.toStringHelper(this)
.add("rowKey", this.rowKey.toStringUtf8())
.add("type", this.type)
.add("sourceClusterId", this.sourceClusterId)
.add("commitTimestamp", this.commitTimestamp.toString())
.add("token", this.token)
.add("lowWatermark", this.lowWatermark)
.add("entries", entryString)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.auto.value.AutoValue;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import javax.annotation.Nonnull;

/** Representation of a DeleteCells mod in a data change. */
@AutoValue
public abstract class DeleteCells implements Entry, Serializable {
private static final long serialVersionUID = 851772158721462017L;

public static DeleteCells create(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange) {
return new AutoValue_DeleteCells(familyName, qualifier, timestampRange);
}

/** Get the column family of the current DeleteCells. */
@Nonnull
public abstract String getFamilyName();

/** Get the column qualifier of the current DeleteCells. */
@Nonnull
public abstract ByteString getQualifier();

/** Get the timestamp range of the current DeleteCells. */
@Nonnull
public abstract TimestampRange getTimestampRange();
}
Loading

0 comments on commit 53dd0f0

Please sign in to comment.