From 5a76bcee9659cb8c8bbd6da4a8e45d04474b2c1f Mon Sep 17 00:00:00 2001 From: kimerinn Date: Thu, 4 Aug 2022 19:33:40 +0300 Subject: [PATCH 1/4] 15302: Azure blob destination consumer fixed --- .../azure_blob_storage/AzureBlobStorageConsumer.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java index 1989560f45f9..e0f6dd4c88c0 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java @@ -121,7 +121,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob @Override protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Exception { if (airbyteMessage.getType() == Type.STATE) { - this.lastStateMessage = airbyteMessage; + outputRecordCollector.accept(airbyteMessage); return; } else if (airbyteMessage.getType() != Type.RECORD) { return; @@ -154,10 +154,6 @@ protected void close(final boolean hasFailed) throws Exception { for (final AzureBlobStorageWriter handler : streamNameAndNamespaceToWriters.values()) { handler.close(hasFailed); } - - if (!hasFailed) { - outputRecordCollector.accept(lastStateMessage); - } } private static String getOutputFilename(final Timestamp timestamp) { From fbfe1386eef024312c663263827d8799d2b67413 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Fri, 5 Aug 2022 14:33:33 +0300 Subject: [PATCH 2/4] 15302: Unit test added --- .../build.gradle | 2 + .../AzureBlobStorageConsumer.java | 16 +++---- .../AzureBlobRecordConsumerTest.java | 47 +++++++++++++++++++ 3 files changed, 56 insertions(+), 9 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle b/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle index 6163033e0f44..1e1882ebb22e 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle @@ -19,6 +19,8 @@ dependencies { implementation 'com.azure:azure-storage-blob:12.12.0' implementation 'org.apache.commons:commons-csv:1.4' + testImplementation project(':airbyte-integrations:bases:standard-destination-test') + testImplementation 'org.apache.commons:commons-lang3:3.11' integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java index e0f6dd4c88c0..be9d14d28eab 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java @@ -44,13 +44,11 @@ public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsu private final Consumer outputRecordCollector; private final Map streamNameAndNamespaceToWriters; - private AirbyteMessage lastStateMessage = null; - public AzureBlobStorageConsumer( - final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig, - final ConfiguredAirbyteCatalog configuredCatalog, - final AzureBlobStorageWriterFactory writerFactory, - final Consumer outputRecordCollector) { + final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig, + final ConfiguredAirbyteCatalog configuredCatalog, + final AzureBlobStorageWriterFactory writerFactory, + final Consumer outputRecordCollector) { this.azureBlobStorageDestinationConfig = azureBlobStorageDestinationConfig; this.configuredCatalog = configuredCatalog; this.writerFactory = writerFactory; @@ -93,8 +91,8 @@ protected void startTracked() throws Exception { } private void createContainers(final SpecializedBlobClientBuilder specializedBlobClientBuilder, - final AppendBlobClient appendBlobClient, - final ConfiguredAirbyteStream configuredStream) { + final AppendBlobClient appendBlobClient, + final ConfiguredAirbyteStream configuredStream) { // create container if absent (aka SQl Schema) final BlobContainerClient containerClient = appendBlobClient.getContainerClient(); if (!containerClient.exists()) { @@ -103,7 +101,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob if (DestinationSyncMode.OVERWRITE.equals(configuredStream.getDestinationSyncMode())) { LOGGER.info("Sync mode is selected to OVERRIDE mode. New container will be automatically" + " created or all data would be overridden (if any) for stream:" + configuredStream - .getStream().getName()); + .getStream().getName()); var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false) .collect(Collectors.toList()); blobItemList.forEach(blob -> { diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java new file mode 100644 index 000000000000..100e8954b40b --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java @@ -0,0 +1,47 @@ +package io.airbyte.integrations.destination.azure_blob_storage; + +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriterFactory; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@DisplayName("AzureBlobRecordConsumer") +@ExtendWith(MockitoExtension.class) +public class AzureBlobRecordConsumerTest extends PerStreamStateMessageTest { + @Mock + private Consumer outputRecordCollector; + + @InjectMocks + private AzureBlobStorageConsumer consumer; + + @Mock + private AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig; + + @Mock + private ConfiguredAirbyteCatalog configuredCatalog; + + @Mock + private AzureBlobStorageWriterFactory writerFactory; + + @BeforeEach + public void init() { + consumer = new AzureBlobStorageConsumer(azureBlobStorageDestinationConfig, configuredCatalog, writerFactory, outputRecordCollector); + } + + @Override + protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override + protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return consumer; + } +} From 38f8b3d9a98d8ac666394939c60376afbcc8f6c3 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Mon, 8 Aug 2022 03:35:04 +0300 Subject: [PATCH 3/4] 15302: Unit test added --- .../azure_blob_storage/AzureBlobRecordConsumerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java index 100e8954b40b..81497d8a7f9b 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java @@ -18,7 +18,6 @@ public class AzureBlobRecordConsumerTest extends PerStreamStateMessageTest { @Mock private Consumer outputRecordCollector; - @InjectMocks private AzureBlobStorageConsumer consumer; @Mock From 97c7a3eac5e6a52acfc18bdc9906ed8dfa6010f4 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Mon, 8 Aug 2022 15:01:25 +0300 Subject: [PATCH 4/4] 15318: test fix --- .../azure_blob_storage/AzureBlobRecordConsumerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java index 81497d8a7f9b..bd3c0972bcda 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java @@ -2,6 +2,7 @@ import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriterFactory; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.function.Consumer;