Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT_PR_FOR_TESTING_IGNORE_IT] 5980 azure blob buffering #9393

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "b4c5d105-31fd-4817-96b6-cb923bfc04cb",
"name": "Azure Blob Storage",
"dockerRepository": "airbyte/destination-azure-blob-storage",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/azureblobstorage",
"icon": "azureblobstorage.svg"
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
- name: Azure Blob Storage
destinationDefinitionId: b4c5d105-31fd-4817-96b6-cb923bfc04cb
dockerRepository: airbyte/destination-azure-blob-storage
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/azureblobstorage
icon: azureblobstorage.svg
- name: Amazon SQS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-azure-blob-storage

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/destination-azure-blob-storage
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ public class AzureBlobStorageDestinationConfig {
private final String accountName;
private final String accountKey;
private final String containerName;
private final int outputStreamBufferSize;
private final AzureBlobStorageFormatConfig formatConfig;

public AzureBlobStorageDestinationConfig(
final String endpointUrl,
final String accountName,
final String accountKey,
final String containerName,
final int outputStreamBufferSize,
final AzureBlobStorageFormatConfig formatConfig) {
this.endpointUrl = endpointUrl;
this.accountName = accountName;
this.accountKey = accountKey;
this.containerName = containerName;
this.outputStreamBufferSize = outputStreamBufferSize;
this.formatConfig = formatConfig;
}

Expand All @@ -50,12 +53,22 @@ public AzureBlobStorageFormatConfig getFormatConfig() {
return formatConfig;
}

public int getOutputStreamBufferSize() {
// Convert from MB to Bytes
return outputStreamBufferSize * 1024 * 1024;
}

public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final JsonNode config) {
final String accountNameFomConfig = config.get("azure_blob_storage_account_name").asText();
final String accountKeyFromConfig = config.get("azure_blob_storage_account_key").asText();
final JsonNode endpointFromConfig = config
.get("azure_blob_storage_endpoint_domain_name");
final JsonNode containerName = config.get("azure_blob_storage_container_name");
final int outputStreamBufferSizeFromConfig =
config.get("azure_blob_storage_output_buffer_size") != null
? config.get("azure_blob_storage_output_buffer_size").asInt(DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE)
: DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE;

final JsonNode blobName = config.get("azure_blob_storage_blob_name"); // streamId

final String endpointComputed = String.format(Locale.ROOT, DEFAULT_STORAGE_ENDPOINT_FORMAT,
Expand All @@ -72,6 +85,7 @@ public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final
accountNameFomConfig,
accountKeyFromConfig,
containerNameComputed,
outputStreamBufferSizeFromConfig,
AzureBlobStorageFormatConfigs.getAzureBlobStorageFormatConfig(config));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public final class AzureBlobStorageDestinationConstants {
public static final String DEFAULT_STORAGE_ENDPOINT_HTTP_PROTOCOL = "https";
public static final String DEFAULT_STORAGE_ENDPOINT_DOMAIN_NAME = "blob.core.windows.net";
public static final String DEFAULT_STORAGE_ENDPOINT_FORMAT = "%s://%s.%s";
public static final int DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE = 1024 * 1024 * 100; // 100MB

private AzureBlobStorageDestinationConstants() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
Expand All @@ -28,7 +29,7 @@ public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implem

private final CsvSheetGenerator csvSheetGenerator;
private final CSVPrinter csvPrinter;
private final BlobOutputStream blobOutputStream;
private final BufferedOutputStream blobOutputStream;

public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config,
final AppendBlobClient appendBlobClient,
Expand All @@ -44,17 +45,17 @@ public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config,
.create(configuredStream.getStream().getJsonSchema(),
formatConfig);

this.blobOutputStream = appendBlobClient.getBlobOutputStream();
this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize());

if (isNewlyCreatedBlob) {
this.csvPrinter = new CSVPrinter(
new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8),
new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8),
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)
.withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0])));
} else {
// no header required for append
this.csvPrinter = new CSVPrinter(
new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8),
new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8),
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
Expand All @@ -32,7 +33,7 @@ public class AzureBlobStorageJsonlWriter extends BaseAzureBlobStorageWriter impl
private static final ObjectMapper MAPPER = MoreMappers.initMapper();
private static final ObjectWriter WRITER = MAPPER.writer();

private final BlobOutputStream blobOutputStream;
private final BufferedOutputStream blobOutputStream;
private final PrintWriter printWriter;

public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig config,
Expand All @@ -41,8 +42,8 @@ public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig confi
final boolean isNewlyCreatedBlob) {
super(config, appendBlobClient, configuredStream);
// at this moment we already receive appendBlobClient initialized
this.blobOutputStream = appendBlobClient.getBlobOutputStream();
this.printWriter = new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8);
this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize());
this.printWriter = new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@
"examples": ["airbyte5storage"]
},
"azure_blob_storage_account_key": {
"title": "Azure Blob Storage account key",
"description": "The Azure blob storage account key.",
"airbyte_secret": true,
"type": "string",
"examples": [
"Z8ZkZpteggFx394vm+PJHnGTvdRncaYS+JhLKdj789YNmD+iyGTnG+PV+POiuYNhBg/ACS+LKjd%4FG3FHGN12Nd=="
]
},
"azure_blob_storage_output_buffer_size": {
"title": "Azure Blob Storage output buffer size",
"type": "integer",
"description": "The amount of megabytes to buffer for the output stream to Azure. This will impact memory footprint on workers, but may need adjustment for performance and appropriate block size in Azure.",
"minimum": 1,
"maximum": 2047,
"default": 5,
"examples": [5]
},
"format": {
"title": "Output Format",
"type": "object",
Expand Down
3 changes: 3 additions & 0 deletions docs/integrations/destinations/azureblobstorage.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The Airbyte Azure Blob Storage destination allows you to sync data to Azure Blob
| Azure blob storage container \(Bucket\) Name | string | A name of the Azure blob storage container. If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp. |
| Azure Blob Storage account name | string | The account's name of the Azure Blob Storage. |
| The Azure blob storage account key | string | Azure blob storage account key. Example: `abcdefghijklmnopqrstuvwxyz/0123456789+ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789%++sampleKey==`. |
| Azure Blob Storage output buffer size | integer | Azure Blob Storage output buffer size, in megabytes. Example: 5 |
| Format | object | Format specific configuration. See below for details. |

⚠️ Please note that under "Full Refresh Sync" mode, data in the configured blob will be wiped out before each sync. We recommend you to provision a dedicated Azure Blob Storage Container resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️
Expand Down Expand Up @@ -136,5 +137,7 @@ They will be like this in the output file:

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.1 | 2021-12-29 | [\#5332](https://github.com/airbytehq/airbyte/pull/9190) | Added BufferedOutputStream wrapper to blob output stream to improve performance and fix issues with 50,000 block limit. Also disabled autoflush on PrintWriter. |
| 0.1.0 | 2021-08-30 | [\#5332](https://github.com/airbytehq/airbyte/pull/5332) | Initial release with JSONL and CSV output. |