Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15302: Destination Azure Blob Storage: Handle per-stream state #15318

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ dependencies {
implementation 'com.azure:azure-storage-blob:12.12.0'
implementation 'org.apache.commons:commons-csv:1.4'

testImplementation project(':airbyte-integrations:bases:standard-destination-test')

testImplementation 'org.apache.commons:commons-lang3:3.11'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsu
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, AzureBlobStorageWriter> streamNameAndNamespaceToWriters;

private AirbyteMessage lastStateMessage = null;

public AzureBlobStorageConsumer(
final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig,
final ConfiguredAirbyteCatalog configuredCatalog,
final AzureBlobStorageWriterFactory writerFactory,
final Consumer<AirbyteMessage> outputRecordCollector) {
final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig,
final ConfiguredAirbyteCatalog configuredCatalog,
final AzureBlobStorageWriterFactory writerFactory,
final Consumer<AirbyteMessage> outputRecordCollector) {
this.azureBlobStorageDestinationConfig = azureBlobStorageDestinationConfig;
this.configuredCatalog = configuredCatalog;
this.writerFactory = writerFactory;
Expand Down Expand Up @@ -93,8 +91,8 @@ protected void startTracked() throws Exception {
}

private void createContainers(final SpecializedBlobClientBuilder specializedBlobClientBuilder,
final AppendBlobClient appendBlobClient,
final ConfiguredAirbyteStream configuredStream) {
final AppendBlobClient appendBlobClient,
final ConfiguredAirbyteStream configuredStream) {
// create container if absent (aka SQl Schema)
final BlobContainerClient containerClient = appendBlobClient.getContainerClient();
if (!containerClient.exists()) {
Expand All @@ -103,7 +101,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob
if (DestinationSyncMode.OVERWRITE.equals(configuredStream.getDestinationSyncMode())) {
LOGGER.info("Sync mode is selected to OVERRIDE mode. New container will be automatically"
+ " created or all data would be overridden (if any) for stream:" + configuredStream
.getStream().getName());
.getStream().getName());
var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false)
.collect(Collectors.toList());
blobItemList.forEach(blob -> {
Expand All @@ -121,7 +119,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob
@Override
protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Exception {
if (airbyteMessage.getType() == Type.STATE) {
this.lastStateMessage = airbyteMessage;
outputRecordCollector.accept(airbyteMessage);
return;
} else if (airbyteMessage.getType() != Type.RECORD) {
return;
Expand Down Expand Up @@ -154,10 +152,6 @@ protected void close(final boolean hasFailed) throws Exception {
for (final AzureBlobStorageWriter handler : streamNameAndNamespaceToWriters.values()) {
handler.close(hasFailed);
}

if (!hasFailed) {
outputRecordCollector.accept(lastStateMessage);
}
}

private static String getOutputFilename(final Timestamp timestamp) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.airbyte.integrations.destination.azure_blob_storage;

import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriterFactory;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@DisplayName("AzureBlobRecordConsumer")
@ExtendWith(MockitoExtension.class)
public class AzureBlobRecordConsumerTest extends PerStreamStateMessageTest {
kimerinn marked this conversation as resolved.
Show resolved Hide resolved
@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

private AzureBlobStorageConsumer consumer;

@Mock
private AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig;

@Mock
private ConfiguredAirbyteCatalog configuredCatalog;

@Mock
private AzureBlobStorageWriterFactory writerFactory;

@BeforeEach
public void init() {
consumer = new AzureBlobStorageConsumer(azureBlobStorageDestinationConfig, configuredCatalog, writerFactory, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}
}