Skip to content

Commit

Permalink
Destination BigQuery: Consolidation of objects to StreamConfig, clean…
Browse files Browse the repository at this point in the history
…up (#38131)

## What
Removing redundant references and duplicate information passed around using `WriteConfig` objects. No functional changes and resurrected all the information needed through `StreamConfig` and adapted changes accordingly. 

This PR should be in a mergeable state with no functional changes after the ones down the stack are published. 


## Review guide
* Removed references of `BigQueryWriteConfig` and reused already built `StreamConfig`
* Removing unnecessary `StagingOperations` interface and made concrete class, this will help for later adding a shim on this and refactoring without large changes 
* Removed other unnecessary references of getting dynamic schema, `WriteDispostion` etc. Probably remnant of bigquery-denormalized bespoke connector. 

## User Impact
<!--
* What is the end result perceived by the user?
* If there are negative side effects, please list them. 
-->

## Can this PR be safely reverted and rolled back?
<!--
* If unsure, leave it blank.
-->
- [x] YES 💚
- [ ] NO ❌
  • Loading branch information
gisripa authored May 13, 2024
1 parent 13e16de commit e0225c1
Show file tree
Hide file tree
Showing 23 changed files with 103 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.34.4'
cdkVersionRequired = '0.35.0'
features = [
'db-destinations',
'datastore-bigquery',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.4.19
dockerImageTag: 2.4.20
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.bigquery;

import com.google.cloud.bigquery.TableId;
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns;
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
Expand All @@ -12,6 +13,9 @@
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Map;
Expand All @@ -25,15 +29,15 @@
@Slf4j
class BigQueryAsyncFlush implements DestinationFlushFunction {

private final Map<StreamDescriptor, BigQueryWriteConfig> streamDescToWriteConfig;
private final BigQueryStagingOperations stagingOperations;
private final Map<StreamDescriptor, StreamConfig> streamConfigMap;
private final BigQueryGcsOperations stagingOperations;
private final ConfiguredAirbyteCatalog catalog;

public BigQueryAsyncFlush(
final Map<StreamDescriptor, BigQueryWriteConfig> streamDescToWriteConfig,
final BigQueryStagingOperations stagingOperations,
final Map<StreamDescriptor, StreamConfig> streamConfigMap,
final BigQueryGcsOperations stagingOperations,
final ConfiguredAirbyteCatalog catalog) {
this.streamDescToWriteConfig = streamDescToWriteConfig;
this.streamConfigMap = streamConfigMap;
this.stagingOperations = stagingOperations;
this.catalog = catalog;
}
Expand All @@ -60,20 +64,20 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag

writer.flush();
log.info("Flushing CSV buffer for stream {} ({}) to staging", decs.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount()));
if (!streamDescToWriteConfig.containsKey(decs)) {
if (!streamConfigMap.containsKey(decs)) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog)));
}

final BigQueryWriteConfig writeConfig = streamDescToWriteConfig.get(decs);
final StreamId streamId = streamConfigMap.get(decs).getId();
try {
final String stagedFileName = stagingOperations.uploadRecordsToStage(writeConfig.datasetId(), writeConfig.streamName(), writer);
final String stagedFileName = stagingOperations.uploadRecordsToStage(streamId.getRawNamespace(), streamId.getOriginalName(), writer);

stagingOperations.copyIntoTableFromStage(
writeConfig.datasetId(),
writeConfig.streamName(),
writeConfig.targetTableId(),
writeConfig.tableSchema(),
streamId.getRawNamespace(),
streamId.getOriginalName(),
TableId.of(streamId.getRawNamespace(), streamId.getRawName()),
BigQueryRecordFormatter.SCHEMA_V2,
stagedFileName);
} catch (final Exception e) {
log.error("Failed to flush and commit buffer data into destination's raw table", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.common.util.concurrent.RateLimiter;
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
Expand All @@ -22,13 +21,9 @@ public class BigQueryAsyncStandardFlush implements DestinationFlushFunction {

// TODO remove this once the async framework supports rate-limiting/backpressuring
private static final RateLimiter rateLimiter = RateLimiter.create(0.07);

private final BigQuery bigQuery;
private final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap;

public BigQueryAsyncStandardFlush(final BigQuery bigQuery,
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
this.bigQuery = bigQuery;
public BigQueryAsyncStandardFlush(final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
this.uploaderMap = uploaderMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
Expand Down Expand Up @@ -64,10 +65,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.jetbrains.annotations.NotNull;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
Expand Down Expand Up @@ -223,9 +224,9 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,

@Override
@SuppressWarnings("deprecation")
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final @NotNull JsonNode config,
final @NotNull ConfiguredAirbyteCatalog catalog,
final @NotNull Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config);
final String defaultNamespace = BigQueryUtils.getDatasetId(config);
Expand All @@ -234,7 +235,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET);
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride);
final ParsedCatalog parsedCatalog = parseCatalog(sqlGenerator, defaultNamespace,
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE), catalog);
final BigQuery bigquery = getBigQuery(config);
final TyperDeduper typerDeduper =
buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe);
Expand Down Expand Up @@ -269,22 +271,20 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final DateTime syncDatetime = DateTime.now(DateTimeZone.UTC);
final boolean keepStagingFiles = BigQueryUtils.isKeepFilesInGcs(config);
final GcsStorageOperations gcsOperations = new GcsStorageOperations(gcsNameTransformer, gcsConfig.getS3Client(), gcsConfig);
final BigQueryStagingOperations bigQueryGcsOperations = new BigQueryGcsOperations(
final BigQueryGcsOperations bigQueryGcsOperations = new BigQueryGcsOperations(
bigquery,
gcsNameTransformer,
gcsConfig,
gcsOperations,
datasetLocation,
stagingId,
syncDatetime,
keepStagingFiles);

return new BigQueryStagingConsumerFactory().createAsync(
config,
catalog,
outputRecordCollector,
bigQueryGcsOperations,
getCsvRecordFormatterCreator(namingResolver),
namingResolver::getTmpTableName,
typerDeduper,
parsedCatalog,
BigQueryUtils.getDatasetId(config));
Expand Down Expand Up @@ -368,7 +368,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
catalog,
parsedCatalog);

