Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Destinations v2: handle streams with no columns #29381

Merged
merged 6 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -718,20 +718,58 @@ public void weirdColumnNames() throws Exception {
dumpFinalTableRecords(streamId, ""));
}

/**
* A stream with no columns is weird, but we shouldn't treat it specially in any way.
* It should create a final table as usual, and populate it with the relevant metadata columns.
*/
@Test
public void noColumns() throws Exception {
createRawTable(streamId);
insertRawTableRecords(
streamId,
List.of(Jsons.deserialize(
"""
{
"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {}
edgao marked this conversation as resolved.
Show resolved Hide resolved
}
"""
)));
final StreamConfig stream = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
emptyList(),
Optional.empty(),
new LinkedHashMap<>());

final String createTable = generator.createTable(stream, "");
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
destinationHandler.execute(updateTable);

verifyRecords(
"sqlgenerator/nocolumns_expectedrecords_raw.jsonl",
dumpRawTableRecords(streamId),
"sqlgenerator/nocolumns_expectedrecords_final.jsonl",
dumpFinalTableRecords(streamId, ""));
}

@Test
public void testV1V2migration() throws Exception {
// This is maybe a little hacky, but it avoids having to refactor this entire class and subclasses
// for something that is going away
StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null);
final StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null);
createV1RawTable(v1RawTableStreamId);
insertV1RawTableRecords(v1RawTableStreamId, singletonList(Jsons.jsonNode(Map.of(
"_airbyte_ab_id", "v1v2",
"_airbyte_emitted_at", "2023-01-01T00:00:00Z",
"_airbyte_data", "{\"hello\": \"world\"}"))));
final String migration = generator.migrateFromV1toV2(streamId, v1RawTableStreamId.rawNamespace(), v1RawTableStreamId.rawName());
destinationHandler.execute(migration);
List<JsonNode> v1RawRecords = dumpRawTableRecords(v1RawTableStreamId);
List<JsonNode> v2RawRecords = dumpRawTableRecords(streamId);
final List<JsonNode> v1RawRecords = dumpRawTableRecords(v1RawTableStreamId);
final List<JsonNode> v2RawRecords = dumpRawTableRecords(streamId);
assertAll(
() -> assertEquals(1, v1RawRecords.size()),
() -> assertEquals(1, v2RawRecords.size()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.7.6
LABEL io.airbyte.version=1.7.7
LABEL io.airbyte.name=airbyte/destination-bigquery

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 1.7.6
dockerImageTag: 1.7.7
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class BigQuerySqlGenerator implements SqlGenerator<TableDefinition> {
* @param datasetLocation This is technically redundant with {@link BigQueryDestinationHandler} setting the query
* execution location, but let's be explicit since this is typically a compliance requirement.
*/
public BigQuerySqlGenerator(String datasetLocation) {
public BigQuerySqlGenerator(final String datasetLocation) {
this.datasetLocation = datasetLocation;
}

Expand Down Expand Up @@ -343,7 +343,7 @@ private String clearLoadedAt(final StreamId streamId) {
public String updateTable(final StreamConfig stream, final String finalSuffix) {
return updateTable(stream, finalSuffix, true);
}
private String updateTable(final StreamConfig stream, final String finalSuffix, boolean verifyPrimaryKeys) {
private String updateTable(final StreamConfig stream, final String finalSuffix, final boolean verifyPrimaryKeys) {
String pkVarDeclaration = "";
String validatePrimaryKeys = "";
if (verifyPrimaryKeys && stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
Expand Down Expand Up @@ -424,20 +424,26 @@ String insertNewRecords(final StreamConfig stream, final String finalSuffix, fin
final String columnCasts = streamColumns.entrySet().stream().map(
col -> extractAndCast(col.getKey(), col.getValue()) + " as " + col.getKey().name(QUOTE) + ",")
.collect(joining("\n"));
final String columnErrors = streamColumns.entrySet().stream().map(
col -> new StringSubstitutor(Map.of(
"raw_col_name", col.getKey().originalName(),
"col_type", toDialectType(col.getValue()).name(),
"json_extract", extractAndCast(col.getKey(), col.getValue()))).replace(
"""
CASE
WHEN (JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"') IS NOT NULL)
AND (JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"')) != 'null')
AND (${json_extract} IS NULL)
THEN ["Problem with `${raw_col_name}`"]
ELSE []
END"""))
.collect(joining(",\n"));
final String columnErrors;
if (streamColumns.isEmpty()) {
// ARRAY_CONCAT doesn't like having an empty argument list, so handle that case separately
columnErrors = "[]";
} else {
columnErrors = "ARRAY_CONCAT(" + streamColumns.entrySet().stream().map(
col -> new StringSubstitutor(Map.of(
"raw_col_name", col.getKey().originalName(),
"col_type", toDialectType(col.getValue()).name(),
"json_extract", extractAndCast(col.getKey(), col.getValue()))).replace(
"""
CASE
WHEN (JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"') IS NOT NULL)
AND (JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"')) != 'null')
AND (${json_extract} IS NULL)
THEN ["Problem with `${raw_col_name}`"]
ELSE []
END"""))
.collect(joining(",\n")) + ")";
}
final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n"));

String cdcConditionalOrIncludeStatement = "";
Expand Down Expand Up @@ -468,9 +474,7 @@ AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
WITH intermediate_data AS (
SELECT
${column_casts}
array_concat(
${column_errors}
) as _airbyte_cast_errors,
${column_errors} as _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
Expand Down Expand Up @@ -598,7 +602,7 @@ private String wrapAndQuote(final String namespace, final String tableName) {
}

@Override
public String migrateFromV1toV2(StreamId streamId, String namespace, String tableName) {
public String migrateFromV1toV2(final StreamId streamId, final String namespace, final String tableName) {
return new StringSubstitutor(Map.of(
"v2_raw_table", streamId.rawTableId(QUOTE),
"v1_raw_table", wrapAndQuote(namespace, tableName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {}}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
ENV ENABLE_SENTRY true


LABEL io.airbyte.version=1.3.1
LABEL io.airbyte.version=1.3.2
LABEL io.airbyte.name=airbyte/destination-snowflake

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 1.3.1
dockerImageTag: 1.3.2
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public String toDialectType(final AirbyteProtocolType airbyteProtocolType) {
@Override
public String createTable(final StreamConfig stream, final String suffix) {
final String columnDeclarations = stream.columns().entrySet().stream()
.map(column -> column.getKey().name(QUOTE) + " " + toDialectType(column.getValue()))
.collect(joining(",\n"));
.map(column -> "," + column.getKey().name(QUOTE) + " " + toDialectType(column.getValue()))
.collect(joining("\n"));
// TODO indexes and stuff
return new StringSubstitutor(Map.of(
"final_namespace", stream.id().finalNamespace(QUOTE),
Expand All @@ -92,7 +92,7 @@ public String createTable(final StreamConfig stream, final String suffix) {
CREATE TABLE ${final_table_id} (
"_airbyte_raw_id" TEXT NOT NULL,
"_airbyte_extracted_at" TIMESTAMP_TZ NOT NULL,
"_airbyte_meta" VARIANT NOT NULL,
"_airbyte_meta" VARIANT NOT NULL
${column_declarations}
);
""");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {}}
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.7.7 | 2023-08-11 | [\#29381](https://github.com/airbytehq/airbyte/pull/29381) | Destinations v2: Add support for streams with no columns |
| 1.7.6 | 2023-08-04 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Add v1 -> v2 migration Logic |
| 1.7.5 | 2023-08-04 | [\#29106](https://github.com/airbytehq/airbyte/pull/29106) | Destinations v2: handle unusual CDC deletion edge case |
| 1.7.4 | 2023-08-04 | [\#29089](https://github.com/airbytehq/airbyte/pull/29089) | Destinations v2: improve special character handling in column names |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.3.2 | 2023-08-11 | [\#29381](https://github.com/airbytehq/airbyte/pull/29381) | Destinations v2: Add support for streams with no columns |
| 1.3.1 | 2023-08-04 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Update SqlGenerator |
| 1.3.0 | 2023-08-07 | [\#29174](https://github.com/airbytehq/airbyte/pull/29174) | Destinations v2: early access release |
| 1.2.10 | 2023-08-07 | [\#29188](https://github.com/airbytehq/airbyte/pull/29188) | Internal code refactoring |
Expand Down