Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination BigQuery: Accept Dataset ID field prefixed by Project ID #8383

Merged
merged 17 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public BigQueryDestination() {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
final String datasetId = BigQueryUtils.getDatasetId(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigQueryUtils.getDatasetId(config) used in 2 cases

  1. BigQueryDestination.check()
  2. BigQueryDestination.getConsumer() invoke inside BigQueryUploaderFactory.getUploader then BigQueryUtils.getSchema then BigQueryUtils.getDatasetId(config)

Could you pls consider to add integration test to BigQueryDestinationTest to check that:

  1. we can create connection with provided dataset
  2. we can get consumer and write some data

final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuery bigquery = getBigQuery(config);
final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.bigquery;

import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage;
import static java.util.Objects.isNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -37,6 +38,8 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.joda.time.DateTime;
import org.slf4j.Logger;
Expand All @@ -46,6 +49,7 @@ public class BigQueryUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUtils.class);
private static final String BIG_QUERY_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";
private static final Pattern DATASET_ID_PATTERN = Pattern.compile("^(([a-z]([a-z0-9\\-]*[a-z0-9])?):)?([a-zA-Z0-9_]+)$");

public static ImmutablePair<Job, String> executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) {
final JobId jobId = JobId.of(UUID.randomUUID().toString());
Expand Down Expand Up @@ -162,6 +166,27 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) {
return gcsJsonNode;
}

