Skip to content

Commit

Permalink
Destination-BigQuery: fixed table already exists error (#22497)
Browse files Browse the repository at this point in the history
* [22178] Destination-bigquery: fixed table already exists error

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
etsybaev and octavia-squidington-iii authored Feb 9, 2023
1 parent fcd3b03 commit a70d6e8
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.2.13
dockerImageTag: 1.2.14
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
normalizationConfig:
Expand All @@ -58,7 +58,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 1.2.11
dockerImageTag: 1.2.14
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.2.13"
- dockerImage: "airbyte/destination-bigquery:1.2.14"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -831,7 +831,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.11"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.14"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.12
LABEL io.airbyte.version=1.2.14
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.13
LABEL io.airbyte.version=1.2.14
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void createSchemaIfNotExists(final String datasetId, final String dataset
@Override
public void createTableIfNotExists(final TableId tableId, final Schema tableSchema) {
LOGGER.info("Creating target table {}", tableId);
BigQueryUtils.createPartitionedTable(bigQuery, tableId, tableSchema);
BigQueryUtils.createPartitionedTableIfNotExists(bigQuery, tableId, tableSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static void createSchemaAndTableIfNeeded(final BigQuery bigquery,
getOrCreateDataset(bigquery, schemaName, datasetLocation);
existingSchemas.add(schemaName);
}
BigQueryUtils.createPartitionedTable(bigquery, tmpTableId, schema);
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, tmpTableId, schema);
}

public static Dataset getOrCreateDataset(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
Expand Down Expand Up @@ -202,7 +202,7 @@ public static Table createTable(final BigQuery bigquery, final String datasetNam
* @return Table BigQuery table object to be referenced for deleting, otherwise empty meaning table
* was not successfully created
*/
static void createPartitionedTable(final BigQuery bigquery, final TableId tableId, final Schema schema) {
static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) {
try {
final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
Expand All @@ -220,8 +220,14 @@ static void createPartitionedTable(final BigQuery bigquery, final TableId tableI
.build();
final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

bigquery.create(tableInfo);
LOGGER.info("Partitioned table created successfully: {}", tableId);
final Table table = bigquery.getTable(tableInfo.getTableId());
if (table != null && table.exists()) {
LOGGER.info("Partitioned table ALREADY EXISTS: {}", tableId);
} else {
bigquery.create(tableInfo);
LOGGER.info("Partitioned table created successfully: {}", tableId);
}

} catch (final BigQueryException e) {
LOGGER.error("Partitioned table was not created: " + tableId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.bigquery;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -15,13 +16,18 @@
import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -314,6 +320,31 @@ void testWriteSuccess(final String configName) throws Exception {
.collect(Collectors.toList()));
}

@Test
void testCreateTableSuccessWhenTableAlreadyExists() throws Exception {
initBigQuery(config);

// Test schema where we will try to re-create existing table
final String tmpTestSchemaName = "test_create_table_when_exists_schema";

final com.google.cloud.bigquery.Schema schema = com.google.cloud.bigquery.Schema.of(
com.google.cloud.bigquery.Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING),
com.google.cloud.bigquery.Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP),
com.google.cloud.bigquery.Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING));

final TableId tableId = TableId.of(tmpTestSchemaName, "test_already_existing_table");

BigQueryUtils.getOrCreateDataset(bigquery, tmpTestSchemaName, BigQueryUtils.getDatasetLocation(config));

assertDoesNotThrow(() -> {
// Create table
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, tableId, schema);

// Try to create it one more time. Shouldn't throw exception
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, tableId, schema);
});
}

@ParameterizedTest
@MethodSource("failWriteTestConfigProvider")
void testWriteFailure(final String configName, final String error) throws Exception {
Expand Down
67 changes: 67 additions & 0 deletions docs/integrations/destinations/bigquery-denormalized.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,70 @@
# Bigquery Denormalized

See [destinations/bigquery](/integrations/destinations/bigquery)

## Changelog

### bigquery-denormalized

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.2.14 | 2023-02-08 | [#22497](https://github.com/airbytehq/airbyte/pull/22497) | Fixed table already exists error |
| 1.2.13 | 2023-01-26 | [#20631](https://github.com/airbytehq/airbyte/pull/20631) | Added support for destination checkpointing with staging |
| 1.2.12 | 2023-01-18 | [#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions |
| 1.2.11 | 2023-01-18 | [#21144](https://github.com/airbytehq/airbyte/pull/21144) | Added explicit error message if sync fails due to a config issue |
| 1.2.10 | 2023-01-04 | [#20730](https://github.com/airbytehq/airbyte/pull/20730) | An incoming source Number type will create a big query integer rather than a float. |
| 1.2.9 | 2022-12-14 | [#20501](https://github.com/airbytehq/airbyte/pull/20501) | Report GCS staging failures that occur during connection check |
| 1.2.8 | 2022-11-22 | [#19489](https://github.com/airbytehq/airbyte/pull/19489) | Added non-billable projects handle to check connection stage |
| 1.2.7 | 2022-11-11 | [#19358](https://github.com/airbytehq/airbyte/pull/19358) | Fixed check method to capture mismatch dataset location |
| 1.2.6 | 2022-11-10 | [#18554](https://github.com/airbytehq/airbyte/pull/18554) | Improve check connection method to handle more errors |
| 1.2.5 | 2022-10-19 | [#18162](https://github.com/airbytehq/airbyte/pull/18162) | Improve error logs |
| 1.2.4 | 2022-09-26 | [#16890](https://github.com/airbytehq/airbyte/pull/16890) | Add user-agent header |
| 1.2.3 | 2022-09-22 | [#17054](https://github.com/airbytehq/airbyte/pull/17054) | Respect stream namespace |
| 1.2.2 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | (bugged, do not use) Wrap logs in AirbyteLogMessage |
| 1.2.1 | 2022-09-10 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | (bugged, do not use) Wrapping string objects with TextNode |
| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | (bugged, do not use) Cover arrays only if they are nested |
| 1.1.16 | 2022-09-01 | [#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
| 1.1.15 | 2022-08-03 | [14784](https://github.com/airbytehq/airbyte/pull/14784) | Enabling Application Default Credentials |
| 1.1.14 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings |
| 1.1.13 | 2022-08-02 | [15180](https://github.com/airbytehq/airbyte/pull/15180) | Fix standard loading mode |
| 1.1.12 | 2022-06-29 | [14079](https://github.com/airbytehq/airbyte/pull/14079) | Map "airbyte_type": "big_integer" to INT64 |
| 1.1.11 | 2022-06-24 | [14114](https://github.com/airbytehq/airbyte/pull/14114) | Remove "additionalProperties": false from specs for connectors with staging |
| 1.1.10 | 2022-06-16 | [13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors |
| 1.1.9 | 2022-06-17 | [13753](https://github.com/airbytehq/airbyte/pull/13753) | Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager |
| 1.1.8 | 2022-06-07 | [13579](https://github.com/airbytehq/airbyte/pull/13579) | Always check GCS bucket for GCS loading method to catch invalid HMAC keys. |
| 1.1.7 | 2022-06-07 | [13424](https://github.com/airbytehq/airbyte/pull/13424) | Reordered fields for specification. |
| 1.1.6 | 2022-05-15 | [12768](https://github.com/airbytehq/airbyte/pull/12768) | Clarify that the service account key json field is required on cloud. |
| 0.3.5 | 2022-05-12 | [12805](https://github.com/airbytehq/airbyte/pull/12805) | Updated to latest base-java to emit AirbyteTraceMessage on error. |
| 0.3.4 | 2022-05-04 | [12578](https://github.com/airbytehq/airbyte/pull/12578) | In JSON to Avro conversion, log JSON field values that do not follow Avro schema for debugging. |
| 0.3.3 | 2022-05-02 | [12528](https://github.com/airbytehq/airbyte/pull/12528) | Update Dataset location field description |
| 0.3.2 | 2022-04-29 | [12477](https://github.com/airbytehq/airbyte/pull/12477) | Dataset location is a required field |
| 0.3.1 | 2022-04-15 | [11978](https://github.com/airbytehq/airbyte/pull/11978) | Fixed emittedAt timestamp. |
| 0.3.0 | 2022-04-06 | [11776](https://github.com/airbytehq/airbyte/pull/11776) | Use serialized buffering strategy to reduce memory consumption. |
| 0.2.15 | 2022-04-05 | [11166](https://github.com/airbytehq/airbyte/pull/11166) | Fixed handling of anyOf and allOf fields |
| 0.2.14 | 2022-04-02 | [11620](https://github.com/airbytehq/airbyte/pull/11620) | Updated spec |
| 0.2.13 | 2022-04-01 | [11636](https://github.com/airbytehq/airbyte/pull/11636) | Added new unit tests |
| 0.2.12 | 2022-03-28 | [11454](https://github.com/airbytehq/airbyte/pull/11454) | Integration test enhancement for picking test-data and schemas |
| 0.2.11 | 2022-03-18 | [10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters |
| 0.2.10 | 2022-03-03 | [10755](https://github.com/airbytehq/airbyte/pull/10755) | Make sure to kill children threads and stop JVM |
| 0.2.8 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.2.7 | 2022-02-01 | [9959](https://github.com/airbytehq/airbyte/pull/9959) | Fix null pointer exception from buffered stream consumer. |
| 0.2.6 | 2022-01-29 | [9745](https://github.com/airbytehq/airbyte/pull/9745) | Integrate with Sentry. |
| 0.2.5 | 2022-01-18 | [9573](https://github.com/airbytehq/airbyte/pull/9573) | BigQuery Destination : update description for some input fields |
| 0.2.4 | 2022-01-17 | [8383](https://github.com/airbytehq/airbyte/issues/8383) | BigQuery/BiqQuery denorm Destinations : Support dataset-id prefixed by project-id |
| 0.2.3 | 2022-01-12 | [9415](https://github.com/airbytehq/airbyte/pull/9415) | BigQuery Destination : Fix GCS processing of Facebook data |
| 0.2.2 | 2021-12-22 | [9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.2.1 | 2021-12-21 | [8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.2.0 | 2021-12-17 | [8788](https://github.com/airbytehq/airbyte/pull/8788) | BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files |
| 0.1.11 | 2021-12-16 | [8816](https://github.com/airbytehq/airbyte/issues/8816) | Update dataset locations |
| 0.1.10 | 2021-11-09 | [7804](https://github.com/airbytehq/airbyte/pull/7804) | handle null values in fields described by a $ref definition |
| 0.1.9 | 2021-11-08 | [7736](https://github.com/airbytehq/airbyte/issues/7736) | Fixed the handling of ObjectNodes with $ref definition key |
| 0.1.8 | 2021-10-27 | [7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery |
| 0.1.7 | 2021-10-26 | [7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |
| 0.1.6 | 2021-09-16 | [6145](https://github.com/airbytehq/airbyte/pull/6145) | BigQuery Denormalized support for date, datetime & timestamp types through the json "format" key |
| 0.1.5 | 2021-09-07 | [5881](https://github.com/airbytehq/airbyte/pull/5881) | BigQuery Denormalized NPE fix |
| 0.1.4 | 2021-09-04 | [5813](https://github.com/airbytehq/airbyte/pull/5813) | fix Stackoverflow error when receive a schema from source where "Array" type doesn't contain a required "items" element |
| 0.1.3 | 2021-08-07 | [5261](https://github.com/airbytehq/airbyte/pull/5261) | 🐛 Destination BigQuery\(Denormalized\): Fix processing arrays of records |
| 0.1.2 | 2021-07-30 | [5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
| 0.1.1 | 2021-06-21 | [3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer |
| 0.1.0 | 2021-06-21 | [4176](https://github.com/airbytehq/airbyte/pull/4176) | Destination using Typed Struct and Repeated fields |

Loading

0 comments on commit a70d6e8

Please sign in to comment.