Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination postgres (dv2): add indexes to raw table #34236

Merged
merged 121 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
121 commits
Select commit Hold shift + click to select a range
7ec0e4a
make methods public
edgao Dec 21, 2023
b0f6035
allow overriding timestamptz type
edgao Dec 21, 2023
e65e315
better?
edgao Dec 21, 2023
e9c6e34
move jdbc base test into db-destinations testFixtures
edgao Dec 21, 2023
6ee0add
override annotation
edgao Dec 21, 2023
aade64a
fixes
edgao Dec 21, 2023
d8fde89
typing_deduping core code uses Sql struct
edgao Dec 22, 2023
d7949a0
update bigquery
edgao Dec 22, 2023
30f8a4d
update snowflake
edgao Dec 22, 2023
cd6b03b
update redshift
edgao Dec 22, 2023
803c758
autoappend semicolon
edgao Dec 22, 2023
af0d77a
move insert statement to base class
edgao Dec 22, 2023
f9a4193
Merge branch 'dv2_cdk_changes' into edgao/dv2_sql_refactor
edgao Dec 22, 2023
3f31078
propagate safe_cast option to subclass
edgao Dec 22, 2023
d952aab
Merge branch 'dv2_cdk_changes' into edgao/dv2_sql_refactor
edgao Dec 22, 2023
d25cb85
fix redshift sqlgenerator
edgao Jan 2, 2024
ef2ddd4
Merge branch 'dv2_cdk_changes' into edgao/dv2_sql_refactor
edgao Jan 2, 2024
b21a539
make methods public
edgao Dec 21, 2023
8d55bac
allow overriding timestamptz type
edgao Dec 21, 2023
a37d096
better?
edgao Dec 21, 2023
0f7cf74
move jdbc base test into db-destinations testFixtures
edgao Dec 21, 2023
2583a70
override annotation
edgao Dec 21, 2023
2e6e931
fixes
edgao Dec 21, 2023
b4ffdba
move insert statement to base class
edgao Dec 22, 2023
4d045ca
propagate safe_cast option to subclass
edgao Dec 22, 2023
3c53c37
fix redshift sqlgenerator
edgao Jan 2, 2024
54350e9
Merge branch 'dv2_cdk_changes' into edgao/dv2_sql_refactor
edgao Jan 3, 2024
b0a667d
fix redshift test
edgao Jan 3, 2024
df833f9
Merge branch 'dv2_cdk_changes' into edgao/dv2_sql_refactor
edgao Jan 3, 2024
9463896
use local cdk
edgao Jan 3, 2024
92e9cf3
skeletons
edgao Jan 3, 2024
78c09c8
rough test impl
edgao Jan 3, 2024
3b7e6e6
stuff
edgao Jan 3, 2024
22d952f
better base implementation
edgao Jan 3, 2024
d07724c
implement extract+cast?
edgao Jan 3, 2024
5858164
I am the dumb
edgao Jan 3, 2024
116618b
I am still the dumb
edgao Jan 3, 2024
a32c67b
placeholder airbyte_meta impl to make tests run
edgao Jan 3, 2024
a5f1d2f
placeholder row_number impl to make tests run
edgao Jan 3, 2024
e8daf9c
copy expectedrecords files from redshift
edgao Jan 3, 2024
e0a3928
fix raw table fetch
edgao Jan 3, 2024
a2c45a4
fetch all rows for now
edgao Jan 3, 2024
b2b70f9
copy over rowNumber impl
edgao Jan 3, 2024
009d5f6
unused imports
edgao Jan 3, 2024
ad81d3f
cdc deletion condition
edgao Jan 3, 2024
63d2fa0
implement test
edgao Jan 3, 2024
00ce755
stuff
edgao Jan 4, 2024
03622d6
fix test insert statement
edgao Jan 4, 2024
fe6499b
fix json typeof check
edgao Jan 4, 2024
7bf1633
extract to method
edgao Jan 4, 2024
b5ee826
fix test fixtures
edgao Jan 4, 2024
e408c33
extract JdbcTypingDedupingTest
edgao Jan 4, 2024
98d38b4
add containerized tests
edgao Jan 4, 2024
2c2debb
plug in raw table stuff
edgao Jan 4, 2024
fd147ad
schema change detection
edgao Jan 4, 2024
2a05c63
build airbyte_meta.errors?
edgao Jan 5, 2024
c0c7568
implement safe_cast
edgao Jan 5, 2024
65f8490
better cast implementation
edgao Jan 5, 2024
127017b
fix error detection
edgao Jan 5, 2024
fbe8267
minor refactor in jsonTypeof
edgao Jan 5, 2024
fa01aeb
threadsafe
edgao Jan 5, 2024
dcf1c8c
format
edgao Jan 5, 2024
e93a566
Merge branch 'master' into dv2/postgres
edgao Jan 5, 2024
7987413
test raw dataset override
edgao Jan 5, 2024
c541d4c
spec
edgao Jan 5, 2024
7bafc06
derp
edgao Jan 5, 2024
aa5cbdc
allow multiple setup
edgao Jan 5, 2024
bcb150f
derpderp
edgao Jan 5, 2024
5b10081
use sql.of for single statement
edgao Jan 5, 2024
fa6cf78
add test for unsafe casting
edgao Jan 5, 2024
8ae16fa
explicit limit 1; static imports
edgao Jan 5, 2024
2bcac6e
fix params
edgao Jan 5, 2024
8969cf9
create safe_cast in airbyte_internal schema
edgao Jan 5, 2024
97617fc
fix test
edgao Jan 5, 2024
c3703fd
fix build
edgao Jan 5, 2024
bb43a17
fix build+format
edgao Jan 5, 2024
27f8bdb
dumb but it works
edgao Jan 5, 2024
5e2c6b1
cleanup
edgao Jan 5, 2024
bf29a0b
create index on pk
edgao Jan 5, 2024
c63c922
format
edgao Jan 5, 2024
4eac520
create index on correct table
edgao Jan 8, 2024
5351a85
Merge branch 'master' into dv2/postgres
edgao Jan 8, 2024
1a203c3
fix tests
edgao Jan 9, 2024
21a0a86
use pg_temp schema for udf
edgao Jan 9, 2024
a5406a1
try this???
edgao Jan 9, 2024
0974b04
Merge branch 'master' into dv2/postgres
edgao Jan 9, 2024
961c8f8
format
edgao Jan 9, 2024
d10a1e7
remove config param from sqlgenerator
edgao Jan 9, 2024
c7ca903
fix snowflake build
edgao Jan 9, 2024
b763a75
fix test class
edgao Jan 9, 2024
241a9f8
testing
edgao Jan 10, 2024
c4b03ae
format
edgao Jan 10, 2024
2dd5357
testing 2
edgao Jan 10, 2024
4084e63
Merge branch 'master' into dv2/postgres_take2
edgao Jan 11, 2024
6cb5d49
revert spec changes
edgao Jan 11, 2024
eca204b
big pile of logistics
edgao Jan 11, 2024
86626a8
Merge branch 'master' into dv2/postgres_take2
edgao Jan 11, 2024
8e8c461
revert postgres to local cdk for prerelease publish
edgao Jan 11, 2024
b6ed088
Revert "revert postgres to local cdk for prerelease publish"
edgao Jan 11, 2024
a859acc
Merge branch 'master' into dv2/postgres_take2
edgao Jan 12, 2024
1b759d4
Merge branch 'master' into dv2/postgres_take2
edgao Jan 12, 2024
125a9da
once more into the breach
edgao Jan 12, 2024
85b8355
just override the method
edgao Jan 12, 2024
4298916
add more indexes
edgao Jan 12, 2024
cb17380
create raw table indexes
edgao Jan 12, 2024
f011896
derp
edgao Jan 12, 2024
17d2b5b
format
edgao Jan 12, 2024
bcecacc
fix create index call
edgao Jan 12, 2024
19add4e
limit test concurrency
edgao Jan 12, 2024
4e9caff
comments
edgao Jan 12, 2024
80713a7
more comment
edgao Jan 12, 2024
b5bbd9d
Merge branch 'master' into edgao/dv2/postgres/indexes_take2
gisripa Jan 17, 2024
4be9a32
redshift cdk update
gisripa Jan 18, 2024
ddd719e
revert bigquery and snowflake cdk breaking changes
gisripa Jan 18, 2024
8f32e8b
Merge branch 'master' into edgao/dv2/postgres/indexes_take2
gisripa Jan 18, 2024
1daf383
format
gisripa Jan 18, 2024
ac45f4f
bump connector versions and cdk version
gisripa Jan 18, 2024
ae1ec15
bump redshift connector
gisripa Jan 18, 2024
2a11dcc
fix readme for cdk
gisripa Jan 18, 2024
9797031
turnoff local cdk for postgres
gisripa Jan 18, 2024
5ae13c0
Merge branch 'master' into edgao/dv2/postgres/indexes_take2
gisripa Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator |
| 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes |
| 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method |
| 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.13.0
version=0.13.1
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ protected Optional<ConfigErrorException> checkForKnownConfigExceptions(final Exc
public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException {
try {
database.execute(createTableQuery(database, schemaName, tableName));
for (final String postCreateSql : postCreateTableQueries(schemaName, tableName)) {
database.execute(postCreateSql);
}
} catch (final SQLException e) {
throw checkForKnownConfigExceptions(e).orElseThrow(() -> e);
}
Expand All @@ -85,6 +88,15 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
}
}

