Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination BigQuery: Nuking old remnants #38111

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.4.17
dockerImageTag: 2.4.18
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.google.common.util.concurrent.RateLimiter;
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -24,18 +24,18 @@ public class BigQueryAsyncStandardFlush implements DestinationFlushFunction {
private static final RateLimiter rateLimiter = RateLimiter.create(0.07);

private final BigQuery bigQuery;
private final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> uploaderMap;
private final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap;

public BigQueryAsyncStandardFlush(final BigQuery bigQuery,
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> uploaderMap) {
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
this.bigQuery = bigQuery;
this.uploaderMap = uploaderMap;
}

@Override
public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessage> stream) throws Exception {
rateLimiter.acquire();
final ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMapSupplied = uploaderMap.get();
final ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMapSupplied = uploaderMap.get();
final AtomicInteger recordCount = new AtomicInteger();
stream.forEach(aibyteMessage -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.GcsCsvBigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator;
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
Expand Down Expand Up @@ -292,14 +290,14 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
BigQueryUtils.getDatasetId(config));
}

protected Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> getUploaderMap(
final BigQuery bigquery,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog)
protected Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> getUploaderMap(
final BigQuery bigquery,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog)
throws IOException {
return () -> {
final ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap = new ConcurrentHashMap<>();
final ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMap = new ConcurrentHashMap<>();
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
final AirbyteStream stream = configStream.getStream();
final StreamConfig parsedStream;
Expand All @@ -315,7 +313,7 @@ protected Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQuer
.configStream(configStream)
.parsedStream(parsedStream)
.config(config)
.formatterMap(getFormatterMap(stream.getJsonSchema()))
.formatterMap(getFormatterMap())
.targetTableName(targetTableName)
// This refers to whether this is BQ denormalized or not
.isDefaultAirbyteTmpSchema(isDefaultAirbyteTmpTableSchema())
Expand All @@ -333,7 +331,7 @@ protected Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQuer

protected void putStreamIntoUploaderMap(final AirbyteStream stream,
final UploaderConfig uploaderConfig,
final Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap)
final Map<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMap)
throws IOException {
uploaderMap.put(
AirbyteStreamNameNamespacePair.fromAirbyteStream(stream),
Expand All @@ -351,10 +349,10 @@ protected boolean isDefaultAirbyteTmpTableSchema() {
return true;
}

protected Map<UploaderType, BigQueryRecordFormatter> getFormatterMap(final JsonNode jsonSchema) {
protected Map<UploaderType, BigQueryRecordFormatter> getFormatterMap() {
return Map.of(
UploaderType.STANDARD, new DefaultBigQueryRecordFormatter(jsonSchema, namingResolver),
UploaderType.CSV, new GcsCsvBigQueryRecordFormatter(jsonSchema, namingResolver));
UploaderType.STANDARD, new BigQueryRecordFormatter(namingResolver),
UploaderType.CSV, new BigQueryRecordFormatter(namingResolver));
}

private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuery bigquery,
Expand All @@ -364,7 +362,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
final Consumer<AirbyteMessage> outputRecordCollector,
final TyperDeduper typerDeduper)
throws Exception {
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> writeConfigs = getUploaderMap(
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> writeConfigs = getUploaderMap(
bigquery,
config,
catalog,
Expand All @@ -390,7 +388,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
LOGGER.info("Raw table {} not found, continuing with creation", rawTableId);
}
LOGGER.info("Creating table {}", rawTableId);
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, DefaultBigQueryRecordFormatter.SCHEMA_V2);
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, BigQueryRecordFormatter.SCHEMA_V2);
} else {
uploader.createRawTable();
}
Expand All @@ -415,7 +413,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
}

protected Function<JsonNode, BigQueryRecordFormatter> getCsvRecordFormatterCreator(final BigQuerySQLNameTransformer namingResolver) {
return streamSchema -> new GcsCsvBigQueryRecordFormatter(streamSchema, namingResolver);
return streamSchema -> new BigQueryRecordFormatter(namingResolver);
}

private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, final String namespace) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand All @@ -31,7 +31,7 @@ public BigQueryRecordStandardConsumer(Consumer<AirbyteMessage> outputRecordColle
BigQuery bigQuery,
ConfiguredAirbyteCatalog catalog,
Optional<String> defaultNamespace,
Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> uploaderMap) {
Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
super(outputRecordCollector,
onStart,
onClose,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
Expand Down Expand Up @@ -117,12 +116,6 @@ private Map<StreamDescriptor, BigQueryWriteConfig> createWriteConfigs(final Json
}

/**
* Sets up {@link BufferedStreamConsumer} with creation of the destination's raw tables
*
* <p>
* Note: targetTableId is synonymous with airbyte_raw table
* </p>
*
* @param bigQueryGcsOperations collection of Google Cloud Storage Operations
* @param writeConfigs configuration settings used to describe how to write data and where it exists
*/
Expand Down
Loading
Loading