Skip to content

Commit

Permalink
Merge branch 'master' into bnchrch/remove-approve-and-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
bnchrch authored Nov 30, 2023
2 parents 6f94d92 + 7aaaa06 commit 9ce9a24
Show file tree
Hide file tree
Showing 79 changed files with 815 additions and 277 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.6.0 | 2023-11-30 | [\#32888](https://github.com/airbytehq/airbyte/pull/32888) | JDBC destinations now use the async framework |
| 0.5.3 | 2023-11-28 | [\#32686](https://github.com/airbytehq/airbyte/pull/32686) | Better attribution of debezium engine shutdown due to heartbeat. |
| 0.5.1 | 2023-11-27 | [\#32662](https://github.com/airbytehq/airbyte/pull/32662) | Debezium initialization wait time will now read from initial setup time. |
| 0.5.0 | 2023-11-22 | [\#32656](https://github.com/airbytehq/airbyte/pull/32656) | Introduce TestDatabase test fixture, refactor database source test base classes. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -41,4 +42,12 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
return destination.getConsumer(config, catalog, outputRecordCollector);
}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
return destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@
* {@link io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction}.
* Separately out for easier versioning.
*/
public interface OnCloseFunction extends Consumer<Boolean> {}
public interface OnCloseFunction extends Consumer<Boolean> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ public PartialAirbyteMessage withState(final PartialAirbyteStateMessage state) {
return this;
}

/**
* For record messages, this stores the serialized data blob (i.e.
* {@code Jsons.serialize(message.getRecord().getData())}). For state messages, this stores the
* _entire_ message (i.e. {@code Jsons.serialize(message)}).
* <p>
* See
* {@link io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer#deserializeAirbyteMessage(String)}
* for the exact logic of how this field is populated.
*/
@JsonProperty("serialized")
public String getSerialized() {
return serialized;
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.5.3
version=0.6.0
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.util.List;
Expand All @@ -31,6 +33,7 @@
import java.util.UUID;
import java.util.function.Consumer;
import javax.sql.DataSource;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,7 +114,6 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch
* @param sqlOps - SqlOperations object
* @param attemptInsert - set true if need to make attempt to insert dummy records to newly created
* table. Set false to skip insert step.
* @throws Exception
*/
public static void attemptTableOperations(final String outputSchema,
final JdbcDatabase database,
Expand Down Expand Up @@ -155,12 +157,13 @@ public static void attemptTableOperations(final String outputSchema,
*
* @return AirbyteRecordMessage object with dummy values that may be used to test insert permission.
*/
private static AirbyteRecordMessage getDummyRecord() {
private static PartialAirbyteMessage getDummyRecord() {
final JsonNode dummyDataToInsert = Jsons.deserialize("{ \"field1\": true }");
return new AirbyteRecordMessage()
.withStream("stream1")
.withData(dummyDataToInsert)
.withEmittedAt(1602637589000L);
return new PartialAirbyteMessage()
.withRecord(new PartialAirbyteRecordMessage()
.withStream("stream1")
.withEmittedAt(1602637589000L))
.withSerialized(dummyDataToInsert.toString());
}

protected DataSource getDataSource(final JsonNode config) {
Expand Down Expand Up @@ -201,8 +204,27 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
return JdbcBufferedConsumerFactory.create(outputRecordCollector, getDatabase(getDataSource(config)), sqlOperations, namingResolver, config,
catalog);
throw new NotImplementedException("Should use the getSerializedMessageConsumer instead");
}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
getDatabase(getDataSource(config)),
sqlOperations,
namingResolver,
config,
catalog,
null,
// TODO populate the DV2 stuff
false,
null,
null,
null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,35 @@

package io.airbyte.cdk.integrations.destination.jdbc;

import static io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter;
import io.airbyte.cdk.integrations.destination.record_buffer.InMemoryRecordBufferingStrategy;
import io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination_async.OnCloseFunction;
import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -53,21 +57,30 @@ public class JdbcBufferedConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class);

public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SqlOperations sqlOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
public static SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SqlOperations sqlOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final String defaultNamespace,
final boolean use1s1t,
// TODO these are currently unused, but we'll need it for DV2 things
// specifically creating the dv2 raw tables instead of legacy tables
final ParsedCatalog parsedCatalog,
final TyperDeduper typerDeduper,
// TODO this is only needed if we want to do incremental T+D
final TypeAndDedupeOperationValve typerDeduperValve) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired());

return new BufferedStreamConsumer(
return new AsyncStreamConsumer(
outputRecordCollector,
onStartFunction(database, sqlOperations, writeConfigs),
new InMemoryRecordBufferingStrategy(recordWriterFunction(database, sqlOperations, writeConfigs, catalog), DEFAULT_MAX_BATCH_SIZE_BYTES),
onCloseFunction(),
onCloseFunction(use1s1t, typerDeduper),
new JdbcInsertFlushFunction(recordWriterFunction(database, sqlOperations, writeConfigs, catalog)),
catalog,
sqlOperations::isValidData);
new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.2)),
defaultNamespace,
Executors.newFixedThreadPool(2));
}

private static List<WriteConfig> createWriteConfigs(final NamingConventionTransformer namingResolver,
Expand All @@ -77,14 +90,12 @@ private static List<WriteConfig> createWriteConfigs(final NamingConventionTransf
if (schemaRequired) {
Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema.");
}
final Instant now = Instant.now();
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now, schemaRequired)).collect(Collectors.toList());
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, schemaRequired)).collect(Collectors.toList());
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
final NamingConventionTransformer namingResolver,
final JsonNode config,
final Instant now,
final boolean schemaRequired) {
return stream -> {
Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode");
Expand Down Expand Up @@ -133,11 +144,10 @@ private static String getOutputSchema(final AirbyteStream stream,
* @param database JDBC database to connect to
* @param sqlOperations interface for execution SQL queries
* @param writeConfigs settings for each stream
* @return
*/
private static OnStartFunction onStartFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs) {
final Collection<WriteConfig> writeConfigs) {
return () -> {
LOGGER.info("Preparing raw tables in destination started for {} streams", writeConfigs.size());
final List<String> queryList = new ArrayList<>();
Expand Down Expand Up @@ -168,12 +178,11 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database,
* @param sqlOperations interface of SQL queries to execute
* @param writeConfigs settings for each stream
* @param catalog catalog of all streams to sync
* @return
*/
private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
private static RecordWriter<PartialAirbyteMessage> recordWriterFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig = writeConfigs.stream()
.collect(Collectors.toUnmodifiableMap(JdbcBufferedConsumerFactory::toNameNamespacePair, Function.identity()));

Expand All @@ -190,11 +199,19 @@ private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final Jdb

/**
* Tear down functionality
*
* @return
*/
private static OnCloseFunction onCloseFunction() {
return (hasFailed) -> {};
private static OnCloseFunction onCloseFunction(final boolean use1s1t, final TyperDeduper typerDeduper) {
return (hasFailed) -> {
if (use1s1t) {
try {
typerDeduper.typeAndDedupe();
typerDeduper.commitFinalTables();
typerDeduper.cleanup();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
};
}

private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.jdbc;

import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter;
import io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants;
import io.airbyte.cdk.integrations.destination_async.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.stream.Stream;

public class JdbcInsertFlushFunction implements DestinationFlushFunction {

private final RecordWriter<PartialAirbyteMessage> recordWriter;

public JdbcInsertFlushFunction(final RecordWriter<PartialAirbyteMessage> recordWriter) {
this.recordWriter = recordWriter;
}

@Override
public void flush(final StreamDescriptor desc, final Stream<PartialAirbyteMessage> stream) throws Exception {
recordWriter.accept(
new AirbyteStreamNameNamespacePair(desc.getName(), desc.getNamespace()),
stream.toList());
}

@Override
public long getOptimalBatchSizeBytes() {
// TODO tune this value - currently SqlOperationUtils partitions 10K records per insert statement,
// but we'd like to stop doing that and instead control sql insert statement size via batch size.
return GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES;
}

}
Loading

0 comments on commit 9ce9a24

Please sign in to comment.