Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination-BigQuery: fixed table already exists error #22497

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void createSchemaIfNotExists(final String datasetId, final String dataset
@Override
public void createTableIfNotExists(final TableId tableId, final Schema tableSchema) {
LOGGER.info("Creating target table {}", tableId);
BigQueryUtils.createPartitionedTable(bigQuery, tableId, tableSchema);
BigQueryUtils.createPartitionedTableIfNotExists(bigQuery, tableId, tableSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static void createSchemaAndTableIfNeeded(final BigQuery bigquery,
getOrCreateDataset(bigquery, schemaName, datasetLocation);
existingSchemas.add(schemaName);
}
BigQueryUtils.createPartitionedTable(bigquery, tmpTableId, schema);
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, tmpTableId, schema);
}

public static Dataset getOrCreateDataset(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
Expand Down Expand Up @@ -202,7 +202,7 @@ public static Table createTable(final BigQuery bigquery, final String datasetNam
* @return Table BigQuery table object to be referenced for deleting, otherwise empty meaning table
* was not successfully created
*/
static void createPartitionedTable(final BigQuery bigquery, final TableId tableId, final Schema schema) {
static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) {
try {
final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
Expand All @@ -220,8 +220,14 @@ static void createPartitionedTable(final BigQuery bigquery, final TableId tableI
.build();
final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

bigquery.create(tableInfo);
LOGGER.info("Partitioned table created successfully: {}", tableId);
final Table table = bigquery.getTable(tableInfo.getTableId());
if (table != null && table.exists()) {
LOGGER.info("Partitioned table ALREADY EXISTS: {}", tableId);
} else {
bigquery.create(tableInfo);
LOGGER.info("Partitioned table created successfully: {}", tableId);
}

} catch (final BigQueryException e) {
LOGGER.error("Partitioned table was not created: " + tableId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.bigquery;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -15,13 +16,18 @@
import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -314,6 +320,31 @@ void testWriteSuccess(final String configName) throws Exception {
.collect(Collectors.toList()));
}

@Test
void testCreateTableSuccessWhenTableAlreadyExists() throws Exception {
initBigQuery(config);

// Test schema where we will try to re-create existing table
final String tmpTestSchemaName = "test_create_table_when_exists_schema";

final com.google.cloud.bigquery.Schema schema = com.google.cloud.bigquery.Schema.of(
com.google.cloud.bigquery.Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING),
com.google.cloud.bigquery.Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP),
com.google.cloud.bigquery.Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING));

final TableId tableId = TableId.of(tmpTestSchemaName, "test_already_existing_table");

BigQueryUtils.getOrCreateDataset(bigquery, tmpTestSchemaName, BigQueryUtils.getDatasetLocation(config));

assertDoesNotThrow(() -> {
// Create table
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, tableId, schema);

// Try to create it one more time. Shouldn't throw exception
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, tableId, schema);
});
}

@ParameterizedTest
@MethodSource("failWriteTestConfigProvider")
void testWriteFailure(final String configName, final String error) throws Exception {
Expand Down