Skip to content

Commit

Permalink
switch to oss jdbc driver
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 4, 2024
1 parent 095d30e commit 0aefd8f
Show file tree
Hide file tree
Showing 17 changed files with 66 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.44.16'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand All @@ -43,8 +43,9 @@ application {

dependencies {
// Databricks JDBC driver
implementation 'com.databricks:databricks-jdbc:2.6.36'
implementation 'com.databricks:databricks-sdk-java:0.23.0'
// We're using the OSS version https://docs.gcp.databricks.com/en/integrations/jdbc/oss.html
// which has support for GCP databricks instances.
implementation 'com.databricks:databricks-jdbc:0.9.2-oss'
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,25 @@ object DatabricksConnectorClientsFactory {
// EnableArrow=0 flag is undocumented and disables ArrowBuf when reading data
// Destinations only reads data for metadata or for comparison of actual data in tests. so
// we don't need it to be optimized.
val jdbcUrl =
"jdbc:databricks://${config.hostname}:${config.port}/${config.database};transportMode=http;httpPath=${config.httpPath};EnableArrow=0"
datasource.host = config.hostname
datasource.port = config.port
datasource.httpPath = config.httpPath
datasource.properties["catalog"] = config.database
datasource.properties["schema"] = config.schema
datasource.properties["EnableArrow"] = 0
// TODO this is supposed to be the default???
datasource.properties["statement_timeout"] = 172800
when (config.authentication) {
is BasicAuthentication -> {
datasource.setURL(
"$jdbcUrl;AuthMech=3;UID=token;PWD=${config.authentication.personalAccessToken}"
)
datasource.properties["AuthMech"] = 3
datasource.username = "token"
datasource.password = config.authentication.personalAccessToken
}
is OAuth2Authentication -> {
datasource.setURL(
"$jdbcUrl;AuthMech=11;Auth_Flow=1;OAuth2ClientId=${config.authentication.clientId};OAuth2Secret=${config.authentication.secret}"
)
datasource.properties["AuthMech"] = 11
datasource.properties["Auth_Flow"] = 1
datasource.properties["OAuth2ClientId"] = config.authentication.clientId
datasource.properties["OAuth2Secret"] = config.authentication.secret
}
}
return datasource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,14 @@ class DatabricksDestinationHandler(
.bufferedResultSetQuery(
{ connection: Connection -> connection.createStatement().executeQuery(query) },
{ resultSet: ResultSet ->
resultSet.getObject("last_loaded_at", LocalDateTime::class.java)
resultSet.getString("last_loaded_at")?.let {
Instant.parse(it)
}
},
)
.stream()
.filter { ts: LocalDateTime? -> Objects.nonNull(ts) }
.filter { ts: Instant? -> Objects.nonNull(ts) }
.findFirst()
.map {
// Databricks doesn't have offset stored, so we always use UTC as the supposed
// timezone
it.toInstant(ZoneOffset.UTC)
}
}

private fun getInitialRawTableState(id: StreamId, suffix: String): InitialRawTableStatus {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": []}}
{"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": []}}
{"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}}
{"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56.000000Z", "timestamp_without_timezone": "2023-01-23T12:34:56.000", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": []}}
{"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": []}}
{"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}}
// In databricks we don't care about struct/array as they are stored as strings and can be safely queried using jsonpath, time&timetz are non-existent and stored as strings too
{"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "array":{}, "struct": [], "time_with_timezone": "{}", "time_without_timezone": "{}", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes":[{"field":"number","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"integer","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"boolean","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"timestamp_with_timezone","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"timestamp_without_timezone","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}}
{"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "array":{}, "struct": [], "time_with_timezone": "{}", "time_without_timezone": "{}", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes":[{"field":"number","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"integer","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"boolean","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"timestamp_with_timezone","change":"NULLED","reason.000000Z":"DESTINATION_TYPECAST_ERROR"},{"field":"timestamp_without_timezone","change":"NULLED","reason.000":"DESTINATION_TYPECAST_ERROR"},{"field":"date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}}
// Note that for numbers where we parse the value to JSON (struct, array, unknown) we lose precision.
// But for numbers where we create a NUMBER column, we do not lose precision (see the `number` column).
{"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": []}}
{"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": []}}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}}
{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}}
{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}, "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}}
{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}}
{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}}
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}}
{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}}
{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}, "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}}
{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}}
{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": []}, "id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00.000000", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}
{"_airbyte_raw_id": "b9ac9f01-abc1-4e7c-89e5-eac9223d5726", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": [{"field":"integer","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}, "id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:01", "string": "Bob"}
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": []}, "id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00.000000Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}
{"_airbyte_raw_id": "b9ac9f01-abc1-4e7c-89e5-eac9223d5726", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": [{"field":"integer","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}, "id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:01.000000Z", "string": "Bob"}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"_airbyte_raw_id": "d7b81af0-01da-4846-a650-cc398986bc99", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}}
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}}
{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": 126}}
{"_airbyte_raw_id": "b9ac9f01-abc1-4e7c-89e5-eac9223d5726", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:01Z", "string": "Bob", "integer": "oops"}}
{"_airbyte_raw_id": "d7b81af0-01da-4846-a650-cc398986bc99", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}}
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}}
{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": 126}}
{"_airbyte_raw_id": "b9ac9f01-abc1-4e7c-89e5-eac9223d5726", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:01Z", "string": "Bob", "integer": "oops"}}
Loading

0 comments on commit 0aefd8f

Please sign in to comment.