From 9f6963ccfcddd7a573c4ca0ff94de41ce6175cc0 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 26 Jul 2023 17:36:15 -0700 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8Destination=20Bigquery=201s1t:=20handl?= =?UTF-8?q?e=20cursor=20change=20(#28721)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * handle new cursor column * sync2 is actualy weird, apparently * logistics * Automated Commit - Format and Process Resources Changes --------- Co-authored-by: edgao --- .../BaseTypingDedupingTest.java | 52 ++++++++++++++++++- ...orchange_expectedrecords_dedup_final.jsonl | 3 ++ ...rsorchange_expectedrecords_dedup_raw.jsonl | 3 ++ .../sync1_cursorchange_messages.jsonl | 4 ++ ...ectedrecords_incremental_dedup_final.jsonl | 3 ++ ...xpectedrecords_incremental_dedup_raw.jsonl | 4 ++ .../destination-bigquery/Dockerfile | 2 +- .../destination-bigquery/metadata.yaml | 2 +- .../typing_deduping/BigQuerySqlGenerator.java | 2 +- .../BigQuerySqlGeneratorIntegrationTest.java | 20 ++++++- docs/integrations/destinations/bigquery.md | 1 + 11 files changed, 90 insertions(+), 6 deletions(-) create mode 100644 airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_final.jsonl create mode 100644 airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_raw.jsonl create mode 100644 airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_messages.jsonl create mode 100644 airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl create mode 100644 airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index cbc13448cceb..f8b03cd568e7 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -71,7 +71,8 @@ public abstract class BaseTypingDedupingTest { private static final RecordDiffer DIFFER = new RecordDiffer( Pair.of("id1", AirbyteProtocolType.INTEGER), Pair.of("id2", AirbyteProtocolType.INTEGER), - Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE)); + Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE), + Pair.of("old_cursor", AirbyteProtocolType.INTEGER)); private String randomSuffix; private JsonNode config; @@ -488,6 +489,55 @@ public void testSyncNotFailsWithNewFields() throws Exception { // evolutions) } + /** + * Change the cursor column in the second sync to a column that doesn't exist in the first sync. + * Verify that we overwrite everything correctly. + *

+ * This essentially verifies that the destination connector correctly recognizes NULL cursors as + * older than non-NULL cursors. + */ + @Test + public void incrementalDedupChangeCursor() throws Exception { + JsonNode mangledSchema = SCHEMA.deepCopy(); + ((ObjectNode) mangledSchema.get("properties")).remove("updated_at"); + ((ObjectNode) mangledSchema.get("properties")).set( + "old_cursor", + Jsons.deserialize( + """ + {"type": "integer"} + """)); + ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(List.of("old_cursor")) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withPrimaryKey(List.of(List.of("id1"), List.of("id2"))) + .withStream(new AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(mangledSchema)); + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStream)); + + // First sync + final List messages1 = readMessages("sync1_cursorchange_messages.jsonl"); + + runSync(catalog, messages1); + + final List expectedRawRecords1 = readRecords("sync1_cursorchange_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_cursorchange_expectedrecords_dedup_final.jsonl"); + verifySyncResult(expectedRawRecords1, expectedFinalRecords1); + + // Second sync + final List messages2 = readMessages("sync2_messages.jsonl"); + configuredStream.getStream().setJsonSchema(SCHEMA); + configuredStream.setCursorField(List.of("updated_at")); + + runSync(catalog, messages2); + + final List expectedRawRecords2 = readRecords("sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl"); + verifySyncResult(expectedRawRecords2, expectedFinalRecords2); + } + @Test @Disabled("Not yet implemented") public void testSyncWithLargeRecordBatch() throws Exception { diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_final.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_final.jsonl new file mode 100644 index 000000000000..6ea7612c5abc --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_final.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie"} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_raw.jsonl new file mode 100644 index 000000000000..a9bf479e4e3e --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_expectedrecords_dedup_raw.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_messages.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_messages.jsonl new file mode 100644 index 000000000000..e8262c202587 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_messages.jsonl @@ -0,0 +1,4 @@ +{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}} +{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}} +{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}} +{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl new file mode 100644 index 000000000000..7fa0d8339a64 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} +// Charlie wasn't reemitted with updated_at, so it still has a null cursor +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "name": "Charlie"} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl new file mode 100644 index 000000000000..a6bd1aee6e2a --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl @@ -0,0 +1,4 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}} +// Charlie wasn't reemitted in sync2. This record still has an old_cursor value. +{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 5da3a4a4c42e..8723ee664832 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.5.7 +LABEL io.airbyte.version=1.5.8 LABEL io.airbyte.name=airbyte/destination-bigquery ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh" diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index db9015c2a05f..916c43a78686 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 1.5.7 + dockerImageTag: 1.5.8 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 8ce8e4108e35..0940075d21a6 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -503,7 +503,7 @@ String dedupFinalTable(final StreamId id, `_airbyte_raw_id` IN ( SELECT `_airbyte_raw_id` FROM ( SELECT `_airbyte_raw_id`, row_number() OVER ( - PARTITION BY ${pk_list} ORDER BY ${cursor_name} DESC, `_airbyte_extracted_at` DESC + PARTITION BY ${pk_list} ORDER BY ${cursor_name} DESC NULLS LAST, `_airbyte_extracted_at` DESC ) as row_number FROM ${final_table_id} ) WHERE row_number != 1 diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 43d365f7d8ec..3c81bfeb8f86 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -304,12 +304,17 @@ public void testDedupFinalTable() throws InterruptedException { INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}', 'd7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z'), (JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'); + (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'), + (JSON'{"id": 3, "string": "Charlie", "integer": 123}', '22af9e56-7ebb-4f5f-ae6b-6ba53360e41e', '2023-01-01T00:00:00Z'), + (JSON'{"id": 3, "updated_at": "2023-01-01T04:00:00Z", "string": "Charlie", "integer": 456}', '0f2375ac-94c1-4be4-99d8-06db40a8ce3e', '2023-01-01T00:00:00Z'); INSERT INTO ${dataset}.users_final (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `updated_at`, `string`, `struct`, `integer`) values ('d7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z', JSON'{"errors":[]}', 1, '2023-01-01T01:00:00Z', 'Alice', JSON'{"city": "San Francisco", "state": "CA"}', 42), ('80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z', JSON'{"errors":[]}', 1, '2023-01-01T02:00:00Z', 'Alice', JSON'{"city": "San Diego", "state": "CA"}', 84), - ('ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z', JSON'{"errors": ["blah blah integer"]}', 2, '2023-01-01T03:00:00Z', 'Bob', NULL, NULL); + ('ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z', JSON'{"errors": ["blah blah integer"]}', 2, '2023-01-01T03:00:00Z', 'Bob', NULL, NULL), + -- cursor=NULL should be discarded in favor of cursor= + ('22af9e56-7ebb-4f5f-ae6b-6ba53360e41e', '2023-01-01T00:00:00Z', JSON'{"errors": []}', 3, NULL, 'Charlie', NULL, 123), + ('0f2375ac-94c1-4be4-99d8-06db40a8ce3e', '2023-01-01T00:00:00Z', JSON'{"errors": []}', 3, '2023-01-01T04:00:00Z', 'Charlie', NULL, 456); """)) .build()); @@ -340,6 +345,17 @@ public void testDedupFinalTable() throws InterruptedException { "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors":["blah blah integer"]} } + """), + Jsons.deserialize( + """ + { + "id": 3, + "updated_at": "2023-01-01T04:00:00Z", + "string": "Charlie", + "integer": 456, + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_meta": {"errors":[]} + } """)), toJsonRecords(result)); } diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 5a1413148f3a..e144d8c30151 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| +| 1.5.8 | 2023-07-25 | [\#28721](https://github.com/airbytehq/airbyte/pull/28721) | Destinations v2: Handle cursor change across syncs | | 1.5.7 | 2023-07-24 | [\#28625](https://github.com/airbytehq/airbyte/pull/28625) | Destinations v2: Limit Clustering Columns to 4 | | 1.5.6 | 2023-07-21 | [\#28580](https://github.com/airbytehq/airbyte/pull/28580) | Destinations v2: Create dataset in user-specified location | | 1.5.5 | 2023-07-20 | [\#28490](https://github.com/airbytehq/airbyte/pull/28490) | Destinations v2: Fix schema change detection in OVERWRITE mode when existing table is empty; other code refactoring |