Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor bigtable API to use v2 client #5444

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ val beamVersion = "2.58.0"
val autoServiceVersion = "1.0.1"
val autoValueVersion = "1.9"
val bigdataossVersion = "2.2.16"
val bigtableClientVersion = "1.28.0"
val commonsCodecVersion = "1.17.0"
val commonsCompressVersion = "1.26.2"
val commonsIoVersion = "2.16.1"
Expand Down Expand Up @@ -967,7 +966,6 @@ lazy val `scio-google-cloud-platform` = project
"com.google.api-client" % "google-api-client" % gcpBom.key.value,
"com.google.api.grpc" % "grpc-google-cloud-pubsub-v1" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigquerystorage-v1beta1" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigtable-admin-v2" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-datastore-v1" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % gcpBom.key.value,
Expand All @@ -979,8 +977,6 @@ lazy val `scio-google-cloud-platform` = project
"com.google.cloud" % "google-cloud-core" % gcpBom.key.value,
"com.google.cloud" % "google-cloud-spanner" % gcpBom.key.value,
"com.google.cloud.bigdataoss" % "util" % bigdataossVersion,
"com.google.cloud.bigtable" % "bigtable-client-core" % bigtableClientVersion,
"com.google.cloud.bigtable" % "bigtable-client-core-config" % bigtableClientVersion,
"com.google.guava" % "guava" % guavaVersion,
"com.google.http-client" % "google-http-client" % gcpBom.key.value,
"com.google.http-client" % "google-http-client-gson" % gcpBom.key.value,
Expand Down
120 changes: 52 additions & 68 deletions integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
package com.spotify.scio.bigtable

import java.util.UUID

