Skip to content

Commit

Permalink
Overload HbaseIO with KV<RowKey, RowMutations> (#25864)
Browse files Browse the repository at this point in the history
* Added coder, coder-test, HbaseIO, Shared Connection

* RowMutationsCoderTests added

* Added HbaseRowMutationsIO tests, linted HbaseIO

* Wrote up Javadocs for HbaseIO.writeRowMutations
  • Loading branch information
georgecma authored Mar 28, 2023
1 parent 469101d commit c0f0f2d
Show file tree
Hide file tree
Showing 11 changed files with 1,259 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(
HBaseMutationCoder.getCoderProvider(),
HBaseRowMutationsCoder.getCoderProvider(),
CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
Expand All @@ -60,11 +61,13 @@
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -133,6 +136,8 @@
*
* <h3>Writing to HBase</h3>
*
* <h4>Writing {@link Mutation}</h4>
*
* <p>The HBase sink executes a set of row mutations on a single table. It takes as input a {@link
* PCollection PCollection&lt;Mutation&gt;}, where each {@link Mutation} represents an idempotent
* transformation on a row.
Expand All @@ -150,6 +155,31 @@
* .withTableId("table"));
* }</pre>
*
* <h4>Writing {@link RowMutations}</h4>
*
* <p>An alternative way to write to HBase is with {@link HBaseIO#writeRowMutations()}, which takes
* as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs of bytes row keys and
* {@link RowMutations}.
*
* <p>This implementation is useful for preserving mutation order if the upstream is ordered by row
* key, as RowMutations will only be applied after previous RowMutations are successful.
*
* <p>To configure the sink, you must supply a table id string and a {@link Configuration} to
* identify the HBase instance, for example:
*
* <pre>{@code
* Configuration configuration = ...;
* PCollection<KV<byte[], RowMutations>> data = ...;
*
* data.apply("write",
* HBaseIO.writeRowMutations()
* .withConfiguration(configuration)
* .withTableId("table"));
* }</pre>
*
* <p>Note that the transformation emits the number of RowMutations written as an integer after
* successfully writing to HBase.
*
* <h3>Experimental</h3>
*
* <p>The design of the API for HBaseIO is currently related to the BigtableIO one, it can evolve or
Expand Down Expand Up @@ -765,4 +795,195 @@ public void populateDisplayData(DisplayData.Builder builder) {
private transient BufferedMutator mutator;
}
}

public static WriteRowMutations writeRowMutations() {
return new WriteRowMutations(null /* Configuration */, "");
}

/** Transformation that writes RowMutation objects to a Hbase table. */
public static class WriteRowMutations
extends PTransform<PCollection<KV<byte[], RowMutations>>, PDone> {

/** Writes to the HBase instance indicated by the given Configuration. */
public WriteRowMutations withConfiguration(Configuration configuration) {
checkNotNull(configuration, "configuration cannot be null");
return new WriteRowMutations(configuration, tableId);
}

/** Writes to the specified table. */
public WriteRowMutations withTableId(String tableId) {
checkNotNull(tableId, "tableId cannot be null");
return new WriteRowMutations(configuration, tableId);
}

private WriteRowMutations(Configuration configuration, String tableId) {
this.configuration = configuration;
this.tableId = tableId;
}

@Override
public PDone expand(PCollection<KV<byte[], RowMutations>> input) {
checkNotNull(configuration, "withConfiguration() is required");
checkNotNull(tableId, "withTableId() is required");
checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");

input.apply(ParDo.of(new WriteRowMutationsFn(this)));
return PDone.in(input.getPipeline());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("configuration", configuration.toString()));
builder.add(DisplayData.item("tableId", tableId));
}

public Configuration getConfiguration() {
return configuration;
}

public String getTableId() {
return tableId;
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WriteRowMutations writeRowMutations = (WriteRowMutations) o;
return configuration.toString().equals(writeRowMutations.configuration.toString())
&& Objects.equals(tableId, writeRowMutations.tableId);
}

@Override
public int hashCode() {
return Objects.hash(configuration, tableId);
}

/**
* The writeReplace method allows the developer to provide a replacement object that will be
* serialized instead of the original one. We use this to keep the enclosed class immutable. For
* more details on the technique see <a
* href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
* article</a>.
*/
private Object writeReplace() {
return new SerializationProxy(this);
}

private static class SerializationProxy implements Serializable {
public SerializationProxy() {}

public SerializationProxy(WriteRowMutations writeRowMutations) {
configuration = writeRowMutations.configuration;
tableId = writeRowMutations.tableId;
}

private void writeObject(ObjectOutputStream out) throws IOException {
SerializableCoder.of(SerializableConfiguration.class)
.encode(new SerializableConfiguration(this.configuration), out);

StringUtf8Coder.of().encode(this.tableId, out);
}

private void readObject(ObjectInputStream in) throws IOException {
this.configuration = SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
this.tableId = StringUtf8Coder.of().decode(in);
}

Object readResolve() {
return HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
}

private Configuration configuration;
private String tableId;
}

@SuppressFBWarnings("SE_BAD_FIELD")
private final Configuration configuration;

private final String tableId;

/** Function to write row mutations to a hbase table. */
private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>, Integer> {

public WriteRowMutationsFn(WriteRowMutations writeRowMutations) {
checkNotNull(writeRowMutations.tableId, "tableId");
checkNotNull(writeRowMutations.configuration, "configuration");
}

@Setup
public void setup() throws Exception {
connection = HBaseSharedConnection.getOrCreate(configuration);
}

@StartBundle
public void startBundle(StartBundleContext c) throws IOException {
table = connection.getTable(TableName.valueOf(tableId));
recordsWritten = 0;
}

@FinishBundle
public void finishBundle() throws Exception {
if (table != null) {
table.close();
table = null;
}

LOG.debug("Wrote {} records", recordsWritten);
}

@Teardown
public void tearDown() throws Exception {

if (table != null) {
table.close();
table = null;
}

HBaseSharedConnection.close(configuration);
}

@ProcessElement
public void processElement(ProcessContext c) throws IOException {
RowMutations mutations = c.element().getValue();

try {
// Use Table instead of BufferedMutator to preserve mutation-ordering
table.mutateRow(mutations);
recordsWritten++;
} catch (IOException e) {
throw new RuntimeException(
(String.join(
" ",
"Table",
tableId,
"row",
Bytes.toString(mutations.getRow()),
"mutation failed.",
"\nTable Available/Enabled:",
Boolean.toString(
connection.getAdmin().isTableAvailable(TableName.valueOf(tableId))),
Boolean.toString(
connection.getAdmin().isTableEnabled(TableName.valueOf(tableId))),
"\nConnection Closed/Aborted/Locks:",
Boolean.toString(connection.isClosed()),
Boolean.toString(connection.isAborted()))));
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(WriteRowMutations.this);
}

private long recordsWritten;
private transient Connection connection;
private transient Table table;
}
}
}
Loading

0 comments on commit c0f0f2d

Please sign in to comment.