From 2a59e852e51bff07dd4fd2045d1f4dbf5807ab93 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 28 Nov 2023 10:26:59 -0800 Subject: [PATCH 01/23] redshift async standard inserts --- .../destination_async/OnCloseFunction.java | 12 +++- .../PartialAirbyteMessage.java | 20 ++++++ .../jdbc/AbstractJdbcDestination.java | 23 ++++++- .../jdbc/JdbcBufferedConsumerFactory.java | 62 ++++++++++++++++--- .../jdbc/JdbcInsertFlushFunction.java | 34 ++++++++++ .../destination-redshift/build.gradle | 2 +- 6 files changed, 143 insertions(+), 10 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java index afdbc76498d8..c5c8a8b71313 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java @@ -11,4 +11,14 @@ * {@link io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction}. * Separately out for easier versioning. */ -public interface OnCloseFunction extends Consumer {} +public interface OnCloseFunction extends Consumer { + static OnCloseFunction fromNonAsync(final io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction legacy) { + return (success) -> { + try { + legacy.accept(success); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }; + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java index 0b4a84b991e3..81c57984d3b1 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java @@ -6,7 +6,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.util.Objects; public class PartialAirbyteMessage { @@ -71,6 +73,13 @@ public PartialAirbyteMessage withState(final PartialAirbyteStateMessage state) { return this; } + /** + * For record messages, this stores the serialized data blob (i.e. {@code Jsons.serialize(message.getRecord().getData())}). + * For state messages, this stores the _entire_ message (i.e. {@code Jsons.serialize(message)}). + *

+ * See {@link io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer#deserializeAirbyteMessage(String)} + * for the exact logic of how this field is populated. + */ @JsonProperty("serialized") public String getSerialized() { return serialized; @@ -114,4 +123,15 @@ public String toString() { '}'; } + public AirbyteRecordMessage getFullRecordMessage() { + if (type != AirbyteMessage.Type.RECORD || record == null) { + throw new IllegalStateException("Cannot get full record message for non-record message"); + } + return new AirbyteRecordMessage() + .withNamespace(record.getNamespace()) + .withStream(record.getStream()) + .withData(Jsons.deserializeExact(getSerialized())) + .withEmittedAt(record.getEmittedAt()); + } + } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index d3c71c6951a4..e7142bb67066 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -15,6 +15,7 @@ import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.cdk.integrations.base.Destination; +import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; import io.airbyte.commons.exceptions.ConnectionErrorException; import io.airbyte.commons.json.Jsons; @@ -111,7 +112,6 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch * @param sqlOps - SqlOperations object * @param attemptInsert - set true if need to make attempt to insert dummy records to newly created * table. Set false to skip insert step. - * @throws Exception */ public static void attemptTableOperations(final String outputSchema, final JdbcDatabase database, @@ -205,4 +205,25 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, catalog); } + @Override + public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) + throws Exception { + return JdbcBufferedConsumerFactory.createAsync( + outputRecordCollector, + getDatabase(getDataSource(config)), + sqlOperations, + namingResolver, + config, + catalog, + null, + // TODO populate the DV2 stuff + false, + null, + null, + null + ); + } + } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 7262c7a9f3dd..95bf2674a078 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -11,13 +11,19 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; +import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter; import io.airbyte.cdk.integrations.destination.record_buffer.InMemoryRecordBufferingStrategy; +import io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer; +import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; +import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve; +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.AirbyteStream; @@ -27,11 +33,14 @@ import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +79,32 @@ public static AirbyteMessageConsumer create(final Consumer outpu sqlOperations::isValidData); } + public static SerializedAirbyteMessageConsumer createAsync(final Consumer outputRecordCollector, + final JdbcDatabase database, + final SqlOperations sqlOperations, + final NamingConventionTransformer namingResolver, + final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final String defaultNamespace, + // TODO these are currently unused, but we'll need them for DV2 things + final boolean use1s1t, + final ParsedCatalog parsedCatalog, + final TyperDeduper typerDeduper, + final TypeAndDedupeOperationValve typerDeduperValve) { + final List writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired()); + final Map streamDescToWriteConfig = toMap(writeConfigs); + return new AsyncStreamConsumer( + outputRecordCollector, + onStartFunction(database, sqlOperations, writeConfigs), + io.airbyte.cdk.integrations.destination_async.OnCloseFunction.fromNonAsync(onCloseFunction()), + new JdbcInsertFlushFunction(recordWriterFunction(database, sqlOperations, writeConfigs, catalog)), + catalog, + new BufferManager((long) (Runtime.getRuntime().maxMemory() * BufferManager.MEMORY_LIMIT_RATIO)), + defaultNamespace, + Executors.newFixedThreadPool(2) + ); + } + private static List createWriteConfigs(final NamingConventionTransformer namingResolver, final JsonNode config, final ConfiguredAirbyteCatalog catalog, @@ -133,11 +168,10 @@ private static String getOutputSchema(final AirbyteStream stream, * @param database JDBC database to connect to * @param sqlOperations interface for execution SQL queries * @param writeConfigs settings for each stream - * @return */ private static OnStartFunction onStartFunction(final JdbcDatabase database, final SqlOperations sqlOperations, - final List writeConfigs) { + final Collection writeConfigs) { return () -> { LOGGER.info("Preparing raw tables in destination started for {} streams", writeConfigs.size()); final List queryList = new ArrayList<>(); @@ -168,14 +202,12 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database, * @param sqlOperations interface of SQL queries to execute * @param writeConfigs settings for each stream * @param catalog catalog of all streams to sync - * @return */ private static RecordWriter recordWriterFunction(final JdbcDatabase database, final SqlOperations sqlOperations, final List writeConfigs, final ConfiguredAirbyteCatalog catalog) { - final Map pairToWriteConfig = writeConfigs.stream() - .collect(Collectors.toUnmodifiableMap(JdbcBufferedConsumerFactory::toNameNamespacePair, Function.identity())); + final Map pairToWriteConfig = toMap(writeConfigs); return (pair, records) -> { if (!pairToWriteConfig.containsKey(pair)) { @@ -188,12 +220,28 @@ private static RecordWriter recordWriterFunction(final Jdb }; } + @NotNull + private static Map toMap(final List writeConfigs) { + return writeConfigs.stream() + .collect(Collectors.toUnmodifiableMap(JdbcBufferedConsumerFactory::toNameNamespacePair, Function.identity())); + } + /** * Tear down functionality - * - * @return */ private static OnCloseFunction onCloseFunction() { + // TODO something like this for DV2 +// (hasFailed) -> { +// if (use1s1t) { +// try { +// typerDeduper.typeAndDedupe(); +// typerDeduper.commitFinalTables(); +// typerDeduper.cleanup(); +// } catch (final Exception e) { +// throw new RuntimeException(e); +// } +// } +// } return (hasFailed) -> {}; } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java new file mode 100644 index 000000000000..22885caf0ae7 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java @@ -0,0 +1,34 @@ +package io.airbyte.cdk.integrations.destination.jdbc; + +import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter; +import io.airbyte.cdk.integrations.destination_async.DestinationFlushFunction; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import java.util.stream.Stream; + +public class JdbcInsertFlushFunction implements DestinationFlushFunction { + + private final RecordWriter recordWriter; + + public JdbcInsertFlushFunction(final RecordWriter recordWriter) { + this.recordWriter = recordWriter; + } + + @Override + public void flush(final StreamDescriptor desc, final Stream stream) throws Exception { + // TODO we can probably implement this better - use the serialized data string directly instead of redeserializing it for insertRecords + recordWriter.accept( + new AirbyteStreamNameNamespacePair(desc.getName(), desc.getNamespace()), + stream.map(PartialAirbyteMessage::getFullRecordMessage).toList() + ); + } + + @Override + public long getOptimalBatchSizeBytes() { + // SQL statements probably shouldn't be very large + // so limit ourselves to 5MB of data per insert statement + return 5 * 1024 * 1024; + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index 8b3bd50ba5fd..bca7f425502e 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -6,7 +6,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.4.1' features = ['db-destinations', 's3-destinations'] - useLocalCdk = false + useLocalCdk = true } //remove once upgrading the CDK version to 0.4.x or later From 9fe0da3517ffd43875c7c8358329d45eb73179f4 Mon Sep 17 00:00:00 2001 From: edgao Date: Tue, 28 Nov 2023 18:33:02 +0000 Subject: [PATCH 02/23] Automated Commit - Formatting Changes --- .../destination_async/OnCloseFunction.java | 2 ++ .../PartialAirbyteMessage.java | 8 +++--- .../jdbc/AbstractJdbcDestination.java | 3 +-- .../jdbc/JdbcBufferedConsumerFactory.java | 25 +++++++++---------- .../jdbc/JdbcInsertFlushFunction.java | 11 +++++--- 5 files changed, 28 insertions(+), 21 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java index c5c8a8b71313..42e4ee0134a8 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java @@ -12,6 +12,7 @@ * Separately out for easier versioning. */ public interface OnCloseFunction extends Consumer { + static OnCloseFunction fromNonAsync(final io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction legacy) { return (success) -> { try { @@ -21,4 +22,5 @@ static OnCloseFunction fromNonAsync(final io.airbyte.cdk.integrations.destinatio } }; } + } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java index 81c57984d3b1..61d282f4c27f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java @@ -74,10 +74,12 @@ public PartialAirbyteMessage withState(final PartialAirbyteStateMessage state) { } /** - * For record messages, this stores the serialized data blob (i.e. {@code Jsons.serialize(message.getRecord().getData())}). - * For state messages, this stores the _entire_ message (i.e. {@code Jsons.serialize(message)}). + * For record messages, this stores the serialized data blob (i.e. + * {@code Jsons.serialize(message.getRecord().getData())}). For state messages, this stores the + * _entire_ message (i.e. {@code Jsons.serialize(message)}). *

- * See {@link io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer#deserializeAirbyteMessage(String)} + * See + * {@link io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer#deserializeAirbyteMessage(String)} * for the exact logic of how this field is populated. */ @JsonProperty("serialized") diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index e7142bb67066..899e2c59b55c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -222,8 +222,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN false, null, null, - null - ); + null); } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 95bf2674a078..78dd7eeee879 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -101,8 +101,7 @@ public static SerializedAirbyteMessageConsumer createAsync(final Consumer createWriteConfigs(final NamingConventionTransformer namingResolver, @@ -231,17 +230,17 @@ private static Map toMap(final List */ private static OnCloseFunction onCloseFunction() { // TODO something like this for DV2 -// (hasFailed) -> { -// if (use1s1t) { -// try { -// typerDeduper.typeAndDedupe(); -// typerDeduper.commitFinalTables(); -// typerDeduper.cleanup(); -// } catch (final Exception e) { -// throw new RuntimeException(e); -// } -// } -// } + // (hasFailed) -> { + // if (use1s1t) { + // try { + // typerDeduper.typeAndDedupe(); + // typerDeduper.commitFinalTables(); + // typerDeduper.cleanup(); + // } catch (final Exception e) { + // throw new RuntimeException(e); + // } + // } + // } return (hasFailed) -> {}; } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java index 22885caf0ae7..bb063fde5a82 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.cdk.integrations.destination.jdbc; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter; @@ -18,11 +22,11 @@ public JdbcInsertFlushFunction(final RecordWriter recordWr @Override public void flush(final StreamDescriptor desc, final Stream stream) throws Exception { - // TODO we can probably implement this better - use the serialized data string directly instead of redeserializing it for insertRecords + // TODO we can probably implement this better - use the serialized data string directly instead of + // redeserializing it for insertRecords recordWriter.accept( new AirbyteStreamNameNamespacePair(desc.getName(), desc.getNamespace()), - stream.map(PartialAirbyteMessage::getFullRecordMessage).toList() - ); + stream.map(PartialAirbyteMessage::getFullRecordMessage).toList()); } @Override @@ -31,4 +35,5 @@ public long getOptimalBatchSizeBytes() { // so limit ourselves to 5MB of data per insert statement return 5 * 1024 * 1024; } + } From 00dc110679673c8880b51bec612149f3f2644219 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 28 Nov 2023 10:36:27 -0800 Subject: [PATCH 03/23] redshift version bump --- .../connectors/destination-redshift/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index ab22a0b9be36..ff29dd661b59 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 0.6.10 + dockerImageTag: 0.6.11 dockerRepository: airbyte/destination-redshift githubIssueLabel: destination-redshift icon: redshift.svg From 57a9fd41b00f57ff4d6b9834b32d435526595ff3 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 28 Nov 2023 12:02:06 -0800 Subject: [PATCH 04/23] fix batch size --- .../destination/jdbc/JdbcInsertFlushFunction.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java index bb063fde5a82..d9da252f4748 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter; +import io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants; import io.airbyte.cdk.integrations.destination_async.DestinationFlushFunction; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; @@ -31,9 +32,7 @@ public void flush(final StreamDescriptor desc, final Stream Date: Tue, 28 Nov 2023 14:51:38 -0800 Subject: [PATCH 05/23] try lowering memory limit --- .../destination/jdbc/JdbcBufferedConsumerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 78dd7eeee879..5a398da581b7 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -99,7 +99,7 @@ public static SerializedAirbyteMessageConsumer createAsync(final Consumer Date: Tue, 28 Nov 2023 17:11:14 -0800 Subject: [PATCH 06/23] even lower --- .../destination/jdbc/JdbcBufferedConsumerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 5a398da581b7..23098cfa5edc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -99,7 +99,7 @@ public static SerializedAirbyteMessageConsumer createAsync(final Consumer Date: Wed, 29 Nov 2023 09:04:27 -0800 Subject: [PATCH 07/23] changelog --- docs/integrations/destinations/redshift.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index fdf6ba95b407..5e88848f3f63 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -156,7 +156,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.6.10 | 2023-11-06 | [#32193](https://github.com/airbytehq/airbyte/pull/32193) | Adopt java CDK version 0.4.1. | +| 0.6.11 | 2023-11-29 | [#32888](https://github.com/airbytehq/airbyte/pull/32888) | Use the new async framework. | +| 0.6.10 | 2023-11-06 | [#32193](https://github.com/airbytehq/airbyte/pull/32193) | Adopt java CDK version 0.4.1. | | 0.6.9 | 2023-10-10 | [\#31083](https://github.com/airbytehq/airbyte/pull/31083) | Fix precision of numeric values in async destinations | | 0.6.8 | 2023-10-10 | [\#31218](https://github.com/airbytehq/airbyte/pull/31218) | Clarify configuration groups | | 0.6.7 | 2023-10-06 | [\#31153](https://github.com/airbytehq/airbyte/pull/31153) | Increase jvm GC retries | From 89e2b476ae5ba195158cf800a25539bc471b2ffb Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 29 Nov 2023 09:08:05 -0800 Subject: [PATCH 08/23] undo --- .../destination/jdbc/JdbcBufferedConsumerFactory.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 23098cfa5edc..7493d164bf87 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -92,7 +92,6 @@ public static SerializedAirbyteMessageConsumer createAsync(final Consumer writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired()); - final Map streamDescToWriteConfig = toMap(writeConfigs); return new AsyncStreamConsumer( outputRecordCollector, onStartFunction(database, sqlOperations, writeConfigs), @@ -206,7 +205,8 @@ private static RecordWriter recordWriterFunction(final Jdb final SqlOperations sqlOperations, final List writeConfigs, final ConfiguredAirbyteCatalog catalog) { - final Map pairToWriteConfig = toMap(writeConfigs); + final Map pairToWriteConfig = writeConfigs.stream() + .collect(Collectors.toUnmodifiableMap(JdbcBufferedConsumerFactory::toNameNamespacePair, Function.identity())); return (pair, records) -> { if (!pairToWriteConfig.containsKey(pair)) { @@ -219,12 +219,6 @@ private static RecordWriter recordWriterFunction(final Jdb }; } - @NotNull - private static Map toMap(final List writeConfigs) { - return writeConfigs.stream() - .collect(Collectors.toUnmodifiableMap(JdbcBufferedConsumerFactory::toNameNamespacePair, Function.identity())); - } - /** * Tear down functionality */ From 617a75aaba8eb0a9e64feec528414a8f018e6b83 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 29 Nov 2023 09:12:34 -0800 Subject: [PATCH 09/23] also update specmodifyingdestination --- .../base/spec_modification/SpecModifyingDestination.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.java index 4e2601e1c208..209c98fdf0ae 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/spec_modification/SpecModifyingDestination.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.Destination; +import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; @@ -41,4 +42,12 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, return destination.getConsumer(config, catalog, outputRecordCollector); } + @Override + public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) + throws Exception { + return destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector); + } + } From 17591869f55f3484ba790080c41a67d345382de4 Mon Sep 17 00:00:00 2001 From: edgao Date: Wed, 29 Nov 2023 17:21:05 +0000 Subject: [PATCH 10/23] Automated Commit - Formatting Changes --- .../destination/jdbc/JdbcBufferedConsumerFactory.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 7493d164bf87..dea2e2e9764d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -40,7 +40,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From cf3171f953917d2dd66d79212902b7cedcdafdba Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 29 Nov 2023 09:24:56 -0800 Subject: [PATCH 11/23] stop supporting non-async consumers --- .../destination/jdbc/AbstractJdbcDestination.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index 899e2c59b55c..8468f0acb0a9 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -32,6 +32,7 @@ import java.util.UUID; import java.util.function.Consumer; import javax.sql.DataSource; +import org.apache.commons.lang3.NotImplementedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -201,8 +202,7 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map outputRecordCollector) { - return JdbcBufferedConsumerFactory.create(outputRecordCollector, getDatabase(getDataSource(config)), sqlOperations, namingResolver, config, - catalog); + throw new NotImplementedException("Should use the getSerializedMessageConsumer instead"); } @Override From a6e37dff331a5860e0166107f7d35471260ace6d Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 29 Nov 2023 09:35:16 -0800 Subject: [PATCH 12/23] use async + partial message everywhere --- .../jdbc/AbstractJdbcDestination.java | 13 ++-- .../jdbc/JdbcBufferedConsumerFactory.java | 69 +++++++------------ .../jdbc/JdbcInsertFlushFunction.java | 8 +-- .../destination/jdbc/JdbcSqlOperations.java | 11 ++- .../destination/jdbc/SqlOperations.java | 3 +- .../destination/jdbc/SqlOperationsUtils.java | 16 ++--- .../jdbc/TestJdbcSqlOperations.java | 7 +- .../operations/RedshiftSqlOperations.java | 3 +- 8 files changed, 58 insertions(+), 72 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index 8468f0acb0a9..ec986cca31f4 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -17,6 +17,8 @@ import io.airbyte.cdk.integrations.base.Destination; import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage; import io.airbyte.commons.exceptions.ConnectionErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.map.MoreMaps; @@ -156,12 +158,13 @@ public static void attemptTableOperations(final String outputSchema, * * @return AirbyteRecordMessage object with dummy values that may be used to test insert permission. */ - private static AirbyteRecordMessage getDummyRecord() { + private static PartialAirbyteMessage getDummyRecord() { final JsonNode dummyDataToInsert = Jsons.deserialize("{ \"field1\": true }"); - return new AirbyteRecordMessage() - .withStream("stream1") - .withData(dummyDataToInsert) - .withEmittedAt(1602637589000L); + return new PartialAirbyteMessage() + .withRecord(new PartialAirbyteRecordMessage() + .withStream("stream1") + .withEmittedAt(1602637589000L)) + .withSerialized(dummyDataToInsert.toString()); } protected DataSource getDataSource(final JsonNode config) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index dea2e2e9764d..fcdbe1c1903e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -4,22 +4,19 @@ package io.airbyte.cdk.integrations.destination.jdbc; -import static io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES; - import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter; -import io.airbyte.cdk.integrations.destination.record_buffer.InMemoryRecordBufferingStrategy; import io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer; +import io.airbyte.cdk.integrations.destination_async.OnCloseFunction; import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve; @@ -31,7 +28,6 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.DestinationSyncMode; -import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -61,23 +57,6 @@ public class JdbcBufferedConsumerFactory { private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class); - public static AirbyteMessageConsumer create(final Consumer outputRecordCollector, - final JdbcDatabase database, - final SqlOperations sqlOperations, - final NamingConventionTransformer namingResolver, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog) { - final List writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired()); - - return new BufferedStreamConsumer( - outputRecordCollector, - onStartFunction(database, sqlOperations, writeConfigs), - new InMemoryRecordBufferingStrategy(recordWriterFunction(database, sqlOperations, writeConfigs, catalog), DEFAULT_MAX_BATCH_SIZE_BYTES), - onCloseFunction(), - catalog, - sqlOperations::isValidData); - } - public static SerializedAirbyteMessageConsumer createAsync(final Consumer outputRecordCollector, final JdbcDatabase database, final SqlOperations sqlOperations, @@ -85,16 +64,18 @@ public static SerializedAirbyteMessageConsumer createAsync(final Consumer writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired()); return new AsyncStreamConsumer( outputRecordCollector, onStartFunction(database, sqlOperations, writeConfigs), - io.airbyte.cdk.integrations.destination_async.OnCloseFunction.fromNonAsync(onCloseFunction()), + onCloseFunction(use1s1t, typerDeduper), new JdbcInsertFlushFunction(recordWriterFunction(database, sqlOperations, writeConfigs, catalog)), catalog, new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.2)), @@ -109,14 +90,12 @@ private static List createWriteConfigs(final NamingConventionTransf if (schemaRequired) { Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema."); } - final Instant now = Instant.now(); - return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now, schemaRequired)).collect(Collectors.toList()); + return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, schemaRequired)).collect(Collectors.toList()); } private static Function toWriteConfig( final NamingConventionTransformer namingResolver, final JsonNode config, - final Instant now, final boolean schemaRequired) { return stream -> { Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode"); @@ -200,10 +179,10 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database, * @param writeConfigs settings for each stream * @param catalog catalog of all streams to sync */ - private static RecordWriter recordWriterFunction(final JdbcDatabase database, - final SqlOperations sqlOperations, - final List writeConfigs, - final ConfiguredAirbyteCatalog catalog) { + private static RecordWriter recordWriterFunction(final JdbcDatabase database, + final SqlOperations sqlOperations, + final List writeConfigs, + final ConfiguredAirbyteCatalog catalog) { final Map pairToWriteConfig = writeConfigs.stream() .collect(Collectors.toUnmodifiableMap(JdbcBufferedConsumerFactory::toNameNamespacePair, Function.identity())); @@ -221,20 +200,18 @@ private static RecordWriter recordWriterFunction(final Jdb /** * Tear down functionality */ - private static OnCloseFunction onCloseFunction() { - // TODO something like this for DV2 - // (hasFailed) -> { - // if (use1s1t) { - // try { - // typerDeduper.typeAndDedupe(); - // typerDeduper.commitFinalTables(); - // typerDeduper.cleanup(); - // } catch (final Exception e) { - // throw new RuntimeException(e); - // } - // } - // } - return (hasFailed) -> {}; + private static OnCloseFunction onCloseFunction(final boolean use1s1t, final TyperDeduper typerDeduper) { + return (hasFailed) -> { + if (use1s1t) { + try { + typerDeduper.typeAndDedupe(); + typerDeduper.commitFinalTables(); + typerDeduper.cleanup(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + }; } private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java index d9da252f4748..84c365dde553 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java @@ -15,19 +15,17 @@ public class JdbcInsertFlushFunction implements DestinationFlushFunction { - private final RecordWriter recordWriter; + private final RecordWriter recordWriter; - public JdbcInsertFlushFunction(final RecordWriter recordWriter) { + public JdbcInsertFlushFunction(final RecordWriter recordWriter) { this.recordWriter = recordWriter; } @Override public void flush(final StreamDescriptor desc, final Stream stream) throws Exception { - // TODO we can probably implement this better - use the serialized data string directly instead of - // redeserializing it for insertRecords recordWriter.accept( new AirbyteStreamNameNamespacePair(desc.getName(), desc.getNamespace()), - stream.map(PartialAirbyteMessage::getFullRecordMessage).toList()); + stream.toList()); } @Override diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java index 4b62d3340565..433081ac46ca 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; @@ -148,16 +149,20 @@ public boolean isValidData(final JsonNode data) { @Override public final void insertRecords(final JdbcDatabase database, - final List records, + final List records, final String schemaName, final String tableName) throws Exception { - dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> adapter.adapt(airbyteRecordMessage.getData()))); + dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> { + final JsonNode data = Jsons.deserializeExact(airbyteRecordMessage.getSerialized()); + adapter.adapt(data); + airbyteRecordMessage.setSerialized(Jsons.serialize(data)); + })); insertRecordsInternal(database, records, schemaName, tableName); } protected abstract void insertRecordsInternal(JdbcDatabase database, - List records, + List records, String schemaName, String tableName) throws Exception; diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.java index fab0568d497b..9b2fceaade49 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.util.List; import org.slf4j.Logger; @@ -98,7 +99,7 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN * @param tableName Name of table * @throws Exception exception */ - void insertRecords(JdbcDatabase database, List records, String schemaName, String tableName) throws Exception; + void insertRecords(JdbcDatabase database, List records, String schemaName, String tableName) throws Exception; /** * Query to insert all records from source table to destination table. Both tables must be in the diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.java index 1e890647446a..926023f00d48 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.java @@ -7,7 +7,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.commons.json.Jsons; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -33,7 +33,7 @@ public class SqlOperationsUtils { public static void insertRawRecordsInSingleQuery(final String insertQueryComponent, final String recordQueryComponent, final JdbcDatabase jdbcDatabase, - final List records) + final List records) throws SQLException { insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID, true); } @@ -54,7 +54,7 @@ public static void insertRawRecordsInSingleQuery(final String insertQueryCompone public static void insertRawRecordsInSingleQueryNoSem(final String insertQueryComponent, final String recordQueryComponent, final JdbcDatabase jdbcDatabase, - final List records) + final List records) throws SQLException { insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID, false); } @@ -63,7 +63,7 @@ public static void insertRawRecordsInSingleQueryNoSem(final String insertQueryCo static void insertRawRecordsInSingleQuery(final String insertQueryComponent, final String recordQueryComponent, final JdbcDatabase jdbcDatabase, - final List records, + final List records, final Supplier uuidSupplier, final boolean sem) throws SQLException { @@ -83,7 +83,7 @@ static void insertRawRecordsInSingleQuery(final String insertQueryComponent, // how many records can be inserted at once // TODO(sherif) this should use a smarter, destination-aware partitioning scheme instead of 10k by // default - for (List partition : Iterables.partition(records, 10_000)) { + for (final List partition : Iterables.partition(records, 10_000)) { final StringBuilder sql = new StringBuilder(insertQueryComponent); partition.forEach(r -> sql.append(recordQueryComponent)); final String s = sql.toString(); @@ -92,11 +92,11 @@ static void insertRawRecordsInSingleQuery(final String insertQueryComponent, try (final PreparedStatement statement = connection.prepareStatement(s1)) { // second loop: bind values to the SQL string. int i = 1; - for (final AirbyteRecordMessage message : partition) { + for (final PartialAirbyteMessage message : partition) { // 1-indexed statement.setString(i, uuidSupplier.get().toString()); - statement.setString(i + 1, Jsons.serialize(message.getData())); - statement.setTimestamp(i + 2, Timestamp.from(Instant.ofEpochMilli(message.getEmittedAt()))); + statement.setString(i + 1, message.getSerialized()); + statement.setTimestamp(i + 2, Timestamp.from(Instant.ofEpochMilli(message.getRecord().getEmittedAt()))); i += 3; } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.java index 0f6e72481ddb..598fd98a35ef 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.java @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc; import io.airbyte.cdk.db.jdbc.JdbcDatabase; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.sql.SQLException; import java.util.List; @@ -16,7 +17,7 @@ public class TestJdbcSqlOperations extends JdbcSqlOperations { @Override public void insertRecordsInternal(final JdbcDatabase database, - final List records, + final List records, final String schemaName, final String tableName) throws Exception { @@ -29,11 +30,11 @@ public void testCreateSchemaIfNotExists() { final var schemaName = "foo"; try { Mockito.doThrow(new SQLException("TEST")).when(db).execute(Mockito.anyString()); - } catch (Exception e) { + } catch (final Exception e) { // This would not be expected, but the `execute` method above will flag as an unhandled exception assert false; } - SQLException exception = Assertions.assertThrows(SQLException.class, () -> createSchemaIfNotExists(db, schemaName)); + final SQLException exception = Assertions.assertThrows(SQLException.class, () -> createSchemaIfNotExists(db, schemaName)); Assertions.assertEquals(exception.getMessage(), "TEST"); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java index e158b3621729..704c90a7cf55 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java @@ -8,6 +8,7 @@ import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.sql.SQLException; import java.util.List; @@ -37,7 +38,7 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN @Override public void insertRecordsInternal(final JdbcDatabase database, - final List records, + final List records, final String schemaName, final String tmpTableName) throws SQLException { From b73a47f6393ca66f6882421d415239ab1577dc00 Mon Sep 17 00:00:00 2001 From: edgao Date: Wed, 29 Nov 2023 17:38:08 +0000 Subject: [PATCH 13/23] Automated Commit - Formatting Changes --- .../integrations/destination/jdbc/AbstractJdbcDestination.java | 1 - .../integrations/destination/jdbc/JdbcInsertFlushFunction.java | 1 - .../airbyte/cdk/integrations/destination/jdbc/SqlOperations.java | 1 - .../cdk/integrations/destination/jdbc/SqlOperationsUtils.java | 1 - .../cdk/integrations/destination/jdbc/TestJdbcSqlOperations.java | 1 - .../destination/redshift/operations/RedshiftSqlOperations.java | 1 - 6 files changed, 6 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index ec986cca31f4..5b5841b7720e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -25,7 +25,6 @@ import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import java.sql.SQLException; import java.util.List; diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java index 84c365dde553..87a3205a7f3e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java @@ -8,7 +8,6 @@ import io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants; import io.airbyte.cdk.integrations.destination_async.DestinationFlushFunction; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.stream.Stream; diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.java index 9b2fceaade49..0ce39c576d02 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.java index 926023f00d48..3e0282cf1f08 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.java @@ -8,7 +8,6 @@ import com.google.common.collect.Iterables; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.java index 598fd98a35ef..5809f8e13449 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.java @@ -6,7 +6,6 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.sql.SQLException; import java.util.List; import org.junit.jupiter.api.Assertions; diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java index 704c90a7cf55..d2738d08d81a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java @@ -9,7 +9,6 @@ import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.sql.SQLException; import java.util.List; import org.slf4j.Logger; From 949be583efc5ae34d2ace7144e4a099eb112da5e Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 29 Nov 2023 10:09:34 -0800 Subject: [PATCH 14/23] 2mb batch --- .../destination/jdbc/JdbcInsertFlushFunction.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java index 87a3205a7f3e..35caec04c72a 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java @@ -29,7 +29,15 @@ public void flush(final StreamDescriptor desc, final Stream Date: Wed, 29 Nov 2023 18:13:12 +0000 Subject: [PATCH 15/23] Automated Commit - Formatting Changes --- .../integrations/destination/jdbc/JdbcInsertFlushFunction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java index 35caec04c72a..df35ef36553d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.jdbc; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter; -import io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants; import io.airbyte.cdk.integrations.destination_async.DestinationFlushFunction; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; From 00676163602f2e0aec0edb6b53540d1975775344 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 29 Nov 2023 10:18:12 -0800 Subject: [PATCH 16/23] actually lets use 5mb --- .../destination/jdbc/JdbcInsertFlushFunction.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java index df35ef36553d..9840e52d85b6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java @@ -28,15 +28,15 @@ public void flush(final StreamDescriptor desc, final Stream Date: Wed, 29 Nov 2023 11:36:38 -0800 Subject: [PATCH 17/23] delete unused code --- .../destination_async/OnCloseFunction.java | 10 ---------- .../partial_messages/PartialAirbyteMessage.java | 11 ----------- 2 files changed, 21 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java index 42e4ee0134a8..9b004ac0d451 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/OnCloseFunction.java @@ -13,14 +13,4 @@ */ public interface OnCloseFunction extends Consumer { - static OnCloseFunction fromNonAsync(final io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction legacy) { - return (success) -> { - try { - legacy.accept(success); - } catch (final Exception e) { - throw new RuntimeException(e); - } - }; - } - } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java index 61d282f4c27f..a0b488529322 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java @@ -125,15 +125,4 @@ public String toString() { '}'; } - public AirbyteRecordMessage getFullRecordMessage() { - if (type != AirbyteMessage.Type.RECORD || record == null) { - throw new IllegalStateException("Cannot get full record message for non-record message"); - } - return new AirbyteRecordMessage() - .withNamespace(record.getNamespace()) - .withStream(record.getStream()) - .withData(Jsons.deserializeExact(getSerialized())) - .withEmittedAt(record.getEmittedAt()); - } - } From 422af2eda7b54dc247d724639e2006d262210da4 Mon Sep 17 00:00:00 2001 From: edgao Date: Wed, 29 Nov 2023 19:40:58 +0000 Subject: [PATCH 18/23] Automated Commit - Formatting Changes --- .../partial_messages/PartialAirbyteMessage.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java index a0b488529322..c0d3739b3285 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteMessage.java @@ -6,9 +6,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; -import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.util.Objects; public class PartialAirbyteMessage { From 90fb10837fa4dee3dc4b6c8a8eecb99f5d91661d Mon Sep 17 00:00:00 2001 From: Ben Church Date: Wed, 29 Nov 2023 11:41:40 -0800 Subject: [PATCH 19/23] Fix: Revert airbyte-ci to 2.8.0 from 2.10.0 (#32954) --- .github/workflows/connectors_tests.yml | 5 -- .github/workflows/publish_connectors.yml | 5 -- airbyte-ci/connectors/pipelines/README.md | 48 ++++------ .../connectors/build_image/commands.py | 28 +----- .../connectors/build_image/steps/__init__.py | 18 ++-- .../connectors/build_image/steps/common.py | 51 ++++------- .../build_image/steps/java_connectors.py | 7 +- .../airbyte_ci/connectors/context.py | 8 +- .../airbyte_ci/connectors/publish/pipeline.py | 2 +- .../airbyte_ci/connectors/test/commands.py | 3 +- .../connectors/test/steps/java_connectors.py | 10 +-- .../test/steps/python_connectors.py | 2 +- .../connectors/pipelines/pipelines/consts.py | 6 +- .../pipelines/pipelines/helpers/utils.py | 32 +++---- .../connectors/pipelines/pyproject.toml | 2 +- airbyte-ci/connectors/pipelines/pytest.ini | 2 - .../tests/test_build_image/__init__.py | 3 - .../test_steps/test_common.py | 90 ------------------- .../dummy_build_customization.py | 0 .../test_python_connectors.py | 52 +++++------ .../pipelines/tests/test_helpers/__init__.py | 3 - .../test_tests/test_python_connectors.py | 10 +-- .../tests/{test_helpers => }/test_utils.py | 46 ---------- 23 files changed, 105 insertions(+), 328 deletions(-) delete mode 100644 airbyte-ci/connectors/pipelines/tests/test_build_image/__init__.py delete mode 100644 airbyte-ci/connectors/pipelines/tests/test_build_image/test_steps/test_common.py rename airbyte-ci/connectors/pipelines/tests/{test_build_image => test_builds}/dummy_build_customization.py (100%) rename airbyte-ci/connectors/pipelines/tests/{test_build_image => test_builds}/test_python_connectors.py (80%) delete mode 100644 airbyte-ci/connectors/pipelines/tests/test_helpers/__init__.py rename airbyte-ci/connectors/pipelines/tests/{test_helpers => }/test_utils.py (80%) diff --git a/.github/workflows/connectors_tests.yml b/.github/workflows/connectors_tests.yml index 25acc1617a9b..c371735c35bb 100644 --- a/.github/workflows/connectors_tests.yml +++ b/.github/workflows/connectors_tests.yml @@ -20,10 +20,6 @@ on: runner: description: "The runner to use for this job" default: "ci-runner-connector-test-large-dagger-0-6-4" - airbyte_ci_binary_url: - description: "The URL to download the airbyte-ci binary from" - required: false - default: https://connectors.airbyte.com/airbyte-ci/releases/ubuntu/latest/airbyte-ci pull_request: types: - opened @@ -70,7 +66,6 @@ jobs: s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} subcommand: "connectors ${{ github.event.inputs.test-connectors-options }} test" - airbyte_ci_binary_url: ${{ github.event.inputs.airbyte_ci_binary_url }} - name: Test connectors [PULL REQUESTS] if: github.event_name == 'pull_request' uses: ./.github/actions/run-dagger-pipeline diff --git a/.github/workflows/publish_connectors.yml b/.github/workflows/publish_connectors.yml index 16b84b191cc3..ded1f9fb11ae 100644 --- a/.github/workflows/publish_connectors.yml +++ b/.github/workflows/publish_connectors.yml @@ -18,10 +18,6 @@ on: type: string default: ci-runner-connector-publish-large-dagger-0-6-4 required: true - airbyte-ci-binary-url: - description: "URL to airbyte-ci binary" - required: false - default: https://connectors.airbyte.com/airbyte-ci/releases/ubuntu/latest/airbyte-ci jobs: publish_connectors: name: Publish connectors @@ -66,7 +62,6 @@ jobs: s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} subcommand: "connectors ${{ github.event.inputs.connectors-options }} publish ${{ github.event.inputs.publish-options }}" - airbyte_ci_binary_url: ${{ github.event.inputs.airbyte-ci-binary-url }} set-instatus-incident-on-failure: name: Create Instatus Incident on Failure diff --git a/airbyte-ci/connectors/pipelines/README.md b/airbyte-ci/connectors/pipelines/README.md index 982be483036e..a0adab57d120 100644 --- a/airbyte-ci/connectors/pipelines/README.md +++ b/airbyte-ci/connectors/pipelines/README.md @@ -124,17 +124,18 @@ At this point you can run `airbyte-ci` commands. #### Options -| Option | Default value | Mapped environment variable | Description | -| ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------- | ------------------------------------------------------------------------------------------- | -| `--enable-dagger-run/--disable-dagger-run` | `--enable-dagger-run`` | | Disables the Dagger terminal UI. | | | | | | | -| `--is-local/--is-ci` | `--is-local` | | Determines the environment in which the CLI runs: local environment or CI environment. | -| `--git-branch` | The checked out git branch name | `CI_GIT_BRANCH` | The git branch on which the pipelines will run. | -| `--git-revision` | The current branch head | `CI_GIT_REVISION` | The commit hash on which the pipelines will run. | -| `--diffed-branch` | `origin/master` | | Branch to which the git diff will happen to detect new or modified files. | -| `--gha-workflow-run-id` | | | GHA CI only - The run id of the GitHub action workflow | -| `--ci-context` | `manual` | | The current CI context: `manual` for manual run, `pull_request`, `nightly_builds`, `master` | -| `--pipeline-start-timestamp` | Current epoch time | `CI_PIPELINE_START_TIMESTAMP` | Start time of the pipeline as epoch time. Used for pipeline run duration computation. | -| `--show-dagger-logs/--hide-dagger-logs` | `--hide-dagger-logs` | | Flag to show or hide the dagger logs. | +| Option | Default value | Mapped environment variable | Description | +| ------------------------------------------ | ---------------------------------------------------------------------------------------------- | ----------------------------- | ------------------------------------------------------------------------------------------- | +| `--enable-dagger-run/--disable-dagger-run` | `--enable-dagger-run`` | | Disables the Dagger terminal UI. | | | +| `--enable-auto-update/--disable-auto-update` | `--enable-auto-update`` | | Disables the auto update prompt | | | +| `--is-local/--is-ci` | `--is-local` | | Determines the environment in which the CLI runs: local environment or CI environment. | +| `--git-branch` | The checked out git branch name | `CI_GIT_BRANCH` | The git branch on which the pipelines will run. | +| `--git-revision` | The current branch head | `CI_GIT_REVISION` | The commit hash on which the pipelines will run. | +| `--diffed-branch` | `origin/master` | | Branch to which the git diff will happen to detect new or modified files. | +| `--gha-workflow-run-id` | | | GHA CI only - The run id of the GitHub action workflow | +| `--ci-context` | `manual` | | The current CI context: `manual` for manual run, `pull_request`, `nightly_builds`, `master` | +| `--pipeline-start-timestamp` | Current epoch time | `CI_PIPELINE_START_TIMESTAMP` | Start time of the pipeline as epoch time. Used for pipeline run duration computation. | +| `--show-dagger-logs/--hide-dagger-logs` | `--hide-dagger-logs` | | Flag to show or hide the dagger logs. | ### `connectors` command subgroup @@ -254,12 +255,6 @@ It's mainly purposed for local use. Build a single connector: `airbyte-ci connectors --name=source-pokeapi build` -Build a single connector with a custom image tag: -`airbyte-ci connectors --name=source-pokeapi build --tag=my-custom-tag` - -Build a single connector for multiple architectures: -`airbyte-ci connectors --name=source-pokeapi build --architecture=linux/amd64 --architecture=linux/arm64` - Build multiple connectors: `airbyte-ci connectors --name=source-pokeapi --name=source-bigquery build` @@ -296,19 +291,11 @@ flowchart TD distTar-->connector normalization--"if supports normalization"-->connector - load[Load to docker host with :dev tag] + load[Load to docker host with :dev tag, current platform] spec[Get spec] connector-->spec--"if success"-->load ``` -### Options - -| Option | Multiple | Default value | Description | -| --------------------- | -------- | -------------- | ----------------------------------------------------------------- | -| `--architecture`/`-a` | True | Local platform | Defines for which architecture the connector image will be built. | -| `--tag` | False | `dev` | Image tag for the built image. | - - ### `connectors publish` command Run a publish pipeline for one or multiple connectors. It's mainly purposed for CI use to release a connector update. @@ -447,15 +434,12 @@ This command runs the Python tests for a airbyte-ci poetry package. ## Changelog | Version | PR | Description | | ------- | ---------------------------------------------------------- | --------------------------------------------------------------------------------------------------------- | -| 2.10.1 | [#32928](https://github.com/airbytehq/airbyte/pull/32928) | Fix BuildConnectorImages constructor. | -| 2.10.0 | [#32819](https://github.com/airbytehq/airbyte/pull/32819) | Add `--tag` option to connector build. | -| 2.9.0 | [#32816](https://github.com/airbytehq/airbyte/pull/32816) | Add `--architecture` option to connector build. | | 2.8.0 | [#31930](https://github.com/airbytehq/airbyte/pull/31930) | Move pipx install to `airbyte-ci-dev`, and add auto-update feature targeting binary | -| 2.7.3 | [#32847](https://github.com/airbytehq/airbyte/pull/32847) | Improve --modified behaviour for pull requests. | +| 2.7.3 | [#32847](https://github.com/airbytehq/airbyte/pull/32847) | Improve --modified behaviour for pull requests. | | 2.7.2 | [#32839](https://github.com/airbytehq/airbyte/pull/32839) | Revert changes in v2.7.1. | -| 2.7.1 | [#32806](https://github.com/airbytehq/airbyte/pull/32806) | Improve --modified behaviour for pull requests. | +| 2.7.1 | [#32806](https://github.com/airbytehq/airbyte/pull/32806) | Improve --modified behaviour for pull requests. | | 2.7.0 | [#31930](https://github.com/airbytehq/airbyte/pull/31930) | Merge airbyte-ci-internal into airbyte-ci | -| 2.6.0 | [#31831](https://github.com/airbytehq/airbyte/pull/31831) | Add `airbyte-ci format` commands, remove connector-specific formatting check | +| 2.6.0 | [#31831](https://github.com/airbytehq/airbyte/pull/31831) | Add `airbyte-ci format` commands, remove connector-specific formatting check | | 2.5.9 | [#32427](https://github.com/airbytehq/airbyte/pull/32427) | Re-enable caching for source-postgres | | 2.5.8 | [#32402](https://github.com/airbytehq/airbyte/pull/32402) | Set Dagger Cloud token for airbyters only | | 2.5.7 | [#31628](https://github.com/airbytehq/airbyte/pull/31628) | Add ClickPipelineContext class | diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/commands.py index 6b6624391f47..c15260489392 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/commands.py @@ -2,16 +2,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import List - import asyncclick as click -import dagger -from pipelines import main_logger from pipelines.airbyte_ci.connectors.build_image.steps import run_connector_build_pipeline from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.connectors.pipeline import run_connectors_pipelines from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand -from pipelines.consts import BUILD_PLATFORMS, LOCAL_BUILD_PLATFORM @click.command(cls=DaggerPipelineCommand, help="Build all images for the selected connectors.") @@ -22,27 +17,10 @@ default=False, type=bool, ) -@click.option( - "-a", - "--architecture", - "build_architectures", - help="Architecture for which to build the connector image. If not specified, the image will be built for the local architecture.", - multiple=True, - default=[LOCAL_BUILD_PLATFORM], - type=click.Choice(BUILD_PLATFORMS, case_sensitive=True), -) -@click.option( - "-t", - "--tag", - help="The tag to use for the built image.", - default="dev", - type=str, -) @click.pass_context -async def build(ctx: click.Context, use_host_gradle_dist_tar: bool, build_architectures: List[str], tag: str) -> bool: +async def build(ctx: click.Context, use_host_gradle_dist_tar: bool) -> bool: """Runs a build pipeline for the selected connectors.""" - build_platforms = [dagger.Platform(architecture) for architecture in build_architectures] - main_logger.info(f"Building connectors for {build_platforms}, use --architecture to change this.") + connectors_contexts = [ ConnectorContext( pipeline_name=f"Build connector {connector.technical_name}", @@ -63,7 +41,6 @@ async def build(ctx: click.Context, use_host_gradle_dist_tar: bool, build_archit use_host_gradle_dist_tar=use_host_gradle_dist_tar, s3_build_cache_access_key_id=ctx.obj.get("s3_build_cache_access_key_id"), s3_build_cache_secret_key=ctx.obj.get("s3_build_cache_secret_key"), - targeted_platforms=build_platforms, ) for connector in ctx.obj["selected_connectors_with_modified_files"] ] @@ -76,7 +53,6 @@ async def build(ctx: click.Context, use_host_gradle_dist_tar: bool, build_archit ctx.obj["concurrency"], ctx.obj["dagger_logs_path"], ctx.obj["execute_timeout"], - tag, ) return True diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/__init__.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/__init__.py index 279cfe16cc66..5bbc035fe1bd 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/__init__.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/__init__.py @@ -5,13 +5,17 @@ from __future__ import annotations +import platform + import anyio from connector_ops.utils import ConnectorLanguage -from pipelines.airbyte_ci.connectors.build_image.steps import java_connectors, python_connectors +from pipelines.models.steps import StepResult +from pipelines.airbyte_ci.connectors.build_image.steps import python_connectors from pipelines.airbyte_ci.connectors.build_image.steps.common import LoadContainerToLocalDockerHost, StepStatus +from pipelines.consts import LOCAL_BUILD_PLATFORM +from pipelines.airbyte_ci.connectors.build_image.steps import java_connectors from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.connectors.reports import ConnectorReport -from pipelines.models.steps import StepResult class NoBuildStepForLanguageError(Exception): @@ -32,15 +36,12 @@ async def run_connector_build(context: ConnectorContext) -> StepResult: return await LANGUAGE_BUILD_CONNECTOR_MAPPING[context.connector.language](context) -async def run_connector_build_pipeline( - context: ConnectorContext, semaphore: anyio.Semaphore, image_tag: str -) -> ConnectorReport: +async def run_connector_build_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport: """Run a build pipeline for a single connector. Args: context (ConnectorContext): The initialized connector context. - semaphore (anyio.Semaphore): The semaphore to use to limit the number of concurrent builds. - image_tag (str): The tag to use for the built image. + Returns: ConnectorReport: The reports holding builds results. """ @@ -48,10 +49,9 @@ async def run_connector_build_pipeline( async with semaphore: async with context: build_result = await run_connector_build(context) - per_platform_built_containers = build_result.output_artifact step_results.append(build_result) if context.is_local and build_result.status is StepStatus.SUCCESS: - load_image_result = await LoadContainerToLocalDockerHost(context, per_platform_built_containers, image_tag).run() + load_image_result = await LoadContainerToLocalDockerHost(context, LOCAL_BUILD_PLATFORM, build_result.output_artifact).run() step_results.append(load_image_result) context.report = ConnectorReport(context, step_results, name="BUILD RESULTS") return context.report diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py index 3930efed620e..267238ee2347 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py @@ -2,14 +2,14 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import json from abc import ABC -from typing import List, Optional, Tuple +from typing import List, Tuple import docker from dagger import Container, ExecError, Platform, QueryError from pipelines.airbyte_ci.connectors.context import ConnectorContext -from pipelines.helpers.utils import export_containers_to_tarball +from pipelines.consts import BUILD_PLATFORMS +from pipelines.helpers.utils import export_container_to_tarball from pipelines.models.steps import Step, StepResult, StepStatus @@ -22,8 +22,8 @@ class BuildConnectorImagesBase(Step, ABC): def title(self): return f"Build {self.context.connector.technical_name} docker image for platform(s) {', '.join(self.build_platforms)}" - def __init__(self, context: ConnectorContext) -> None: - self.build_platforms: List[Platform] = context.targeted_platforms + def __init__(self, context: ConnectorContext, *build_platforms: List[Platform]) -> None: + self.build_platforms = build_platforms if build_platforms else BUILD_PLATFORMS super().__init__(context) async def _run(self, *args) -> StepResult: @@ -56,41 +56,28 @@ async def _build_connector(self, platform: Platform, *args) -> Container: class LoadContainerToLocalDockerHost(Step): - def __init__(self, context: ConnectorContext, containers: dict[Platform, Container], image_tag: Optional[str] = "dev") -> None: + IMAGE_TAG = "dev" + + def __init__(self, context: ConnectorContext, platform: Platform, containers: dict[Platform, Container]) -> None: super().__init__(context) - self.image_tag = image_tag - self.containers = containers + self.platform = platform + self.container = containers[platform] @property def title(self): - return f"Load {self.image_name}:{self.image_tag} to the local docker host." + return f"Load {self.image_name}:{self.IMAGE_TAG} for platform {self.platform} to the local docker host." @property def image_name(self) -> Tuple: return f"airbyte/{self.context.connector.technical_name}" async def _run(self) -> StepResult: - container_variants = list(self.containers.values()) - _, exported_tar_path = await export_containers_to_tarball(self.context, container_variants) - if not exported_tar_path: - return StepResult( - self, - StepStatus.FAILURE, - stderr=f"Failed to export the connector image {self.image_name}:{self.image_tag} to a tarball.", - ) + _, exported_tarball_path = await export_container_to_tarball(self.context, self.container) + client = docker.from_env() try: - client = docker.from_env() - response = client.api.import_image_from_file(str(exported_tar_path), repository=self.image_name, tag=self.image_tag) - try: - image_sha = json.loads(response)["status"] - except (json.JSONDecodeError, KeyError): - return StepResult( - self, - StepStatus.FAILURE, - stderr=f"Failed to import the connector image {self.image_name}:{self.image_tag} to your Docker host: {response}", - ) - return StepResult( - self, StepStatus.SUCCESS, stdout=f"Loaded image {self.image_name}:{self.image_tag} to your Docker host ({image_sha})." - ) - except docker.errors.DockerException as e: - return StepResult(self, StepStatus.FAILURE, stderr=f"Something went wrong while interacting with the local docker client: {e}") + with open(exported_tarball_path, "rb") as tarball_content: + new_image = client.images.load(tarball_content.read())[0] + new_image.tag(self.image_name, tag=self.IMAGE_TAG) + return StepResult(self, StepStatus.SUCCESS) + except ConnectionError: + return StepResult(self, StepStatus.FAILURE, stderr="The connection to the local docker host failed.") diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/java_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/java_connectors.py index f8a4c7ed0d61..704ce6fa3849 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/java_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/java_connectors.py @@ -2,10 +2,13 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from dagger import Container, Directory, File, Platform, QueryError +from typing import List, Optional, Tuple, Union + +from dagger import Container, Directory, ExecError, File, Host, Platform, QueryError from pipelines.airbyte_ci.connectors.build_image.steps.common import BuildConnectorImagesBase from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.steps.gradle import GradleTask +from pipelines.consts import LOCAL_BUILD_PLATFORM from pipelines.dagger.containers import java from pipelines.models.steps import StepResult, StepStatus @@ -53,7 +56,7 @@ async def run_connector_build(context: ConnectorContext) -> StepResult: # Special case: use a local dist tar to speed up local development. dist_dir = await context.dagger_client.host().directory(dist_tar_directory_path(context), include=["*.tar"]) # Speed things up by only building for the local platform. - return await BuildConnectorImages(context).run(dist_dir) + return await BuildConnectorImages(context, LOCAL_BUILD_PLATFORM).run(dist_dir) # Default case: distribution tar is built by the dagger pipeline. build_connector_tar_result = await BuildConnectorDistributionTar(context).run() diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py index 3553cafa8ac1..8a2115a41135 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py @@ -6,15 +6,14 @@ from datetime import datetime from types import TracebackType -from typing import Iterable, Optional +from typing import Optional import yaml from anyio import Path from asyncer import asyncify -from dagger import Directory, Platform, Secret +from dagger import Directory, Secret from github import PullRequest from pipelines.airbyte_ci.connectors.reports import ConnectorReport -from pipelines.consts import BUILD_PLATFORMS from pipelines.dagger.actions import secrets from pipelines.helpers.connectors.modifed import ConnectorWithModifiedFiles from pipelines.helpers.github import update_commit_status_check @@ -61,7 +60,6 @@ def __init__( s3_build_cache_access_key_id: Optional[str] = None, s3_build_cache_secret_key: Optional[str] = None, concurrent_cat: Optional[bool] = False, - targeted_platforms: Optional[Iterable[Platform]] = BUILD_PLATFORMS, ): """Initialize a connector context. @@ -90,7 +88,6 @@ def __init__( s3_build_cache_access_key_id (Optional[str], optional): Gradle S3 Build Cache credentials. Defaults to None. s3_build_cache_secret_key (Optional[str], optional): Gradle S3 Build Cache credentials. Defaults to None. concurrent_cat (bool, optional): Whether to run the CAT tests in parallel. Defaults to False. - targeted_platforms (Optional[Iterable[Platform]], optional): The platforms to build the connector image for. Defaults to BUILD_PLATFORMS. """ self.pipeline_name = pipeline_name @@ -113,7 +110,6 @@ def __init__( self.s3_build_cache_access_key_id = s3_build_cache_access_key_id self.s3_build_cache_secret_key = s3_build_cache_secret_key self.concurrent_cat = concurrent_cat - self.targeted_platforms = targeted_platforms super().__init__( pipeline_name=pipeline_name, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py index 2a5908bdb150..ffb754cf47f7 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py @@ -205,7 +205,7 @@ async def _run(self, built_connector: Container) -> StepResult: return StepResult(self, status=StepStatus.SUCCESS, stdout="Uploaded connector spec to spec cache bucket.") -# Pipeline +## Pipeline async def run_connector_publish_pipeline(context: PublishConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport: diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py index b858ca839e00..71e120d92d8f 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py @@ -10,7 +10,7 @@ from pipelines.airbyte_ci.connectors.pipeline import run_connectors_pipelines from pipelines.airbyte_ci.connectors.test.pipeline import run_connector_test_pipeline from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand -from pipelines.consts import LOCAL_BUILD_PLATFORM, ContextState +from pipelines.consts import ContextState from pipelines.helpers.github import update_global_commit_status_check_for_tests from pipelines.helpers.utils import fail_if_missing_docker_hub_creds @@ -95,7 +95,6 @@ async def test( docker_hub_username=ctx.obj.get("docker_hub_username"), docker_hub_password=ctx.obj.get("docker_hub_password"), concurrent_cat=concurrent_cat, - targeted_platforms=[LOCAL_BUILD_PLATFORM], ) for connector in ctx.obj["selected_connectors_with_modified_files"] ] diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py index bee79803637f..ca334022e10f 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py @@ -21,7 +21,7 @@ from pipelines.consts import LOCAL_BUILD_PLATFORM from pipelines.dagger.actions import secrets from pipelines.dagger.actions.system import docker -from pipelines.helpers.utils import export_containers_to_tarball +from pipelines.helpers.utils import export_container_to_tarball from pipelines.models.steps import StepResult, StepStatus @@ -91,7 +91,7 @@ async def run_all_tests(context: ConnectorContext) -> List[StepResult]: async def run_docker_build_dependent_steps(dist_tar_dir: Directory) -> List[StepResult]: step_results = [] - build_connector_image_results = await BuildConnectorImages(context).run(dist_tar_dir) + build_connector_image_results = await BuildConnectorImages(context, LOCAL_BUILD_PLATFORM).run(dist_tar_dir) step_results.append(build_connector_image_results) if build_connector_image_results.status is StepStatus.FAILURE: return step_results @@ -101,15 +101,15 @@ async def run_docker_build_dependent_steps(dist_tar_dir: Directory) -> List[Step context.logger.info(f"This connector supports normalization: will build {normalization_image}.") build_normalization_results = await BuildOrPullNormalization(context, normalization_image, LOCAL_BUILD_PLATFORM).run() normalization_container = build_normalization_results.output_artifact - normalization_tar_file, _ = await export_containers_to_tarball( - context, [normalization_container], tar_file_name=f"{context.connector.normalization_repository}_{context.git_revision}.tar" + normalization_tar_file, _ = await export_container_to_tarball( + context, normalization_container, tar_file_name=f"{context.connector.normalization_repository}_{context.git_revision}.tar" ) step_results.append(build_normalization_results) else: normalization_tar_file = None connector_container = build_connector_image_results.output_artifact[LOCAL_BUILD_PLATFORM] - connector_image_tar_file, _ = await export_containers_to_tarball(context, [connector_container]) + connector_image_tar_file, _ = await export_container_to_tarball(context, connector_container) async with asyncer.create_task_group() as docker_build_dependent_group: soon_integration_tests_results = docker_build_dependent_group.soonify(IntegrationTests(context).run)( diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py index 769eb8575146..cf3ef8eee923 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py @@ -203,7 +203,7 @@ async def run_all_tests(context: ConnectorContext) -> List[StepResult]: List[StepResult]: The results of all the steps that ran or were skipped. """ step_results = [] - build_connector_image_results = await BuildConnectorImages(context).run() + build_connector_image_results = await BuildConnectorImages(context, LOCAL_BUILD_PLATFORM).run() if build_connector_image_results.status is StepStatus.FAILURE: return [build_connector_image_results] step_results.append(build_connector_image_results) diff --git a/airbyte-ci/connectors/pipelines/pipelines/consts.py b/airbyte-ci/connectors/pipelines/pipelines/consts.py index ba1db767c0dd..4ccaf7522644 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/consts.py +++ b/airbyte-ci/connectors/pipelines/pipelines/consts.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import os import platform from enum import Enum @@ -19,7 +20,7 @@ "pytest-custom_exit_code", ] -BUILD_PLATFORMS = (Platform("linux/amd64"), Platform("linux/arm64")) +BUILD_PLATFORMS = [Platform("linux/amd64"), Platform("linux/arm64")] PLATFORM_MACHINE_TO_DAGGER_PLATFORM = { "x86_64": Platform("linux/amd64"), @@ -27,8 +28,7 @@ "aarch64": Platform("linux/amd64"), "amd64": Platform("linux/amd64"), } -LOCAL_MACHINE_TYPE = platform.machine() -LOCAL_BUILD_PLATFORM = PLATFORM_MACHINE_TO_DAGGER_PLATFORM[LOCAL_MACHINE_TYPE] +LOCAL_BUILD_PLATFORM = PLATFORM_MACHINE_TO_DAGGER_PLATFORM[platform.machine()] AMAZONCORRETTO_IMAGE = "amazoncorretto:17.0.8-al2023" NODE_IMAGE = "node:18.18.0-slim" GO_IMAGE = "golang:1.17" diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py index 509ff15e423c..d2709257e449 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py @@ -267,31 +267,31 @@ async def execute_concurrently(steps: List[Callable], concurrency=5): return [task.value for task in tasks] -async def export_containers_to_tarball( - context: ConnectorContext, container_variants: List[Container], tar_file_name: Optional[str] = None +async def export_container_to_tarball( + context: ConnectorContext, container: Container, tar_file_name: Optional[str] = None ) -> Tuple[Optional[File], Optional[Path]]: """Save the container image to the host filesystem as a tar archive. - Exports a list of container variants to a tarball file. - The list of container variants should be platform/os specific variants of the same container image. - The tarball file is saved to the host filesystem in the directory specified by the host_image_export_dir_path attribute of the context. - - Args: - context (ConnectorContext): The current connector context. - container_variants (List[Container]): The list of container variants to export. - tar_file_name (Optional[str], optional): The name of the tar archive file. Defaults to None. + Exporting a container image as a tar archive allows user to have a dagger built container image available on their host filesystem. + They can load this tar file to their main docker host with 'docker load'. + This mechanism is also used to share dagger built containers with other steps like AcceptanceTest that have their own dockerd service. + We 'docker load' this tar file to AcceptanceTest's docker host to make sure the container under test image is available for testing. Returns: Tuple[Optional[File], Optional[Path]]: A tuple with the file object holding the tar archive on the host and its path. """ - tar_file_name = f"{slugify(context.connector.technical_name)}_{context.git_revision}.tar" if tar_file_name is None else tar_file_name + if tar_file_name is None: + tar_file_name = f"{context.connector.technical_name}_{context.git_revision}.tar" + tar_file_name = slugify(tar_file_name) local_path = Path(f"{context.host_image_export_dir_path}/{tar_file_name}") - export_success = await context.dagger_client.container().export( - str(local_path), platform_variants=container_variants, forced_compression=ImageLayerCompression.Gzip - ) + export_success = await container.export(str(local_path), forced_compression=ImageLayerCompression.Gzip) if export_success: - return context.dagger_client.host().file(str(local_path)), local_path - return None, None + exported_file = ( + context.dagger_client.host().directory(context.host_image_export_dir_path, include=[tar_file_name]).file(tar_file_name) + ) + return exported_file, local_path + else: + return None, None def format_duration(time_delta: datetime.timedelta) -> str: diff --git a/airbyte-ci/connectors/pipelines/pyproject.toml b/airbyte-ci/connectors/pipelines/pyproject.toml index dc3cabd2badc..13c7814009a9 100644 --- a/airbyte-ci/connectors/pipelines/pyproject.toml +++ b/airbyte-ci/connectors/pipelines/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipelines" -version = "2.10.1" +version = "2.8.0" description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines" authors = ["Airbyte "] diff --git a/airbyte-ci/connectors/pipelines/pytest.ini b/airbyte-ci/connectors/pipelines/pytest.ini index b228671b5fa2..0bd08b038c23 100644 --- a/airbyte-ci/connectors/pipelines/pytest.ini +++ b/airbyte-ci/connectors/pipelines/pytest.ini @@ -1,4 +1,2 @@ [pytest] addopts = --cov=pipelines -markers = - slow: marks tests as slow (deselect with '-m "not slow"') diff --git a/airbyte-ci/connectors/pipelines/tests/test_build_image/__init__.py b/airbyte-ci/connectors/pipelines/tests/test_build_image/__init__.py deleted file mode 100644 index c941b3045795..000000000000 --- a/airbyte-ci/connectors/pipelines/tests/test_build_image/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# diff --git a/airbyte-ci/connectors/pipelines/tests/test_build_image/test_steps/test_common.py b/airbyte-ci/connectors/pipelines/tests/test_build_image/test_steps/test_common.py deleted file mode 100644 index 13c8703caf3e..000000000000 --- a/airbyte-ci/connectors/pipelines/tests/test_build_image/test_steps/test_common.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import os -from typing import Dict - -import dagger -import docker -import pytest -from pipelines.airbyte_ci.connectors.build_image.steps import common -from pipelines.consts import BUILD_PLATFORMS -from pipelines.models.steps import StepStatus - -pytestmark = [ - pytest.mark.anyio, -] - - -@pytest.mark.slow -class TestLoadContainerToLocalDockerHost: - @pytest.fixture(scope="class") - def certified_connector(self, all_connectors): - for connector in all_connectors: - if connector.support_level == "certified": - return connector - pytest.skip("No certified connector found") - - @pytest.fixture - def built_containers(self, dagger_client, certified_connector) -> Dict[dagger.Platform, dagger.Container]: - return { - platform: dagger_client.container(platform=platform).from_(f'{certified_connector.metadata["dockerRepository"]}:latest') - for platform in BUILD_PLATFORMS - } - - @pytest.fixture - def test_context(self, mocker, dagger_client, certified_connector, tmp_path): - return mocker.Mock( - secrets_to_mask=[], dagger_client=dagger_client, connector=certified_connector, host_image_export_dir_path=tmp_path - ) - - @pytest.fixture - def step(self, test_context, built_containers): - return common.LoadContainerToLocalDockerHost(test_context, built_containers) - - @pytest.fixture - def bad_docker_host(self): - original_docker_host = os.environ.get("DOCKER_HOST") - yield "tcp://localhost:9999" - if original_docker_host: - os.environ["DOCKER_HOST"] = original_docker_host - else: - del os.environ["DOCKER_HOST"] - - async def test_run(self, test_context, step): - """Test that the step runs successfully and that the image is loaded in the local docker host.""" - assert step.image_tag == "dev" - docker_client = docker.from_env() - step.image_tag = "test-load-container" - try: - docker_client.images.remove(f"{test_context.connector.metadata['dockerRepository']}:{step.image_tag}") - except docker.errors.ImageNotFound: - pass - result = await step.run() - assert result.status is StepStatus.SUCCESS - docker_client.images.get(f"{test_context.connector.metadata['dockerRepository']}:{step.image_tag}") - docker_client.images.remove(f"{test_context.connector.metadata['dockerRepository']}:{step.image_tag}") - - async def test_run_export_failure(self, step, mocker): - """Test that the step fails if the export of the container fails.""" - mocker.patch.object(common, "export_containers_to_tarball", return_value=(None, None)) - result = await step.run() - assert result.status is StepStatus.FAILURE - assert "Failed to export the connector image" in result.stderr - - async def test_run_connection_error(self, step, bad_docker_host): - """Test that the step fails if the connection to the docker host fails.""" - os.environ["DOCKER_HOST"] = bad_docker_host - result = await step.run() - assert result.status is StepStatus.FAILURE - assert "Something went wrong while interacting with the local docker client" in result.stderr - - async def test_run_import_failure(self, step, mocker): - """Test that the step fails if the docker import of the tar fails.""" - mock_docker_client = mocker.MagicMock() - mock_docker_client.api.import_image_from_file.return_value = "bad response" - mocker.patch.object(common.docker, "from_env", return_value=mock_docker_client) - result = await step.run() - assert result.status is StepStatus.FAILURE - assert "Failed to import the connector image" in result.stderr diff --git a/airbyte-ci/connectors/pipelines/tests/test_build_image/dummy_build_customization.py b/airbyte-ci/connectors/pipelines/tests/test_builds/dummy_build_customization.py similarity index 100% rename from airbyte-ci/connectors/pipelines/tests/test_build_image/dummy_build_customization.py rename to airbyte-ci/connectors/pipelines/tests/test_builds/dummy_build_customization.py diff --git a/airbyte-ci/connectors/pipelines/tests/test_build_image/test_python_connectors.py b/airbyte-ci/connectors/pipelines/tests/test_builds/test_python_connectors.py similarity index 80% rename from airbyte-ci/connectors/pipelines/tests/test_build_image/test_python_connectors.py rename to airbyte-ci/connectors/pipelines/tests/test_builds/test_python_connectors.py index bb8ac23a10ea..96d6fc79807e 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_build_image/test_python_connectors.py +++ b/airbyte-ci/connectors/pipelines/tests/test_builds/test_python_connectors.py @@ -7,7 +7,6 @@ import pytest from pipelines.airbyte_ci.connectors.build_image.steps import build_customization, python_connectors from pipelines.airbyte_ci.connectors.context import ConnectorContext -from pipelines.consts import BUILD_PLATFORMS from pipelines.models.steps import StepStatus pytestmark = [ @@ -16,13 +15,9 @@ class TestBuildConnectorImage: - @pytest.fixture - def all_platforms(self): - return BUILD_PLATFORMS - @pytest.fixture def test_context(self, mocker): - return mocker.Mock(secrets_to_mask=[], targeted_platforms=BUILD_PLATFORMS) + return mocker.Mock(secrets_to_mask=[]) @pytest.fixture def test_context_with_connector_with_base_image(self, test_context): @@ -50,9 +45,7 @@ def connector_with_base_image_with_build_customization(self, connector_with_base (connector_with_base_image_no_build_customization.code_directory / "build_customization.py").unlink() @pytest.fixture - def test_context_with_real_connector_using_base_image( - self, connector_with_base_image_no_build_customization, dagger_client, current_platform - ): + def test_context_with_real_connector_using_base_image(self, connector_with_base_image_no_build_customization, dagger_client): context = ConnectorContext( pipeline_name="test build", connector=connector_with_base_image_no_build_customization, @@ -61,14 +54,13 @@ def test_context_with_real_connector_using_base_image( report_output_prefix="test", is_local=True, use_remote_secrets=True, - targeted_platforms=[current_platform], ) context.dagger_client = dagger_client return context @pytest.fixture def test_context_with_real_connector_using_base_image_with_build_customization( - self, connector_with_base_image_with_build_customization, dagger_client, current_platform + self, connector_with_base_image_with_build_customization, dagger_client ): context = ConnectorContext( pipeline_name="test build", @@ -78,7 +70,6 @@ def test_context_with_real_connector_using_base_image_with_build_customization( report_output_prefix="test", is_local=True, use_remote_secrets=True, - targeted_platforms=[current_platform], ) context.dagger_client = dagger_client return context @@ -91,7 +82,7 @@ def connector_without_base_image(self, all_connectors): pytest.skip("No connector without a connectorBuildOptions.baseImage metadata found") @pytest.fixture - def test_context_with_real_connector_without_base_image(self, connector_without_base_image, dagger_client, current_platform): + def test_context_with_real_connector_without_base_image(self, connector_without_base_image, dagger_client): context = ConnectorContext( pipeline_name="test build", connector=connector_without_base_image, @@ -100,28 +91,25 @@ def test_context_with_real_connector_without_base_image(self, connector_without_ report_output_prefix="test", is_local=True, use_remote_secrets=True, - targeted_platforms=[current_platform], ) context.dagger_client = dagger_client return context - async def test__run_using_base_image_with_mocks(self, mocker, test_context_with_connector_with_base_image, all_platforms): + async def test__run_using_base_image_with_mocks(self, mocker, test_context_with_connector_with_base_image, current_platform): container_built_from_base = mocker.AsyncMock() mocker.patch.object( python_connectors.BuildConnectorImages, "_build_from_base_image", mocker.AsyncMock(return_value=container_built_from_base) ) mocker.patch.object(python_connectors.BuildConnectorImages, "get_step_result", mocker.AsyncMock()) - step = python_connectors.BuildConnectorImages(test_context_with_connector_with_base_image) + step = python_connectors.BuildConnectorImages(test_context_with_connector_with_base_image, current_platform) step_result = await step._run() - assert step._build_from_base_image.call_count == len(all_platforms) - container_built_from_base.with_exec.assert_called_with(["spec"]) + step._build_from_base_image.assert_called_once() + container_built_from_base.with_exec.assert_called_once_with(["spec"]) assert step_result.status is StepStatus.SUCCESS - for platform in all_platforms: - assert step_result.output_artifact[platform] == container_built_from_base + assert step_result.output_artifact[current_platform] == container_built_from_base - @pytest.mark.slow async def test_building_from_base_image_for_real(self, test_context_with_real_connector_using_base_image, current_platform): - step = python_connectors.BuildConnectorImages(test_context_with_real_connector_using_base_image) + step = python_connectors.BuildConnectorImages(test_context_with_real_connector_using_base_image, current_platform) step_result = await step._run() step_result.status is StepStatus.SUCCESS built_container = step_result.output_artifact[current_platform] @@ -139,31 +127,31 @@ async def test_building_from_base_image_for_real(self, test_context_with_real_co == test_context_with_real_connector_using_base_image.connector.metadata["dockerRepository"] ) - @pytest.mark.slow async def test_building_from_base_image_with_customization_for_real( self, test_context_with_real_connector_using_base_image_with_build_customization, current_platform ): - step = python_connectors.BuildConnectorImages(test_context_with_real_connector_using_base_image_with_build_customization) + step = python_connectors.BuildConnectorImages( + test_context_with_real_connector_using_base_image_with_build_customization, current_platform + ) step_result = await step._run() step_result.status is StepStatus.SUCCESS built_container = step_result.output_artifact[current_platform] assert await built_container.env_variable("MY_PRE_BUILD_ENV_VAR") == "my_pre_build_env_var_value" assert await built_container.env_variable("MY_POST_BUILD_ENV_VAR") == "my_post_build_env_var_value" - async def test__run_using_base_dockerfile_with_mocks(self, mocker, test_context_with_connector_without_base_image, all_platforms): + async def test__run_using_base_dockerfile_with_mocks(self, mocker, test_context_with_connector_without_base_image, current_platform): container_built_from_dockerfile = mocker.AsyncMock() mocker.patch.object( python_connectors.BuildConnectorImages, "_build_from_dockerfile", mocker.AsyncMock(return_value=container_built_from_dockerfile) ) - step = python_connectors.BuildConnectorImages(test_context_with_connector_without_base_image) + step = python_connectors.BuildConnectorImages(test_context_with_connector_without_base_image, current_platform) step_result = await step._run() - assert step._build_from_dockerfile.call_count == len(all_platforms) - container_built_from_dockerfile.with_exec.assert_called_with(["spec"]) + step._build_from_dockerfile.assert_called_once() + container_built_from_dockerfile.with_exec.assert_called_once_with(["spec"]) assert step_result.status is StepStatus.SUCCESS - for platform in all_platforms: - assert step_result.output_artifact[platform] == container_built_from_dockerfile + assert step_result.output_artifact[current_platform] == container_built_from_dockerfile - async def test_building_from_dockerfile_for_real(self, test_context_with_real_connector_without_base_image): - step = python_connectors.BuildConnectorImages(test_context_with_real_connector_without_base_image) + async def test_building_from_dockerfile_for_real(self, test_context_with_real_connector_without_base_image, current_platform): + step = python_connectors.BuildConnectorImages(test_context_with_real_connector_without_base_image, current_platform) step_result = await step._run() step_result.status is StepStatus.SUCCESS diff --git a/airbyte-ci/connectors/pipelines/tests/test_helpers/__init__.py b/airbyte-ci/connectors/pipelines/tests/test_helpers/__init__.py deleted file mode 100644 index c941b3045795..000000000000 --- a/airbyte-ci/connectors/pipelines/tests/test_helpers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# diff --git a/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py b/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py index da63c33fb01c..8bcde17e715e 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py +++ b/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py @@ -28,7 +28,7 @@ def certified_connector_with_setup(self, all_connectors): pytest.skip("No certified connector with setup.py found.") @pytest.fixture - def context_for_certified_connector_with_setup(self, certified_connector_with_setup, dagger_client, current_platform): + def context_for_certified_connector_with_setup(self, certified_connector_with_setup, dagger_client): context = ConnectorContext( pipeline_name="test unit tests", connector=certified_connector_with_setup, @@ -37,7 +37,6 @@ def context_for_certified_connector_with_setup(self, certified_connector_with_se report_output_prefix="test", is_local=True, use_remote_secrets=True, - targeted_platforms=[current_platform], ) context.dagger_client = dagger_client context.connector_secrets = {} @@ -45,11 +44,11 @@ def context_for_certified_connector_with_setup(self, certified_connector_with_se @pytest.fixture async def certified_container_with_setup(self, context_for_certified_connector_with_setup, current_platform): - result = await BuildConnectorImages(context_for_certified_connector_with_setup).run() + result = await BuildConnectorImages(context_for_certified_connector_with_setup, current_platform).run() return result.output_artifact[current_platform] @pytest.fixture - def context_for_connector_with_poetry(self, connector_with_poetry, dagger_client, current_platform): + def context_for_connector_with_poetry(self, connector_with_poetry, dagger_client): context = ConnectorContext( pipeline_name="test unit tests", connector=connector_with_poetry, @@ -58,7 +57,6 @@ def context_for_connector_with_poetry(self, connector_with_poetry, dagger_client report_output_prefix="test", is_local=True, use_remote_secrets=True, - targeted_platforms=[current_platform], ) context.dagger_client = dagger_client context.connector_secrets = {} @@ -66,7 +64,7 @@ def context_for_connector_with_poetry(self, connector_with_poetry, dagger_client @pytest.fixture async def container_with_poetry(self, context_for_connector_with_poetry, current_platform): - result = await BuildConnectorImages(context_for_connector_with_poetry).run() + result = await BuildConnectorImages(context_for_connector_with_poetry, current_platform).run() return result.output_artifact[current_platform] async def test__run_for_setup_py(self, context_for_certified_connector_with_setup, certified_container_with_setup): diff --git a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_utils.py b/airbyte-ci/connectors/pipelines/tests/test_utils.py similarity index 80% rename from airbyte-ci/connectors/pipelines/tests/test_helpers/test_utils.py rename to airbyte-ci/connectors/pipelines/tests/test_utils.py index 31c6434da797..9d9328f38417 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_utils.py +++ b/airbyte-ci/connectors/pipelines/tests/test_utils.py @@ -5,7 +5,6 @@ from pathlib import Path from unittest import mock -import dagger import pytest from connector_ops.utils import Connector, ConnectorLanguage from pipelines import consts @@ -191,48 +190,3 @@ def test_sh_dash_c(): assert utils.sh_dash_c(["foo", "bar"]) == ["sh", "-c", "set -o xtrace && foo && bar"] assert utils.sh_dash_c(["foo"]) == ["sh", "-c", "set -o xtrace && foo"] assert utils.sh_dash_c([]) == ["sh", "-c", "set -o xtrace"] - - -@pytest.mark.anyio -@pytest.mark.parametrize("tar_file_name", [None, "custom_tar_name.tar"]) -async def test_export_containers_to_tarball(mocker, dagger_client, tmp_path, tar_file_name): - context = mocker.Mock( - dagger_client=dagger_client, - connector=mocker.Mock(technical_name="my_connector"), - host_image_export_dir_path=tmp_path, - git_revision="my_git_revision", - ) - container_variants = [ - dagger_client.container(platform=dagger.Platform("linux/arm64")).from_("bash:latest"), - dagger_client.container(platform=dagger.Platform("linux/amd64")).from_("bash:latest"), - ] - expected_tar_file_path = tmp_path / "my_connector_my_git_revision.tar" if tar_file_name is None else tmp_path / tar_file_name - exported_tar_file, exported_tar_file_path = await utils.export_containers_to_tarball( - context, container_variants, tar_file_name=tar_file_name - ) - assert exported_tar_file_path == expected_tar_file_path - assert await exported_tar_file.size() == expected_tar_file_path.stat().st_size - - -@pytest.mark.anyio -async def test_export_containers_to_tarball_failure(mocker, tmp_path): - mock_dagger_client = mocker.Mock() - mock_export = mocker.AsyncMock(return_value=False) - mock_dagger_client.container.return_value.export = mock_export - - context = mocker.Mock( - dagger_client=mock_dagger_client, - connector=mocker.Mock(technical_name="my_connector"), - host_image_export_dir_path=tmp_path, - git_revision="my_git_revision", - ) - - container_variants = mocker.Mock() - exported_tar_file, exported_tar_file_path = await utils.export_containers_to_tarball(context, container_variants) - mock_export.assert_called_once_with( - str(tmp_path / "my_connector_my_git_revision.tar"), - platform_variants=container_variants, - forced_compression=dagger.ImageLayerCompression.Gzip, - ) - assert exported_tar_file is None - assert exported_tar_file_path is None From b42a93ead9819e0b85683c6018bde051851ee9cb Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 29 Nov 2023 13:34:37 -0800 Subject: [PATCH 20/23] revert to 25MB batch --- .../destination/jdbc/JdbcInsertFlushFunction.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java index 9840e52d85b6..6990b73b3e6c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.java @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter; +import io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants; import io.airbyte.cdk.integrations.destination_async.DestinationFlushFunction; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; @@ -28,15 +29,9 @@ public void flush(final StreamDescriptor desc, final Stream Date: Wed, 29 Nov 2023 21:46:28 +0000 Subject: [PATCH 21/23] Automated Commit - Formatting Changes --- .../source_google_search_console/exceptions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-google-search-console/source_google_search_console/exceptions.py b/airbyte-integrations/connectors/source-google-search-console/source_google_search_console/exceptions.py index 03c2c635cf23..04cc13a6adf8 100644 --- a/airbyte-integrations/connectors/source-google-search-console/source_google_search_console/exceptions.py +++ b/airbyte-integrations/connectors/source-google-search-console/source_google_search_console/exceptions.py @@ -20,7 +20,9 @@ def __init__(self): class UnauthorizedServiceAccountError(Exception): def __init__(self): - message = "Unable to connect with provided Service Account credentials. Make sure the `sevice account credentials` provided are valid." + message = ( + "Unable to connect with provided Service Account credentials. Make sure the `sevice account credentials` provided are valid." + ) super().__init__(message) From f4daa6ac56e39292a273e98577e642985aad8f15 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 30 Nov 2023 08:07:57 -0800 Subject: [PATCH 22/23] cdk version bump --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../java/airbyte-cdk/core/src/main/resources/version.properties | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 5517982b9db8..164dfa230a25 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -156,6 +156,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.6.0 | 2023-11-30 | [\#32888](https://github.com/airbytehq/airbyte/pull/32888) | JDBC destinations now use the async framework | | 0.5.3 | 2023-11-28 | [\#32686](https://github.com/airbytehq/airbyte/pull/32686) | Better attribution of debezium engine shutdown due to heartbeat. | | 0.5.1 | 2023-11-27 | [\#32662](https://github.com/airbytehq/airbyte/pull/32662) | Debezium initialization wait time will now read from initial setup time. | | 0.5.0 | 2023-11-22 | [\#32656](https://github.com/airbytehq/airbyte/pull/32656) | Introduce TestDatabase test fixture, refactor database source test base classes. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 60acae2693fa..af0eb1ce35a2 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.5.3 +version=0.6.0 From b9b246f0c3a6f9f9cddf5cfbd8d219b77613bdb8 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 30 Nov 2023 08:10:38 -0800 Subject: [PATCH 23/23] switch back to published cdk artifact --- .../connectors/destination-redshift/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index bca7f425502e..44683f5ffeeb 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -4,9 +4,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.4.1' + cdkVersionRequired = '0.6.0' features = ['db-destinations', 's3-destinations'] - useLocalCdk = true + useLocalCdk = false } //remove once upgrading the CDK version to 0.4.x or later