diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index 9e318072c496..ea31c7e23832 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -718,11 +718,48 @@ public void weirdColumnNames() throws Exception { dumpFinalTableRecords(streamId, "")); } + /** + * A stream with no columns is weird, but we shouldn't treat it specially in any way. It should + * create a final table as usual, and populate it with the relevant metadata columns. + */ + @Test + public void noColumns() throws Exception { + createRawTable(streamId); + insertRawTableRecords( + streamId, + List.of(Jsons.deserialize( + """ + { + "_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_data": {} + } + """))); + final StreamConfig stream = new StreamConfig( + streamId, + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + emptyList(), + Optional.empty(), + new LinkedHashMap<>()); + + final String createTable = generator.createTable(stream, ""); + destinationHandler.execute(createTable); + final String updateTable = generator.updateTable(stream, ""); + destinationHandler.execute(updateTable); + + verifyRecords( + "sqlgenerator/nocolumns_expectedrecords_raw.jsonl", + dumpRawTableRecords(streamId), + "sqlgenerator/nocolumns_expectedrecords_final.jsonl", + dumpFinalTableRecords(streamId, "")); + } + @Test public void testV1V2migration() throws Exception { // This is maybe a little hacky, but it avoids having to refactor this entire class and subclasses // for something that is going away - StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null); + final StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null); createV1RawTable(v1RawTableStreamId); insertV1RawTableRecords(v1RawTableStreamId, singletonList(Jsons.jsonNode(Map.of( "_airbyte_ab_id", "v1v2", @@ -730,8 +767,8 @@ public void testV1V2migration() throws Exception { "_airbyte_data", "{\"hello\": \"world\"}")))); final String migration = generator.migrateFromV1toV2(streamId, v1RawTableStreamId.rawNamespace(), v1RawTableStreamId.rawName()); destinationHandler.execute(migration); - List v1RawRecords = dumpRawTableRecords(v1RawTableStreamId); - List v2RawRecords = dumpRawTableRecords(streamId); + final List v1RawRecords = dumpRawTableRecords(v1RawTableStreamId); + final List v2RawRecords = dumpRawTableRecords(streamId); assertAll( () -> assertEquals(1, v1RawRecords.size()), () -> assertEquals(1, v2RawRecords.size()), diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 77d510f7f6e5..3fe7a8373cbe 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.7.6 +LABEL io.airbyte.version=1.7.7 LABEL io.airbyte.name=airbyte/destination-bigquery ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh" diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 4107a672fa7f..d18c489e1113 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 1.7.6 + dockerImageTag: 1.7.7 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 6469eb858214..205796b35369 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -56,7 +56,7 @@ public class BigQuerySqlGenerator implements SqlGenerator { * @param datasetLocation This is technically redundant with {@link BigQueryDestinationHandler} setting the query * execution location, but let's be explicit since this is typically a compliance requirement. */ - public BigQuerySqlGenerator(String datasetLocation) { + public BigQuerySqlGenerator(final String datasetLocation) { this.datasetLocation = datasetLocation; } @@ -343,7 +343,7 @@ private String clearLoadedAt(final StreamId streamId) { public String updateTable(final StreamConfig stream, final String finalSuffix) { return updateTable(stream, finalSuffix, true); } - private String updateTable(final StreamConfig stream, final String finalSuffix, boolean verifyPrimaryKeys) { + private String updateTable(final StreamConfig stream, final String finalSuffix, final boolean verifyPrimaryKeys) { String pkVarDeclaration = ""; String validatePrimaryKeys = ""; if (verifyPrimaryKeys && stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { @@ -424,20 +424,26 @@ String insertNewRecords(final StreamConfig stream, final String finalSuffix, fin final String columnCasts = streamColumns.entrySet().stream().map( col -> extractAndCast(col.getKey(), col.getValue()) + " as " + col.getKey().name(QUOTE) + ",") .collect(joining("\n")); - final String columnErrors = streamColumns.entrySet().stream().map( - col -> new StringSubstitutor(Map.of( - "raw_col_name", col.getKey().originalName(), - "col_type", toDialectType(col.getValue()).name(), - "json_extract", extractAndCast(col.getKey(), col.getValue()))).replace( - """ - CASE - WHEN (JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"') IS NOT NULL) - AND (JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"')) != 'null') - AND (${json_extract} IS NULL) - THEN ["Problem with `${raw_col_name}`"] - ELSE [] - END""")) - .collect(joining(",\n")); + final String columnErrors; + if (streamColumns.isEmpty()) { + // ARRAY_CONCAT doesn't like having an empty argument list, so handle that case separately + columnErrors = "[]"; + } else { + columnErrors = "ARRAY_CONCAT(" + streamColumns.entrySet().stream().map( + col -> new StringSubstitutor(Map.of( + "raw_col_name", col.getKey().originalName(), + "col_type", toDialectType(col.getValue()).name(), + "json_extract", extractAndCast(col.getKey(), col.getValue()))).replace( + """ + CASE + WHEN (JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"') IS NOT NULL) + AND (JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"')) != 'null') + AND (${json_extract} IS NULL) + THEN ["Problem with `${raw_col_name}`"] + ELSE [] + END""")) + .collect(joining(",\n")) + ")"; + } final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); String cdcConditionalOrIncludeStatement = ""; @@ -468,9 +474,7 @@ AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL WITH intermediate_data AS ( SELECT ${column_casts} - array_concat( - ${column_errors} - ) as _airbyte_cast_errors, + ${column_errors} as _airbyte_cast_errors, _airbyte_raw_id, _airbyte_extracted_at FROM ${raw_table_id} @@ -598,7 +602,7 @@ private String wrapAndQuote(final String namespace, final String tableName) { } @Override - public String migrateFromV1toV2(StreamId streamId, String namespace, String tableName) { + public String migrateFromV1toV2(final StreamId streamId, final String namespace, final String tableName) { return new StringSubstitutor(Map.of( "v2_raw_table", streamId.rawTableId(QUOTE), "v1_raw_table", wrapAndQuote(namespace, tableName) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/nocolumns_expectedrecords_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/nocolumns_expectedrecords_final.jsonl new file mode 100644 index 000000000000..fc5f35381948 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/nocolumns_expectedrecords_final.jsonl @@ -0,0 +1 @@ +{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/nocolumns_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/nocolumns_expectedrecords_raw.jsonl new file mode 100644 index 000000000000..fc7aaebdd9c1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/nocolumns_expectedrecords_raw.jsonl @@ -0,0 +1 @@ +{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {}} diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index a987cc59c453..9d8c830b2792 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=1.3.1 +LABEL io.airbyte.version=1.3.2 LABEL io.airbyte.name=airbyte/destination-snowflake ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh" diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 9085bc107069..4160433a1fc6 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 1.3.1 + dockerImageTag: 1.3.2 dockerRepository: airbyte/destination-snowflake githubIssueLabel: destination-snowflake icon: snowflake.svg diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 7c941aa123b1..340ed646e823 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -79,8 +79,8 @@ public String toDialectType(final AirbyteProtocolType airbyteProtocolType) { @Override public String createTable(final StreamConfig stream, final String suffix) { final String columnDeclarations = stream.columns().entrySet().stream() - .map(column -> column.getKey().name(QUOTE) + " " + toDialectType(column.getValue())) - .collect(joining(",\n")); + .map(column -> "," + column.getKey().name(QUOTE) + " " + toDialectType(column.getValue())) + .collect(joining("\n")); // TODO indexes and stuff return new StringSubstitutor(Map.of( "final_namespace", stream.id().finalNamespace(QUOTE), @@ -92,7 +92,7 @@ public String createTable(final StreamConfig stream, final String suffix) { CREATE TABLE ${final_table_id} ( "_airbyte_raw_id" TEXT NOT NULL, "_airbyte_extracted_at" TIMESTAMP_TZ NOT NULL, - "_airbyte_meta" VARIANT NOT NULL, + "_airbyte_meta" VARIANT NOT NULL ${column_declarations} ); """); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index a1828081be19..0ce7b0cca408 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -@Disabled public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { private static String databaseName; @@ -39,7 +38,7 @@ public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegr @BeforeAll public static void setupSnowflake() { - JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_internal_staging_config.json"))); + final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_internal_staging_config.json"))); databaseName = config.get(JdbcUtils.DATABASE_KEY).asText(); dataSource = SnowflakeDatabase.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS); database = SnowflakeDatabase.getDatabase(dataSource); @@ -61,12 +60,12 @@ protected SnowflakeDestinationHandler getDestinationHandler() { } @Override - protected void createNamespace(String namespace) throws SQLException { + protected void createNamespace(final String namespace) throws SQLException { database.execute("CREATE SCHEMA \"" + namespace + '"'); } @Override - protected void createRawTable(StreamId streamId) throws Exception { + protected void createRawTable(final StreamId streamId) throws Exception { database.execute(new StringSubstitutor(Map.of( "raw_table_id", streamId.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace( """ @@ -80,8 +79,8 @@ protected void createRawTable(StreamId streamId) throws Exception { } @Override - protected void createFinalTable(boolean includeCdcDeletedAt, StreamId streamId, String suffix) throws Exception { - String cdcDeletedAt = includeCdcDeletedAt ? "\"_ab_cdc_deleted_at\" TIMESTAMP_TZ," : ""; + protected void createFinalTable(final boolean includeCdcDeletedAt, final StreamId streamId, final String suffix) throws Exception { + final String cdcDeletedAt = includeCdcDeletedAt ? "\"_ab_cdc_deleted_at\" TIMESTAMP_TZ," : ""; database.execute(new StringSubstitutor(Map.of( "final_table_id", streamId.finalTableId(SnowflakeSqlGenerator.QUOTE, suffix), "cdc_deleted_at", cdcDeletedAt @@ -112,12 +111,12 @@ protected void createFinalTable(boolean includeCdcDeletedAt, StreamId streamId, } @Override - protected List dumpRawTableRecords(StreamId streamId) throws Exception { + protected List dumpRawTableRecords(final StreamId streamId) throws Exception { return SnowflakeTestUtils.dumpRawTable(database, streamId.rawTableId(SnowflakeSqlGenerator.QUOTE)); } @Override - protected List dumpFinalTableRecords(StreamId streamId, String suffix) throws Exception { + protected List dumpFinalTableRecords(final StreamId streamId, final String suffix) throws Exception { return SnowflakeTestUtils.dumpFinalTable( database, databaseName, @@ -126,16 +125,16 @@ protected List dumpFinalTableRecords(StreamId streamId, String suffix) } @Override - protected void teardownNamespace(String namespace) throws SQLException { + protected void teardownNamespace(final String namespace) throws SQLException { database.execute("DROP SCHEMA IF EXISTS \"" + namespace + '"'); } @Override - protected void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId streamId, String suffix, List records) throws Exception { - List columnNames = includeCdcDeletedAt ? FINAL_TABLE_COLUMN_NAMES_CDC : FINAL_TABLE_COLUMN_NAMES; - String cdcDeletedAtName = includeCdcDeletedAt ? ",\"_ab_cdc_deleted_at\"" : ""; - String cdcDeletedAtExtract = includeCdcDeletedAt ? ",column19" : ""; - String recordsText = records.stream() + protected void insertFinalTableRecords(final boolean includeCdcDeletedAt, final StreamId streamId, final String suffix, final List records) throws Exception { + final List columnNames = includeCdcDeletedAt ? FINAL_TABLE_COLUMN_NAMES_CDC : FINAL_TABLE_COLUMN_NAMES; + final String cdcDeletedAtName = includeCdcDeletedAt ? ",\"_ab_cdc_deleted_at\"" : ""; + final String cdcDeletedAtExtract = includeCdcDeletedAt ? ",column19" : ""; + final String recordsText = records.stream() // For each record, convert it to a string like "(rawId, extractedAt, loadedAt, data)" .map(record -> columnNames.stream() .map(record::get) @@ -143,7 +142,7 @@ protected void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId str if (r == null) { return "NULL"; } - String stringContents; + final String stringContents; if (r.isTextual()) { stringContents = r.asText(); } else { @@ -214,8 +213,8 @@ protected void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId str } @Override - protected void insertRawTableRecords(StreamId streamId, List records) throws Exception { - String recordsText = records.stream() + protected void insertRawTableRecords(final StreamId streamId, final List records) throws Exception { + final String recordsText = records.stream() // For each record, convert it to a string like "(rawId, extractedAt, loadedAt, data)" .map(record -> JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES.stream() .map(record::get) @@ -223,7 +222,7 @@ protected void insertRawTableRecords(StreamId streamId, List records) if (r == null) { return "NULL"; } - String stringContents; + final String stringContents; if (r.isTextual()) { stringContents = r.asText(); } else { @@ -266,13 +265,13 @@ protected void insertRawTableRecords(StreamId streamId, List records) @Override @Test public void testCreateTableIncremental() throws Exception { - String sql = generator.createTable(incrementalDedupStream, ""); + final String sql = generator.createTable(incrementalDedupStream, ""); destinationHandler.execute(sql); - Optional tableKind = database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "users_final", namespace)) + final Optional tableKind = database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "users_final", namespace)) .stream().map(record -> record.get("kind").asText()) .findFirst(); - Map columns = database.queryJsons( + final Map columns = database.queryJsons( """ SELECT column_name, data_type, numeric_precision, numeric_scale FROM information_schema.columns @@ -288,7 +287,7 @@ public void testCreateTableIncremental() throws Exception { .collect(toMap( record -> record.get("COLUMN_NAME").asText(), record -> { - String type = record.get("DATA_TYPE").asText(); + final String type = record.get("DATA_TYPE").asText(); if (type.equals("NUMBER")) { return String.format("NUMBER(%s, %s)", record.get("NUMERIC_PRECISION").asText(), record.get("NUMERIC_SCALE").asText() @@ -335,8 +334,10 @@ protected void insertV1RawTableRecords(final StreamId streamId, final List v2 migration Logic | | 1.7.5 | 2023-08-04 | [\#29106](https://github.com/airbytehq/airbyte/pull/29106) | Destinations v2: handle unusual CDC deletion edge case | | 1.7.4 | 2023-08-04 | [\#29089](https://github.com/airbytehq/airbyte/pull/29089) | Destinations v2: improve special character handling in column names | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index e780d44bd98c..3a3eb334d598 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.3.2 | 2023-08-11 | [\#29381](https://github.com/airbytehq/airbyte/pull/29381) | Destinations v2: Add support for streams with no columns | | 1.3.1 | 2023-08-04 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Update SqlGenerator | | 1.3.0 | 2023-08-07 | [\#29174](https://github.com/airbytehq/airbyte/pull/29174) | Destinations v2: early access release | | 1.2.10 | 2023-08-07 | [\#29188](https://github.com/airbytehq/airbyte/pull/29188) | Internal code refactoring |