Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination redshift: async standard inserts #32888

Merged
merged 28 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2a59e85
redshift async standard inserts
edgao Nov 28, 2023
9fe0da3
Automated Commit - Formatting Changes
edgao Nov 28, 2023
00dc110
redshift version bump
edgao Nov 28, 2023
57a9fd4
fix batch size
edgao Nov 28, 2023
b87b935
try lowering memory limit
edgao Nov 28, 2023
b7640d6
even lower
edgao Nov 29, 2023
951e8eb
Merge branch 'master' into edgao/redshift/standard_inserts_async
edgao Nov 29, 2023
3021982
changelog
edgao Nov 29, 2023
89e2b47
undo
edgao Nov 29, 2023
617a75a
also update specmodifyingdestination
edgao Nov 29, 2023
1759186
Automated Commit - Formatting Changes
edgao Nov 29, 2023
cf3171f
stop supporting non-async consumers
edgao Nov 29, 2023
a6e37df
use async + partial message everywhere
edgao Nov 29, 2023
b73a47f
Automated Commit - Formatting Changes
edgao Nov 29, 2023
949be58
2mb batch
edgao Nov 29, 2023
6b74f57
Automated Commit - Formatting Changes
edgao Nov 29, 2023
0067616
actually lets use 5mb
edgao Nov 29, 2023
7128786
Merge branch 'master' into edgao/redshift/standard_inserts_async
edgao Nov 29, 2023
5f70fd0
delete unused code
edgao Nov 29, 2023
422af2e
Automated Commit - Formatting Changes
edgao Nov 29, 2023
90fb108
Fix: Revert airbyte-ci to 2.8.0 from 2.10.0 (#32954)
bnchrch Nov 29, 2023
b42a93e
revert to 25MB batch
edgao Nov 29, 2023
3d1ff9d
Merge branch 'master' into edgao/redshift/standard_inserts_async
edgao Nov 29, 2023
2051da9
Automated Commit - Formatting Changes
edgao Nov 29, 2023
1d2bca9
Merge branch 'master' into edgao/redshift/standard_inserts_async
edgao Nov 30, 2023
f4daa6a
cdk version bump
edgao Nov 30, 2023
b9b246f
switch back to published cdk artifact
edgao Nov 30, 2023
95c21d0
Merge branch 'master' into edgao/redshift/standard_inserts_async
edgao Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -41,4 +42,12 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
return destination.getConsumer(config, catalog, outputRecordCollector);
}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't used by redshift, just adding it since I'm in this part of the code anyway

final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
return destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@
* {@link io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction}.
* Separately out for easier versioning.
*/
public interface OnCloseFunction extends Consumer<Boolean> {}
public interface OnCloseFunction extends Consumer<Boolean> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ public PartialAirbyteMessage withState(final PartialAirbyteStateMessage state) {
return this;
}

/**
* For record messages, this stores the serialized data blob (i.e.
* {@code Jsons.serialize(message.getRecord().getData())}). For state messages, this stores the
* _entire_ message (i.e. {@code Jsons.serialize(message)}).
* <p>
* See
* {@link io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer#deserializeAirbyteMessage(String)}
* for the exact logic of how this field is populated.
*/
@JsonProperty("serialized")
public String getSerialized() {
return serialized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.util.List;
Expand All @@ -31,6 +33,7 @@
import java.util.UUID;
import java.util.function.Consumer;
import javax.sql.DataSource;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,7 +114,6 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch
* @param sqlOps - SqlOperations object
* @param attemptInsert - set true if need to make attempt to insert dummy records to newly created
* table. Set false to skip insert step.
* @throws Exception
*/
public static void attemptTableOperations(final String outputSchema,
final JdbcDatabase database,
Expand Down Expand Up @@ -155,12 +157,13 @@ public static void attemptTableOperations(final String outputSchema,
*
* @return AirbyteRecordMessage object with dummy values that may be used to test insert permission.
*/
private static AirbyteRecordMessage getDummyRecord() {
private static PartialAirbyteMessage getDummyRecord() {
final JsonNode dummyDataToInsert = Jsons.deserialize("{ \"field1\": true }");
return new AirbyteRecordMessage()
.withStream("stream1")
.withData(dummyDataToInsert)
.withEmittedAt(1602637589000L);
return new PartialAirbyteMessage()
.withRecord(new PartialAirbyteRecordMessage()
.withStream("stream1")
.withEmittedAt(1602637589000L))
.withSerialized(dummyDataToInsert.toString());
}

protected DataSource getDataSource(final JsonNode config) {
Expand Down Expand Up @@ -201,8 +204,27 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
return JdbcBufferedConsumerFactory.create(outputRecordCollector, getDatabase(getDataSource(config)), sqlOperations, namingResolver, config,
catalog);
throw new NotImplementedException("Should use the getSerializedMessageConsumer instead");
}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
getDatabase(getDataSource(config)),
sqlOperations,
namingResolver,
config,
catalog,
null,
// TODO populate the DV2 stuff
false,
null,
null,
null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,35 @@

package io.airbyte.cdk.integrations.destination.jdbc;

import static io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter;
import io.airbyte.cdk.integrations.destination.record_buffer.InMemoryRecordBufferingStrategy;
import io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination_async.OnCloseFunction;
import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -53,21 +57,30 @@ public class JdbcBufferedConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class);

public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SqlOperations sqlOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
public static SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SqlOperations sqlOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final String defaultNamespace,
final boolean use1s1t,
// TODO these are currently unused, but we'll need it for DV2 things
// specifically creating the dv2 raw tables instead of legacy tables
final ParsedCatalog parsedCatalog,
final TyperDeduper typerDeduper,
// TODO this is only needed if we want to do incremental T+D
final TypeAndDedupeOperationValve typerDeduperValve) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired());

return new BufferedStreamConsumer(
return new AsyncStreamConsumer(
outputRecordCollector,
onStartFunction(database, sqlOperations, writeConfigs),
new InMemoryRecordBufferingStrategy(recordWriterFunction(database, sqlOperations, writeConfigs, catalog), DEFAULT_MAX_BATCH_SIZE_BYTES),
onCloseFunction(),
onCloseFunction(use1s1t, typerDeduper),
new JdbcInsertFlushFunction(recordWriterFunction(database, sqlOperations, writeConfigs, catalog)),
catalog,
sqlOperations::isValidData);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every implementation of this method is just return true 🤷 but deleting it looks like a nontrivial refactor so ignoring it for now. It's not used in the async framework at all, and we don't want this functionality to begin with - it's dropping "invalid" records, which we don't want to do.

new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.2)),
defaultNamespace,
Executors.newFixedThreadPool(2));
}