/**
* Some subclasses may want to execute additional SQL statements after creating the raw table. For
* example, Postgres does not support index definitions within a CREATE TABLE statement, so we need
* to run CREATE INDEX statements after creating the table.
*/
protected List<String> postCreateTableQueries(final String schemaName, final String tableName) {
return List.of();
}

protected String createTableQueryV1(final String schemaName, final String tableName) {
return String.format(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.13.0'
cdkVersionRequired = '0.13.1'
features = [
'db-sources', // required for tests
'db-destinations'
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
testExecutionConcurrency=-1
# our testcontainer has issues with too much concurrency.
# 4 threads seems to be the sweet spot.
testExecutionConcurrency=4
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
data:
registries:
cloud:
dockerImageTag: 0.4.0
enabled: false # strict encrypt connectors are deployed to Cloud by their non strict encrypt sibling.
oss:
dockerImageTag: 0.4.0
enabled: false # strict encrypt connectors are not used on OSS.
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.5.4
dockerImageTag: 0.5.5
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
icon: postgresql.svg
license: ELv2
Expand All @@ -19,8 +13,14 @@ data:
normalizationIntegrationType: postgres
normalizationRepository: airbyte/normalization
normalizationTag: 0.4.1
registries:
cloud:
dockerImageTag: 0.4.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove this, even if it's ignored

enabled: false
oss:
dockerImageTag: 0.4.0
enabled: false
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
supportsDbt: true
tags:
- language:java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.13.0'
cdkVersionRequired = '0.13.1'
features = [
'db-sources', // required for tests
'db-destinations',
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
testExecutionConcurrency=-1
# our testcontainer has issues with too much concurrency.
# 4 threads seems to be the sweet spot.
testExecutionConcurrency=4
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.5.4
dockerImageTag: 0.5.5
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.postgres;

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import java.io.BufferedReader;
Expand All @@ -14,6 +15,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
Expand All @@ -24,6 +26,23 @@ public PostgresSqlOperations() {
super(new PostgresDataAdapter());
}

@Override
protected List<String> postCreateTableQueries(final String schemaName, final String tableName) {
if (TypingAndDedupingFlag.isDestinationV2()) {
return List.of(
// the raw_id index _could_ be unique (since raw_id is a UUID)
// but there's no reason to do that (because it's a UUID :P )
// and it would just slow down inserts.
// also, intentionally don't specify the type of index (btree, hash, etc). Just use the default.
"CREATE INDEX IF NOT EXISTS " + tableName + "_raw_id" + " ON " + schemaName + "." + tableName + "(_airbyte_raw_id)",
"CREATE INDEX IF NOT EXISTS " + tableName + "_extracted_at" + " ON " + schemaName + "." + tableName + "(_airbyte_extracted_at)",
"CREATE INDEX IF NOT EXISTS " + tableName + "_loaded_at" + " ON " + schemaName + "." + tableName
+ "(_airbyte_loaded_at, _airbyte_extracted_at)");
} else {
return Collections.emptyList();
}
}

@Override
protected void insertRecordsInternalV2(final JdbcDatabase database,
final List<PartialAirbyteMessage> records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA;
import static java.util.Collections.emptyList;
import static org.jooq.impl.DSL.array;
Expand All @@ -28,6 +29,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
Expand All @@ -36,10 +38,13 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.Condition;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.Name;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultDataType;
import org.jooq.impl.SQLDataType;
Expand Down Expand Up @@ -79,6 +84,41 @@ protected SQLDialect getDialect() {
return SQLDialect.POSTGRES;
}

@Override
public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) {
final List<Sql> statements = new ArrayList<>();
final Name finalTableName = name(stream.id().finalNamespace(), stream.id().finalName() + suffix);

statements.add(super.createTable(stream, suffix, force));

if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
// An index for our ROW_NUMBER() PARTITION BY pk ORDER BY cursor, extracted_at function
final List<Name> pkNames = stream.primaryKey().stream()
.map(pk -> quotedName(pk.name()))
.toList();
statements.add(Sql.of(getDslContext().createIndex().on(
finalTableName,
Stream.of(
pkNames.stream(),
// if cursor is present, then a stream containing its name
// but if no cursor, then empty stream
stream.cursor().stream().map(cursor -> quotedName(cursor.name())),
Stream.of(name(COLUMN_NAME_AB_EXTRACTED_AT))).flatMap(Function.identity()).toList())
.getSQL()));
}
statements.add(Sql.of(getDslContext().createIndex().on(
finalTableName,
name(COLUMN_NAME_AB_EXTRACTED_AT))
.getSQL()));

statements.add(Sql.of(getDslContext().createIndex().on(
finalTableName,
name(COLUMN_NAME_AB_RAW_ID))
.getSQL()));

return Sql.concat(statements);
}

@Override
protected List<String> createIndexSql(final StreamConfig stream, final String suffix) {
if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP && !stream.primaryKey().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.12.1'
cdkVersionRequired = '0.13.0'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might as well bump to 0.13.1?

features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.7.15
dockerImageTag: 0.8.0
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected Map<String, String> getDefaultConnectionProperties(final JsonNode conf
// TODO: Pull common code from RedshiftInsertDestination and RedshiftStagingS3Destination into a
// base class.
// The following properties can be overriden through jdbcUrlParameters in the config.
Map<String, String> connectionOptions = new HashMap<>();
final Map<String, String> connectionOptions = new HashMap<>();
// Redshift properties
// https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configuration-options.html#jdbc20-connecttimeout-option
// connectTimeout is different from Hikari pool's connectionTimout, driver defaults to 10seconds so
Expand Down Expand Up @@ -177,7 +177,7 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
return new RedshiftDestinationHandler(databaseName, database);
}

Expand Down Expand Up @@ -247,7 +247,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
parsedCatalog = catalogParser.parseCatalog(catalog);
final JdbcV1V2Migrator migrator = new JdbcV1V2Migrator(getNamingResolver(), database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final int defaultThreadCount = 8;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,17 @@ protected SQLDialect getDialect() {
*/

@Override
protected Field<?> castedField(final Field<?> field, final AirbyteType type, final String alias) {
protected Field<?> castedField(final Field<?> field, final AirbyteType type, final String alias, final boolean useExpensiveSaferCasting) {
if (type instanceof final AirbyteProtocolType airbyteProtocolType) {
switch (airbyteProtocolType) {
case STRING -> {
return field(CASE_STATEMENT_SQL_TEMPLATE,
jsonTypeOf(field).ne("string").and(field.isNotNull()),
jsonSerialize(field),
castedField(field, airbyteProtocolType)).as(quotedName(alias));
castedField(field, airbyteProtocolType, useExpensiveSaferCasting)).as(quotedName(alias));
}
default -> {
return castedField(field, airbyteProtocolType).as(quotedName(alias));
return castedField(field, airbyteProtocolType, useExpensiveSaferCasting).as(quotedName(alias));
}
}

Expand All @@ -129,7 +129,7 @@ protected Field<?> castedField(final Field<?> field, final AirbyteType type, fin
jsonTypeOf(field).eq("array"),
cast(field, getArrayType())).as(quotedName(alias));
// No nested Unions supported so this will definitely not result in infinite recursion.
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias);
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias, useExpensiveSaferCasting);
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + type);
};
}
Expand All @@ -139,7 +139,11 @@ protected List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, Airb
return columns
.entrySet()
.stream()
.map(column -> castedField(field(quotedName(COLUMN_NAME_DATA, column.getKey().originalName())), column.getValue(), column.getKey().name()))
.map(column -> castedField(
field(quotedName(COLUMN_NAME_DATA, column.getKey().originalName())),
column.getValue(),
column.getKey().name(),
useExpensiveSaferCasting))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -180,7 +184,7 @@ Field<?> toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) {
// TODO: Timestamp format issues can result in null values when cast, add regex check if destination
// supports regex functions.
return field(CASE_STATEMENT_SQL_TEMPLATE,
field.isNotNull().and(castedField(field, type, column.name()).isNull()),
field.isNotNull().and(castedField(field, type, column.name(), true).isNull()),
function("ARRAY", getSuperType(), val(COLUMN_ERROR_MESSAGE_FORMAT.formatted(column.name()))), field("ARRAY()"));
}

Expand All @@ -198,6 +202,7 @@ protected Field<?> buildAirbyteMetaColumn(final LinkedHashMap<ColumnId, AirbyteT
@Override
public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
// Check that the columns match, with special handling for the metadata columns.
// This is mostly identical to the redshift implementation, but swaps jsonb to super
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()),
Expand Down Expand Up @@ -227,6 +232,8 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, fina
*/
@Override
protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Optional<ColumnId> cursor) {
// literally identical to postgres's getRowNumber implementation, changes here probably should
// be reflected there
final List<Field<?>> primaryKeyFields =
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList())
: new ArrayList<>();
Expand Down
Loading
Loading