-
Notifications
You must be signed in to change notification settings - Fork 514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor bigtable API to use v2 client #5444
base: main
Are you sure you want to change the base?
Conversation
|
||
private final BigtableOptions options; | ||
private final Supplier<BigtableDataSettings> settingsSupplier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunatelly BigtableDataSettings
is not serializable.
val sideOutput = PCollectionTuple.of(BigtableWrite.BigtableWriteResult.tupleTag, result) | ||
(tap(()), SideOutputCollections(sideOutput, data.context)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leverage WriteResultIO
to expose the BigtableWriteResult
as side output
projectId: String, | ||
instanceId: String, | ||
tablesAndColumnFamilies: Map[String, Iterable[String]], | ||
createDisposition: TableAdmin.CreateDisposition | ||
): Unit = | ||
if (!self.isTest) { | ||
val bigtableOptions = BigtableOptions | ||
.builder() | ||
.setProjectId(projectId) | ||
.setInstanceId(instanceId) | ||
.build | ||
TableAdmin.ensureTables(bigtableOptions, tablesAndColumnFamilies, createDisposition) | ||
} | ||
|
||
def ensureTables( | ||
def ensureTable( | ||
projectId: String, | ||
instanceId: String, | ||
tablesAndColumnFamilies: Map[String, Iterable[String]] | ||
): Unit = ensureTables( | ||
projectId, | ||
instanceId, | ||
tablesAndColumnFamilies, | ||
TableAdmin.CreateDisposition.default | ||
) | ||
|
||
/** | ||
* Ensure that tables and column families exist. Checks for existence of tables or creates them if | ||
* they do not exist. Also checks for existence of column families within each table and creates | ||
* them if they do not exist. | ||
* | ||
* @param tablesAndColumnFamilies | ||
* A map of tables and column families. Keys are table names. Values are a list of column family | ||
* names. | ||
*/ | ||
def ensureTables( | ||
bigtableOptions: BigtableOptions, | ||
tablesAndColumnFamilies: Map[String, Iterable[String]], | ||
createDisposition: TableAdmin.CreateDisposition | ||
tableId: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As underlying client is cached, simplify the API an not worry if this is called multiple times.
} | ||
final case class WriteParam private ( | ||
flowControl: Boolean = WriteParam.DefaultFlowControl, | ||
errorHandler: ErrorHandler[BadRecord, _] = WriteParam.DefaultErrorHandler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix #5440
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5444 +/- ##
==========================================
+ Coverage 61.29% 61.59% +0.29%
==========================================
Files 311 312 +1
Lines 11062 11031 -31
Branches 774 780 +6
==========================================
+ Hits 6780 6794 +14
+ Misses 4282 4237 -45 ☔ View full report in Codecov by Sentry. |
a10d6ae
to
f7140cf
Compare
28b4139
to
931277d
Compare
Update all bigtable APIs to use cloud v2 client.
Drop
BigtableBulkWriter
. This is TBD, but it looks to me can manually shard/groupBy and flatten to normal IO if they require this feature.Change bigtable
ScioContext
api to useAdmin.Table
andAdmin.Instance
objects that cache clients instances.