Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Destinations v2: handle streams with no columns #29381

Merged
merged 6 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -718,20 +718,57 @@ public void weirdColumnNames() throws Exception {
dumpFinalTableRecords(streamId, ""));
}

/**
* A stream with no columns is weird, but we shouldn't treat it specially in any way. It should
* create a final table as usual, and populate it with the relevant metadata columns.
*/
@Test
public void noColumns() throws Exception {
createRawTable(streamId);
insertRawTableRecords(
streamId,
List.of(Jsons.deserialize(
"""
{
"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {}
}
""")));
final StreamConfig stream = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
emptyList(),
Optional.empty(),
new LinkedHashMap<>());

final String createTable = generator.createTable(stream, "");
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
destinationHandler.execute(updateTable);

verifyRecords(
"sqlgenerator/nocolumns_expectedrecords_raw.jsonl",
dumpRawTableRecords(streamId),
"sqlgenerator/nocolumns_expectedrecords_final.jsonl",
dumpFinalTableRecords(streamId, ""));
}

@Test
public void testV1V2migration() throws Exception {
// This is maybe a little hacky, but it avoids having to refactor this entire class and subclasses
// for something that is going away
StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null);
final StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null);
createV1RawTable(v1RawTableStreamId);
insertV1RawTableRecords(v1RawTableStreamId, singletonList(Jsons.jsonNode(Map.of(
"_airbyte_ab_id", "v1v2",
"_airbyte_emitted_at", "2023-01-01T00:00:00Z",
"_airbyte_data", "{\"hello\": \"world\"}"))));
final String migration = generator.migrateFromV1toV2(streamId, v1RawTableStreamId.rawNamespace(), v1RawTableStreamId.rawName());
destinationHandler.execute(migration);
List<JsonNode> v1RawRecords = dumpRawTableRecords(v1RawTableStreamId);
List<JsonNode> v2RawRecords = dumpRawTableRecords(streamId);
final List<JsonNode> v1RawRecords = dumpRawTableRecords(v1RawTableStreamId);
final List<JsonNode> v2RawRecords = dumpRawTableRecords(streamId);
assertAll(
() -> assertEquals(1, v1RawRecords.size()),
() -> assertEquals(1, v2RawRecords.size()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.7.6
LABEL io.airbyte.version=1.7.7
LABEL io.airbyte.name=airbyte/destination-bigquery

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 1.7.6
dockerImageTag: 1.7.7
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class BigQuerySqlGenerator implements SqlGenerator<TableDefinition> {
* @param datasetLocation This is technically redundant with {@link BigQueryDestinationHandler} setting the query
* execution location, but let's be explicit since this is typically a compliance requirement.
*/
public BigQuerySqlGenerator(String datasetLocation) {
public BigQuerySqlGenerator(final String datasetLocation) {
this.datasetLocation = datasetLocation;
}

Expand Down Expand Up @@ -343,7 +343,7 @@ private String clearLoadedAt(final StreamId streamId) {
public String updateTable(final StreamConfig stream, final String finalSuffix) {
return updateTable(stream, finalSuffix, true);
}
private String updateTable(final StreamConfig stream, final String finalSuffix, boolean verifyPrimaryKeys) {
private String updateTable(final StreamConfig stream, final String finalSuffix, final boolean verifyPrimaryKeys) {
String pkVarDeclaration = "";
String validatePrimaryKeys = "";
if (verifyPrimaryKeys && stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
Expand Down Expand Up @@ -424,20 +424,26 @@ String insertNewRecords(final StreamConfig stream, final String finalSuffix, fin
final String columnCasts = streamColumns.entrySet().stream().map(
col -> extractAndCast(col.getKey(), col.getValue()) + " as " + col.getKey().name(QUOTE) + ",")
.collect(joining("\n"));
final String columnErrors = streamColumns.entrySet().stream().map(
col -> new StringSubstitutor(Map.of(
"raw_col_name", col.getKey().originalName(),
"col_type", toDialectType(col.getValue()).name(),
"json_extract", extractAndCast(col.getKey(), col.getValue()))).replace(
"""
CASE
WHEN (JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"') IS NOT NULL)
AND (JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"')) != 'null')
AND (${json_extract} IS NULL)
THEN ["Problem with `${raw_col_name}`"]
ELSE []
END"""))
.collect(joining(",\n"));
final String columnErrors;
if (streamColumns.isEmpty()) {
// ARRAY_CONCAT doesn't like having an empty argument list, so handle that case separately
columnErrors = "[]";
} else {
columnErrors = "ARRAY_CONCAT(" + streamColumns.entrySet().stream().map(
col -> new StringSubstitutor(Map.of(
"raw_col_name", col.getKey().originalName(),
"col_type", toDialectType(col.getValue()).name(),
"json_extract", extractAndCast(col.getKey(), col.getValue()))).replace(
"""
CASE
WHEN (JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"') IS NOT NULL)
AND (JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"')) != 'null')
AND (${json_extract} IS NULL)
THEN ["Problem with `${raw_col_name}`"]
ELSE []
END"""))
.collect(joining(",\n")) + ")";
}
final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n"));

String cdcConditionalOrIncludeStatement = "";
Expand Down Expand Up @@ -468,9 +474,7 @@ AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
WITH intermediate_data AS (
SELECT
${column_casts}
array_concat(
${column_errors}
) as _airbyte_cast_errors,
${column_errors} as _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
Expand Down Expand Up @@ -598,7 +602,7 @@ private String wrapAndQuote(final String namespace, final String tableName) {
}

@Override
public String migrateFromV1toV2(StreamId streamId, String namespace, String tableName) {
public String migrateFromV1toV2(final StreamId streamId, final String namespace, final String tableName) {
return new StringSubstitutor(Map.of(
"v2_raw_table", streamId.rawTableId(QUOTE),
"v1_raw_table", wrapAndQuote(namespace, tableName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {}}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
ENV ENABLE_SENTRY true


LABEL io.airbyte.version=1.3.1
LABEL io.airbyte.version=1.3.2
LABEL io.airbyte.name=airbyte/destination-snowflake

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 1.3.1
dockerImageTag: 1.3.2
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public String toDialectType(final AirbyteProtocolType airbyteProtocolType) {
@Override
public String createTable(final StreamConfig stream, final String suffix) {
final String columnDeclarations = stream.columns().entrySet().stream()
.map(column -> column.getKey().name(QUOTE) + " " + toDialectType(column.getValue()))
.collect(joining(",\n"));
.map(column -> "," + column.getKey().name(QUOTE) + " " + toDialectType(column.getValue()))
.collect(joining("\n"));
// TODO indexes and stuff
return new StringSubstitutor(Map.of(
"final_namespace", stream.id().finalNamespace(QUOTE),
Expand All @@ -92,7 +92,7 @@ public String createTable(final StreamConfig stream, final String suffix) {
CREATE TABLE ${final_table_id} (
"_airbyte_raw_id" TEXT NOT NULL,
"_airbyte_extracted_at" TIMESTAMP_TZ NOT NULL,
"_airbyte_meta" VARIANT NOT NULL,
"_airbyte_meta" VARIANT NOT NULL
${column_declarations}
);
""");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

@Disabled
public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest<SnowflakeTableDefinition> {

private static String databaseName;
Expand All @@ -39,7 +38,7 @@ public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegr

@BeforeAll
public static void setupSnowflake() {
JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_internal_staging_config.json")));
final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_internal_staging_config.json")));
databaseName = config.get(JdbcUtils.DATABASE_KEY).asText();
dataSource = SnowflakeDatabase.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS);
database = SnowflakeDatabase.getDatabase(dataSource);
Expand All @@ -61,12 +60,12 @@ protected SnowflakeDestinationHandler getDestinationHandler() {
}

@Override
protected void createNamespace(String namespace) throws SQLException {
protected void createNamespace(final String namespace) throws SQLException {
database.execute("CREATE SCHEMA \"" + namespace + '"');
}

@Override
protected void createRawTable(StreamId streamId) throws Exception {
protected void createRawTable(final StreamId streamId) throws Exception {
database.execute(new StringSubstitutor(Map.of(
"raw_table_id", streamId.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace(
"""
Expand All @@ -80,8 +79,8 @@ protected void createRawTable(StreamId streamId) throws Exception {
}

@Override
protected void createFinalTable(boolean includeCdcDeletedAt, StreamId streamId, String suffix) throws Exception {
String cdcDeletedAt = includeCdcDeletedAt ? "\"_ab_cdc_deleted_at\" TIMESTAMP_TZ," : "";
protected void createFinalTable(final boolean includeCdcDeletedAt, final StreamId streamId, final String suffix) throws Exception {
final String cdcDeletedAt = includeCdcDeletedAt ? "\"_ab_cdc_deleted_at\" TIMESTAMP_TZ," : "";
database.execute(new StringSubstitutor(Map.of(
"final_table_id", streamId.finalTableId(SnowflakeSqlGenerator.QUOTE, suffix),
"cdc_deleted_at", cdcDeletedAt
Expand Down Expand Up @@ -112,12 +111,12 @@ protected void createFinalTable(boolean includeCdcDeletedAt, StreamId streamId,
}

@Override
protected List<JsonNode> dumpRawTableRecords(StreamId streamId) throws Exception {
protected List<JsonNode> dumpRawTableRecords(final StreamId streamId) throws Exception {
return SnowflakeTestUtils.dumpRawTable(database, streamId.rawTableId(SnowflakeSqlGenerator.QUOTE));
}

@Override
protected List<JsonNode> dumpFinalTableRecords(StreamId streamId, String suffix) throws Exception {
protected List<JsonNode> dumpFinalTableRecords(final StreamId streamId, final String suffix) throws Exception {
return SnowflakeTestUtils.dumpFinalTable(
database,
databaseName,
Expand All @@ -126,24 +125,24 @@ protected List<JsonNode> dumpFinalTableRecords(StreamId streamId, String suffix)
}

@Override
protected void teardownNamespace(String namespace) throws SQLException {
protected void teardownNamespace(final String namespace) throws SQLException {
database.execute("DROP SCHEMA IF EXISTS \"" + namespace + '"');
}

@Override
protected void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId streamId, String suffix, List<JsonNode> records) throws Exception {
List<String> columnNames = includeCdcDeletedAt ? FINAL_TABLE_COLUMN_NAMES_CDC : FINAL_TABLE_COLUMN_NAMES;
String cdcDeletedAtName = includeCdcDeletedAt ? ",\"_ab_cdc_deleted_at\"" : "";
String cdcDeletedAtExtract = includeCdcDeletedAt ? ",column19" : "";
String recordsText = records.stream()
protected void insertFinalTableRecords(final boolean includeCdcDeletedAt, final StreamId streamId, final String suffix, final List<JsonNode> records) throws Exception {
final List<String> columnNames = includeCdcDeletedAt ? FINAL_TABLE_COLUMN_NAMES_CDC : FINAL_TABLE_COLUMN_NAMES;
final String cdcDeletedAtName = includeCdcDeletedAt ? ",\"_ab_cdc_deleted_at\"" : "";
final String cdcDeletedAtExtract = includeCdcDeletedAt ? ",column19" : "";
final String recordsText = records.stream()
// For each record, convert it to a string like "(rawId, extractedAt, loadedAt, data)"
.map(record -> columnNames.stream()
.map(record::get)
.map(r -> {
if (r == null) {
return "NULL";
}
String stringContents;
final String stringContents;
if (r.isTextual()) {
stringContents = r.asText();
} else {
Expand Down Expand Up @@ -214,16 +213,16 @@ protected void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId str
}

@Override
protected void insertRawTableRecords(StreamId streamId, List<JsonNode> records) throws Exception {
String recordsText = records.stream()
protected void insertRawTableRecords(final StreamId streamId, final List<JsonNode> records) throws Exception {
final String recordsText = records.stream()
// For each record, convert it to a string like "(rawId, extractedAt, loadedAt, data)"
.map(record -> JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES.stream()
.map(record::get)
.map(r -> {
if (r == null) {
return "NULL";
}
String stringContents;
final String stringContents;
if (r.isTextual()) {
stringContents = r.asText();
} else {
Expand Down Expand Up @@ -266,13 +265,13 @@ protected void insertRawTableRecords(StreamId streamId, List<JsonNode> records)
@Override
@Test
public void testCreateTableIncremental() throws Exception {
String sql = generator.createTable(incrementalDedupStream, "");
final String sql = generator.createTable(incrementalDedupStream, "");
destinationHandler.execute(sql);

Optional<String> tableKind = database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "users_final", namespace))
final Optional<String> tableKind = database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "users_final", namespace))
.stream().map(record -> record.get("kind").asText())
.findFirst();
Map<String, String> columns = database.queryJsons(
final Map<String, String> columns = database.queryJsons(
"""
SELECT column_name, data_type, numeric_precision, numeric_scale
FROM information_schema.columns
Expand All @@ -288,7 +287,7 @@ public void testCreateTableIncremental() throws Exception {
.collect(toMap(
record -> record.get("COLUMN_NAME").asText(),
record -> {
String type = record.get("DATA_TYPE").asText();
final String type = record.get("DATA_TYPE").asText();
if (type.equals("NUMBER")) {
return String.format("NUMBER(%s, %s)", record.get("NUMERIC_PRECISION").asText(),
record.get("NUMERIC_SCALE").asText()
Expand Down Expand Up @@ -335,8 +334,10 @@ protected void insertV1RawTableRecords(final StreamId streamId, final List<JsonN

}

// TODO delete this after implementing https://github.com/airbytehq/airbyte/issues/28691
@Disabled
@Override
public void testV1V2migration() throws Exception {
super.testV1V2migration();
@Test
public void testV1V2migration() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_data": {}}
Loading