Skip to content

Commit

Permalink
Destination postgres (dv2): add indexes to raw table (#34236)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <gisripa@gmail.com>
Co-authored-by: Gireesh Sreepathi <gisripa@gmail.com>
  • Loading branch information
edgao and gisripa authored Jan 19, 2024
1 parent 6d73558 commit d610ad1
Show file tree
Hide file tree
Showing 23 changed files with 147 additions and 106 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator |
| 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes |
| 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method |
| 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.13.0
version=0.13.1
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ protected Optional<ConfigErrorException> checkForKnownConfigExceptions(final Exc
public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException {
try {
database.execute(createTableQuery(database, schemaName, tableName));
for (final String postCreateSql : postCreateTableQueries(schemaName, tableName)) {
database.execute(postCreateSql);
}
} catch (final SQLException e) {
throw checkForKnownConfigExceptions(e).orElseThrow(() -> e);
}
Expand All @@ -85,6 +88,15 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
}
}

/**
* Some subclasses may want to execute additional SQL statements after creating the raw table. For
* example, Postgres does not support index definitions within a CREATE TABLE statement, so we need
* to run CREATE INDEX statements after creating the table.
*/
protected List<String> postCreateTableQueries(final String schemaName, final String tableName) {
return List.of();
}

protected String createTableQueryV1(final String schemaName, final String tableName) {
return String.format(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.13.0'
cdkVersionRequired = '0.13.1'
features = [
'db-sources', // required for tests
'db-destinations'
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
testExecutionConcurrency=-1
# our testcontainer has issues with too much concurrency.
# 4 threads seems to be the sweet spot.
testExecutionConcurrency=4
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
data:
registries:
cloud:
dockerImageTag: 0.4.0
enabled: false # strict encrypt connectors are deployed to Cloud by their non strict encrypt sibling.
oss:
dockerImageTag: 0.4.0
enabled: false # strict encrypt connectors are not used on OSS.
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.5.4
dockerImageTag: 0.5.5
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
icon: postgresql.svg
license: ELv2
Expand All @@ -19,8 +13,14 @@ data:
normalizationIntegrationType: postgres
normalizationRepository: airbyte/normalization
normalizationTag: 0.4.1
registries:
cloud:
dockerImageTag: 0.4.0
enabled: false
oss:
dockerImageTag: 0.4.0
enabled: false
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
supportsDbt: true
tags:
- language:java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.13.0'
cdkVersionRequired = '0.13.1'
features = [
'db-sources', // required for tests
'db-destinations',
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
testExecutionConcurrency=-1
# our testcontainer has issues with too much concurrency.
# 4 threads seems to be the sweet spot.
testExecutionConcurrency=4
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.5.4
dockerImageTag: 0.5.5
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.postgres;

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import java.io.BufferedReader;
Expand All @@ -14,6 +15,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
Expand All @@ -24,6 +26,23 @@ public PostgresSqlOperations() {
super(new PostgresDataAdapter());
}

@Override
protected List<String> postCreateTableQueries(final String schemaName, final String tableName) {
if (TypingAndDedupingFlag.isDestinationV2()) {
return List.of(
// the raw_id index _could_ be unique (since raw_id is a UUID)
// but there's no reason to do that (because it's a UUID :P )
// and it would just slow down inserts.
// also, intentionally don't specify the type of index (btree, hash, etc). Just use the default.
"CREATE INDEX IF NOT EXISTS " + tableName + "_raw_id" + " ON " + schemaName + "." + tableName + "(_airbyte_raw_id)",
"CREATE INDEX IF NOT EXISTS " + tableName + "_extracted_at" + " ON " + schemaName + "." + tableName + "(_airbyte_extracted_at)",
"CREATE INDEX IF NOT EXISTS " + tableName + "_loaded_at" + " ON " + schemaName + "." + tableName
+ "(_airbyte_loaded_at, _airbyte_extracted_at)");
} else {
return Collections.emptyList();
}
}

@Override
protected void insertRecordsInternalV2(final JdbcDatabase database,
final List<PartialAirbyteMessage> records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA;
import static java.util.Collections.emptyList;
import static org.jooq.impl.DSL.array;
Expand All @@ -28,6 +29,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
Expand All @@ -36,10 +38,13 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.Condition;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.Name;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultDataType;
import org.jooq.impl.SQLDataType;
Expand Down Expand Up @@ -79,6 +84,41 @@ protected SQLDialect getDialect() {
return SQLDialect.POSTGRES;
}

@Override
public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) {
final List<Sql> statements = new ArrayList<>();
final Name finalTableName = name(stream.id().finalNamespace(), stream.id().finalName() + suffix);

statements.add(super.createTable(stream, suffix, force));

if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
// An index for our ROW_NUMBER() PARTITION BY pk ORDER BY cursor, extracted_at function
final List<Name> pkNames = stream.primaryKey().stream()
.map(pk -> quotedName(pk.name()))
.toList();
statements.add(Sql.of(getDslContext().createIndex().on(
finalTableName,
Stream.of(
pkNames.stream(),
// if cursor is present, then a stream containing its name
// but if no cursor, then empty stream
stream.cursor().stream().map(cursor -> quotedName(cursor.name())),
Stream.of(name(COLUMN_NAME_AB_EXTRACTED_AT))).flatMap(Function.identity()).toList())
.getSQL()));
}
statements.add(Sql.of(getDslContext().createIndex().on(
finalTableName,
name(COLUMN_NAME_AB_EXTRACTED_AT))
.getSQL()));

statements.add(Sql.of(getDslContext().createIndex().on(
finalTableName,
name(COLUMN_NAME_AB_RAW_ID))
.getSQL()));

return Sql.concat(statements);
}

@Override
protected List<String> createIndexSql(final StreamConfig stream, final String suffix) {
if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP && !stream.primaryKey().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.12.1'
cdkVersionRequired = '0.13.0'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.7.15
dockerImageTag: 0.8.0
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected Map<String, String> getDefaultConnectionProperties(final JsonNode conf
// TODO: Pull common code from RedshiftInsertDestination and RedshiftStagingS3Destination into a
// base class.
// The following properties can be overriden through jdbcUrlParameters in the config.
Map<String, String> connectionOptions = new HashMap<>();
final Map<String, String> connectionOptions = new HashMap<>();
// Redshift properties
// https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configuration-options.html#jdbc20-connecttimeout-option
// connectTimeout is different from Hikari pool's connectionTimout, driver defaults to 10seconds so
Expand Down Expand Up @@ -177,7 +177,7 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
return new RedshiftDestinationHandler(databaseName, database);
}

Expand Down Expand Up @@ -247,7 +247,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
parsedCatalog = catalogParser.parseCatalog(catalog);
final JdbcV1V2Migrator migrator = new JdbcV1V2Migrator(getNamingResolver(), database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final int defaultThreadCount = 8;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,17 @@ protected SQLDialect getDialect() {
*/

@Override
protected Field<?> castedField(final Field<?> field, final AirbyteType type, final String alias) {
protected Field<?> castedField(final Field<?> field, final AirbyteType type, final String alias, final boolean useExpensiveSaferCasting) {
if (type instanceof final AirbyteProtocolType airbyteProtocolType) {
switch (airbyteProtocolType) {
case STRING -> {
return field(CASE_STATEMENT_SQL_TEMPLATE,
jsonTypeOf(field).ne("string").and(field.isNotNull()),
jsonSerialize(field),
castedField(field, airbyteProtocolType)).as(quotedName(alias));
castedField(field, airbyteProtocolType, useExpensiveSaferCasting)).as(quotedName(alias));
}
default -> {
return castedField(field, airbyteProtocolType).as(quotedName(alias));
return castedField(field, airbyteProtocolType, useExpensiveSaferCasting).as(quotedName(alias));
}
}

Expand All @@ -129,7 +129,7 @@ protected Field<?> castedField(final Field<?> field, final AirbyteType type, fin
jsonTypeOf(field).eq("array"),
cast(field, getArrayType())).as(quotedName(alias));
// No nested Unions supported so this will definitely not result in infinite recursion.
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias);
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias, useExpensiveSaferCasting);
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + type);
};
}
Expand All @@ -139,7 +139,11 @@ protected List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, Airb
return columns
.entrySet()
.stream()
.map(column -> castedField(field(quotedName(COLUMN_NAME_DATA, column.getKey().originalName())), column.getValue(), column.getKey().name()))
.map(column -> castedField(
field(quotedName(COLUMN_NAME_DATA, column.getKey().originalName())),
column.getValue(),
column.getKey().name(),
useExpensiveSaferCasting))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -180,7 +184,7 @@ Field<?> toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) {
// TODO: Timestamp format issues can result in null values when cast, add regex check if destination
// supports regex functions.
return field(CASE_STATEMENT_SQL_TEMPLATE,
field.isNotNull().and(castedField(field, type, column.name()).isNull()),
field.isNotNull().and(castedField(field, type, column.name(), true).isNull()),
function("ARRAY", getSuperType(), val(COLUMN_ERROR_MESSAGE_FORMAT.formatted(column.name()))), field("ARRAY()"));
}

Expand All @@ -198,6 +202,7 @@ protected Field<?> buildAirbyteMetaColumn(final LinkedHashMap<ColumnId, AirbyteT
@Override
public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
// Check that the columns match, with special handling for the metadata columns.
// This is mostly identical to the redshift implementation, but swaps jsonb to super
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()),
Expand Down Expand Up @@ -227,6 +232,8 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, fina
*/
@Override
protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Optional<ColumnId> cursor) {
// literally identical to postgres's getRowNumber implementation, changes here probably should
// be reflected there
final List<Field<?>> primaryKeyFields =
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList())
: new ArrayList<>();
Expand Down
Loading

0 comments on commit d610ad1

Please sign in to comment.