Skip to content

Commit

Permalink
✨Destination Bigquery 1s1t: handle cursor change (#28721)
Browse files Browse the repository at this point in the history
* handle new cursor column

* sync2 is actualy weird, apparently

* logistics

* Automated Commit - Format and Process Resources Changes

---------

Co-authored-by: edgao <edgao@users.noreply.github.com>
  • Loading branch information
edgao and edgao authored Jul 27, 2023
1 parent d9f2444 commit 9f6963c
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public abstract class BaseTypingDedupingTest {
private static final RecordDiffer DIFFER = new RecordDiffer(
Pair.of("id1", AirbyteProtocolType.INTEGER),
Pair.of("id2", AirbyteProtocolType.INTEGER),
Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE));
Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE),
Pair.of("old_cursor", AirbyteProtocolType.INTEGER));

private String randomSuffix;
private JsonNode config;
Expand Down Expand Up @@ -488,6 +489,55 @@ public void testSyncNotFailsWithNewFields() throws Exception {
// evolutions)
}

/**
* Change the cursor column in the second sync to a column that doesn't exist in the first sync.
* Verify that we overwrite everything correctly.
* <p>
* This essentially verifies that the destination connector correctly recognizes NULL cursors as
* older than non-NULL cursors.
*/
@Test
public void incrementalDedupChangeCursor() throws Exception {
JsonNode mangledSchema = SCHEMA.deepCopy();
((ObjectNode) mangledSchema.get("properties")).remove("updated_at");
((ObjectNode) mangledSchema.get("properties")).set(
"old_cursor",
Jsons.deserialize(
"""
{"type": "integer"}
"""));
ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(List.of("old_cursor"))
.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.withPrimaryKey(List.of(List.of("id1"), List.of("id2")))
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(mangledSchema));
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStream));

// First sync
final List<AirbyteMessage> messages1 = readMessages("sync1_cursorchange_messages.jsonl");

runSync(catalog, messages1);

final List<JsonNode> expectedRawRecords1 = readRecords("sync1_cursorchange_expectedrecords_dedup_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("sync1_cursorchange_expectedrecords_dedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);

// Second sync
final List<AirbyteMessage> messages2 = readMessages("sync2_messages.jsonl");
configuredStream.getStream().setJsonSchema(SCHEMA);
configuredStream.setCursorField(List.of("updated_at"));

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
}

@Test
@Disabled("Not yet implemented")
public void testSyncWithLargeRecordBatch() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}}
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}}
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}}
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
// Charlie wasn't reemitted with updated_at, so it still has a null cursor
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}
// Charlie wasn't reemitted in sync2. This record still has an old_cursor value.
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.5.7
LABEL io.airbyte.version=1.5.8
LABEL io.airbyte.name=airbyte/destination-bigquery

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 1.5.7
dockerImageTag: 1.5.8
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ String dedupFinalTable(final StreamId id,
`_airbyte_raw_id` IN (
SELECT `_airbyte_raw_id` FROM (
SELECT `_airbyte_raw_id`, row_number() OVER (
PARTITION BY ${pk_list} ORDER BY ${cursor_name} DESC, `_airbyte_extracted_at` DESC
PARTITION BY ${pk_list} ORDER BY ${cursor_name} DESC NULLS LAST, `_airbyte_extracted_at` DESC
) as row_number FROM ${final_table_id}
)
WHERE row_number != 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,17 @@ public void testDedupFinalTable() throws InterruptedException {
INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES
(JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}', 'd7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z'),
(JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'),
(JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z');
(JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'),
(JSON'{"id": 3, "string": "Charlie", "integer": 123}', '22af9e56-7ebb-4f5f-ae6b-6ba53360e41e', '2023-01-01T00:00:00Z'),
(JSON'{"id": 3, "updated_at": "2023-01-01T04:00:00Z", "string": "Charlie", "integer": 456}', '0f2375ac-94c1-4be4-99d8-06db40a8ce3e', '2023-01-01T00:00:00Z');
INSERT INTO ${dataset}.users_final (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `updated_at`, `string`, `struct`, `integer`) values
('d7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z', JSON'{"errors":[]}', 1, '2023-01-01T01:00:00Z', 'Alice', JSON'{"city": "San Francisco", "state": "CA"}', 42),
('80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z', JSON'{"errors":[]}', 1, '2023-01-01T02:00:00Z', 'Alice', JSON'{"city": "San Diego", "state": "CA"}', 84),
('ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z', JSON'{"errors": ["blah blah integer"]}', 2, '2023-01-01T03:00:00Z', 'Bob', NULL, NULL);
('ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z', JSON'{"errors": ["blah blah integer"]}', 2, '2023-01-01T03:00:00Z', 'Bob', NULL, NULL),
-- cursor=NULL should be discarded in favor of cursor=<anything not null>
('22af9e56-7ebb-4f5f-ae6b-6ba53360e41e', '2023-01-01T00:00:00Z', JSON'{"errors": []}', 3, NULL, 'Charlie', NULL, 123),
('0f2375ac-94c1-4be4-99d8-06db40a8ce3e', '2023-01-01T00:00:00Z', JSON'{"errors": []}', 3, '2023-01-01T04:00:00Z', 'Charlie', NULL, 456);
"""))
.build());

Expand Down Expand Up @@ -340,6 +345,17 @@ public void testDedupFinalTable() throws InterruptedException {
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_meta": {"errors":["blah blah integer"]}
}
"""),
Jsons.deserialize(
"""
{
"id": 3,
"updated_at": "2023-01-01T04:00:00Z",
"string": "Charlie",
"integer": 456,
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_meta": {"errors":[]}
}
""")),
toJsonRecords(result));
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.5.8 | 2023-07-25 | [\#28721](https://github.com/airbytehq/airbyte/pull/28721) | Destinations v2: Handle cursor change across syncs |
| 1.5.7 | 2023-07-24 | [\#28625](https://github.com/airbytehq/airbyte/pull/28625) | Destinations v2: Limit Clustering Columns to 4 |
| 1.5.6 | 2023-07-21 | [\#28580](https://github.com/airbytehq/airbyte/pull/28580) | Destinations v2: Create dataset in user-specified location |
| 1.5.5 | 2023-07-20 | [\#28490](https://github.com/airbytehq/airbyte/pull/28490) | Destinations v2: Fix schema change detection in OVERWRITE mode when existing table is empty; other code refactoring |
Expand Down

0 comments on commit 9f6963c

Please sign in to comment.