final Optional<String> bqNamespace = Optional.ofNullable(BigQueryUtils.getDatasetId(config));
final String bqNamespace = BigQueryUtils.getDatasetId(config);

return new BigQueryRecordStandardConsumer(
outputRecordCollector,
Expand Down Expand Up @@ -406,18 +406,14 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
throw new RuntimeException(e);
}
},
bigquery,
catalog,
bqNamespace,
writeConfigs);
}

protected Function<JsonNode, BigQueryRecordFormatter> getCsvRecordFormatterCreator(final BigQuerySQLNameTransformer namingResolver) {
return streamSchema -> new BigQueryRecordFormatter(namingResolver);
}

private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, final String namespace) {
// Set the default namespace on streams with null namespace. This means we don't need to repeat this
// Set the default originalNamespace on streams with null originalNamespace. This means we don't
// need to repeat this
// logic in the rest of the connector.
// (record messages still need to handle null namespaces though, which currently happens in e.g.
// AsyncStreamConsumer#accept)
Expand All @@ -429,13 +425,11 @@ private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, f
}
}

private ParsedCatalog parseCatalog(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final String datasetLocation,
final Optional<String> rawNamespaceOverride) {
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
final CatalogParser catalogParser = rawNamespaceOverride.map(s -> new CatalogParser(sqlGenerator, s))
.orElseGet(() -> new CatalogParser(sqlGenerator));
private ParsedCatalog parseCatalog(final BigQuerySqlGenerator sqlGenerator,
final String defaultNamespace,
final String rawNamespaceOverride,
final ConfiguredAirbyteCatalog catalog) {
final CatalogParser catalogParser = new CatalogParser(sqlGenerator, rawNamespaceOverride);

return catalogParser.parseCatalog(catalog);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryGcsOperations implements BigQueryStagingOperations {
public class BigQueryGcsOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsOperations.class);

private final BigQuery bigQuery;
private final StandardNameTransformer gcsNameTransformer;
private final GcsDestinationConfig gcsConfig;
private final GcsStorageOperations gcsStorageOperations;

private final String datasetLocation;
private final UUID randomStagingId;
private final DateTime syncDatetime;
private final boolean keepStagingFiles;
Expand All @@ -44,13 +46,15 @@ public BigQueryGcsOperations(final BigQuery bigQuery,
final StandardNameTransformer gcsNameTransformer,
final GcsDestinationConfig gcsConfig,
final GcsStorageOperations gcsStorageOperations,
final String datasetLocation,
final UUID randomStagingId,
final DateTime syncDatetime,
final boolean keepStagingFiles) {
this.bigQuery = bigQuery;
this.gcsNameTransformer = gcsNameTransformer;
this.gcsConfig = gcsConfig;
this.gcsStorageOperations = gcsStorageOperations;
this.datasetLocation = datasetLocation;
this.randomStagingId = randomStagingId;
this.syncDatetime = syncDatetime;
this.keepStagingFiles = keepStagingFiles;
Expand All @@ -69,7 +73,6 @@ private String getStagingRootPath(final String datasetId, final String stream) {
/**
* @return {@code <bucket-path>/<dataset-id>_<stream-name>/<year>/<month>/<day>/<hour>/<uuid>/}
*/
@Override
public String getStagingFullPath(final String datasetId, final String stream) {
return gcsNameTransformer.applyDefaultCase(String.format("%s/%s/%02d/%02d/%02d/%s/",
getStagingRootPath(datasetId, stream),
Expand All @@ -80,8 +83,7 @@ public String getStagingFullPath(final String datasetId, final String stream) {
randomStagingId));
}

@Override
public void createSchemaIfNotExists(final String datasetId, final String datasetLocation) {
public void createSchemaIfNotExists(final String datasetId) {
if (!existingSchemas.contains(datasetId)) {
LOGGER.info("Creating dataset {}", datasetId);
try {
Expand All @@ -97,20 +99,17 @@ public void createSchemaIfNotExists(final String datasetId, final String dataset
}
}

@Override
public void createTableIfNotExists(final TableId tableId, final Schema tableSchema) {
LOGGER.info("Creating target table {}", tableId);
BigQueryUtils.createPartitionedTableIfNotExists(bigQuery, tableId, tableSchema);
}

@Override
public void createStageIfNotExists(final String datasetId, final String stream) {
final String objectPath = getStagingFullPath(datasetId, stream);
LOGGER.info("Creating staging path for stream {} (dataset {}): {}", stream, datasetId, objectPath);
gcsStorageOperations.createBucketIfNotExists();
}

@Override
public String uploadRecordsToStage(final String datasetId, final String stream, final SerializableBuffer writer) {
final String objectPath = getStagingFullPath(datasetId, stream);
LOGGER.info("Uploading records to staging for stream {} (dataset {}): {}", stream, datasetId, objectPath);
Expand All @@ -125,7 +124,6 @@ public String uploadRecordsToStage(final String datasetId, final String stream,
* Reference
* https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
*/
@Override
public void copyIntoTableFromStage(final String datasetId,
final String stream,
final TableId tableId,
Expand Down Expand Up @@ -159,7 +157,6 @@ public void copyIntoTableFromStage(final String datasetId,
}
}

@Override
@Deprecated
public void cleanUpStage(final String datasetId, final String stream, final List<String> stagedFiles) {
if (keepStagingFiles) {
Expand All @@ -170,13 +167,11 @@ public void cleanUpStage(final String datasetId, final String stream, final List
gcsStorageOperations.cleanUpBucketObject(getStagingRootPath(datasetId, stream), stagedFiles);
}

@Override
public void dropTableIfExists(final String datasetId, final TableId tableId) {
LOGGER.info("Deleting target table {} (dataset {})", tableId, datasetId);
bigQuery.delete(tableId);
}

@Override
public void dropStageIfExists(final String datasetId, final String stream) {
if (keepStagingFiles) {
return;
Expand All @@ -200,7 +195,6 @@ public void dropStageIfExists(final String datasetId, final String stream) {
* @param tableId table name
* @param schema schema of the table to be deleted/created
*/
@Override
public void truncateTableIfExists(final String datasetId,
final TableId tableId,
final Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination.bigquery;

import com.google.cloud.bigquery.BigQuery;
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure;
Expand All @@ -28,17 +27,16 @@ public class BigQueryRecordStandardConsumer extends AsyncStreamConsumer {
public BigQueryRecordStandardConsumer(Consumer<AirbyteMessage> outputRecordCollector,
OnStartFunction onStart,
OnCloseFunction onClose,
BigQuery bigQuery,
ConfiguredAirbyteCatalog catalog,
Optional<String> defaultNamespace,
String defaultNamespace,
Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
super(outputRecordCollector,
onStart,
onClose,
new BigQueryAsyncStandardFlush(bigQuery, uploaderMap),
new BigQueryAsyncStandardFlush(uploaderMap),
catalog,
new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5)),
defaultNamespace,
Optional.ofNullable(defaultNamespace),
new FlushFailure(),
Executors.newFixedThreadPool(2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public String convertStreamName(final String input) {
}

/**
* BigQuery allows a number to be the first character of a namespace. Datasets that begin with an
* underscore are hidden databases, and we cannot query <hidden-dataset>.INFORMATION_SCHEMA. So we
* append a letter instead of underscore for normalization. Reference:
* BigQuery allows a number to be the first character of a originalNamespace. Datasets that begin
* with an underscore are hidden databases, and we cannot query <hidden-dataset>.INFORMATION_SCHEMA.
* So we append a letter instead of underscore for normalization. Reference:
* https://cloud.google.com/bigquery/docs/datasets#dataset-naming
*/
@Override
Expand Down
Loading

0 comments on commit e0225c1

Please sign in to comment.