Skip to content

Commit

Permalink
fix compilation errors
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 23, 2024
1 parent ce7a6c2 commit bff0465
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import org.jooq.DSLContext
import org.jooq.impl.DSL

/** Database object for interacting with a Jooq connection. */
open class Database(private val dslContext: DSLContext) {
open class Database(private val dslContext: DSLContext?) {
@Throws(SQLException::class)
open fun <T> query(transform: ContextQueryFunction<T>): T? {
return transform.query(dslContext)
}

@Throws(SQLException::class)
open fun <T> transaction(transform: ContextQueryFunction<T>): T? {
return dslContext.transactionResult { configuration: Configuration? ->
return dslContext!!.transactionResult { configuration: Configuration? ->
transform.query(DSL.using(configuration))
}
}
Expand Down
21 changes: 21 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/datastore-bigquery/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion

compileTestFixturesKotlin {
compilerOptions {
allWarningsAsErrors = false
}
}

compileTestKotlin {
compilerOptions {
allWarningsAsErrors = false
}
}

compileKotlin {
compilerOptions {
allWarningsAsErrors = false
}
}

dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,26 @@ import com.google.common.base.Charsets
import com.google.common.collect.ImmutableMap
import com.google.common.collect.Streams
import io.airbyte.cdk.db.SqlDatabase
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.tuple.ImmutablePair
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.threeten.bp.Duration
import java.io.ByteArrayInputStream
import java.io.IOException
import java.sql.SQLException
import java.util.*
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.tuple.ImmutablePair
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.threeten.bp.Duration

class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds: String?, sourceOperations: BigQuerySourceOperations? = BigQuerySourceOperations()) : SqlDatabase() {
class BigQueryDatabase
@JvmOverloads
constructor(
projectId: String?,
jsonCreds: String?,
sourceOperations: BigQuerySourceOperations? = BigQuerySourceOperations()
) : SqlDatabase() {
var bigQuery: BigQuery? = null
private var sourceOperations: BigQuerySourceOperations? = null

Expand All @@ -34,19 +40,28 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
val bigQueryBuilder = BigQueryOptions.newBuilder()
var credentials: ServiceAccountCredentials? = null
if (jsonCreds != null && !jsonCreds.isEmpty()) {
credentials = ServiceAccountCredentials
.fromStream(ByteArrayInputStream(jsonCreds.toByteArray(Charsets.UTF_8)))
credentials =
ServiceAccountCredentials.fromStream(
ByteArrayInputStream(jsonCreds.toByteArray(Charsets.UTF_8))
)
}
bigQuery = bigQueryBuilder
bigQuery =
bigQueryBuilder
.setProjectId(projectId)
.setCredentials(if (!Objects.isNull(credentials)) credentials else ServiceAccountCredentials.getApplicationDefault())
.setHeaderProvider { ImmutableMap.of("user-agent", getUserAgentHeader(connectorVersion)) }
.setRetrySettings(RetrySettings
.newBuilder()
.setCredentials(
if (!Objects.isNull(credentials)) credentials
else ServiceAccountCredentials.getApplicationDefault()
)
.setHeaderProvider {
ImmutableMap.of("user-agent", getUserAgentHeader(connectorVersion))
}
.setRetrySettings(
RetrySettings.newBuilder()
.setMaxAttempts(10)
.setRetryDelayMultiplier(1.5)
.setTotalTimeout(Duration.ofMinutes(60))
.build())
.build()
)
.build()
.service
} catch (e: IOException) {
Expand All @@ -59,31 +74,41 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
}

private val connectorVersion: String
get() = Optional.ofNullable(System.getenv("WORKER_CONNECTOR_IMAGE"))
get() =
Optional.ofNullable(System.getenv("WORKER_CONNECTOR_IMAGE"))
.orElse(StringUtils.EMPTY)
.replace("airbyte/", StringUtils.EMPTY).replace(":", "/")
.replace("airbyte/", StringUtils.EMPTY)
.replace(":", "/")

@Throws(SQLException::class)
override fun execute(sql: String?) {
val result = executeQuery(bigQuery, getQueryConfig(sql, emptyList()))
if (result.getLeft() == null) {
throw SQLException("BigQuery request is failed with error: " + result.getRight() + ". SQL: " + sql)
throw SQLException(
"BigQuery request is failed with error: " + result.getRight() + ". SQL: " + sql
)
}
LOGGER.info("BigQuery successfully finished execution SQL: $sql")
}

@Throws(Exception::class)
fun query(sql: String?, vararg params: QueryParameterValue?): Stream<JsonNode> {
return query(sql, (if (params == null) emptyList() else Arrays.asList(*params)))
fun query(sql: String?, vararg params: QueryParameterValue): Stream<JsonNode> {
return query(sql, (if (params == null) emptyList() else Arrays.asList(*params).toList()))
}

@Throws(Exception::class)
override fun unsafeQuery(sql: String?, vararg params: String): Stream<JsonNode> {
val parameterValueList = if (params == null) emptyList()
else Arrays.stream(params).map { param: String? ->
QueryParameterValue.newBuilder().setValue(param).setType(
StandardSQLTypeName.STRING).build()
}.collect(Collectors.toList())
override fun unsafeQuery(sql: String?, vararg params: String?): Stream<JsonNode> {
val parameterValueList =
if (params == null) emptyList()
else
Arrays.stream(params)
.map { param: String? ->
QueryParameterValue.newBuilder()
.setValue(param)
.setType(StandardSQLTypeName.STRING)
.build()
}
.collect(Collectors.toList())

return query(sql, parameterValueList)
}
Expand All @@ -95,21 +120,31 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:

if (result.getLeft() != null) {
val fieldList = result.getLeft()!!.getQueryResults().schema.fields
return Streams.stream(result.getLeft()!!.getQueryResults().iterateAll())
.map { fieldValues: FieldValueList -> sourceOperations!!.rowToJson(BigQueryResultSet(fieldValues, fieldList)) }
} else throw Exception(
"Failed to execute query " + sql + (if (params != null && !params.isEmpty()) " with params $params" else "") + ". Error: " + result.getRight())
return Streams.stream(result.getLeft()!!.getQueryResults().iterateAll()).map {
fieldValues: FieldValueList ->
sourceOperations!!.rowToJson(BigQueryResultSet(fieldValues, fieldList))
}
} else
throw Exception(
"Failed to execute query " +
sql +
(if (params != null && !params.isEmpty()) " with params $params" else "") +
". Error: " +
result.getRight()
)
}

fun getQueryConfig(sql: String?, params: List<QueryParameterValue>?): QueryJobConfiguration {
return QueryJobConfiguration
.newBuilder(sql)
.setUseLegacySql(false)
.setPositionalParameters(params)
.build()
return QueryJobConfiguration.newBuilder(sql)
.setUseLegacySql(false)
.setPositionalParameters(params)
.build()
}

fun executeQuery(bigquery: BigQuery?, queryConfig: QueryJobConfiguration?): ImmutablePair<Job?, String?> {
fun executeQuery(
bigquery: BigQuery?,
queryConfig: QueryJobConfiguration?
): ImmutablePair<Job?, String?> {
val jobId = JobId.of(UUID.randomUUID().toString())
val queryJob = bigquery!!.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build())
return executeQuery(queryJob)
Expand All @@ -123,13 +158,21 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
*/
fun getProjectTables(projectId: String?): List<Table> {
val tableList: MutableList<Table> = ArrayList()
bigQuery!!.listDatasets(projectId)
.iterateAll()
.forEach(Consumer { dataset: Dataset ->
bigQuery!!.listTables(dataset.datasetId)
.iterateAll()
.forEach(Consumer { table: Table -> tableList.add(bigQuery!!.getTable(table.tableId)) })
})
bigQuery!!
.listDatasets(projectId)
.iterateAll()
.forEach(
Consumer { dataset: Dataset ->
bigQuery!!
.listTables(dataset.datasetId)
.iterateAll()
.forEach(
Consumer { table: Table ->
tableList.add(bigQuery!!.getTable(table.tableId))
}
)
}
)
return tableList
}

Expand All @@ -141,9 +184,10 @@ class BigQueryDatabase @JvmOverloads constructor(projectId: String?, jsonCreds:
*/
fun getDatasetTables(datasetId: String?): List<Table> {
val tableList: MutableList<Table> = ArrayList()
bigQuery!!.listTables(datasetId)
.iterateAll()
.forEach(Consumer { table: Table -> tableList.add(bigQuery!!.getTable(table.tableId)) })
bigQuery!!
.listTables(datasetId)
.iterateAll()
.forEach(Consumer { table: Table -> tableList.add(bigQuery!!.getTable(table.tableId)) })
return tableList
}

Expand Down
Loading

0 comments on commit bff0465

Please sign in to comment.