Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination BigQuery: Accept Dataset ID field prefixed by Project ID #8383

Merged
merged 17 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496",
"name": "BigQuery (denormalized typed struct)",
"dockerRepository": "airbyte/destination-bigquery-denormalized",
"dockerImageTag": "0.2.3",
"dockerImageTag": "0.2.4",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery",
"icon": "bigquery.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.6.3",
"dockerImageTag": "0.6.4",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery",
"icon": "bigquery.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.6.3
dockerImageTag: 0.6.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
- name: Cassandra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-bigquery:0.6.3"
- dockerImage: "airbyte/destination-bigquery:0.6.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -378,7 +378,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.3"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery-denormalized

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.3
LABEL io.airbyte.version=0.6.4
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public BigQueryDestination() {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
final String datasetId = BigQueryUtils.getDatasetId(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigQueryUtils.getDatasetId(config) used in 2 cases

  1. BigQueryDestination.check()
  2. BigQueryDestination.getConsumer() invoke inside BigQueryUploaderFactory.getUploader then BigQueryUtils.getSchema then BigQueryUtils.getDatasetId(config)

Could you pls consider to add integration test to BigQueryDestinationTest to check that:

  1. we can create connection with provided dataset
  2. we can get consumer and write some data

final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuery bigquery = getBigQuery(config);
final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.bigquery;

import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage;
import static java.util.Objects.isNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -162,6 +163,25 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) {
return gcsJsonNode;
}

public static String getDatasetId(final JsonNode config) {
String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();

int colonIndex = datasetId.indexOf(":");
if (colonIndex != -1) {
String projectIdPart = datasetId.substring(0, colonIndex);
String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
if (!(projectId.equals(projectIdPart))) {
throw new IllegalArgumentException(String.format(
"Project ID included in Dataset ID must match Project ID field's value: Project ID is `%s`, but you specified `%s` in Dataset ID",
projectId,
projectIdPart));
}
}
// if colonIndex is -1, then this returns the entire string
// otherwise it returns everything after the colon
return datasetId.substring(colonIndex + 1);
}

public static String getDatasetLocation(final JsonNode config) {
if (config.has(BigQueryConsts.CONFIG_DATASET_LOCATION)) {
return config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText();
Expand Down Expand Up @@ -214,7 +234,7 @@ public static void transformJsonDateTimeToBigDataFormat(List<String> dateTimeFie
}

public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) {
final String defaultSchema = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
final String defaultSchema = getDatasetId(config);
final String srcNamespace = stream.getStream().getNamespace();
if (srcNamespace == null) {
return defaultSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,18 @@
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -197,16 +202,20 @@ void testSpec() throws Exception {
assertEquals(expected, actual);
}

@Test
void testCheckSuccess() {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testCheckSuccess(DatasetIdResetter resetDatasetId) {
resetDatasetId.accept(config);
final AirbyteConnectionStatus actual = new BigQueryDestination().check(config);
final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
assertEquals(expected, actual);
}

@Test
void testCheckFailure() {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testCheckFailure(DatasetIdResetter resetDatasetId) {
((ObjectNode) config).put(BigQueryConsts.CONFIG_PROJECT_ID, "fake");
resetDatasetId.accept(config);
final AirbyteConnectionStatus actual = new BigQueryDestination().check(config);
final String actualMessage = actual.getMessage();
LOGGER.info("Checking expected failure message:" + actualMessage);
Expand All @@ -215,8 +224,10 @@ void testCheckFailure() {
assertEquals(expected, actual.withMessage(""));
}

@Test
void testWriteSuccess() throws Exception {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWriteSuccess(DatasetIdResetter resetDatasetId) throws Exception {
resetDatasetId.accept(config);
final BigQueryDestination destination = new BigQueryDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

Expand Down Expand Up @@ -244,8 +255,10 @@ void testWriteSuccess() throws Exception {
.collect(Collectors.toList()));
}

@Test
void testWriteFailure() throws Exception {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWriteFailure(DatasetIdResetter resetDatasetId) throws Exception {
resetDatasetId.accept(config);
// hack to force an exception to be thrown from within the consumer.
final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1);
doThrow(new RuntimeException()).when(spiedMessage).getRecord();
Expand Down Expand Up @@ -305,8 +318,10 @@ private List<JsonNode> retrieveRecords(final String tableName) throws Exception
.collect(Collectors.toList());
}

@Test
void testWritePartitionOverUnpartitioned() throws Exception {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWritePartitionOverUnpartitioned(DatasetIdResetter resetDatasetId) throws Exception {
resetDatasetId.accept(config);
final String raw_table_name = String.format("_airbyte_raw_%s", USERS_STREAM_NAME);
createUnpartitionedTable(bigquery, dataset, raw_table_name);
assertFalse(isTablePartitioned(bigquery, dataset, raw_table_name));
Expand Down Expand Up @@ -369,4 +384,30 @@ private boolean isTablePartitioned(final BigQuery bigquery, final Dataset datase
return false;
}

private static class DatasetIdResetter {
private Consumer<JsonNode> consumer;

DatasetIdResetter(Consumer<JsonNode> consumer) {
this.consumer = consumer;
}

public void accept(JsonNode config) {
consumer.accept(config);
}
}

private static Stream<Arguments> datasetIdResetterProvider() {
// parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id`
return Stream.of(
Arguments.arguments(new DatasetIdResetter(config -> {})),
Arguments.arguments(new DatasetIdResetter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment to explain what this is testing

config -> {
String projectId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
String datasetId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_DATASET_ID).asText();
((ObjectNode) config).put(BigQueryConsts.CONFIG_DATASET_ID,
String.format("%s:%s", projectId, datasetId));
}
))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class BigQueryUtilsTest {

private ImmutableMap.Builder<Object, Object> configMapBuilder;

@BeforeEach
public void init() {
configMapBuilder = ImmutableMap.builder()
.put(BigQueryConsts.CONFIG_CREDS, "test_secret")
.put(BigQueryConsts.CONFIG_DATASET_LOCATION, "US");
}

@ParameterizedTest
@MethodSource("validBigQueryIdProvider")
public void testGetDatasetIdSuccess(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

String actual = BigQueryUtils.getDatasetId(config);

assertEquals(expected, actual);
}

@ParameterizedTest
@MethodSource("invalidBigQueryIdProvider")
public void testGetDatasetIdFail(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

Exception exception = assertThrows(IllegalArgumentException.class, () -> BigQueryUtils.getDatasetId(config));

assertEquals(expected, exception.getMessage());
}

private static Stream<Arguments> validBigQueryIdProvider() {
return Stream.of(
Arguments.arguments("my-project", "my_dataset", "my_dataset"),
Arguments.arguments("my-project", "my-project:my_dataset", "my_dataset"));
}

private static Stream<Arguments> invalidBigQueryIdProvider() {
return Stream.of(
Arguments.arguments("my-project", ":my_dataset",
"Project ID included in Dataset ID must match Project ID field's value: Project ID is `my-project`, but you specified `` in Dataset ID"),
Arguments.arguments("my-project", "your-project:my_dataset",
"Project ID included in Dataset ID must match Project ID field's value: Project ID is `my-project`, but you specified `your-project` in Dataset ID"));
}
}
2 changes: 2 additions & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.6.4 | 2022-01-17 | [\#8383](https://github.com/airbytehq/airbyte/issues/8383) | Support dataset-id prefixed by project-id |
| 0.6.3 | 2022-01-12 | [\#9415](https://github.com/airbytehq/airbyte/pull/9415) | BigQuery Destination : Fix GCS processing of Facebook data |
| 0.6.2 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user |
| 0.6.1 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
Expand All @@ -172,6 +173,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------| :--- |
| 0.2.4 | 2022-01-17 | [\#8383](https://github.com/airbytehq/airbyte/issues/8383) | BigQuery/BiqQuery denorm Destinations : Support dataset-id prefixed by project-id |
| 0.2.3 | 2022-01-12 | [\#9415](https://github.com/airbytehq/airbyte/pull/9415) | BigQuery Destination : Fix GCS processing of Facebook data |
| 0.2.2 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.2.1 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
Expand Down