private static List<WriteConfig> createWriteConfigs(final NamingConventionTransformer namingResolver,
Expand All @@ -77,14 +90,12 @@ private static List<WriteConfig> createWriteConfigs(final NamingConventionTransf
if (schemaRequired) {
Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema.");
}
final Instant now = Instant.now();
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now, schemaRequired)).collect(Collectors.toList());
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, schemaRequired)).collect(Collectors.toList());
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
final NamingConventionTransformer namingResolver,
final JsonNode config,
final Instant now,
final boolean schemaRequired) {
return stream -> {
Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode");
Expand Down Expand Up @@ -133,11 +144,10 @@ private static String getOutputSchema(final AirbyteStream stream,
* @param database JDBC database to connect to
* @param sqlOperations interface for execution SQL queries
* @param writeConfigs settings for each stream
* @return
*/
private static OnStartFunction onStartFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs) {
final Collection<WriteConfig> writeConfigs) {
return () -> {
LOGGER.info("Preparing raw tables in destination started for {} streams", writeConfigs.size());
final List<String> queryList = new ArrayList<>();
Expand Down Expand Up @@ -168,12 +178,11 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database,
* @param sqlOperations interface of SQL queries to execute
* @param writeConfigs settings for each stream
* @param catalog catalog of all streams to sync
* @return
*/
private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
private static RecordWriter<PartialAirbyteMessage> recordWriterFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig = writeConfigs.stream()
.collect(Collectors.toUnmodifiableMap(JdbcBufferedConsumerFactory::toNameNamespacePair, Function.identity()));

Expand All @@ -190,11 +199,19 @@ private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final Jdb

/**
* Tear down functionality
*
* @return
*/
private static OnCloseFunction onCloseFunction() {
return (hasFailed) -> {};
private static OnCloseFunction onCloseFunction(final boolean use1s1t, final TyperDeduper typerDeduper) {
return (hasFailed) -> {
if (use1s1t) {
try {
typerDeduper.typeAndDedupe();
typerDeduper.commitFinalTables();
typerDeduper.cleanup();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
};
}

private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.jdbc;

import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter;
import io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants;
import io.airbyte.cdk.integrations.destination_async.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.stream.Stream;

public class JdbcInsertFlushFunction implements DestinationFlushFunction {

private final RecordWriter<PartialAirbyteMessage> recordWriter;

public JdbcInsertFlushFunction(final RecordWriter<PartialAirbyteMessage> recordWriter) {
this.recordWriter = recordWriter;
}

@Override
public void flush(final StreamDescriptor desc, final Stream<PartialAirbyteMessage> stream) throws Exception {
recordWriter.accept(
new AirbyteStreamNameNamespacePair(desc.getName(), desc.getNamespace()),
stream.toList());
}

@Override
public long getOptimalBatchSizeBytes() {
// TODO tune this value - currently SqlOperationUtils partitions 10K records per insert statement,
// but we'd like to stop doing that and instead control sql insert statement size via batch size.
return GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
Expand Down Expand Up @@ -148,16 +149,20 @@ public boolean isValidData(final JsonNode data) {

@Override
public final void insertRecords(final JdbcDatabase database,
final List<AirbyteRecordMessage> records,
final List<PartialAirbyteMessage> records,
final String schemaName,
final String tableName)
throws Exception {
dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> adapter.adapt(airbyteRecordMessage.getData())));
dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only DataAdapter in existence is destination-postgres, because of https://airbytehq-team.slack.com/archives/C03C4AVJWG4/p1700605909300539 / #3476

so I don't feel bad about de+reserializing here

final JsonNode data = Jsons.deserializeExact(airbyteRecordMessage.getSerialized());
adapter.adapt(data);
airbyteRecordMessage.setSerialized(Jsons.serialize(data));
}));
insertRecordsInternal(database, records, schemaName, tableName);
}

protected abstract void insertRecordsInternal(JdbcDatabase database,
List<AirbyteRecordMessage> records,
List<PartialAirbyteMessage> records,
String schemaName,
String tableName)
throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -98,7 +98,7 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN
* @param tableName Name of table
* @throws Exception exception
*/
void insertRecords(JdbcDatabase database, List<AirbyteRecordMessage> records, String schemaName, String tableName) throws Exception;
void insertRecords(JdbcDatabase database, List<PartialAirbyteMessage> records, String schemaName, String tableName) throws Exception;

/**
* Query to insert all records from source table to destination table. Both tables must be in the
Expand Down
Loading
Loading