import com.google.bigtable.admin.v2.{DeleteTableRequest, GetTableRequest, ListTablesRequest}
import com.google.bigtable.v2.{Mutation, Row, RowFilter}
import com.google.cloud.bigtable.config.BigtableOptions
import com.google.cloud.bigtable.grpc._
import com.google.cloud.bigtable.admin.v2.{BigtableInstanceAdminClient, BigtableTableAdminClient}
import com.google.protobuf.ByteString
import com.spotify.scio._
import com.spotify.scio.testing._
Expand All @@ -41,12 +38,6 @@ object BigtableIT {
def testData(id: String): Seq[(String, Long)] =
Seq((s"$id-key1", 1L), (s"$id-key2", 2L), (s"$id-key3", 3L))

val bigtableOptions: BigtableOptions = BigtableOptions
.builder()
.setProjectId(projectId)
.setInstanceId(instanceId)
.build

val FAMILY_NAME: String = "count"
val COLUMN_QUALIFIER: ByteString = ByteString.copyFromUtf8("long")

Expand All @@ -67,31 +58,29 @@ object BigtableIT {

def fromRow(r: Row): (String, Long) =
(r.getKey.toStringUtf8, r.getValue(FAMILY_NAME, COLUMN_QUALIFIER).get.toStringUtf8.toLong)

def listTables(client: BigtableTableAdminGrpcClient): Set[String] = {
val instancePath = s"projects/$projectId/instances/$instanceId"
val tables = client.listTables(ListTablesRequest.newBuilder().setParent(instancePath).build)
tables.getTablesList.asScala.map(t => new BigtableTableName(t.getName).getTableId).toSet
}
}

class BigtableIT extends PipelineSpec {
import BigtableIT._

// "Update number of bigtable nodes" should "work" in {
ignore should "update number of bigtable nodes" in {
val bt = new BigtableClusterUtilities(bigtableOptions)
val sc = ScioContext()
sc.updateNumberOfBigtableNodes(projectId, instanceId, 4, Duration.standardSeconds(10))
sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 4
bt.getClusterNodeCount(clusterId, zoneId) shouldBe 4
sc.updateNumberOfBigtableNodes(projectId, instanceId, 3, Duration.standardSeconds(10))
sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 3
bt.getClusterNodeCount(clusterId, zoneId) shouldBe 3
val client = BigtableInstanceAdminClient.create(projectId)
try {
val sc = ScioContext()
sc.updateNumberOfBigtableNodes(projectId, instanceId, 4, Duration.standardSeconds(10))
sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 4
client.getCluster(clusterId, zoneId).getServeNodes shouldBe 4
sc.updateNumberOfBigtableNodes(projectId, instanceId, 3, Duration.standardSeconds(10))
sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 3
client.getCluster(clusterId, zoneId).getServeNodes shouldBe 3
} finally {
client.close()
}
}

"BigtableIO" should "work in default mode" in {
TableAdmin.ensureTables(bigtableOptions, Map(tableId -> List(FAMILY_NAME)))
Admin.Table.ensureTable(projectId, instanceId, tableId, List(FAMILY_NAME))
val id = testId()
val data = testData(id)
try {
Expand Down Expand Up @@ -126,12 +115,7 @@ class BigtableIT extends PipelineSpec {
}

it should "work in bulk mode" in {
TableAdmin.ensureTables(bigtableOptions, Map(tableId -> List(FAMILY_NAME)))
val options = BigtableOptions
.builder()
.setProjectId(projectId)
.setInstanceId(instanceId)
.build()
Admin.Table.ensureTable(projectId, instanceId, tableId, List(FAMILY_NAME))
val id = testId()
val data = testData(id)

Expand All @@ -140,7 +124,7 @@ class BigtableIT extends PipelineSpec {
runWithRealContext() { sc =>
sc
.parallelize(data.map(kv => toWriteMutation(kv._1, kv._2)))
.saveAsBigtable(options, tableId, 1)
.saveAsBigtable(projectId, instanceId, tableId)
}.waitUntilDone()

// Read rows back
Expand All @@ -166,48 +150,48 @@ class BigtableIT extends PipelineSpec {
}.waitUntilFinish()
}

"TableAdmin" should "work" in {
"Admin.Table" should "work" in {
val id = testId()
val tables = Map(
s"scio-bigtable-empty-table-$id" -> List(),
s"scio-bigtable-one-cf-table-$id" -> List("colfam1"),
s"scio-bigtable-two-cf-table-$id" -> List("colfam1", "colfam2")
)
val channel = ChannelPoolCreator.createPool(bigtableOptions)
val executorService = BigtableSessionSharedThreadPools.getInstance().getRetryExecutor
val client = new BigtableTableAdminGrpcClient(channel, executorService, bigtableOptions)
val instancePath = s"projects/$projectId/instances/$instanceId"
val tableIds = tables.keys.toSet
def tablePath(table: String): String = s"$instancePath/tables/$table"
def deleteTable(table: String): Unit =
client.deleteTable(DeleteTableRequest.newBuilder().setName(tablePath(table)).build)

// Delete any tables that could be left around from previous IT run.
val oldTables = listTables(client).intersect(tableIds)
oldTables.foreach(deleteTable)

// Ensure that the tables don't exist now
listTables(client).intersect(tableIds) shouldBe empty

// Run UUT
TableAdmin.ensureTables(bigtableOptions, tables)

// Tables must exist
listTables(client).intersect(tableIds) shouldEqual tableIds

// Assert Column families exist
for ((table, columnFamilies) <- tables) {
val tableInfo = client.getTable(
GetTableRequest
.newBuilder()
.setName(tablePath(table))
.build
)
val actualColumnFamilies = tableInfo.getColumnFamiliesMap.asScala.keys
actualColumnFamilies should contain theSameElementsAs columnFamilies
}

// Clean up and delete
tables.keys.foreach(deleteTable)
val client = BigtableTableAdminClient.create(projectId, instanceId)
try {
val tableIds = tables.keys.toSet

// Delete any tables that could be left around from previous IT run.
client
.listTables()
.asScala
.filterNot(tableIds.contains)
.foreach(client.deleteTable)

// Ensure that the tables don't exist now
client.listTables().asScala.toSet.intersect(tableIds) shouldBe empty

// Run UUT
tables.foreach { case (tableId, cfs) =>
Admin.Table.ensureTable(projectId, instanceId, tableId, cfs)
}

// Tables must exist
client.listTables().asScala should contain allElementsOf tableIds

// Assert Column families exist
tables.foreach { case (id, columnFamilies) =>
val table = client.getTable(id)
val actualFamilies = table.getColumnFamilies.asScala.map(_.getId)

actualFamilies should contain theSameElementsAs columnFamilies
}

// Clean up and delete
tableIds.foreach(client.deleteTable)
} finally {
client.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,12 @@ object BigtableWriteExample {
// before the ingestion starts.
sc.updateNumberOfBigtableNodes(btProjectId, btInstanceId, 15)

// Ensure that destination tables and column families exist
sc.ensureTables(
// Ensure that destination table and column families exist
sc.ensureTable(
btProjectId,
btInstanceId,
Map(
btTableId -> List(BigtableExample.FAMILY_NAME)
)
btTableId,
List(BigtableExample.FAMILY_NAME)
)

sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR))
Expand Down
Loading
Loading