diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java index e7cb823dff4b..88a0b47ef617 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java @@ -33,6 +33,7 @@ public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar { public List getCoderProviders() { return ImmutableList.of( HBaseMutationCoder.getCoderProvider(), + HBaseRowMutationsCoder.getCoderProvider(), CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of())); } } diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index ca6e67517c89..db63d9d0f32b 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -60,11 +61,13 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,6 +136,8 @@ * *

Writing to HBase

* + *

Writing {@link Mutation}

+ * *

The HBase sink executes a set of row mutations on a single table. It takes as input a {@link * PCollection PCollection<Mutation>}, where each {@link Mutation} represents an idempotent * transformation on a row. @@ -150,6 +155,31 @@ * .withTableId("table")); * } * + *

Writing {@link RowMutations}

+ * + *

An alternative way to write to HBase is with {@link HBaseIO#writeRowMutations()}, which takes + * as input a {@link PCollection>}, representing KVs of bytes row keys and + * {@link RowMutations}. + * + *

This implementation is useful for preserving mutation order if the upstream is ordered by row + * key, as RowMutations will only be applied after previous RowMutations are successful. + * + *

To configure the sink, you must supply a table id string and a {@link Configuration} to + * identify the HBase instance, for example: + * + *

{@code
+ * Configuration configuration = ...;
+ * PCollection> data = ...;
+ *
+ * data.apply("write",
+ *     HBaseIO.writeRowMutations()
+ *         .withConfiguration(configuration)
+ *         .withTableId("table"));
+ * }
+ * + *

Note that the transformation emits the number of RowMutations written as an integer after + * successfully writing to HBase. + * *

Experimental

* *

The design of the API for HBaseIO is currently related to the BigtableIO one, it can evolve or @@ -765,4 +795,194 @@ public void populateDisplayData(DisplayData.Builder builder) { private transient BufferedMutator mutator; } } + + public static WriteRowMutations writeRowMutations() { + return new WriteRowMutations(null /* Configuration */, ""); + } + + /** Transformation that writes RowMutation objects to a Hbase table. */ + public static class WriteRowMutations + extends PTransform>, PDone> { + + /** Writes to the HBase instance indicated by the given Configuration. */ + public WriteRowMutations withConfiguration(Configuration configuration) { + checkNotNull(configuration, "configuration cannot be null"); + return new WriteRowMutations(configuration, tableId); + } + + /** Writes to the specified table. */ + public WriteRowMutations withTableId(String tableId) { + checkNotNull(tableId, "tableId cannot be null"); + return new WriteRowMutations(configuration, tableId); + } + + private WriteRowMutations(Configuration configuration, String tableId) { + this.configuration = configuration; + this.tableId = tableId; + } + + @Override + public PDone expand(PCollection> input) { + checkNotNull(configuration, "withConfiguration() is required"); + checkNotNull(tableId, "withTableId() is required"); + checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty"); + + input.apply(ParDo.of(new WriteRowMutationsFn(this))); + return PDone.in(input.getPipeline()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("configuration", configuration.toString())); + builder.add(DisplayData.item("tableId", tableId)); + } + + public Configuration getConfiguration() { + return configuration; + } + + public String getTableId() { + return tableId; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WriteRowMutations writeRowMutations = (WriteRowMutations) o; + return configuration.toString().equals(writeRowMutations.configuration.toString()) + && Objects.equals(tableId, writeRowMutations.tableId); + } + + @Override + public int hashCode() { + return Objects.hash(configuration, tableId); + } + + /** + * The writeReplace method allows the developer to provide a replacement object that will be + * serialized instead of the original one. We use this to keep the enclosed class immutable. For + * more details on the technique see this + * article. + */ + private Object writeReplace() { + return new SerializationProxy(this); + } + + private static class SerializationProxy implements Serializable { + public SerializationProxy() {} + + public SerializationProxy(WriteRowMutations writeRowMutations) { + configuration = writeRowMutations.configuration; + tableId = writeRowMutations.tableId; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + SerializableCoder.of(SerializableConfiguration.class) + .encode(new SerializableConfiguration(this.configuration), out); + + StringUtf8Coder.of().encode(this.tableId, out); + } + + private void readObject(ObjectInputStream in) throws IOException { + this.configuration = SerializableCoder.of(SerializableConfiguration.class).decode(in).get(); + this.tableId = StringUtf8Coder.of().decode(in); + } + + Object readResolve() { + return HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId); + } + + private Configuration configuration; + private String tableId; + } + + private final Configuration configuration; + private final String tableId; + + /** Function to write row mutations to a hbase table. */ + private class WriteRowMutationsFn extends DoFn, Integer> { + + public WriteRowMutationsFn( + WriteRowMutations writeRowMutations) { // , HbaseSharedConnection hbaseSharedConnection) { + checkNotNull(writeRowMutations.tableId, "tableId"); + checkNotNull(writeRowMutations.configuration, "configuration"); + } + + @Setup + public void setup() throws Exception { + connection = HBaseSharedConnection.getOrCreate(configuration); + } + + @StartBundle + public void startBundle(StartBundleContext c) throws IOException { + table = connection.getTable(TableName.valueOf(tableId)); + recordsWritten = 0; + } + + @FinishBundle + public void finishBundle() throws Exception { + if (table != null) { + table.close(); + table = null; + } + + LOG.debug("Wrote {} records", recordsWritten); + } + + @Teardown + public void tearDown() throws Exception { + + if (table != null) { + table.close(); + table = null; + } + + HBaseSharedConnection.close(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + RowMutations mutations = c.element().getValue(); + + try { + // Use Table instead of BufferedMutator to preserve mutation-ordering + table.mutateRow(mutations); + recordsWritten++; + } catch (Exception e) { + throw new Exception( + (String.join( + " ", + "Table", + tableId, + "row", + Bytes.toString(mutations.getRow()), + "mutation failed.", + "\nTable Available/Enabled:", + Boolean.toString( + connection.getAdmin().isTableAvailable(TableName.valueOf(tableId))), + Boolean.toString( + connection.getAdmin().isTableEnabled(TableName.valueOf(tableId))), + "\nConnection Closed/Aborted/Locks:", + Boolean.toString(connection.isClosed()), + Boolean.toString(connection.isAborted())))); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(WriteRowMutations.this); + } + + private long recordsWritten; + private transient Connection connection; + private transient Table table; + } + } } diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java new file mode 100644 index 000000000000..c2b657427df2 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; + +/** + * Row mutations coder to provide serialization support for Hbase RowMutations object, which isn't + * natively serializable. + */ +public class HBaseRowMutationsCoder extends AtomicCoder implements Serializable { + private static final HBaseRowMutationsCoder INSTANCE = new HBaseRowMutationsCoder(); + + public HBaseRowMutationsCoder() {} + + public static HBaseRowMutationsCoder of() { + return INSTANCE; + } + + @Override + public void encode(RowMutations value, OutputStream outStream) throws IOException { + + // encode row key + byte[] rowKey = value.getRow(); + int rowKeyLen = rowKey.length; + + // serialize row key + outStream.write(rowKeyLen); + outStream.write(rowKey); + + // serialize mutation list + List mutations = value.getMutations(); + int mutationsSize = mutations.size(); + outStream.write(mutationsSize); + for (Mutation mutation : mutations) { + MutationType type = getType(mutation); + MutationProto proto = ProtobufUtil.toMutation(type, mutation); + proto.writeDelimitedTo(outStream); + } + } + + @Override + public RowMutations decode(InputStream inStream) throws IOException { + + int rowKeyLen = inStream.read(); + byte[] rowKey = new byte[rowKeyLen]; + inStream.read(rowKey); + + RowMutations rowMutations = new RowMutations(rowKey); + int mutationListSize = inStream.read(); + for (int i = 0; i < mutationListSize; i++) { + Mutation m = ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream)); + MutationType type = getType(m); + + if (type == MutationType.PUT) { + rowMutations.add((Put) m); + } else if (type == MutationType.DELETE) { + rowMutations.add((Delete) m); + } + } + return rowMutations; + } + + private static MutationType getType(Mutation mutation) { + if (mutation instanceof Put) { + return MutationType.PUT; + } else if (mutation instanceof Delete) { + return MutationType.DELETE; + } else { + throw new IllegalArgumentException("Only Put and Delete are supported"); + } + } + + /** + * Returns a {@link CoderProvider} which uses the {@link HBaseRowMutationsCoder} for {@link + * RowMutations}. + */ + static CoderProvider getCoderProvider() { + return HBASE_ROW_MUTATIONS_CODER_PROVIDER; + } + + private static final CoderProvider HBASE_ROW_MUTATIONS_CODER_PROVIDER = + new HBaseRowMutationsCoderProvider(); + + /** A {@link CoderProvider} for {@link Mutation mutations}. */ + private static class HBaseRowMutationsCoderProvider extends CoderProvider { + @Override + public Coder coderFor( + TypeDescriptor typeDescriptor, List> componentCoders) + throws CannotProvideCoderException { + if (!typeDescriptor.isSubtypeOf(HBASE_ROW_MUTATIONS_TYPE_DESCRIPTOR)) { + throw new CannotProvideCoderException( + String.format( + "Cannot provide %s because %s is not a subclass of %s", + HBaseRowMutationsCoder.class.getSimpleName(), + typeDescriptor, + Mutation.class.getName())); + } + + try { + @SuppressWarnings("unchecked") + Coder coder = (Coder) HBaseRowMutationsCoder.of(); + return coder; + } catch (IllegalArgumentException e) { + throw new CannotProvideCoderException(e); + } + } + } + + private static final TypeDescriptor HBASE_ROW_MUTATIONS_TYPE_DESCRIPTOR = + new TypeDescriptor() {}; +} diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java new file mode 100644 index 000000000000..7786fe20e67e --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import java.io.IOException; +import java.io.Serializable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Static connection shared between all threads of a worker. Connectors are not persisted between + * worker machines as Connection serialization is not implemented. Each worker will create its own + * connection and share it between all its threads. + */ +public class HBaseSharedConnection implements Serializable { + private static final long serialVersionUID = 5252999807656940415L; + private static final Logger LOG = LoggerFactory.getLogger(HBaseSharedConnection.class); + + // Transient connection to be initialized per worker + // Wrap Connection in array because static Connection cannot be non-null in beam repo + private static Connection[] connection = new Connection[1]; + // Number of threads using the shared connection, close connection if connectionCount goes to 0 + private static int connectionCount; + + /** + * Create or return existing Hbase connection. + * + * @param configuration Hbase configuration + * @return Hbase connection + * @throws IOException + */ + public static synchronized Connection getOrCreate(Configuration configuration) + throws IOException { + if (connection[0] == null || connection[0].isClosed()) { + forceCreate(configuration); + } + connectionCount++; + return connection[0]; + } + + /** + * Forcibly create new connection. + * + * @param configuration + * @throws IOException + */ + public static synchronized void forceCreate(Configuration configuration) throws IOException { + connection[0] = ConnectionFactory.createConnection(configuration); + connectionCount = 0; + } + + /** + * Decrement connector count and close connection if no more connector is using it. + * + * @throws IOException + */ + public static synchronized void close() throws IOException { + connectionCount--; + if (connectionCount == 0) { + forceClose(); + } + if (connectionCount < 0) { + LOG.warn("Connection count at " + connectionCount + ", should not be possible"); + } + } + + /** + * Forcibly close connection. + * + * @throws IOException + */ + public static synchronized void forceClose() throws IOException { + if (connection != null) { + connection[0].close(); + connectionCount = 0; + } + } + + public String getDebugString() { + return String.format( + "Connection down: %s\n" + "Connectors: %s\n", + (connection[0] == null || connection[0].isClosed()), connectionCount); + } + + public int getConnectionCount() { + return connectionCount; + } +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java index 25369fc50a20..fcfc59dc8de3 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -40,4 +41,9 @@ public void testMutationCoderIsRegistered() throws Exception { CoderRegistry.createDefault().getCoder(Put.class); CoderRegistry.createDefault().getCoder(Delete.class); } + + @Test + public void testRowMutationsCoderIsRegistered() throws Exception { + CoderRegistry.createDefault().getCoder(RowMutations.class); + } } diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseIOWriteRowMutationsTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseIOWriteRowMutationsTest.java new file mode 100644 index 000000000000..87798ae8bb5d --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseIOWriteRowMutationsTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colQualifier; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colQualifier2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.rowKey; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.rowKey2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.timeT; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.value; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.value2; + +import java.io.IOException; +import org.apache.beam.sdk.io.hbase.utils.TestHBaseUtils; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Unit tests for Hbase row mutation IO. */ +@RunWith(JUnit4.class) +public class HbaseIOWriteRowMutationsTest { + private static final Logger LOG = LoggerFactory.getLogger(HbaseIOWriteRowMutationsTest.class); + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private static HBaseTestingUtility htu; + private static final Configuration conf = HBaseConfiguration.create(); + + public HbaseIOWriteRowMutationsTest() {} + + @BeforeClass + public static void setUpCluster() throws Exception { + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + // Try to bind the hostname to localhost to solve an issue when it is not configured or + // no DNS resolution available. + conf.setStrings("hbase.master.hostname", "localhost"); + conf.setStrings("hbase.regionserver.hostname", "localhost"); + + // Create an HBase test cluster with one table. + htu = new HBaseTestingUtility(); + // We don't use the full htu.startMiniCluster() to avoid starting unneeded HDFS/MR daemons + htu.startMiniZKCluster(); + MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 4); + hbm.waitForActiveAndReadyMaster(); + LOG.info("Hbase test cluster started."); + } + + @AfterClass + public static void tearDownCluster() throws Exception { + if (htu != null) { + htu.shutdownMiniHBaseCluster(); + htu.shutdownMiniZKCluster(); + htu.cleanupTestDir(); + htu = null; + } + } + + @Before + public void setUp() throws IOException, InterruptedException { + // Provide custom encoder to non-serializable RowMutations class. + pipeline + .getCoderRegistry() + .registerCoderForClass(RowMutations.class, HBaseRowMutationsCoder.of()); + } + + @Test + public void testWritesPuts() throws Exception { + + Table table = TestHBaseUtils.createTable(htu); + + // Write two cells in one row mutations object + RowMutations rowMutationsOnTwoColumnFamilies = new RowMutations(rowKey); + rowMutationsOnTwoColumnFamilies.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + rowMutationsOnTwoColumnFamilies.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily2, colQualifier2, value2, timeT)); + + // Two mutations on same cell, later one should overwrite earlier one + RowMutations overwritingRowMutations = new RowMutations(rowKey2); + overwritingRowMutations.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey2, colFamily, colQualifier, value, timeT)); + overwritingRowMutations.add( + // Overwrites previous mutation + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey2, colFamily, colQualifier, value2, timeT)); + + pipeline + .apply( + "Create row mutations", + Create.of( + KV.of(rowKey, rowMutationsOnTwoColumnFamilies), + KV.of(rowKey2, overwritingRowMutations))) + .apply( + "Write to hbase", + HBaseIO.writeRowMutations() + .withConfiguration(htu.getConfiguration()) + .withTableId(table.getName().getNameAsString())); + + pipeline.run().waitUntilFinish(); + + Assert.assertEquals(2, TestHBaseUtils.getRowResult(table, rowKey).size()); + Assert.assertArrayEquals(value, TestHBaseUtils.getCell(table, rowKey, colFamily, colQualifier)); + Assert.assertArrayEquals( + value2, TestHBaseUtils.getCell(table, rowKey, colFamily2, colQualifier2)); + + Assert.assertEquals(1, TestHBaseUtils.getRowResult(table, rowKey2).size()); + Assert.assertArrayEquals( + value2, TestHBaseUtils.getCell(table, rowKey2, colFamily, colQualifier)); + } + + @Test + public void testWritesDeletes() throws Exception { + Table table = TestHBaseUtils.createTable(htu); + + // Expect deletes to result in empty row. + RowMutations deleteCellMutation = new RowMutations(rowKey); + deleteCellMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + deleteCellMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createDelete(rowKey, colFamily, colQualifier, timeT)); + // Expect delete family to delete entire row. + RowMutations deleteColFamilyMutation = new RowMutations(rowKey2); + deleteColFamilyMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey2, colFamily, colQualifier, value, timeT)); + deleteColFamilyMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey2, colFamily, colQualifier2, value2, timeT)); + deleteColFamilyMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey2, colFamily, Long.MAX_VALUE)); + + pipeline + .apply( + "Create row mutations", + Create.of(KV.of(rowKey, deleteCellMutation), KV.of(rowKey2, deleteColFamilyMutation))) + .apply( + "Write to hbase", + HBaseIO.writeRowMutations() + .withConfiguration(htu.getConfiguration()) + .withTableId(table.getName().getNameAsString())); + + pipeline.run().waitUntilFinish(); + + Assert.assertTrue(TestHBaseUtils.getRowResult(table, rowKey).isEmpty()); + Assert.assertTrue(TestHBaseUtils.getRowResult(table, rowKey2).isEmpty()); + } + + @Test + public void testWritesDeletesThenPutsInOrderByTimestamp() throws Exception { + Table table = TestHBaseUtils.createTable(htu); + + // RowMutations entry ordering does not guarantee mutation ordering, as Hbase operations + // are ordered by timestamp. See https://issues.apache.org/jira/browse/HBASE-2256 + RowMutations putDeletePut = new RowMutations(rowKey); + putDeletePut.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + putDeletePut.add( + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT + 1)); + putDeletePut.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value2, timeT + 2)); + + pipeline + .apply("Create row mutations", Create.of(KV.of(rowKey, putDeletePut))) + .apply( + "Write to hbase", + HBaseIO.writeRowMutations() + .withConfiguration(htu.getConfiguration()) + .withTableId(table.getName().getNameAsString())); + + pipeline.run().waitUntilFinish(); + + Assert.assertArrayEquals( + value2, TestHBaseUtils.getCell(table, rowKey, colFamily, colQualifier)); + } +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java new file mode 100644 index 000000000000..5aac7c122679 --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colQualifier; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colQualifier2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.rowKey; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.timeT; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.value; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.beam.sdk.io.hbase.utils.HashUtils; +import org.apache.beam.sdk.io.hbase.utils.TestHBaseUtils; +import org.apache.beam.sdk.io.hbase.utils.TestHBaseUtils.HBaseMutationBuilder; +import org.apache.hadoop.hbase.client.RowMutations; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test that {@link HBaseRowMutationsCoder} encoding does not change {@link RowMutations} object. + */ +@RunWith(JUnit4.class) +public class RowMutationsCoderTest { + + private final HBaseRowMutationsCoder coder = HBaseRowMutationsCoder.of(); + private ByteArrayOutputStream outputStream; + private ByteArrayInputStream inputStream; + + @Before + public void setUp() { + outputStream = new ByteArrayOutputStream(); + } + + @After + public void tearDown() throws IOException { + outputStream.close(); + } + + @Test + public void testEncodePut() throws Exception { + RowMutations put = new RowMutations(rowKey); + put.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + coder.encode(put, outputStream); + + inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + + RowMutations decodedPut = coder.decode(inputStream); + + Assert.assertTrue(inputStream.available() == 0); + HashUtils.assertRowMutationsEquals(put, decodedPut); + } + + @Test + public void testEncodeMultipleMutations() throws Exception { + RowMutations multipleMutations = new RowMutations(rowKey); + multipleMutations.add( + HBaseMutationBuilder.createPut(rowKey, colFamily, colQualifier, value, timeT)); + multipleMutations.add( + TestHBaseUtils.HBaseMutationBuilder.createDelete(rowKey, colFamily, colQualifier, timeT)); + multipleMutations.add( + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); + coder.encode(multipleMutations, outputStream); + + inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + + RowMutations decodedMultipleMutations = coder.decode(inputStream); + + Assert.assertTrue(inputStream.available() == 0); + HashUtils.assertRowMutationsEquals(multipleMutations, decodedMultipleMutations); + } + + @Test + public void testEncodeMultipleRowMutations() throws Exception { + RowMutations put = new RowMutations(rowKey); + put.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + RowMutations deleteCols = new RowMutations(rowKey); + deleteCols.add( + TestHBaseUtils.HBaseMutationBuilder.createDelete(rowKey, colFamily, colQualifier, timeT)); + RowMutations deleteFamily = new RowMutations(rowKey); + deleteFamily.add( + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); + + coder.encode(put, outputStream); + coder.encode(deleteCols, outputStream); + coder.encode(deleteFamily, outputStream); + + inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + + RowMutations decodedPut = coder.decode(inputStream); + RowMutations decodedDeleteCols = coder.decode(inputStream); + RowMutations decodedDeleteFamily = coder.decode(inputStream); + + Assert.assertTrue(inputStream.available() == 0); + HashUtils.assertRowMutationsEquals(put, decodedPut); + HashUtils.assertRowMutationsEquals(deleteCols, decodedDeleteCols); + HashUtils.assertRowMutationsEquals(deleteFamily, decodedDeleteFamily); + } + + @Test + public void testEncodeMultipleComplexRowMutations() throws Exception { + RowMutations complexMutation = new RowMutations(rowKey); + complexMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + complexMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createDelete( + rowKey, colFamily2, colQualifier2, timeT + 1)); + complexMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); + + coder.encode(complexMutation, outputStream); + coder.encode(complexMutation, outputStream); + coder.encode(complexMutation, outputStream); + + inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + + RowMutations decodedComplexMutation = coder.decode(inputStream); + RowMutations decodedComplexMutation2 = coder.decode(inputStream); + RowMutations decodedComplexMutation3 = coder.decode(inputStream); + + Assert.assertTrue(inputStream.available() == 0); + HashUtils.assertRowMutationsEquals(complexMutation, decodedComplexMutation); + HashUtils.assertRowMutationsEquals(complexMutation, decodedComplexMutation2); + HashUtils.assertRowMutationsEquals(complexMutation, decodedComplexMutation3); + } +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java new file mode 100644 index 000000000000..3cc81c45c161 --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase.utils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; + +/** Utility functions to help assert equality between mutation lists for testing purposes. */ +public class HashUtils { + + /** + * Asserts two {@link RowMutations} objects are equal by rowkey and list of {@link Mutation}. + * + * @param rowMutationA + * @param rowMutationB + * @throws Exception if hash function fails + */ + public static void assertRowMutationsEquals(RowMutations rowMutationA, RowMutations rowMutationB) + throws Exception { + if (rowMutationA == null || rowMutationB == null) { + Assert.assertEquals(rowMutationA, rowMutationB); + } + Assert.assertTrue(Bytes.equals(rowMutationA.getRow(), rowMutationB.getRow())); + Assert.assertEquals( + hashMutationList(rowMutationA.getMutations()), + hashMutationList(rowMutationB.getMutations())); + } + + /** + * Hashes list of {@link Mutation} into String, by iterating through Mutation {@link Cell} and + * picking out relevant attributes for comparison. + * + *

Different mutation types may have different hashing treatments. + * + * @param mutationList + * @return list of mutation strings that can be compared to other hashed mutation lists. + */ + public static List hashMutationList(List mutationList) throws Exception { + List mutations = new ArrayList<>(); + for (Mutation mutation : mutationList) { + List cells = new ArrayList<>(); + + CellScanner scanner = mutation.cellScanner(); + while (scanner.advance()) { + Cell c = scanner.current(); + String mutationType = ""; + long ts = 0; + + if (KeyValue.Type.codeToType(c.getTypeByte()).equals(KeyValue.Type.DeleteFamily)) { + // DeleteFamily has its timestamp created at runtime and cannot be compared with accuracy + // during tests, so we remove the timestamp altogether. + mutationType = "DELETE_FAMILY"; + ts = 0L; + } else if (KeyValue.Type.codeToType(c.getTypeByte()).equals(KeyValue.Type.DeleteColumn)) { + mutationType = "DELETE_COLUMN"; + ts = c.getTimestamp(); + } else if (KeyValue.Type.codeToType(c.getTypeByte()).equals(KeyValue.Type.Put)) { + mutationType = "PUT"; + ts = c.getTimestamp(); + } else { + throw new Exception("hashMutationList error: Cell type is not supported."); + } + + String cellHash = + String.join( + "_", + mutationType, + Long.toString(ts), + Bytes.toString(CellUtil.cloneRow(c)), + Bytes.toString(CellUtil.cloneFamily(c)), + Bytes.toString(CellUtil.cloneQualifier(c)), + Bytes.toString(CellUtil.cloneValue(c))); + + cells.add(cellHash); + } + + mutations.add(String.join(" > ", cells)); + } + + return mutations; + } + + /** + * {@link RowMutations} assert equality on rowkey only and does not guarantee that its mutations + * are the same nor that they are in the same order. + * + *

This transform splits a RowMutations object into so + * that two RowMutations objects can be compared via {@link org.apache.beam.sdk.testing.PAssert}. + */ + public static class HashHbaseRowMutations + extends PTransform< + PCollection>, PCollection>>> { + + @Override + public PCollection>> expand( + PCollection> input) { + return input.apply(ParDo.of(new HashHbaseRowMutationsFn())); + } + } + + static class HashHbaseRowMutationsFn + extends DoFn, KV>> { + + public HashHbaseRowMutationsFn() {} + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + RowMutations rowMutations = c.element().getValue(); + + if (!Bytes.equals(c.element().getKey(), rowMutations.getRow())) { + throw new Exception("Hash error, KV rowkey is not the same as rowMutations rowkey"); + } + + c.output( + KV.of( + Bytes.toString(rowMutations.getRow()), + hashMutationList(rowMutations.getMutations()))); + } + } +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java new file mode 100644 index 000000000000..a6e01245dc94 --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase.utils; + +import java.nio.charset.StandardCharsets; + +/** Constants used for testing purposes. */ +public class TestConstants { + // Base timestamp, assumed to be in milliseconds. + public static long timeT = 123456000; + + public static byte[] rowKey = "row-key-1".getBytes(StandardCharsets.UTF_8); + + public static byte[] colFamily = "cf".getBytes(StandardCharsets.UTF_8); + public static byte[] colQualifier = "col1".getBytes(StandardCharsets.UTF_8); + public static byte[] value = "val-1".getBytes(StandardCharsets.UTF_8); + + public static byte[] rowKey2 = "row-key-2".getBytes(StandardCharsets.UTF_8); + public static byte[] colFamily2 = "cf2".getBytes(StandardCharsets.UTF_8); + public static byte[] colQualifier2 = "col2".getBytes(StandardCharsets.UTF_8); + public static byte[] value2 = "long-value-2".getBytes(StandardCharsets.UTF_8); + + // Variables for bidirectional replication. + public static String cbtQualifier = "SOURCE_CBT"; + public static String hbaseQualifier = "SOURCE_HBASE"; + + // Bigtable change stream constants. + public static String testCluster = "test-cluster-1"; + public static String testToken = "test-token"; +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestHBaseUtils.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestHBaseUtils.java new file mode 100644 index 000000000000..72751fcb6801 --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestHBaseUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase.utils; + +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily2; + +import java.io.IOException; +import java.util.UUID; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; + +/** Hbase-related convenience functions. */ +public class TestHBaseUtils { + + public static byte[] getCell(Table table, byte[] rowKey, byte[] colFamily, byte[] colQualifier) + throws IOException { + return getRowResult(table, rowKey).getValue(colFamily, colQualifier); + } + + public static Result getRowResult(Table table, byte[] rowKey) throws IOException { + return table.get(new Get(rowKey)); + } + + public static Table createTable(HBaseTestingUtility htu) throws IOException { + return createTable(htu, UUID.randomUUID().toString()); + } + + public static Table createTable(HBaseTestingUtility htu, String name) throws IOException { + TableName tableName = TableName.valueOf(name); + return htu.createTable( + tableName, new String[] {Bytes.toString(colFamily), Bytes.toString(colFamily2)}); + } + + /** Builder class for Hbase mutations. */ + public static class HBaseMutationBuilder { + + public static Put createPut( + byte[] rowKey, byte[] colFamily, byte[] colQualifier, byte[] value, long atTimestamp) { + return new Put(rowKey, atTimestamp).addColumn(colFamily, colQualifier, value); + } + + public static Delete createDelete( + byte[] rowKey, byte[] colFamily, byte[] colQualifier, long atTimestamp) { + return new Delete(rowKey, atTimestamp).addColumns(colFamily, colQualifier); + } + + public static Delete createDeleteFamily(byte[] rowKey, byte[] colFamily, long atTimestamp) { + return new Delete(rowKey, atTimestamp).addFamily(colFamily); + } + } +}