public static String getDatasetId(final JsonNode config) {
String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
Matcher matcher = DATASET_ID_PATTERN.matcher(datasetId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the regex makes it a little hard to follow the logic; string manipulation feels more clear:

int colonIndex = datasetId.indexOf(":");
if (colonIndex != -1) {
  String projectId = datasetId.substring(0, colonIndex);
  // check equality against config.get(PROJECT_ID)
  // ...
}
// if colonIndex is -1, then this returns the entire string
// otherwise it returns everything after the colon
return datasetId.substring(colonIndex + 1);

This means we would lose the validation aspect (e.g. if someone enters invalid*characters! we'll accept it). But those errors would be detected when the connector runs its check method, so that feels OK to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I agree with your suggestion. We don't need to check here what BigQuery checks.


if (matcher.matches()) {
if (!isNull(matcher.group(1))) {
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
if (!(projectId.equals(matcher.group(2)))) {
throw new IllegalArgumentException(String.format(
"Project ID included in Dataset ID must match Project ID field's value: Project ID is %s, but you specified %s in Dataset ID",
projectId,
matcher.group(2)));
}
}
return matcher.group(4);
}
throw new IllegalArgumentException(String.format(
"BigQuery Dataset ID format must match '[project-id:]dataset_id': %s",
datasetId));
}

public static String getDatasetLocation(final JsonNode config) {
if (config.has(BigQueryConsts.CONFIG_DATASET_LOCATION)) {
return config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText();
Expand Down Expand Up @@ -214,7 +239,7 @@ public static void transformJsonDateTimeToBigDataFormat(List<String> dateTimeFie
}

public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) {
final String defaultSchema = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText();
final String defaultSchema = getDatasetId(config);
final String srcNamespace = stream.getStream().getNamespace();
if (srcNamespace == null) {
return defaultSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,18 @@
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -197,16 +202,20 @@ void testSpec() throws Exception {
assertEquals(expected, actual);
}

@Test
void testCheckSuccess() {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testCheckSuccess(DatasetIdResetter resetDatasetId) {
resetDatasetId.accept(config);
final AirbyteConnectionStatus actual = new BigQueryDestination().check(config);
final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
assertEquals(expected, actual);
}

@Test
void testCheckFailure() {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testCheckFailure(DatasetIdResetter resetDatasetId) {
((ObjectNode) config).put(BigQueryConsts.CONFIG_PROJECT_ID, "fake");
resetDatasetId.accept(config);
final AirbyteConnectionStatus actual = new BigQueryDestination().check(config);
final String actualMessage = actual.getMessage();
LOGGER.info("Checking expected failure message:" + actualMessage);
Expand All @@ -215,8 +224,10 @@ void testCheckFailure() {
assertEquals(expected, actual.withMessage(""));
}

@Test
void testWriteSuccess() throws Exception {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWriteSuccess(DatasetIdResetter resetDatasetId) throws Exception {
resetDatasetId.accept(config);
final BigQueryDestination destination = new BigQueryDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

Expand Down Expand Up @@ -244,8 +255,10 @@ void testWriteSuccess() throws Exception {
.collect(Collectors.toList()));
}

@Test
void testWriteFailure() throws Exception {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWriteFailure(DatasetIdResetter resetDatasetId) throws Exception {
resetDatasetId.accept(config);
// hack to force an exception to be thrown from within the consumer.
final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1);
doThrow(new RuntimeException()).when(spiedMessage).getRecord();
Expand Down Expand Up @@ -305,8 +318,10 @@ private List<JsonNode> retrieveRecords(final String tableName) throws Exception
.collect(Collectors.toList());
}

@Test
void testWritePartitionOverUnpartitioned() throws Exception {
@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWritePartitionOverUnpartitioned(DatasetIdResetter resetDatasetId) throws Exception {
resetDatasetId.accept(config);
final String raw_table_name = String.format("_airbyte_raw_%s", USERS_STREAM_NAME);
createUnpartitionedTable(bigquery, dataset, raw_table_name);
assertFalse(isTablePartitioned(bigquery, dataset, raw_table_name));
Expand Down Expand Up @@ -369,4 +384,29 @@ private boolean isTablePartitioned(final BigQuery bigquery, final Dataset datase
return false;
}

private static class DatasetIdResetter {
private Consumer<JsonNode> consumer;

DatasetIdResetter(Consumer<JsonNode> consumer) {
this.consumer = consumer;
}

public void accept(JsonNode config) {
consumer.accept(config);
}
}

private static Stream<Arguments> datasetIdResetterProvider() {
return Stream.of(
Arguments.arguments(new DatasetIdResetter(config -> {})),
Arguments.arguments(new DatasetIdResetter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment to explain what this is testing

config -> {
String projectId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
String datasetId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_DATASET_ID).asText();
((ObjectNode) config).put(BigQueryConsts.CONFIG_DATASET_ID,
String.format("%s:%s", projectId, datasetId));
}
))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class BigQueryUtilsTest {

private ImmutableMap.Builder<Object, Object> configMapBuilder;

@BeforeEach
public void init() {
configMapBuilder = ImmutableMap.builder()
.put(BigQueryConsts.CONFIG_CREDS, "test_secret")
.put(BigQueryConsts.CONFIG_DATASET_LOCATION, "US");
}

@ParameterizedTest
@MethodSource("validBigQueryIdProvider")
public void testGetDatasetIdSuccess(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

String actual = BigQueryUtils.getDatasetId(config);

assertEquals(expected, actual);
}

@ParameterizedTest
@MethodSource("invalidBigQueryIdProvider")
public void testGetDatasetIdFail(String projectId, String datasetId, String expected) throws Exception {
JsonNode config = Jsons.jsonNode(configMapBuilder
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)
.build());

Exception exception = assertThrows(IllegalArgumentException.class, () -> BigQueryUtils.getDatasetId(config));

assertEquals(expected, exception.getMessage());
}

private static Stream<Arguments> validBigQueryIdProvider() {
return Stream.of(
Arguments.arguments("my-project", "my_dataset", "my_dataset"),
Arguments.arguments("my-project", "my-project:my_dataset", "my_dataset"));
}

private static Stream<Arguments> invalidBigQueryIdProvider() {
return Stream.of(
Arguments.arguments("my-project", ":my_dataset", "BigQuery Dataset ID format must match '[project-id:]dataset_id': :my_dataset"),
Arguments.arguments("my-project", "my-project:my-project:my_dataset", "BigQuery Dataset ID format must match '[project-id:]dataset_id': my-project:my-project:my_dataset"),
Arguments.arguments("my-project", "my-project-:my_dataset", "BigQuery Dataset ID format must match '[project-id:]dataset_id': my-project-:my_dataset"),
Arguments.arguments("my-project", "my-project:", "BigQuery Dataset ID format must match '[project-id:]dataset_id': my-project:"),
Arguments.arguments("my-project", "your-project:my_dataset",
"Project ID included in Dataset ID must match Project ID field's value: Project ID is my-project, but you specified your-project in Dataset ID"));
}
}
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.6.2 | 2021-12-22 | [\#8383](https://github.com/airbytehq/airbyte/issues/8383) | Support dataset-id prefixed by project-id |
| 0.6.1 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.6.0 | 2021-12-17 | [\#8788](https://github.com/airbytehq/airbyte/issues/8788) | BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files |
| 0.5.1 | 2021-12-16 | [\#8816](https://github.com/airbytehq/airbyte/issues/8816) | Update dataset locations |
Expand Down