Skip to content

Commit

Permalink
Destination postgres: DV2 beta implementation (#34177)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jan 17, 2024
1 parent 63c6961 commit 0063382
Show file tree
Hide file tree
Showing 54 changed files with 972 additions and 111 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes |
| 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method |
| 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 |
| 0.11.5 | 2024-01-10 | [\#34119](https://github.com/airbytehq/airbyte/pull/34119) | Remove wal2json support for postgres+debezium. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ public static DataSource create(final String username,
final String password,
final String driverClassName,
final String jdbcConnectionString) {
return new DataSourceBuilder()
.withDriverClassName(driverClassName)
.withJdbcUrl(jdbcConnectionString)
.withPassword(password)
.withUsername(username)
return new DataSourceBuilder(username, password, driverClassName, jdbcConnectionString)
.build();
}

Expand All @@ -56,12 +52,8 @@ public static DataSource create(final String username,
final String jdbcConnectionString,
final Map<String, String> connectionProperties,
final Duration connectionTimeout) {
return new DataSourceBuilder()
return new DataSourceBuilder(username, password, driverClassName, jdbcConnectionString)
.withConnectionProperties(connectionProperties)
.withDriverClassName(driverClassName)
.withJdbcUrl(jdbcConnectionString)
.withPassword(password)
.withUsername(username)
.withConnectionTimeout(connectionTimeout)
.build();
}
Expand All @@ -83,13 +75,7 @@ public static DataSource create(final String username,
final int port,
final String database,
final String driverClassName) {
return new DataSourceBuilder()
.withDatabase(database)
.withDriverClassName(driverClassName)
.withHost(host)
.withPort(port)
.withPassword(password)
.withUsername(username)
return new DataSourceBuilder(username, password, driverClassName, host, port, database)
.build();
}

Expand All @@ -112,14 +98,8 @@ public static DataSource create(final String username,
final String database,
final String driverClassName,
final Map<String, String> connectionProperties) {
return new DataSourceBuilder()
return new DataSourceBuilder(username, password, driverClassName, host, port, database)
.withConnectionProperties(connectionProperties)
.withDatabase(database)
.withDriverClassName(driverClassName)
.withHost(host)
.withPort(port)
.withPassword(password)
.withUsername(username)
.build();
}

Expand All @@ -139,13 +119,7 @@ public static DataSource createPostgres(final String username,
final String host,
final int port,
final String database) {
return new DataSourceBuilder()
.withDatabase(database)
.withDriverClassName("org.postgresql.Driver")
.withHost(host)
.withPort(port)
.withPassword(password)
.withUsername(username)
return new DataSourceBuilder(username, password, "org.postgresql.Driver", host, port, database)
.build();
}

Expand All @@ -158,7 +132,7 @@ public static DataSource createPostgres(final String username,
*/
public static void close(final DataSource dataSource) throws Exception {
if (dataSource != null) {
if (dataSource instanceof AutoCloseable closeable) {
if (dataSource instanceof final AutoCloseable closeable) {
closeable.close();
}
}
Expand All @@ -167,7 +141,7 @@ public static void close(final DataSource dataSource) throws Exception {
/**
* Builder class used to configure and construct {@link DataSource} instances.
*/
private static class DataSourceBuilder {
public static class DataSourceBuilder {

private Map<String, String> connectionProperties = Map.of();
private String database;
Expand All @@ -180,8 +154,35 @@ private static class DataSourceBuilder {
private String password;
private int port = 5432;
private String username;
private String connectionInitSql;

private DataSourceBuilder() {}
private DataSourceBuilder(final String username,
final String password,
final String driverClassName) {
this.username = username;
this.password = password;
this.driverClassName = driverClassName;
}

public DataSourceBuilder(final String username,
final String password,
final String driverClassName,
final String jdbcUrl) {
this(username, password, driverClassName);
this.jdbcUrl = jdbcUrl;
}

public DataSourceBuilder(final String username,
final String password,
final String driverClassName,
final String host,
final int port,
final String database) {
this(username, password, driverClassName);
this.host = host;
this.port = port;
this.database = database;
}

public DataSourceBuilder withConnectionProperties(final Map<String, String> connectionProperties) {
if (connectionProperties != null) {
Expand Down Expand Up @@ -248,6 +249,11 @@ public DataSourceBuilder withUsername(final String username) {
return this;
}

public DataSourceBuilder withConnectionInitSql(final String sql) {
this.connectionInitSql = sql;
return this;
}

public DataSource build() {
final DatabaseDriver databaseDriver = DatabaseDriver.findByDriverClassName(driverClassName);

Expand All @@ -272,6 +278,8 @@ public DataSource build() {
*/
config.setInitializationFailTimeout(Integer.MIN_VALUE);

config.setConnectionInitSql(connectionInitSql);

connectionProperties.forEach(config::addDataSourceProperty);

return new HikariDataSource(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,17 @@ public List<JsonNode> queryJsons(final CheckedFunction<Connection, PreparedState
}

public int queryInt(final String sql, final String... params) throws SQLException {
try (final Stream<Integer> stream = unsafeQuery(c -> {
PreparedStatement statement = c.prepareStatement(sql);
int i = 1;
for (String param : params) {
statement.setString(i, param);
++i;
}
return statement;
}, rs -> rs.getInt(1))) {
try (final Stream<Integer> stream = unsafeQuery(
c -> getPreparedStatement(sql, params, c),
rs -> rs.getInt(1))) {
return stream.findFirst().get();
}
}

public boolean queryBoolean(final String sql, final String... params) throws SQLException {
try (final Stream<Boolean> stream = unsafeQuery(
c -> getPreparedStatement(sql, params, c),
rs -> rs.getBoolean(1))) {
return stream.findFirst().get();
}
}
Expand Down Expand Up @@ -216,20 +218,23 @@ public List<JsonNode> queryJsons(final String sql, final String... params) throw
}

public ResultSetMetaData queryMetadata(final String sql, final String... params) throws SQLException {
try (final Stream<ResultSetMetaData> q = unsafeQuery(c -> {
PreparedStatement statement = c.prepareStatement(sql);
int i = 1;
for (String param : params) {
statement.setString(i, param);
++i;
}
return statement;
},
try (final Stream<ResultSetMetaData> q = unsafeQuery(
c -> getPreparedStatement(sql, params, c),
ResultSet::getMetaData)) {
return q.findFirst().orElse(null);
}
}

public abstract DatabaseMetaData getMetaData() throws SQLException;

private static PreparedStatement getPreparedStatement(String sql, String[] params, Connection c) throws SQLException {
PreparedStatement statement = c.prepareStatement(sql);
int i = 1;
for (String param : params) {
statement.setString(i, param);
i++;
}
return statement;
}

}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.12.1
version=0.13.0
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,26 @@ private static PartialAirbyteMessage getDummyRecord() {
.withSerialized(dummyDataToInsert.toString());
}

/**
* Subclasses which need to modify the DataSource should override
* {@link #modifyDataSourceBuilder(DataSourceFactory.DataSourceBuilder)} rather than this method.
*/
@VisibleForTesting
public DataSource getDataSource(final JsonNode config) {
final JsonNode jdbcConfig = toJdbcConfig(config);
final Map<String, String> connectionProperties = getConnectionProperties(config);
return DataSourceFactory.create(
final DataSourceFactory.DataSourceBuilder builder = new DataSourceFactory.DataSourceBuilder(
jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText(),
jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null,
driverClassName,
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(),
connectionProperties,
getConnectionTimeout(connectionProperties));
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText())
.withConnectionProperties(connectionProperties)
.withConnectionTimeout(getConnectionTimeout(connectionProperties));
return modifyDataSourceBuilder(builder).build();
}

protected DataSourceFactory.DataSourceBuilder modifyDataSourceBuilder(final DataSourceFactory.DataSourceBuilder builder) {
return builder;
}

@VisibleForTesting
Expand Down Expand Up @@ -287,7 +296,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler<TableDefinition> destinationHandler = getDestinationHandler(databaseName, database);
boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,26 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN

protected String createTableQueryV1(final String schemaName, final String tableName) {
return String.format(
"CREATE TABLE IF NOT EXISTS %s.%s ( \n"
+ "%s VARCHAR PRIMARY KEY,\n"
+ "%s JSONB,\n"
+ "%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n"
+ ");\n",
"""
CREATE TABLE IF NOT EXISTS %s.%s (
%s VARCHAR PRIMARY KEY,
%s JSONB,
%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
""",
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
}

protected String createTableQueryV2(final String schemaName, final String tableName) {
return String.format(
"CREATE TABLE IF NOT EXISTS %s.%s ( \n"
+ "%s VARCHAR PRIMARY KEY,\n"
+ "%s JSONB,\n"
+ "%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n"
+ "%s TIMESTAMP WITH TIME ZONE DEFAULT NULL\n"
+ ");\n",
"""
CREATE TABLE IF NOT EXISTS %s.%s (
%s VARCHAR PRIMARY KEY,
%s JSONB,
%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
%s TIMESTAMP WITH TIME ZONE DEFAULT NULL
);
""",
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping;

import static org.jooq.impl.DSL.exists;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.select;
import static org.jooq.impl.DSL.selectOne;

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.CustomSqlType;
Expand All @@ -26,6 +32,7 @@
import java.util.UUID;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,18 +58,13 @@ public Optional<TableDefinition> findExistingTable(final StreamId id) throws Exc

@Override
public boolean isFinalTableEmpty(final StreamId id) throws Exception {
final int rowCount = jdbcDatabase.queryInt(
"""
SELECT row_count
FROM information_schema.tables
WHERE table_catalog = ?
AND table_schema = ?
AND table_name = ?
""",
databaseName,
id.finalNamespace(),
id.finalName());
return rowCount == 0;
return !jdbcDatabase.queryBoolean(
select(
field(exists(
selectOne()
.from(name(id.finalNamespace(), id.finalName()))
.limit(1))))
.getSQL(ParamType.INLINED));
}

@Override
Expand All @@ -83,8 +85,8 @@ public InitialRawTableState getInitialRawTableState(final StreamId id) throws Ex
// but it's also the only method in the JdbcDatabase interface to return non-string/int types
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
DSL.select(DSL.field("MIN(_airbyte_extracted_at)").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
select(field("MIN(_airbyte_extracted_at)").as("min_timestamp"))
.from(name(id.rawNamespace(), id.rawName()))
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
Expand All @@ -102,8 +104,8 @@ record -> record.getTimestamp("min_timestamp"))) {
// This second query just finds the newest raw record.
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
DSL.select(DSL.field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
select(field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
.from(name(id.rawNamespace(), id.rawName()))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
// Filter for nonNull values in case the query returned NULL (i.e. no raw records at all).
Expand Down
Loading

0 comments on commit 0063382

Please sign in to comment.