Skip to content

Commit

Permalink
bigquery cdk bump
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Aug 12, 2024
1 parent f3e781d commit d266507
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ class BigQueryDestination : BaseConnector(), Destination {
}

fun main(args: Array<String>) {
log.info { "dumb change to force CI to actually run" }
addThrowableForDeinterpolation(BigQueryException::class.java)
val destination: Destination = BigQueryDestination()
log.info { "Starting Destination : ${destination.javaClass}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class BigQueryDestinationHandler(private val bq: BigQuery, private val datasetLo
SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
FROM ${'$'}{raw_table}
WHERE _airbyte_loaded_at IS NULL
""".trimIndent()
)
)
Expand Down Expand Up @@ -96,7 +96,7 @@ class BigQueryDestinationHandler(private val bq: BigQuery, private val datasetLo
"""
SELECT MAX(_airbyte_extracted_at)
FROM ${'$'}{raw_table}
""".trimIndent()
)
)
Expand Down Expand Up @@ -125,6 +125,17 @@ class BigQueryDestinationHandler(private val bq: BigQuery, private val datasetLo
return null
}

val tableDef = finalTable.getDefinition<StandardTableDefinition>()
val hasGenerationId: Boolean =
tableDef.schema
?.fields
// Field doesn't have a hasColumn(String) method >.>
?.any { it.name == JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID }
?: false
if (!hasGenerationId) {
return null
}

val result =
bq.query(
QueryJobConfiguration.of(
Expand Down

0 comments on commit d266507

Please sign in to comment.