From 5221333ca958adcd64491e90ccabb3c404fb35bd Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 9 Aug 2024 11:37:49 +0200 Subject: [PATCH 1/6] Refactor bigtable API to use v2 client --- build.sbt | 4 - .../spotify/scio/bigtable/BigtableIT.scala | 120 +++---- .../scio/examples/extra/BigtableExample.scala | 9 +- .../scio/bigtable/BigtableBulkWriter.java | 220 ------------ .../spotify/scio/bigtable/BigtableDoFn.java | 39 ++- .../spotify/scio/bigtable/BigtableUtil.java | 166 --------- .../scio/bigtable/ChannelPoolCreator.java | 59 ---- .../gcp/bigtable/BigtableServiceHelper.java | 48 --- .../com/spotify/scio/bigtable/Admin.scala | 271 +++++++++++++++ .../spotify/scio/bigtable/BigTableIO.scala | 173 +++++---- .../spotify/scio/bigtable/TableAdmin.scala | 297 ---------------- .../bigtable/syntax/SCollectionSyntax.scala | 45 ++- .../bigtable/syntax/ScioContextSyntax.scala | 328 +++--------------- .../scio/bigtable/BigtableBulkWriterTest.java | 200 ----------- .../scio/bigtable/BigtableUtilTest.java | 33 -- .../scio/bigtable/BigtableDoFnTest.scala | 17 +- 16 files changed, 518 insertions(+), 1511 deletions(-) delete mode 100644 scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBulkWriter.java delete mode 100644 scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableUtil.java delete mode 100644 scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/ChannelPoolCreator.java delete mode 100644 scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceHelper.java create mode 100644 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala delete mode 100644 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/TableAdmin.scala delete mode 100644 scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableBulkWriterTest.java delete mode 100644 scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableUtilTest.java diff --git a/build.sbt b/build.sbt index d70772b0a1..682d18f1ce 100644 --- a/build.sbt +++ b/build.sbt @@ -36,7 +36,6 @@ val beamVersion = "2.58.0" val autoServiceVersion = "1.0.1" val autoValueVersion = "1.9" val bigdataossVersion = "2.2.16" -val bigtableClientVersion = "1.28.0" val commonsCodecVersion = "1.17.0" val commonsCompressVersion = "1.26.2" val commonsIoVersion = "2.16.1" @@ -967,7 +966,6 @@ lazy val `scio-google-cloud-platform` = project "com.google.api-client" % "google-api-client" % gcpBom.key.value, "com.google.api.grpc" % "grpc-google-cloud-pubsub-v1" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-bigquerystorage-v1beta1" % gcpBom.key.value, - "com.google.api.grpc" % "proto-google-cloud-bigtable-admin-v2" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-datastore-v1" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % gcpBom.key.value, @@ -979,8 +977,6 @@ lazy val `scio-google-cloud-platform` = project "com.google.cloud" % "google-cloud-core" % gcpBom.key.value, "com.google.cloud" % "google-cloud-spanner" % gcpBom.key.value, "com.google.cloud.bigdataoss" % "util" % bigdataossVersion, - "com.google.cloud.bigtable" % "bigtable-client-core" % bigtableClientVersion, - "com.google.cloud.bigtable" % "bigtable-client-core-config" % bigtableClientVersion, "com.google.guava" % "guava" % guavaVersion, "com.google.http-client" % "google-http-client" % gcpBom.key.value, "com.google.http-client" % "google-http-client-gson" % gcpBom.key.value, diff --git a/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala b/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala index 29e0407d3c..779227770f 100644 --- a/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala @@ -18,11 +18,8 @@ package com.spotify.scio.bigtable import java.util.UUID - -import com.google.bigtable.admin.v2.{DeleteTableRequest, GetTableRequest, ListTablesRequest} import com.google.bigtable.v2.{Mutation, Row, RowFilter} -import com.google.cloud.bigtable.config.BigtableOptions -import com.google.cloud.bigtable.grpc._ +import com.google.cloud.bigtable.admin.v2.{BigtableInstanceAdminClient, BigtableTableAdminClient} import com.google.protobuf.ByteString import com.spotify.scio._ import com.spotify.scio.testing._ @@ -41,12 +38,6 @@ object BigtableIT { def testData(id: String): Seq[(String, Long)] = Seq((s"$id-key1", 1L), (s"$id-key2", 2L), (s"$id-key3", 3L)) - val bigtableOptions: BigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - val FAMILY_NAME: String = "count" val COLUMN_QUALIFIER: ByteString = ByteString.copyFromUtf8("long") @@ -67,12 +58,6 @@ object BigtableIT { def fromRow(r: Row): (String, Long) = (r.getKey.toStringUtf8, r.getValue(FAMILY_NAME, COLUMN_QUALIFIER).get.toStringUtf8.toLong) - - def listTables(client: BigtableTableAdminGrpcClient): Set[String] = { - val instancePath = s"projects/$projectId/instances/$instanceId" - val tables = client.listTables(ListTablesRequest.newBuilder().setParent(instancePath).build) - tables.getTablesList.asScala.map(t => new BigtableTableName(t.getName).getTableId).toSet - } } class BigtableIT extends PipelineSpec { @@ -80,18 +65,22 @@ class BigtableIT extends PipelineSpec { // "Update number of bigtable nodes" should "work" in { ignore should "update number of bigtable nodes" in { - val bt = new BigtableClusterUtilities(bigtableOptions) - val sc = ScioContext() - sc.updateNumberOfBigtableNodes(projectId, instanceId, 4, Duration.standardSeconds(10)) - sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 4 - bt.getClusterNodeCount(clusterId, zoneId) shouldBe 4 - sc.updateNumberOfBigtableNodes(projectId, instanceId, 3, Duration.standardSeconds(10)) - sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 3 - bt.getClusterNodeCount(clusterId, zoneId) shouldBe 3 + val client = BigtableInstanceAdminClient.create(projectId) + try { + val sc = ScioContext() + sc.updateNumberOfBigtableNodes(projectId, instanceId, 4, Duration.standardSeconds(10)) + sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 4 + client.getCluster(clusterId, zoneId).getServeNodes shouldBe 4 + sc.updateNumberOfBigtableNodes(projectId, instanceId, 3, Duration.standardSeconds(10)) + sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 3 + client.getCluster(clusterId, zoneId).getServeNodes shouldBe 3 + } finally { + client.close() + } } "BigtableIO" should "work in default mode" in { - TableAdmin.ensureTables(bigtableOptions, Map(tableId -> List(FAMILY_NAME))) + Admin.Table.ensureTable(projectId, instanceId, tableId, List(FAMILY_NAME)) val id = testId() val data = testData(id) try { @@ -126,12 +115,7 @@ class BigtableIT extends PipelineSpec { } it should "work in bulk mode" in { - TableAdmin.ensureTables(bigtableOptions, Map(tableId -> List(FAMILY_NAME))) - val options = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build() + Admin.Table.ensureTable(projectId, instanceId, tableId, List(FAMILY_NAME)) val id = testId() val data = testData(id) @@ -140,7 +124,7 @@ class BigtableIT extends PipelineSpec { runWithRealContext() { sc => sc .parallelize(data.map(kv => toWriteMutation(kv._1, kv._2))) - .saveAsBigtable(options, tableId, 1) + .saveAsBigtable(projectId, instanceId, tableId) }.waitUntilDone() // Read rows back @@ -166,48 +150,48 @@ class BigtableIT extends PipelineSpec { }.waitUntilFinish() } - "TableAdmin" should "work" in { + "Admin.Table" should "work" in { val id = testId() val tables = Map( s"scio-bigtable-empty-table-$id" -> List(), s"scio-bigtable-one-cf-table-$id" -> List("colfam1"), s"scio-bigtable-two-cf-table-$id" -> List("colfam1", "colfam2") ) - val channel = ChannelPoolCreator.createPool(bigtableOptions) - val executorService = BigtableSessionSharedThreadPools.getInstance().getRetryExecutor - val client = new BigtableTableAdminGrpcClient(channel, executorService, bigtableOptions) - val instancePath = s"projects/$projectId/instances/$instanceId" - val tableIds = tables.keys.toSet - def tablePath(table: String): String = s"$instancePath/tables/$table" - def deleteTable(table: String): Unit = - client.deleteTable(DeleteTableRequest.newBuilder().setName(tablePath(table)).build) - - // Delete any tables that could be left around from previous IT run. - val oldTables = listTables(client).intersect(tableIds) - oldTables.foreach(deleteTable) - - // Ensure that the tables don't exist now - listTables(client).intersect(tableIds) shouldBe empty - - // Run UUT - TableAdmin.ensureTables(bigtableOptions, tables) - - // Tables must exist - listTables(client).intersect(tableIds) shouldEqual tableIds - - // Assert Column families exist - for ((table, columnFamilies) <- tables) { - val tableInfo = client.getTable( - GetTableRequest - .newBuilder() - .setName(tablePath(table)) - .build - ) - val actualColumnFamilies = tableInfo.getColumnFamiliesMap.asScala.keys - actualColumnFamilies should contain theSameElementsAs columnFamilies - } - // Clean up and delete - tables.keys.foreach(deleteTable) + val client = BigtableTableAdminClient.create(projectId, instanceId) + try { + val tableIds = tables.keys.toSet + + // Delete any tables that could be left around from previous IT run. + client + .listTables() + .asScala + .filterNot(tableIds.contains) + .foreach(client.deleteTable) + + // Ensure that the tables don't exist now + client.listTables().asScala.toSet.intersect(tableIds) shouldBe empty + + // Run UUT + tables.foreach { case (tableId, cfs) => + Admin.Table.ensureTable(projectId, instanceId, tableId, cfs) + } + + // Tables must exist + client.listTables().asScala should contain allElementsOf tableIds + + // Assert Column families exist + tables.foreach { case (id, columnFamilies) => + val table = client.getTable(id) + val actualFamilies = table.getColumnFamilies.asScala.map(_.getId) + + actualFamilies should contain theSameElementsAs columnFamilies + } + + // Clean up and delete + tableIds.foreach(client.deleteTable) + } finally { + client.close() + } } } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala index 1ef57e301b..1187e04b7b 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala @@ -72,13 +72,12 @@ object BigtableWriteExample { // before the ingestion starts. sc.updateNumberOfBigtableNodes(btProjectId, btInstanceId, 15) - // Ensure that destination tables and column families exist - sc.ensureTables( + // Ensure that destination table and column families exist + sc.ensureTable( btProjectId, btInstanceId, - Map( - btTableId -> List(BigtableExample.FAMILY_NAME) - ) + btTableId, + List(BigtableExample.FAMILY_NAME) ) sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR)) diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBulkWriter.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBulkWriter.java deleted file mode 100644 index b8eb71eac1..0000000000 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBulkWriter.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright 2018 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable; - -import com.google.bigtable.v2.Mutation; -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceHelper; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BigtableBulkWriter - extends PTransform>>, PDone> { - - private static final Logger LOG = LoggerFactory.getLogger(BigtableBulkWriter.class); - - private final BigtableOptions bigtableOptions; - private final String tableName; - private final int numOfShards; - private final Duration flushInterval; - - public BigtableBulkWriter( - final String tableName, - final BigtableOptions bigtableOptions, - final int numOfShards, - final Duration flushInterval) { - this.bigtableOptions = bigtableOptions; - this.tableName = tableName; - this.numOfShards = numOfShards; - this.flushInterval = flushInterval; - } - - @Override - public PDone expand(PCollection>> input) { - createBulkShards(input, numOfShards, flushInterval) - .apply("Bigtable BulkWrite", ParDo.of(new BigtableBulkWriterFn())); - return PDone.in(input.getPipeline()); - } - - @VisibleForTesting - static PCollection>>> createBulkShards( - final PCollection>> input, - final int numOfShards, - final Duration flushInterval) { - return input - .apply("Assign To Shard", ParDo.of(new AssignToShard(numOfShards))) - .apply( - "Window", - Window.>>>into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(flushInterval))) - .discardingFiredPanes() - .withAllowedLateness(Duration.ZERO)) - .apply("Group By Shard", GroupByKey.create()) - .apply( - "Gets Mutations", - ParDo.of( - new DoFn< - KV>>>, - Iterable>>>() { - @ProcessElement - public void process( - @Element KV>>> element, - OutputReceiver>>> out) { - out.output(element.getValue()); - } - })); - } - - private class BigtableBulkWriterFn - extends DoFn>>, Void> { - - private BigtableServiceHelper.Writer bigtableWriter; - private long recordsWritten; - private final ConcurrentLinkedQueue failures; - - public BigtableBulkWriterFn() { - this.failures = new ConcurrentLinkedQueue<>(); - } - - @StartBundle - public void startBundle(StartBundleContext c) throws IOException { - bigtableWriter = - new BigtableServiceHelper(bigtableOptions, c.getPipelineOptions()) - .openForWriting(tableName); - recordsWritten = 0; - } - - @ProcessElement - public void processElement(@Element Iterable>> element) - throws Exception { - checkForFailures(failures); - for (KV> r : element) { - bigtableWriter - .writeRecord(r) - .whenComplete( - (mutationResult, exception) -> { - if (exception != null) { - failures.add(new BigtableWriteException(r, exception)); - } - }); - ++recordsWritten; - } - } - - @FinishBundle - public void finishBundle() throws Exception { - // close the writer and wait for all writes to complete - bigtableWriter.close(); - checkForFailures(failures); - LOG.debug("Wrote {} records", recordsWritten); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("Records Written", recordsWritten)); - } - - /** If any write has asynchronously failed, fail the bundle with a useful error. */ - private void checkForFailures(final ConcurrentLinkedQueue failures) - throws IOException { - // Note that this function is never called by multiple threads and is the only place that - // we remove from failures, so this code is safe. - if (failures.isEmpty()) { - return; - } - - StringBuilder logEntry = new StringBuilder(); - int i = 0; - List suppressed = new ArrayList<>(); - for (; i < 10 && !failures.isEmpty(); ++i) { - BigtableWriteException exc = failures.remove(); - logEntry.append("\n").append(exc.getMessage()); - if (exc.getCause() != null) { - logEntry.append(": ").append(exc.getCause().getMessage()); - } - suppressed.add(exc); - } - String message = - String.format( - "At least %d errors occurred writing to Bigtable. First %d errors: %s", - i + failures.size(), i, logEntry.toString()); - LOG.error(message); - IOException exception = new IOException(message); - for (BigtableWriteException e : suppressed) { - exception.addSuppressed(e); - } - throw exception; - } - - /** An exception that puts information about the failed record being written in its message. */ - class BigtableWriteException extends IOException { - - public BigtableWriteException( - final KV> record, Throwable cause) { - super( - String.format( - "Error mutating row %s with mutations %s", - record.getKey().toStringUtf8(), record.getValue()), - cause); - } - } - } - - static class AssignToShard - extends DoFn< - KV>, KV>>> { - - private final int numOfShards; - - AssignToShard(final int numOfShards) { - this.numOfShards = numOfShards; - } - - @ProcessElement - public void processElement( - @Element KV> element, - OutputReceiver>>> out) { - // assign this element to a random shard - final long shard = ThreadLocalRandom.current().nextLong(numOfShards); - out.output(KV.of(shard, element)); - } - } -} diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java index 0248693fc8..33b8e7524e 100644 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java +++ b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java @@ -17,12 +17,13 @@ package com.spotify.scio.bigtable; -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.grpc.BigtableSession; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.common.util.concurrent.ListenableFuture; import com.spotify.scio.transforms.BaseAsyncLookupDoFn; import com.spotify.scio.transforms.GuavaAsyncLookupDoFn; import java.io.IOException; +import java.util.function.Supplier; import org.apache.beam.sdk.transforms.DoFn; /** @@ -31,53 +32,53 @@ * @param input element type. * @param Bigtable lookup value type. */ -public abstract class BigtableDoFn extends GuavaAsyncLookupDoFn { +public abstract class BigtableDoFn extends GuavaAsyncLookupDoFn { - private final BigtableOptions options; + private final Supplier settingsSupplier; /** Perform asynchronous Bigtable lookup. */ - public abstract ListenableFuture asyncLookup(BigtableSession session, A input); + public abstract ListenableFuture asyncLookup(BigtableDataClient client, A input); /** * Create a {@link BigtableDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. */ - public BigtableDoFn(BigtableOptions options) { - this(options, 1000); + public BigtableDoFn(Supplier settingsSupplier) { + this(settingsSupplier, 1000); } /** * Create a {@link BigtableDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. * @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This * prevents runner from timing out and retrying bundles. */ - public BigtableDoFn(BigtableOptions options, int maxPendingRequests) { - this(options, maxPendingRequests, new BaseAsyncLookupDoFn.NoOpCacheSupplier<>()); + public BigtableDoFn(Supplier settingsSupplier, int maxPendingRequests) { + this(settingsSupplier, maxPendingRequests, new BaseAsyncLookupDoFn.NoOpCacheSupplier<>()); } /** * Create a {@link BigtableDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. * @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This * prevents runner from timing out and retrying bundles. * @param cacheSupplier supplier for lookup cache. */ public BigtableDoFn( - BigtableOptions options, + Supplier settingsSupplier, int maxPendingRequests, BaseAsyncLookupDoFn.CacheSupplier cacheSupplier) { super(maxPendingRequests, cacheSupplier); - this.options = options; + this.settingsSupplier = settingsSupplier; } /** * Create a {@link BigtableDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. * @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This * prevents runner from timing out and retrying bundles. * @param deduplicate if an attempt should be made to de-duplicate simultaneous requests for the @@ -85,12 +86,12 @@ public BigtableDoFn( * @param cacheSupplier supplier for lookup cache. */ public BigtableDoFn( - BigtableOptions options, + Supplier settingsSupplier, int maxPendingRequests, boolean deduplicate, BaseAsyncLookupDoFn.CacheSupplier cacheSupplier) { super(maxPendingRequests, deduplicate, cacheSupplier); - this.options = options; + this.settingsSupplier = settingsSupplier; } @Override @@ -99,9 +100,9 @@ public ResourceType getResourceType() { return ResourceType.PER_INSTANCE; } - protected BigtableSession newClient() { + protected BigtableDataClient newClient() { try { - return new BigtableSession(options); + return BigtableDataClient.create(settingsSupplier.get()); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableUtil.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableUtil.java deleted file mode 100644 index 752dc41976..0000000000 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableUtil.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright 2016 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable; - -import com.google.bigtable.admin.v2.Cluster; -import com.google.bigtable.admin.v2.ListClustersRequest; -import com.google.bigtable.admin.v2.ListClustersResponse; -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.grpc.BigtableClusterUtilities; -import com.google.cloud.bigtable.grpc.BigtableInstanceClient; -import com.google.cloud.bigtable.grpc.BigtableInstanceGrpcClient; -import com.google.cloud.bigtable.grpc.io.ChannelPool; -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import org.joda.time.Duration; -import org.joda.time.format.PeriodFormatter; -import org.joda.time.format.PeriodFormatterBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Utilities to deal with Bigtable. */ -public final class BigtableUtil { - - private BigtableUtil() {} - - private static final Logger LOG = LoggerFactory.getLogger(BigtableUtil.class); - - private static final PeriodFormatter formatter = - new PeriodFormatterBuilder() - .appendDays() - .appendSuffix("d") - .appendHours() - .appendSuffix("h") - .appendMinutes() - .appendSuffix("m") - .appendSeconds() - .appendSuffix("s") - .toFormatter(); - - /** - * Updates all clusters within the specified Bigtable instance to a specified number of nodes. - * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the - * end to lower costs yet still get high throughput during bulk ingests/dumps. - * - * @param bigtableOptions Bigtable Options - * @param numberOfNodes New number of nodes in the cluster - * @param sleepDuration How long to sleep after updating the number of nodes. Google recommends at - * least 20 minutes before the new nodes are fully functional - * @throws IOException If setting up channel pool fails - * @throws InterruptedException If sleep fails - */ - public static void updateNumberOfBigtableNodes( - final BigtableOptions bigtableOptions, final int numberOfNodes, final Duration sleepDuration) - throws IOException, InterruptedException { - updateNumberOfBigtableNodes( - bigtableOptions, numberOfNodes, sleepDuration, Collections.emptySet()); - } - - /** - * Updates all clusters within the specified Bigtable instance to a specified number of nodes. - * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the - * end to lower costs yet still get high throughput during bulk ingests/dumps. - * - * @param bigtableOptions Bigtable Options - * @param numberOfNodes New number of nodes in the cluster - * @param sleepDuration How long to sleep after updating the number of nodes. Google recommends at - * least 20 minutes before the new nodes are fully functional - * @param clusterNames Names of clusters to be updated, all if empty - * @throws IOException If setting up channel pool fails - * @throws InterruptedException If sleep fails - */ - public static void updateNumberOfBigtableNodes( - final BigtableOptions bigtableOptions, - final int numberOfNodes, - final Duration sleepDuration, - final Set clusterNames) - throws IOException, InterruptedException { - final ChannelPool channelPool = ChannelPoolCreator.createPool(bigtableOptions); - - try { - final BigtableInstanceClient bigtableInstanceClient = - new BigtableInstanceGrpcClient(channelPool); - - final String instanceName = bigtableOptions.getInstanceName().toString(); - - // Fetch clusters in Bigtable instance - final ListClustersRequest clustersRequest = - ListClustersRequest.newBuilder().setParent(instanceName).build(); - final ListClustersResponse clustersResponse = - bigtableInstanceClient.listCluster(clustersRequest); - final List clustersToUpdate = - clusterNames.isEmpty() - ? clustersResponse.getClustersList() - : clustersResponse.getClustersList().stream() - .filter(c -> clusterNames.contains(shorterName(c.getName()))) - .collect(Collectors.toList()); - - // For each cluster update the number of nodes - for (Cluster cluster : clustersToUpdate) { - final Cluster updatedCluster = - Cluster.newBuilder().setName(cluster.getName()).setServeNodes(numberOfNodes).build(); - LOG.info("Updating number of nodes to {} for cluster {}", numberOfNodes, cluster.getName()); - bigtableInstanceClient.updateCluster(updatedCluster); - } - - // Wait for the new nodes to be provisioned - if (sleepDuration.getMillis() > 0) { - LOG.info("Sleeping for {} after update", formatter.print(sleepDuration.toPeriod())); - Thread.sleep(sleepDuration.getMillis()); - } - } finally { - channelPool.shutdownNow(); - } - } - - /** - * Get size of all clusters for specified Bigtable instance. - * - * @param projectId GCP projectId - * @param instanceId Bigtable instanceId - * @return map of clusterId to its number of nodes - * @throws IOException If setting up channel pool fails - * @throws GeneralSecurityException If security-related exceptions occurs - */ - public static Map getClusterSizes( - final String projectId, final String instanceId) - throws IOException, GeneralSecurityException { - try (BigtableClusterUtilities clusterUtil = - BigtableClusterUtilities.forInstance(projectId, instanceId)) { - return Collections.unmodifiableMap( - clusterUtil.getClusters().getClustersList().stream() - .collect( - Collectors.toMap( - cn -> cn.getName().substring(cn.getName().indexOf("/clusters/") + 10), - Cluster::getServeNodes))); - } - } - - static String shorterName(String name) { - if (name.lastIndexOf('/') != -1) { - return name.substring(name.lastIndexOf('/') + 1, name.length()); - } else { - return name; - } - } -} diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/ChannelPoolCreator.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/ChannelPoolCreator.java deleted file mode 100644 index 3303c1d6eb..0000000000 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/ChannelPoolCreator.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2016 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable; - -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.grpc.BigtableSession; -import com.google.cloud.bigtable.grpc.io.ChannelPool; -import com.google.cloud.bigtable.grpc.io.CredentialInterceptorCache; -import io.grpc.ClientInterceptor; -import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ChannelPoolCreator { - private static final Logger LOG = LoggerFactory.getLogger(ChannelPoolCreator.class); - - private static ClientInterceptor[] getClientInterceptors(final BigtableOptions options) { - try { - final ClientInterceptor interceptor = - CredentialInterceptorCache.getInstance() - .getCredentialsInterceptor(options.getCredentialOptions()); - // If credentials are unset (i.e. via local emulator), CredentialsInterceptor will return null - if (interceptor == null) { - return new ClientInterceptor[] {}; - } else { - return new ClientInterceptor[] {interceptor}; - } - } catch (Exception e) { - LOG.error( - "Failed to get credentials interceptor. No interceptor will be used for the channel.", e); - return new ClientInterceptor[] {}; - } - } - - public static ChannelPool createPool(final BigtableOptions options) throws IOException { - final ClientInterceptor[] interceptors = getClientInterceptors(options); - - return new ChannelPool( - () -> - BigtableSession.createNettyChannel( - options.getAdminHost(), options, false, interceptors), - 1); - } -} diff --git a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceHelper.java b/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceHelper.java deleted file mode 100644 index dc185b6f51..0000000000 --- a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceHelper.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2018 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigtable; - -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.data.v2.BigtableDataSettings; -import java.io.IOException; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; - -/** Wrap {@link BigtableServiceImpl} and expose package private methods. */ -public class BigtableServiceHelper extends BigtableServiceImpl { - private static final BigtableConfig EMPTY_CONFIG = - BigtableConfig.builder().setValidate(true).build(); - - public BigtableServiceHelper(BigtableOptions bigtableOptions, PipelineOptions pipelineOptions) - throws IOException { - super(translateToVeneerSettings(bigtableOptions, pipelineOptions)); - } - - public Writer openForWriting(String tableId) { - BigtableWriteOptions options = - BigtableWriteOptions.builder().setTableId(StaticValueProvider.of(tableId)).build(); - return openForWriting(options); - } - - private static BigtableDataSettings translateToVeneerSettings( - BigtableOptions bigtableOptions, PipelineOptions pipelineOptions) throws IOException { - final BigtableConfig config = - BigtableConfigTranslator.translateToBigtableConfig(EMPTY_CONFIG, bigtableOptions); - return BigtableConfigTranslator.translateToVeneerSettings(config, pipelineOptions); - } -} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala new file mode 100644 index 0000000000..4976a1884f --- /dev/null +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala @@ -0,0 +1,271 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.bigtable + +import com.google.cloud.bigtable.admin.v2.models.GCRules.GCRule +import com.google.cloud.bigtable.admin.v2.models.{ + CreateTableRequest, + GCRules, + ModifyColumnFamiliesRequest +} +import com.google.cloud.bigtable.admin.v2.{BigtableInstanceAdminClient, BigtableTableAdminClient} +import org.joda.time.Duration +import org.slf4j.{Logger, LoggerFactory} + +import java.util.concurrent.TimeUnit + +import scala.collection.concurrent +import scala.collection.concurrent.TrieMap +import scala.jdk.CollectionConverters._ + +/** + * Bigtable Table Admin API helper commands. + * + * Caches Bigtable clients and exposes basic operations + */ +object Admin { + + private val logger: Logger = LoggerFactory.getLogger(classOf[Admin.type]) + sys.addShutdownHook { + logger.info("Shutting down Bigtable clients") + Instance.clients.values.foreach(_.close()) + Table.clients.values.foreach(_.close()) + } + + object Table { + + sealed trait CreateDisposition + + object CreateDisposition { + case object Never extends CreateDisposition + + case object CreateIfNeeded extends CreateDisposition + + val default: CreateDisposition = CreateIfNeeded + } + + private[bigtable] val clients: concurrent.Map[(String, String), BigtableTableAdminClient] = + TrieMap.empty + private def getOrCreateClient( + projectId: String, + instanceId: String + ): BigtableTableAdminClient = { + val key = (projectId, instanceId) + clients.getOrElseUpdate( + key, + BigtableTableAdminClient.create(projectId, instanceId) + ) + } + + /** + * Retrieves a set of tables from the given instancePath. + * + * @param client + * Client for calling Bigtable. + * @return + */ + private def fetchTables(client: BigtableTableAdminClient): Set[String] = + client.listTables().asScala.toSet + + /** + * Ensure that tables and column families exist. Checks for existence of tables or creates them + * if they do not exist. Also checks for existence of column families within each table and + * creates them if they do not exist. + * + * @param tablesAndColumnFamilies + * A map of tables ids and column families. Values are a list of column family names. + */ + def ensureTable( + projectId: String, + instanceId: String, + tableId: String, + columnFamilies: Iterable[String], + createDisposition: CreateDisposition = CreateDisposition.default + ): Unit = { + val tcf = columnFamilies.map(cf => cf -> None) + ensureTableImpl(projectId, instanceId, tableId, tcf, createDisposition) + } + + /** + * Ensure that tables and column families exist. Checks for existence of tables or creates them + * if they do not exist. Also checks for existence of column families within each table and + * creates them if they do not exist. + * + * @param tablesAndColumnFamilies + * A map of table Ids and column families. Values are a list of column family names along with + * the desired cell expiration. Cell expiration is the duration before which garbage + * collection of a cell may occur. Note: minimum granularity is one second. + */ + def ensureTablesWithExpiration( + projectId: String, + instanceId: String, + tableId: String, + columnFamilies: Iterable[(String, Option[Duration])], + createDisposition: CreateDisposition = CreateDisposition.default + ): Unit = { + // Convert Duration to GcRule + val x = columnFamilies.map { case (columnFamily, duration) => + (columnFamily, duration.map(maxAgeGcRule)) + } + ensureTableImpl(projectId, instanceId, tableId, x, createDisposition) + } + + private def maxAgeGcRule(duration: Duration): GCRule = + GCRules.GCRULES.maxAge(duration.getStandardSeconds, TimeUnit.SECONDS) + + /** + * Ensure that tables and column families exist. Checks for existence of tables or creates them + * if they do not exist. Also checks for existence of column families within each table and + * creates them if they do not exist. + * + * @param tablesAndColumnFamilies + * A map of tables Id and column families. Values are a list of column family names along with + * the desired GcRule. + */ + def ensureTableWithGcRules( + projectId: String, + instanceId: String, + tableId: String, + columnFamilies: Iterable[(String, Option[GCRule])], + createDisposition: CreateDisposition = CreateDisposition.default + ): Unit = ensureTableImpl(projectId, instanceId, tableId, columnFamilies, createDisposition) + + /** + * Ensure that tables and column families exist. Checks for existence of tables or creates them + * if they do not exist. Also checks for existence of column families within each table and + * creates them if they do not exist. + * + * @param tablesAndColumnFamilies + * A map of tables and column families. Keys are table names. Values are a list of column + * family names. + */ + private def ensureTableImpl( + projectId: String, + instanceId: String, + tableId: String, + columnFamilies: Iterable[(String, Option[GCRule])], + createDisposition: CreateDisposition + ): Unit = { + logger.info("Ensuring tables and column families exist in instance {}", instanceId) + + val client = getOrCreateClient(projectId, instanceId) + val existingTables = fetchTables(client) + val exists = existingTables.contains(tableId) + if (exists) { + logger.info("Table {} exists", tableId) + } else { + createDisposition match { + case CreateDisposition.CreateIfNeeded => + logger.info("Creating table {}", tableId) + client.createTable(CreateTableRequest.of(tableId)) + + val table = client.getTable(tableId) + val cf = table.getColumnFamilies.asScala.map(c => c.getId -> c).toMap + + val modifyRequest = columnFamilies.foldLeft(ModifyColumnFamiliesRequest.of(tableId)) { + case (mr, (id, gcrOpt)) => + val gcRule = gcrOpt.getOrElse(GCRules.GCRULES.defaultRule()) + cf.get(id) match { + case None => mr.addFamily(id, gcRule) + case Some(_) => mr.updateFamily(id, gcRule) + } + } + + logger.info("Modifying column families for table {}", tableId) + client.modifyFamilies(modifyRequest) + case CreateDisposition.Never => + throw new IllegalStateException(s"Table $tableId does not exist") + } + } + } + + /** + * Permanently deletes a row range from the specified table that match a particular prefix. + * + * @param table + * table name + * @param rowPrefix + * row key prefix + */ + def dropRowRange( + projectId: String, + instanceId: String, + tableId: String, + rowPrefix: String + ): Unit = { + val client = getOrCreateClient(projectId, instanceId) + client.dropRowRange(tableId, rowPrefix) + } + } + + object Instance { + private[bigtable] val clients: concurrent.Map[String, BigtableInstanceAdminClient] = + TrieMap.empty + private def getOrCreateClient(projectId: String): BigtableInstanceAdminClient = { + clients.getOrElseUpdate( + projectId, + BigtableInstanceAdminClient.create(projectId) + ) + } + + /** + * Updates clusters within the specified Bigtable instance to a specified number of nodes. + * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the + * end to lower costs yet still get high throughput during bulk ingests/dumps. + * + * @param numberOfNodes + * New number of nodes in the cluster + * @param sleepDuration + * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes + * before the new nodes are fully functional + */ + def updateNumberOfBigtableNodes( + projectId: String, + instanceId: String, + clusterIds: Set[String], + numberOfNodes: Int, + sleepDuration: Duration + ): Unit = { + val client = getOrCreateClient(projectId) + val ids = + if (clusterIds.isEmpty) client.listClusters(instanceId).asScala.map(_.getId) + else clusterIds + + ids.foreach { clusterId => + logger.info("Updating number of nodes to {} for cluster {}", numberOfNodes, clusterId) + client.resizeCluster(instanceId, clusterId, numberOfNodes) + } + + if (sleepDuration.getMillis > 0) { + logger.info("Sleeping for {} after update", sleepDuration.toPeriod) + Thread.sleep(sleepDuration.getMillis) + } + } + + /** + * Get size of all clusters for specified Bigtable instance. + * + * @return + * map of clusterId to its number of nodes + */ + def getClusterSizes(projectId: String, instanceId: String): Map[String, Int] = { + val client = getOrCreateClient(projectId) + client.listClusters(instanceId).asScala.map(c => c.getId -> c.getServeNodes).toMap + } + } +} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala index 1cabe73a7f..3425dd87ca 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala @@ -18,54 +18,50 @@ package com.spotify.scio.bigtable import com.google.bigtable.v2._ -import com.google.cloud.bigtable.config.BigtableOptions import com.google.protobuf.ByteString import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} -import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT, TestIO} -import com.spotify.scio.util.Functions -import com.spotify.scio.values.SCollection +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT, TestIO, WriteResultIO} +import com.spotify.scio.values.{SCollection, SideOutput, SideOutputCollections} +import org.apache.beam.sdk.io.gcp.bigtable.{BigtableWriteResult, BigtableWriteResultCoder} import org.apache.beam.sdk.io.gcp.{bigtable => beam} import org.apache.beam.sdk.io.range.ByteKeyRange -import org.apache.beam.sdk.values.KV +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} +import org.apache.beam.sdk.values.{KV, PCollectionTuple} import org.joda.time.Duration -import org.typelevel.scalaccompat.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.util.chaining._ -sealed trait BigtableIO[T] extends ScioIO[T] { +sealed abstract class BigtableIO[T](projectId: String, instanceId: String, tableId: String) + extends ScioIO[T] { final override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] + override def testId: String = s"BigtableIO($projectId:$instanceId:$tableId)" } object BigtableIO { final def apply[T](projectId: String, instanceId: String, tableId: String): BigtableIO[T] = - new BigtableIO[T] with TestIO[T] { - override def testId: String = - s"BigtableIO($projectId\t$instanceId\t$tableId)" - } + new BigtableIO[T](projectId, instanceId, tableId) with TestIO[T] } -final case class BigtableRead(bigtableOptions: BigtableOptions, tableId: String) - extends BigtableIO[Row] { +final case class BigtableRead(projectId: String, instanceId: String, tableId: String) + extends BigtableIO[Row](projectId, instanceId, tableId) { override type ReadP = BigtableRead.ReadParam override type WriteP = Nothing - override def testId: String = - s"BigtableIO(${bigtableOptions.getProjectId}\t${bigtableOptions.getInstanceId}\t$tableId)" - override protected def read(sc: ScioContext, params: ReadP): SCollection[Row] = { val coder = CoderMaterializer.beam(sc, Coder.protoMessageCoder[Row]) - val opts = bigtableOptions // defeat closure val read = beam.BigtableIO .read() - .withProjectId(bigtableOptions.getProjectId) - .withInstanceId(bigtableOptions.getInstanceId) + .withProjectId(projectId) + .withInstanceId(instanceId) .withTableId(tableId) - .withBigtableOptionsConfigurator(Functions.serializableFn(_ => opts.toBuilder)) - .withMaxBufferElementCount(params.maxBufferElementCount.map(Int.box).orNull) - .pipe(r => if (params.keyRanges.isEmpty) r else r.withKeyRanges(params.keyRanges.asJava)) - .pipe(r => Option(params.rowFilter).fold(r)(r.withRowFilter)): @nowarn("cat=deprecation") + .withKeyRanges(params.keyRanges.asJava) + .pipe(r => Option(params.rowFilter).fold(r)(r.withRowFilter)) + .pipe(r => params.maxBufferElementCount.fold(r)(r.withMaxBufferElementCount(_))) + .pipe(r => Option(params.appProfileId).fold(r)(r.withAppProfileId)) + .pipe(r => Option(params.attemptTimeout).fold(r)(r.withAttemptTimeout)) + .pipe(r => Option(params.operationTimeout).fold(r)(r.withOperationTimeout)) sc.applyTransform(read).setCoder(coder) } @@ -81,40 +77,33 @@ final case class BigtableRead(bigtableOptions: BigtableOptions, tableId: String) object BigtableRead { object ReadParam { - val DefaultKeyRanges: Seq[ByteKeyRange] = Seq.empty[ByteKeyRange] + val DefaultKeyRanges: Seq[ByteKeyRange] = Seq(ByteKeyRange.ALL_KEYS) val DefaultRowFilter: RowFilter = null val DefaultMaxBufferElementCount: Option[Int] = None - - def apply(keyRange: ByteKeyRange) = new ReadParam(Seq(keyRange)) - - def apply(keyRange: ByteKeyRange, rowFilter: RowFilter): ReadParam = - new ReadParam(Seq(keyRange), rowFilter) + val DefaultAppProfileId: String = null + val DefaultAttemptTimeout: Duration = null + val DefaultOperationTimeout: Duration = null } final case class ReadParam private ( keyRanges: Seq[ByteKeyRange] = ReadParam.DefaultKeyRanges, rowFilter: RowFilter = ReadParam.DefaultRowFilter, - maxBufferElementCount: Option[Int] = ReadParam.DefaultMaxBufferElementCount + maxBufferElementCount: Option[Int] = ReadParam.DefaultMaxBufferElementCount, + appProfileId: String = ReadParam.DefaultAppProfileId, + attemptTimeout: Duration = ReadParam.DefaultAttemptTimeout, + operationTimeout: Duration = ReadParam.DefaultOperationTimeout ) - - final def apply(projectId: String, instanceId: String, tableId: String): BigtableRead = { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - BigtableRead(bigtableOptions, tableId) - } } -final case class BigtableWrite[T <: Mutation](bigtableOptions: BigtableOptions, tableId: String) - extends BigtableIO[(ByteString, Iterable[T])] { - override type ReadP = Nothing +final case class BigtableWrite[T <: Mutation]( + projectId: String, + instanceId: String, + tableId: String +) extends BigtableIO[(ByteString, Iterable[T])](projectId, instanceId, tableId) + with WriteResultIO[(ByteString, Iterable[T])] { + override type ReadP = Unit override type WriteP = BigtableWrite.WriteParam - override def testId: String = - s"BigtableIO(${bigtableOptions.getProjectId}\t${bigtableOptions.getInstanceId}\t$tableId)" - override protected def read( sc: ScioContext, params: ReadP @@ -123,33 +112,33 @@ final case class BigtableWrite[T <: Mutation](bigtableOptions: BigtableOptions, "BigtableWrite is write-only, use Row to read from Bigtable" ) - override protected def write( + override protected def writeWithResult( data: SCollection[(ByteString, Iterable[T])], params: WriteP - ): Tap[Nothing] = { - val sink = - params match { - case BigtableWrite.Default => - val opts = bigtableOptions // defeat closure - beam.BigtableIO - .write() - .withProjectId(bigtableOptions.getProjectId) - .withInstanceId(bigtableOptions.getInstanceId) - .withTableId(tableId) - .withBigtableOptionsConfigurator( - Functions.serializableFn(_ => opts.toBuilder) - ): @nowarn("cat=deprecation") - case BigtableWrite.Bulk(numOfShards, flushInterval) => - new BigtableBulkWriter(tableId, bigtableOptions, numOfShards, flushInterval) - } - data.transform_("Bigtable write") { coll => + ): (Tap[Nothing], SideOutputCollections) = { + val t = beam.BigtableIO + .write() + .withProjectId(projectId) + .withInstanceId(instanceId) + .withTableId(tableId) + .withFlowControl(params.flowControl) + .pipe(w => Option(params.errorHandler).fold(w)(w.withErrorHandler)) + .pipe(w => Option(params.appProfileId).fold(w)(w.withAppProfileId)) + .pipe(w => Option(params.attemptTimeout).fold(w)(w.withAttemptTimeout)) + .pipe(w => Option(params.operationTimeout).fold(w)(w.withOperationTimeout)) + .pipe(w => params.maxBytesPerBatch.fold(w)(w.withMaxBytesPerBatch)) + .pipe(w => params.maxElementsPerBatch.fold(w)(w.withMaxElementsPerBatch)) + .pipe(w => params.maxOutstandingBytes.fold(w)(w.withMaxOutstandingBytes)) + .pipe(w => params.maxOutstandingElements.fold(w)(w.withMaxOutstandingElements)) + .withWriteResults() + + val result = data.transform_("Bigtable write") { coll => coll - .map { case (key, value) => - KV.of(key, value.asJava.asInstanceOf[java.lang.Iterable[Mutation]]) - } - .applyInternal(sink) + .map { case (key, mutations) => KV.of(key, (mutations: Iterable[Mutation]).asJava) } + .applyInternal(t) } - EmptyTap + val sideOutput = PCollectionTuple.of(BigtableWrite.BigtableWriteResult.tupleTag, result) + (tap(()), SideOutputCollections(sideOutput, data.context)) } override def tap(params: ReadP): Tap[Nothing] = @@ -157,28 +146,34 @@ final case class BigtableWrite[T <: Mutation](bigtableOptions: BigtableOptions, } object BigtableWrite { - sealed trait WriteParam - object Default extends WriteParam - object Bulk { - private[bigtable] val DefaultFlushInterval = Duration.standardSeconds(1) + // TODO should this be here ? + implicit val bigtableWriteResultCoder: Coder[BigtableWriteResult] = + Coder.beam(new BigtableWriteResultCoder) + + lazy val BigtableWriteResult: SideOutput[BigtableWriteResult] = SideOutput() + + object WriteParam { + val DefaultFlowControl: Boolean = false + val DefaultErrorHandler: ErrorHandler[BadRecord, _] = null + val DefaultAppProfileId: String = null + val DefaultAttemptTimeout: Duration = null + val DefaultOperationTimeout: Duration = null + val DefaultMaxBytesPerBatch: Option[Long] = None + val DefaultMaxElementsPerBatch: Option[Long] = None + val DefaultMaxOutstandingBytes: Option[Long] = None + val DefaultMaxOutstandingElements: Option[Long] = None } - final case class Bulk private ( - numOfShards: Int, - flushInterval: Duration = Bulk.DefaultFlushInterval - ) extends WriteParam - - final def apply[T <: Mutation]( - projectId: String, - instanceId: String, - tableId: String - ): BigtableWrite[T] = { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - BigtableWrite[T](bigtableOptions, tableId) - } + final case class WriteParam private ( + flowControl: Boolean = WriteParam.DefaultFlowControl, + errorHandler: ErrorHandler[BadRecord, _] = WriteParam.DefaultErrorHandler, + appProfileId: String = WriteParam.DefaultAppProfileId, + attemptTimeout: Duration = WriteParam.DefaultAttemptTimeout, + operationTimeout: Duration = WriteParam.DefaultOperationTimeout, + maxBytesPerBatch: Option[Long] = WriteParam.DefaultMaxBytesPerBatch, + maxElementsPerBatch: Option[Long] = WriteParam.DefaultMaxElementsPerBatch, + maxOutstandingBytes: Option[Long] = WriteParam.DefaultMaxOutstandingBytes, + maxOutstandingElements: Option[Long] = WriteParam.DefaultMaxOutstandingElements + ) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/TableAdmin.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/TableAdmin.scala deleted file mode 100644 index 1a316382e2..0000000000 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/TableAdmin.scala +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Copyright 2019 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable - -import java.nio.charset.Charset - -import com.google.bigtable.admin.v2._ -import com.google.bigtable.admin.v2.ModifyColumnFamiliesRequest.Modification -import com.google.cloud.bigtable.config.BigtableOptions -import com.google.cloud.bigtable.grpc._ -import com.google.protobuf.{ByteString, Duration => ProtoDuration} -import org.joda.time.Duration -import org.slf4j.{Logger, LoggerFactory} - -import scala.jdk.CollectionConverters._ -import scala.util.Try - -/** Bigtable Table Admin API helper commands. */ -object TableAdmin { - - sealed trait CreateDisposition - object CreateDisposition { - case object Never extends CreateDisposition - case object CreateIfNeeded extends CreateDisposition - val default = CreateIfNeeded - } - - private val log: Logger = LoggerFactory.getLogger(TableAdmin.getClass) - - private def adminClient[A]( - bigtableOptions: BigtableOptions - )(f: BigtableTableAdminClient => A): Try[A] = { - val channel = - ChannelPoolCreator.createPool(bigtableOptions) - val executorService = - BigtableSessionSharedThreadPools.getInstance().getRetryExecutor - val client = new BigtableTableAdminGrpcClient(channel, executorService, bigtableOptions) - - val result = Try(f(client)) - channel.shutdownNow() - result - } - - /** - * Retrieves a set of tables from the given instancePath. - * - * @param client - * Client for calling Bigtable. - * @param instancePath - * String of the form "projects/$project/instances/$instance". - * @return - */ - private def fetchTables(client: BigtableTableAdminClient, instancePath: String): Set[String] = - client - .listTables( - ListTablesRequest - .newBuilder() - .setParent(instancePath) - .build() - ) - .getTablesList - .asScala - .map(_.getName) - .toSet - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names. - */ - def ensureTables( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[String]], - createDisposition: CreateDisposition = CreateDisposition.default - ): Unit = { - val tcf = tablesAndColumnFamilies.iterator.map { case (k, l) => - k -> l.map(_ -> None) - }.toMap - ensureTablesImpl(bigtableOptions, tcf, createDisposition).get - } - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names along with the desired cell expiration. Cell expiration is the duration before which - * garbage collection of a cell may occur. Note: minimum granularity is one second. - */ - def ensureTablesWithExpiration( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[(String, Option[Duration])]], - createDisposition: CreateDisposition = CreateDisposition.default - ): Unit = { - // Convert Duration to GcRule - val x = tablesAndColumnFamilies.iterator.map { case (k, v) => - k -> v.map { case (columnFamily, duration) => - (columnFamily, duration.map(gcRuleFromDuration)) - } - }.toMap - - ensureTablesImpl(bigtableOptions, x, createDisposition).get - } - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names along with the desired GcRule. - */ - def ensureTablesWithGcRules( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[(String, Option[GcRule])]], - createDisposition: CreateDisposition = CreateDisposition.default - ): Unit = - ensureTablesImpl(bigtableOptions, tablesAndColumnFamilies, createDisposition).get - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names. - */ - private def ensureTablesImpl( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[(String, Option[GcRule])]], - createDisposition: CreateDisposition - ): Try[Unit] = { - val project = bigtableOptions.getProjectId - val instance = bigtableOptions.getInstanceId - val instancePath = s"projects/$project/instances/$instance" - - log.info("Ensuring tables and column families exist in instance {}", instance) - - adminClient(bigtableOptions) { client => - val existingTables = fetchTables(client, instancePath) - - tablesAndColumnFamilies.foreach { case (table, columnFamilies) => - val tablePath = s"$instancePath/tables/$table" - - val exists = existingTables.contains(tablePath) - createDisposition match { - case _ if exists => - log.info("Table {} exists", table) - case CreateDisposition.CreateIfNeeded => - log.info("Creating table {}", table) - client.createTable( - CreateTableRequest - .newBuilder() - .setParent(instancePath) - .setTableId(table) - .build() - ) - case CreateDisposition.Never => - throw new IllegalStateException(s"Table $table does not exist") - } - - ensureColumnFamilies(client, tablePath, columnFamilies, createDisposition) - } - } - } - - /** - * Ensure that column families exist. Checks for existence of column families and creates them if - * they don't exist. - * - * @param tablePath - * A full table path that the bigtable API expects, in the form of - * `projects/projectId/instances/instanceId/tables/tableId` - * @param columnFamilies - * A list of column family names. - */ - private def ensureColumnFamilies( - client: BigtableTableAdminClient, - tablePath: String, - columnFamilies: Iterable[(String, Option[GcRule])], - createDisposition: CreateDisposition - ): Unit = - createDisposition match { - case CreateDisposition.CreateIfNeeded => - val tableInfo = - client.getTable(GetTableRequest.newBuilder().setName(tablePath).build) - - val cfList = columnFamilies - .map { case (n, gcRule) => - val cf = tableInfo - .getColumnFamiliesOrDefault(n, ColumnFamily.newBuilder().build()) - .toBuilder - .setGcRule(gcRule.getOrElse(GcRule.getDefaultInstance)) - .build() - - (n, cf) - } - val modifications = - cfList.map { case (n, cf) => - val mod = Modification.newBuilder().setId(n) - if (tableInfo.containsColumnFamilies(n)) { - mod.setUpdate(cf) - } else { - mod.setCreate(cf) - } - mod.build() - } - - log.info( - "Modifying or updating {} column families for table {}", - modifications.size, - tablePath - ) - - if (modifications.nonEmpty) { - client.modifyColumnFamily( - ModifyColumnFamiliesRequest - .newBuilder() - .setName(tablePath) - .addAllModifications(modifications.asJava) - .build - ) - } - () - case CreateDisposition.Never => - () - } - - private def gcRuleFromDuration(duration: Duration): GcRule = { - val protoDuration = ProtoDuration.newBuilder.setSeconds(duration.getStandardSeconds) - GcRule.newBuilder.setMaxAge(protoDuration).build - } - - /** - * Permanently deletes a row range from the specified table that match a particular prefix. - * - * @param table - * table name - * @param rowPrefix - * row key prefix - */ - def dropRowRange(bigtableOptions: BigtableOptions, table: String, rowPrefix: String): Try[Unit] = - adminClient(bigtableOptions) { client => - val project = bigtableOptions.getProjectId - val instance = bigtableOptions.getInstanceId - val instancePath = s"projects/$project/instances/$instance" - val tablePath = s"$instancePath/tables/$table" - - dropRowRange(tablePath, rowPrefix, client) - } - - /** - * Permanently deletes a row range from the specified table that match a particular prefix. - * - * @param tablePath - * A full table path that the bigtable API expects, in the form of - * `projects/projectId/instances/instanceId/tables/tableId` - * @param rowPrefix - * row key prefix - */ - private def dropRowRange( - tablePath: String, - rowPrefix: String, - client: BigtableTableAdminClient - ): Unit = { - val request = DropRowRangeRequest - .newBuilder() - .setName(tablePath) - .setRowKeyPrefix(ByteString.copyFrom(rowPrefix, Charset.forName("UTF-8"))) - .build() - - client.dropRowRange(request) - } -} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala index f918df27bb..37f9a9b385 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala @@ -18,13 +18,12 @@ package com.spotify.scio.bigtable.syntax import com.google.bigtable.v2._ -import com.google.cloud.bigtable.config.BigtableOptions import com.google.protobuf.ByteString import com.spotify.scio.io.ClosedTap import com.spotify.scio.values.SCollection import org.joda.time.Duration - import com.spotify.scio.bigtable.BigtableWrite +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} /** * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Bigtable methods. @@ -34,26 +33,34 @@ final class SCollectionMutationOps[T <: Mutation]( ) { /** Save this SCollection as a Bigtable table. Note that elements must be of type `Mutation`. */ - def saveAsBigtable(projectId: String, instanceId: String, tableId: String): ClosedTap[Nothing] = - self.write(BigtableWrite[T](projectId, instanceId, tableId))(BigtableWrite.Default) - - /** Save this SCollection as a Bigtable table. Note that elements must be of type `Mutation`. */ - def saveAsBigtable(bigtableOptions: BigtableOptions, tableId: String): ClosedTap[Nothing] = - self.write(BigtableWrite[T](bigtableOptions, tableId))(BigtableWrite.Default) - - /** - * Save this SCollection as a Bigtable table. This version supports batching. Note that elements - * must be of type `Mutation`. - */ def saveAsBigtable( - bigtableOptions: BigtableOptions, + projectId: String, + instanceId: String, tableId: String, - numOfShards: Int, - flushInterval: Duration = BigtableWrite.Bulk.DefaultFlushInterval - ): ClosedTap[Nothing] = - self.write(BigtableWrite[T](bigtableOptions, tableId))( - BigtableWrite.Bulk(numOfShards, flushInterval) + flowControl: Boolean = BigtableWrite.WriteParam.DefaultFlowControl, + errorHandler: ErrorHandler[BadRecord, _] = BigtableWrite.WriteParam.DefaultErrorHandler, + appProfileId: String = BigtableWrite.WriteParam.DefaultAppProfileId, + attemptTimeout: Duration = BigtableWrite.WriteParam.DefaultAttemptTimeout, + operationTimeout: Duration = BigtableWrite.WriteParam.DefaultOperationTimeout, + maxBytesPerBatch: Option[Long] = BigtableWrite.WriteParam.DefaultMaxBytesPerBatch, + maxElementsPerBatch: Option[Long] = BigtableWrite.WriteParam.DefaultMaxElementsPerBatch, + maxOutstandingBytes: Option[Long] = BigtableWrite.WriteParam.DefaultMaxOutstandingBytes, + maxOutstandingElements: Option[Long] = BigtableWrite.WriteParam.DefaultMaxOutstandingElements + ): ClosedTap[Nothing] = { + val param = BigtableWrite.WriteParam( + flowControl, + errorHandler, + appProfileId, + attemptTimeout, + operationTimeout, + maxBytesPerBatch, + maxElementsPerBatch, + maxOutstandingBytes, + maxOutstandingElements ) + + self.write(BigtableWrite[T](projectId, instanceId, tableId))(param) + } } trait SCollectionSyntax { diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala index cd21ee3f90..d473ce7338 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala @@ -17,49 +17,23 @@ package com.spotify.scio.bigtable.syntax -import com.google.bigtable.admin.v2.GcRule -import com.google.bigtable.v2._ -import com.google.cloud.bigtable.config.BigtableOptions +import com.google.bigtable.v2.{Row, RowFilter} +import com.google.cloud.bigtable.admin.v2.models.GCRules.GCRule import com.spotify.scio.ScioContext -import com.spotify.scio.bigtable.BigtableRead -import com.spotify.scio.bigtable.BigtableUtil -import com.spotify.scio.bigtable.TableAdmin +import com.spotify.scio.bigtable.{Admin, BigtableRead} import com.spotify.scio.values.SCollection import org.apache.beam.sdk.io.range.ByteKeyRange import org.joda.time.Duration -import scala.jdk.CollectionConverters._ - object ScioContextOps { - private val DefaultSleepDuration = Duration.standardMinutes(20) - private val DefaultClusterNames: Set[String] = Set.empty + private val DefaultSleepDuration: Duration = Duration.standardMinutes(20) + private val DefaultClusterIds: Set[String] = Set.empty } /** Enhanced version of [[ScioContext]] with Bigtable methods. */ final class ScioContextOps(private val self: ScioContext) extends AnyVal { import ScioContextOps._ - /** Get an SCollection for a Bigtable table. */ - def bigtable( - projectId: String, - instanceId: String, - tableId: String, - keyRange: ByteKeyRange, - rowFilter: RowFilter - ): SCollection[Row] = - bigtable(projectId, instanceId, tableId, Seq(keyRange), rowFilter) - - /** Get an SCollection for a Bigtable table. */ - def bigtable( - projectId: String, - instanceId: String, - tableId: String, - keyRange: ByteKeyRange, - rowFilter: RowFilter, - maxBufferElementCount: Option[Int] - ): SCollection[Row] = - bigtable(projectId, instanceId, tableId, Seq(keyRange), rowFilter, maxBufferElementCount) - /** Get an SCollection for a Bigtable table. */ def bigtable( projectId: String, @@ -73,69 +47,25 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { self.read(BigtableRead(projectId, instanceId, tableId))(parameters) } - /** Get an SCollection for a Bigtable table. */ - def bigtable( - bigtableOptions: BigtableOptions, - tableId: String, - keyRange: ByteKeyRange, - rowFilter: RowFilter - ): SCollection[Row] = - bigtable(bigtableOptions, tableId, Seq(keyRange), rowFilter) - - /** Get an SCollection for a Bigtable table. */ - def bigtable( - bigtableOptions: BigtableOptions, - tableId: String, - keyRange: ByteKeyRange, - rowFilter: RowFilter, - maxBufferElementCount: Option[Int] - ): SCollection[Row] = - bigtable(bigtableOptions, tableId, Seq(keyRange), rowFilter, maxBufferElementCount) - - /** Get an SCollection for a Bigtable table. */ - def bigtable( - bigtableOptions: BigtableOptions, - tableId: String, - keyRanges: Seq[ByteKeyRange], - rowFilter: RowFilter - ): SCollection[Row] = { - val parameters = BigtableRead.ReadParam(keyRanges, rowFilter) - self.read(BigtableRead(bigtableOptions, tableId))(parameters) - } - - /** Get an SCollection for a Bigtable table. */ - def bigtable( - bigtableOptions: BigtableOptions, - tableId: String, - keyRanges: Seq[ByteKeyRange], - rowFilter: RowFilter, - maxBufferElementCount: Option[Int] - ): SCollection[Row] = { - val parameters = BigtableRead.ReadParam(keyRanges, rowFilter, maxBufferElementCount) - self.read(BigtableRead(bigtableOptions, tableId))(parameters) - } - /** - * Updates all clusters within the specified Bigtable instance to a specified number of nodes. + * Updates given clusters within the specified Bigtable instance to a specified number of nodes. * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the * end to lower costs yet still get high throughput during bulk ingests/dumps. * - * @param sleepDuration - * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes - * before the new nodes are fully functional + * @param numberOfNodes + * desired number of nodes for the clusters */ def updateNumberOfBigtableNodes( projectId: String, instanceId: String, - numberOfNodes: Int, - sleepDuration: Duration = DefaultSleepDuration + numberOfNodes: Int ): Unit = updateNumberOfBigtableNodes( projectId, instanceId, + DefaultClusterIds, numberOfNodes, - DefaultClusterNames, - sleepDuration + DefaultSleepDuration ) /** @@ -143,45 +73,23 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the * end to lower costs yet still get high throughput during bulk ingests/dumps. * + * @param numberOfNodes + * desired number of nodes for the clusters * @param sleepDuration * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes * before the new nodes are fully functional - * @param clusterNames - * Names of clusters to be updated, all if empty */ def updateNumberOfBigtableNodes( projectId: String, instanceId: String, numberOfNodes: Int, - clusterNames: Set[String], - sleepDuration: Duration - ): Unit = { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - updateNumberOfBigtableNodes(bigtableOptions, numberOfNodes, clusterNames, sleepDuration) - } - - /** - * Updates all clusters within the specified Bigtable instance to a specified number of nodes. - * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the - * end to lower costs yet still get high throughput during bulk ingests/dumps. - * - * @param sleepDuration - * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes - * before the new nodes are fully functional - */ - def updateNumberOfBigtableNodes( - bigtableOptions: BigtableOptions, - numberOfNodes: Int, sleepDuration: Duration ): Unit = updateNumberOfBigtableNodes( - bigtableOptions, + projectId, + instanceId, + DefaultClusterIds, numberOfNodes, - DefaultClusterNames, sleepDuration ) @@ -190,25 +98,29 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the * end to lower costs yet still get high throughput during bulk ingests/dumps. * - * @param clusterNames - * Names of clusters to be updated, all if empty + * @param numberOfNodes + * desired number of nodes for the clusters + * @param clusterIds + * clusters ids to be updated, all if empty * @param sleepDuration * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes * before the new nodes are fully functional */ def updateNumberOfBigtableNodes( - bigtableOptions: BigtableOptions, + projectId: String, + instanceId: String, + clusterIds: Set[String], numberOfNodes: Int, - clusterNames: Set[String], - sleepDuration: Duration + sleepDuration: Duration = DefaultSleepDuration ): Unit = if (!self.isTest) { // No need to update the number of nodes in a test - BigtableUtil.updateNumberOfBigtableNodes( - bigtableOptions, + Admin.Instance.updateNumberOfBigtableNodes( + projectId, + instanceId, + clusterIds, numberOfNodes, - sleepDuration, - clusterNames.asJava + sleepDuration ) } @@ -220,12 +132,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { */ def getBigtableClusterSizes(projectId: String, instanceId: String): Map[String, Int] = if (!self.isTest) { - BigtableUtil - .getClusterSizes(projectId, instanceId) - .asScala - .iterator - .map { case (k, v) => k -> v.toInt } - .toMap + Admin.Instance.getClusterSizes(projectId, instanceId) } else { Map.empty } @@ -239,62 +146,23 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * A map of tables and column families. Keys are table names. Values are a list of column family * names. */ - def ensureTables( - projectId: String, - instanceId: String, - tablesAndColumnFamilies: Map[String, Iterable[String]], - createDisposition: TableAdmin.CreateDisposition - ): Unit = - if (!self.isTest) { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - TableAdmin.ensureTables(bigtableOptions, tablesAndColumnFamilies, createDisposition) - } - - def ensureTables( + def ensureTable( projectId: String, instanceId: String, - tablesAndColumnFamilies: Map[String, Iterable[String]] - ): Unit = ensureTables( - projectId, - instanceId, - tablesAndColumnFamilies, - TableAdmin.CreateDisposition.default - ) - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names. - */ - def ensureTables( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[String]], - createDisposition: TableAdmin.CreateDisposition + tableId: String, + columnFamilies: Iterable[String], + createDisposition: Admin.Table.CreateDisposition = Admin.Table.CreateDisposition.default ): Unit = if (!self.isTest) { - TableAdmin.ensureTables(bigtableOptions, tablesAndColumnFamilies, createDisposition) + Admin.Table.ensureTable(projectId, instanceId, tableId, columnFamilies, createDisposition) } - def ensureTables( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[String]] - ): Unit = - ensureTables(bigtableOptions, tablesAndColumnFamilies, TableAdmin.CreateDisposition.default) - /** * Ensure that tables and column families exist. Checks for existence of tables or creates them if * they do not exist. Also checks for existence of column families within each table and creates * them if they do not exist. * - * @param tablesAndColumnFamiliesWithExpiration + * @param columnFamiliesWithExpiration * A map of tables and column families. Keys are table names. Values are a list of column family * names along with the desired cell expiration. Cell expiration is the duration before which * garbage collection of a cell may occur. Note: minimum granularity is second. @@ -302,65 +170,20 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { def ensureTablesWithExpiration( projectId: String, instanceId: String, - tablesAndColumnFamiliesWithExpiration: Map[String, Iterable[(String, Option[Duration])]], - createDisposition: TableAdmin.CreateDisposition - ): Unit = - if (!self.isTest) { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - TableAdmin.ensureTablesWithExpiration( - bigtableOptions, - tablesAndColumnFamiliesWithExpiration, - createDisposition - ) - } - - def ensureTablesWithExpiration( - projectId: String, - instanceId: String, - tablesAndColumnFamiliesWithExpiration: Map[String, Iterable[(String, Option[Duration])]] - ): Unit = ensureTablesWithExpiration( - projectId, - instanceId, - tablesAndColumnFamiliesWithExpiration, - TableAdmin.CreateDisposition.default - ) - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamiliesWithExpiration - * A map of tables and column families. Keys are table names. Values are a list of column family - * names along with the desired cell expiration. Cell expiration is the duration before which - * garbage collection of a cell may occur. Note: minimum granularity is second. - */ - def ensureTablesWithExpiration( - bigtableOptions: BigtableOptions, - tablesAndColumnFamiliesWithExpiration: Map[String, Iterable[(String, Option[Duration])]], - createDisposition: TableAdmin.CreateDisposition + tableId: String, + columnFamiliesWithExpiration: Iterable[(String, Option[Duration])], + createDisposition: Admin.Table.CreateDisposition = Admin.Table.CreateDisposition.default ): Unit = if (!self.isTest) { - TableAdmin.ensureTablesWithExpiration( - bigtableOptions, - tablesAndColumnFamiliesWithExpiration, + Admin.Table.ensureTablesWithExpiration( + projectId, + instanceId, + tableId, + columnFamiliesWithExpiration, createDisposition ) } - def ensureTablesWithExpiration( - bigtableOptions: BigtableOptions, - tablesAndColumnFamiliesWithExpiration: Map[String, Iterable[(String, Option[Duration])]] - ): Unit = ensureTablesWithExpiration( - bigtableOptions, - tablesAndColumnFamiliesWithExpiration, - TableAdmin.CreateDisposition.default - ) - /** * Ensure that tables and column families exist. Checks for existence of tables or creates them if * they do not exist. Also checks for existence of column families within each table and creates @@ -373,66 +196,19 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { def ensureTablesWithGcRules( projectId: String, instanceId: String, - tablesAndColumnFamiliesWithGcRules: Map[String, Iterable[(String, Option[GcRule])]], - createDisposition: TableAdmin.CreateDisposition - ): Unit = - if (!self.isTest) { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - TableAdmin.ensureTablesWithGcRules( - bigtableOptions, - tablesAndColumnFamiliesWithGcRules, - createDisposition - ) - } - - def ensureTablesWithGcRules( - projectId: String, - instanceId: String, - tablesAndColumnFamiliesWithGcRules: Map[String, Iterable[(String, Option[GcRule])]] - ): Unit = ensureTablesWithGcRules( - projectId, - instanceId, - tablesAndColumnFamiliesWithGcRules, - TableAdmin.CreateDisposition.default - ) - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamiliesWithGcRule - * A map of tables and column families. Keys are table names. Values are a list of column family - * names along with the desired cell expiration. Cell expiration is the duration before which - * garbage collection of a cell may occur. Note: minimum granularity is second. - */ - def ensureTablesWithGcRules( - bigtableOptions: BigtableOptions, - tablesAndColumnFamiliesWithGcRule: Map[String, Iterable[(String, Option[GcRule])]], - createDisposition: TableAdmin.CreateDisposition + tableId: String, + columnFamiliesWithGcRules: Iterable[(String, Option[GCRule])], + createDisposition: Admin.Table.CreateDisposition ): Unit = if (!self.isTest) { - TableAdmin.ensureTablesWithGcRules( - bigtableOptions, - tablesAndColumnFamiliesWithGcRule, + Admin.Table.ensureTableWithGcRules( + projectId, + instanceId, + tableId, + columnFamiliesWithGcRules, createDisposition ) } - - def ensureTablesWithGcRules( - bigtableOptions: BigtableOptions, - tablesAndColumnFamiliesWithGcRule: Map[String, Iterable[(String, Option[GcRule])]] - ): Unit = - ensureTablesWithGcRules( - bigtableOptions, - tablesAndColumnFamiliesWithGcRule, - TableAdmin.CreateDisposition.default - ) - } trait ScioContextSyntax { diff --git a/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableBulkWriterTest.java b/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableBulkWriterTest.java deleted file mode 100644 index 8bad910623..0000000000 --- a/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableBulkWriterTest.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright 2018 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; - -import com.google.bigtable.v2.Mutation; -import com.google.protobuf.ByteString; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class BigtableBulkWriterTest { - - @Rule public final transient TestPipeline p = TestPipeline.create(); - - private static final Instant baseTime = new Instant(0); - - private static final TypeDescriptor>> BIGTABLE_WRITE_TYPE = - new TypeDescriptor>>() {}; - - @Test - public void testCreateBulkShards() throws Exception { - final List>> expected = new ArrayList<>(); - - final String key1 = "key1"; - final String value1 = "value1"; - final String key2 = "key2"; - final String value2 = "value2"; - final String key3 = "key3"; - final String value3 = "value3"; - final String key4 = "key4"; - final String value4 = "value4"; - - final TimestampedValue>> firstMutation = - makeMutation(key1, value1, Duration.standardMinutes(1)); - - expected.add(firstMutation.getValue()); - - final TimestampedValue>> secondMutation = - makeMutation(key2, value2, Duration.standardMinutes(5)); - expected.add(secondMutation.getValue()); - - final TimestampedValue>> thirdMutation = - makeMutation(key3, value3, Duration.standardMinutes(1)); - expected.add(thirdMutation.getValue()); - - final TimestampedValue>> fourthMutation = - makeMutation(key4, value4, Duration.standardMinutes(4)); - expected.add(fourthMutation.getValue()); - - final Coder>> bigtableCoder = - p.getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE); - - final TestStream>> kvTestStream = - TestStream.create(bigtableCoder) - .addElements(firstMutation) - .advanceProcessingTime(Duration.standardMinutes(2)) - .addElements(secondMutation) - .advanceProcessingTime(Duration.standardMinutes(11)) - .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(15))) - // Late elements are always observable within the global window - they arrive before - // the window closes, so they will appear in a pane, even if they arrive after the - // allowed lateness, and are taken into account alongside on-time elements - .addElements(thirdMutation) - .addElements(fourthMutation) - .advanceProcessingTime(Duration.standardMinutes(20)) - .advanceWatermarkToInfinity(); - - final Duration flushInterval = Duration.standardSeconds(1); - final int numOfShard = 1; - final PCollection>>> actual = - p.apply(kvTestStream).apply(new TestPTransform(numOfShard, flushInterval)); - - PAssert.that(actual).inEarlyGlobalWindowPanes().satisfies(new VerifyKVStreamFn(expected)); - - p.run(); - } - - private TimestampedValue>> makeMutation( - String key, String value, Duration baseTimeOffset) { - Instant timestamp = baseTime.plus(baseTimeOffset); - ByteString rowKey = ByteString.copyFromUtf8(key); - Iterable mutations = - Collections.singletonList( - Mutation.newBuilder() - .setSetCell(Mutation.SetCell.newBuilder().setValue(ByteString.copyFromUtf8(value))) - .build()); - return TimestampedValue.of(KV.of(rowKey, mutations), timestamp); - } - - /** - * Hepler class to verify output of {@link PCollection} by converting {@link ByteString} to {@link - * String} to able to verify values. - */ - private static class VerifyKVStreamFn - implements SerializableFunction< - Iterable>>>, Void> { - - private final Iterable>> expected; - - private VerifyKVStreamFn(Iterable>> expected) { - this.expected = expected; - } - - @Override - public Void apply(Iterable>>> input) { - verify(input, expected); - return null; - } - - private List>> convertExpected( - final Iterable>> input) { - List>> mutations = new ArrayList<>(); - for (KV> kv : input) { - final String key = kv.getKey().toString(StandardCharsets.UTF_8); - mutations.add(KV.of(key, kv.getValue())); - } - return mutations; - } - - private List>> convertActual( - final Iterable>>> input) { - List>> mutations = new ArrayList<>(); - for (Iterable>> kv : input) { - for (KV> value : kv) { - final String key = value.getKey().toString(StandardCharsets.UTF_8); - mutations.add(KV.of(key, value.getValue())); - } - } - return mutations; - } - - private void verify( - final Iterable>>> input, - final Iterable>> expected) { - final List>> actual = convertActual(input); - final List>> expectedValues = convertExpected(expected); - - final KV[] kvs = expectedValues.toArray(new KV[0]); - - assertThat(actual, containsInAnyOrder(kvs)); - } - } - - /** Hepler to test createBulkShards. */ - private static class TestPTransform - extends PTransform< - PCollection>>, - PCollection>>>> { - - private final int numOfShards; - private final Duration flushInterval; - - private TestPTransform(int numOfShards, Duration flushInterval) { - this.numOfShards = numOfShards; - this.flushInterval = flushInterval; - } - - @Override - public PCollection>>> expand( - PCollection>> input) { - return BigtableBulkWriter.createBulkShards(input, numOfShards, flushInterval); - } - } -} diff --git a/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableUtilTest.java b/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableUtilTest.java deleted file mode 100644 index f47ece0330..0000000000 --- a/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableUtilTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2022 Spotify AB - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.spotify.scio.bigtable; - -import org.junit.Assert; -import org.junit.Test; - -public class BigtableUtilTest { - - @Test - public void shorterNameTest() { - Assert.assertEquals( - BigtableUtil.shorterName( - "/projects/scio-test/instances/test-instance/clusters/sample-cluster"), - "sample-cluster"); - - Assert.assertEquals(BigtableUtil.shorterName("simple-name-cluster"), "simple-name-cluster"); - } -} diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala index b785d6fd6c..e598b881b6 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala @@ -17,8 +17,7 @@ package com.spotify.scio.bigtable -import java.util.concurrent.ConcurrentLinkedQueue -import com.google.cloud.bigtable.grpc.BigtableSession +import com.google.cloud.bigtable.data.v2.BigtableDataClient import com.google.common.cache.{Cache, CacheBuilder} import com.google.common.util.concurrent.{Futures, ListenableFuture} import com.spotify.scio.testing._ @@ -26,6 +25,7 @@ import com.spotify.scio.transforms.BaseAsyncLookupDoFn.CacheSupplier import com.spotify.scio.transforms.JavaAsyncConverters._ import com.spotify.scio.util.TransformingCache.SimpleTransformingCache +import java.util.concurrent.ConcurrentLinkedQueue import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success} @@ -38,6 +38,7 @@ class BigtableDoFnTest extends PipelineSpec { } it should "work with cache" in { + BigtableDoFnTest.queue.clear() val fn = new TestCachingBigtableDoFn val output = runWithData((1 to 10) ++ (6 to 15))(_.parDo(fn)) .map(kv => (kv.getKey, kv.getValue.get())) @@ -67,22 +68,22 @@ object BigtableDoFnTest { } class TestBigtableDoFn extends BigtableDoFn[Int, String](null) { - override def newClient(): BigtableSession = null - override def asyncLookup(session: BigtableSession, input: Int): ListenableFuture[String] = + override def newClient(): BigtableDataClient = null + override def asyncLookup(client: BigtableDataClient, input: Int): ListenableFuture[String] = Futures.immediateFuture(input.toString) } class TestCachingBigtableDoFn extends BigtableDoFn[Int, String](null, 100, new TestCacheSupplier) { - override def newClient(): BigtableSession = null - override def asyncLookup(session: BigtableSession, input: Int): ListenableFuture[String] = { + override def newClient(): BigtableDataClient = null + override def asyncLookup(client: BigtableDataClient, input: Int): ListenableFuture[String] = { BigtableDoFnTest.queue.add(input) Futures.immediateFuture(input.toString) } } class TestFailingBigtableDoFn extends BigtableDoFn[Int, String](null) { - override def newClient(): BigtableSession = null - override def asyncLookup(session: BigtableSession, input: Int): ListenableFuture[String] = + override def newClient(): BigtableDataClient = null + override def asyncLookup(client: BigtableDataClient, input: Int): ListenableFuture[String] = if (input % 2 == 0) { Futures.immediateFuture("success" + input) } else { From 5e628ae8da7f04bd5952a46d18d6e0f6e30b4d84 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Tue, 13 Aug 2024 11:44:39 +0200 Subject: [PATCH 2/6] Update site --- site/src/main/paradox/FAQ.md | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/site/src/main/paradox/FAQ.md b/site/src/main/paradox/FAQ.md index 9db67a0bfe..34b48287d6 100644 --- a/site/src/main/paradox/FAQ.md +++ b/site/src/main/paradox/FAQ.md @@ -365,18 +365,11 @@ Datastore `Entity` class is actually generated from @github[Protobuf](/scio-exam #### How do I throttle Bigtable writes? -Currently, Dataflow autoscaling may not work well with large writes BigtableIO. Specifically It does not take into account Bigtable IO rate limits and may scale up more workers and end up hitting the limit and eventually fail the job. As a workaround, you can enable throttling for Bigtable writes in Scio 0.4.0-alpha2 or later. +To prevent overloading the cluster but keep trigger the autoscaling if available, you can enable flow control for Bigtable writes in Scio 0.15.0 or later. -```scala mdoc:reset:invisible -val btProjectId = "" -val btInstanceId = "" -val btTableId = "" -``` - -```scala mdoc:silent +```scala mdoc:reset:silent import com.spotify.scio.values._ import com.spotify.scio.bigtable._ -import com.google.cloud.bigtable.config.{BigtableOptions, BulkOptions} import com.google.bigtable.v2.Mutation import com.google.protobuf.ByteString @@ -385,16 +378,10 @@ def main(cmdlineArgs: Array[String]): Unit = { val data: SCollection[(ByteString, Iterable[Mutation])] = ??? - val btOptions = - BigtableOptions.builder() - .setProjectId(btProjectId) - .setInstanceId(btInstanceId) - .setBulkOptions(BulkOptions.builder() - .enableBulkMutationThrottling() - .setBulkMutationRpcTargetMs(10) // lower latency threshold, default is 100 - .build()) - .build() - data.saveAsBigtable(btOptions, btTableId) + val btProjectId: String = ??? + val btInstanceId: String = ??? + val btTableId: String = ??? + data.saveAsBigtable(btProjectId, btInstanceId, btTableId, flowControl = true) // ... } From 9617337e8746d76e875b9a9cad0d87892f80cf6b Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Tue, 13 Aug 2024 14:18:53 +0200 Subject: [PATCH 3/6] Fix scala 2.12 compilation --- .../com/spotify/scio/bigtable/Admin.scala | 24 +++++++++---------- .../bigtable/syntax/ScioContextSyntax.scala | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala index 4976a1884f..d443e5518c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala @@ -40,7 +40,7 @@ import scala.jdk.CollectionConverters._ */ object Admin { - private val logger: Logger = LoggerFactory.getLogger(classOf[Admin.type]) + private val logger: Logger = LoggerFactory.getLogger(this.getClass) sys.addShutdownHook { logger.info("Shutting down Bigtable clients") Instance.clients.values.foreach(_.close()) @@ -50,13 +50,11 @@ object Admin { object Table { sealed trait CreateDisposition - object CreateDisposition { case object Never extends CreateDisposition - case object CreateIfNeeded extends CreateDisposition - val default: CreateDisposition = CreateIfNeeded + val Default: CreateDisposition = CreateIfNeeded } private[bigtable] val clients: concurrent.Map[(String, String), BigtableTableAdminClient] = @@ -95,7 +93,7 @@ object Admin { instanceId: String, tableId: String, columnFamilies: Iterable[String], - createDisposition: CreateDisposition = CreateDisposition.default + createDisposition: CreateDisposition = CreateDisposition.Default ): Unit = { val tcf = columnFamilies.map(cf => cf -> None) ensureTableImpl(projectId, instanceId, tableId, tcf, createDisposition) @@ -116,7 +114,7 @@ object Admin { instanceId: String, tableId: String, columnFamilies: Iterable[(String, Option[Duration])], - createDisposition: CreateDisposition = CreateDisposition.default + createDisposition: CreateDisposition = CreateDisposition.Default ): Unit = { // Convert Duration to GcRule val x = columnFamilies.map { case (columnFamily, duration) => @@ -142,7 +140,7 @@ object Admin { instanceId: String, tableId: String, columnFamilies: Iterable[(String, Option[GCRule])], - createDisposition: CreateDisposition = CreateDisposition.default + createDisposition: CreateDisposition = CreateDisposition.Default ): Unit = ensureTableImpl(projectId, instanceId, tableId, columnFamilies, createDisposition) /** @@ -242,17 +240,19 @@ object Admin { sleepDuration: Duration ): Unit = { val client = getOrCreateClient(projectId) - val ids = - if (clusterIds.isEmpty) client.listClusters(instanceId).asScala.map(_.getId) - else clusterIds + val ids: Iterable[String] = if (clusterIds.isEmpty) { + client.listClusters(instanceId).asScala.map(_.getId) + } else { + clusterIds + } ids.foreach { clusterId => logger.info("Updating number of nodes to {} for cluster {}", numberOfNodes, clusterId) client.resizeCluster(instanceId, clusterId, numberOfNodes) } - if (sleepDuration.getMillis > 0) { - logger.info("Sleeping for {} after update", sleepDuration.toPeriod) + if (sleepDuration.isLongerThan(Duration.ZERO)) { + logger.info("Sleeping for {} after update", sleepDuration) Thread.sleep(sleepDuration.getMillis) } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala index d473ce7338..6596d0491c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala @@ -151,7 +151,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { instanceId: String, tableId: String, columnFamilies: Iterable[String], - createDisposition: Admin.Table.CreateDisposition = Admin.Table.CreateDisposition.default + createDisposition: Admin.Table.CreateDisposition = Admin.Table.CreateDisposition.Default ): Unit = if (!self.isTest) { Admin.Table.ensureTable(projectId, instanceId, tableId, columnFamilies, createDisposition) @@ -172,7 +172,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { instanceId: String, tableId: String, columnFamiliesWithExpiration: Iterable[(String, Option[Duration])], - createDisposition: Admin.Table.CreateDisposition = Admin.Table.CreateDisposition.default + createDisposition: Admin.Table.CreateDisposition = Admin.Table.CreateDisposition.Default ): Unit = if (!self.isTest) { Admin.Table.ensureTablesWithExpiration( From f7140cf06ff4aae2f6034eb3845a84c88f462f52 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Tue, 13 Aug 2024 18:14:55 +0200 Subject: [PATCH 4/6] Change future return type for BigtableDoFn --- .../scio/transforms/FutureHandlers.java | 78 +++++++++++++++++++ .../spotify/scio/bigtable/BigtableDoFn.java | 20 ++++- .../scio/bigtable/BigtableDoFnTest.scala | 16 ++-- .../instance/kryo/GcpSerializerTest.scala | 20 +---- 4 files changed, 103 insertions(+), 31 deletions(-) diff --git a/scio-core/src/main/java/com/spotify/scio/transforms/FutureHandlers.java b/scio-core/src/main/java/com/spotify/scio/transforms/FutureHandlers.java index 4e5581f031..12eaa72568 100644 --- a/scio-core/src/main/java/com/spotify/scio/transforms/FutureHandlers.java +++ b/scio-core/src/main/java/com/spotify/scio/transforms/FutureHandlers.java @@ -17,6 +17,10 @@ package com.spotify.scio.transforms; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; import com.google.common.util.concurrent.*; import java.util.concurrent.*; import java.util.function.Function; @@ -137,4 +141,78 @@ default CompletableFuture addCallback( }); } } + + /** + * A {@link Base} implementation for Google API {@link ApiFuture}. Similar to Guava's + * ListenableFuture, but redeclared so that Guava could be shaded. + */ + public interface GoogleApi extends Base, V> { + /** + * Executor used for callbacks. Default is {@link ForkJoinPool#commonPool()}. Consider + * overriding this method if callbacks are blocking. + * + * @return Executor for callbacks. + */ + default Executor getCallbackExecutor() { + return ForkJoinPool.commonPool(); + } + + @Override + default void waitForFutures(Iterable> futures) + throws InterruptedException, ExecutionException { + // use Future#successfulAsList instead of Futures#allAsList which only works if all + // futures succeed + ApiFutures.successfulAsList(futures).get(); + } + + @Override + default ApiFuture addCallback( + ApiFuture future, Function onSuccess, Function onFailure) { + // Futures#transform doesn't allow onFailure callback while Futures#addCallback doesn't + // guarantee that callbacks are called before ListenableFuture#get() unblocks + SettableApiFuture f = SettableApiFuture.create(); + // if executor rejects the callback, we have to fail the future + Executor rejectPropagationExecutor = + command -> { + try { + getCallbackExecutor().execute(command); + } catch (RejectedExecutionException e) { + f.setException(e); + } + }; + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + @Override + public void onSuccess(@Nullable V result) { + try { + onSuccess.apply(result); + f.set(result); + } catch (Throwable e) { + f.setException(e); + } + } + + @Override + public void onFailure(Throwable t) { + Throwable callbackException = null; + try { + onFailure.apply(t); + } catch (Throwable e) { + // do not fail executing thread if callback fails + // record exception and propagate as suppressed + callbackException = e; + } finally { + if (callbackException != null) { + t.addSuppressed(callbackException); + } + f.setException(t); + } + } + }, + rejectPropagationExecutor); + + return f; + } + } } diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java index 33b8e7524e..2c9f5f1275 100644 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java +++ b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java @@ -17,11 +17,11 @@ package com.spotify.scio.bigtable; +import com.google.api.core.ApiFuture; import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; -import com.google.common.util.concurrent.ListenableFuture; import com.spotify.scio.transforms.BaseAsyncLookupDoFn; -import com.spotify.scio.transforms.GuavaAsyncLookupDoFn; +import com.spotify.scio.transforms.FutureHandlers; import java.io.IOException; import java.util.function.Supplier; import org.apache.beam.sdk.transforms.DoFn; @@ -32,12 +32,14 @@ * @param input element type. * @param Bigtable lookup value type. */ -public abstract class BigtableDoFn extends GuavaAsyncLookupDoFn { +public abstract class BigtableDoFn + extends BaseAsyncLookupDoFn, BaseAsyncLookupDoFn.Try> + implements FutureHandlers.GoogleApi { private final Supplier settingsSupplier; /** Perform asynchronous Bigtable lookup. */ - public abstract ListenableFuture asyncLookup(BigtableDataClient client, A input); + public abstract ApiFuture asyncLookup(BigtableDataClient client, A input); /** * Create a {@link BigtableDoFn} instance. @@ -107,4 +109,14 @@ protected BigtableDataClient newClient() { throw new RuntimeException(e); } } + + @Override + public BaseAsyncLookupDoFn.Try success(B output) { + return new Try<>(output); + } + + @Override + public BaseAsyncLookupDoFn.Try failure(Throwable throwable) { + return new Try<>(throwable); + } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala index e598b881b6..b7c064818e 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala @@ -17,9 +17,9 @@ package com.spotify.scio.bigtable +import com.google.api.core.{ApiFuture, ApiFutures} import com.google.cloud.bigtable.data.v2.BigtableDataClient import com.google.common.cache.{Cache, CacheBuilder} -import com.google.common.util.concurrent.{Futures, ListenableFuture} import com.spotify.scio.testing._ import com.spotify.scio.transforms.BaseAsyncLookupDoFn.CacheSupplier import com.spotify.scio.transforms.JavaAsyncConverters._ @@ -69,25 +69,25 @@ object BigtableDoFnTest { class TestBigtableDoFn extends BigtableDoFn[Int, String](null) { override def newClient(): BigtableDataClient = null - override def asyncLookup(client: BigtableDataClient, input: Int): ListenableFuture[String] = - Futures.immediateFuture(input.toString) + override def asyncLookup(client: BigtableDataClient, input: Int): ApiFuture[String] = + ApiFutures.immediateFuture(input.toString) } class TestCachingBigtableDoFn extends BigtableDoFn[Int, String](null, 100, new TestCacheSupplier) { override def newClient(): BigtableDataClient = null - override def asyncLookup(client: BigtableDataClient, input: Int): ListenableFuture[String] = { + override def asyncLookup(client: BigtableDataClient, input: Int): ApiFuture[String] = { BigtableDoFnTest.queue.add(input) - Futures.immediateFuture(input.toString) + ApiFutures.immediateFuture(input.toString) } } class TestFailingBigtableDoFn extends BigtableDoFn[Int, String](null) { override def newClient(): BigtableDataClient = null - override def asyncLookup(client: BigtableDataClient, input: Int): ListenableFuture[String] = + override def asyncLookup(client: BigtableDataClient, input: Int): ApiFuture[String] = if (input % 2 == 0) { - Futures.immediateFuture("success" + input) + ApiFutures.immediateFuture("success" + input) } else { - Futures.immediateFailedFuture(new RuntimeException("failure" + input)) + ApiFutures.immediateFailedFuture(new RuntimeException("failure" + input)) } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala index ddf4c41360..0184b0a4ed 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala @@ -19,10 +19,9 @@ package com.spotify.scio.coders.instance.kryo import com.google.api.gax.grpc.GrpcStatusCode import com.google.api.gax.rpc.InternalException import com.google.cloud.bigtable.data.v2.models.MutateRowsException -import com.google.cloud.bigtable.grpc.scanner.BigtableRetriesExhaustedException import com.spotify.scio.coders.instances.kryo.GrpcSerializerTest._ import io.grpc.Status.Code -import io.grpc.{Metadata, Status, StatusRuntimeException} +import io.grpc.{Status, StatusRuntimeException} import org.scalactic.Equality import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -31,13 +30,6 @@ import scala.jdk.CollectionConverters._ object GcpSerializerTest { - implicit val eqBigtableRetriesExhaustedException: Equality[BigtableRetriesExhaustedException] = { - case (a: BigtableRetriesExhaustedException, b: BigtableRetriesExhaustedException) => - a.getMessage == b.getMessage && - eqCause.areEqual(a.getCause, b.getCause) - case _ => false - } - implicit val eqMutateRowsException: Equality[MutateRowsException] = { case (a: MutateRowsException, b: MutateRowsException) => eqCause.areEqual(a.getCause, b.getCause) && @@ -59,16 +51,6 @@ class GcpSerializerTest extends AnyFlatSpec with Matchers { import GcpSerializerTest._ import com.spotify.scio.testing.CoderAssertions._ - "BigtableRetriesExhaustedException" should "roundtrip" in { - val metadata = new Metadata() - metadata.put(Metadata.Key.of[String]("k", Metadata.ASCII_STRING_MARSHALLER), "v") - val cause = new StatusRuntimeException( - Status.OK.withCause(new RuntimeException("bar")).withDescription("bar"), - metadata - ) - new BigtableRetriesExhaustedException("Error", cause) coderShould roundtrip() - } - "MutateRowsExceptionSerializer" should "roundtrip" in { val cause = new StatusRuntimeException(Status.OK) val code = GrpcStatusCode.of(Status.OK.getCode) From 931277db2f4b9b739ec9748c57f923c2d3a9a623 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 16 Aug 2024 14:15:40 +0200 Subject: [PATCH 5/6] Declare explicit dependency --- build.sbt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.sbt b/build.sbt index 682d18f1ce..fa153aeb0b 100644 --- a/build.sbt +++ b/build.sbt @@ -735,6 +735,7 @@ lazy val `scio-core` = project "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion, "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion, "com.github.luben" % "zstd-jni" % zstdJniVersion, + "com.google.api" % "api-common" % gcpBom.key.value, "com.google.api" % "gax" % gcpBom.key.value, "com.google.api-client" % "google-api-client" % gcpBom.key.value, "com.google.auto.service" % "auto-service-annotations" % autoServiceVersion, @@ -961,6 +962,7 @@ lazy val `scio-google-cloud-platform` = project libraryDependencies ++= Seq( // compile "com.esotericsoftware" % "kryo-shaded" % kryoVersion, + "com.google.api" % "api-common" % gcpBom.key.value, "com.google.api" % "gax" % gcpBom.key.value, "com.google.api" % "gax-grpc" % gcpBom.key.value, "com.google.api-client" % "google-api-client" % gcpBom.key.value, From fc6db2035e9394124322c42679262712fd300f0b Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Tue, 20 Aug 2024 11:03:59 +0200 Subject: [PATCH 6/6] Remove unused dependency --- build.sbt | 1 - 1 file changed, 1 deletion(-) diff --git a/build.sbt b/build.sbt index fa153aeb0b..11c343b8b4 100644 --- a/build.sbt +++ b/build.sbt @@ -979,7 +979,6 @@ lazy val `scio-google-cloud-platform` = project "com.google.cloud" % "google-cloud-core" % gcpBom.key.value, "com.google.cloud" % "google-cloud-spanner" % gcpBom.key.value, "com.google.cloud.bigdataoss" % "util" % bigdataossVersion, - "com.google.guava" % "guava" % guavaVersion, "com.google.http-client" % "google-http-client" % gcpBom.key.value, "com.google.http-client" % "google-http-client-gson" % gcpBom.key.value, "com.google.protobuf" % "protobuf-java" % gcpBom.key.value,