Skip to content

Commit

Permalink
✨Switch redshift staging to async mode (#28619)
Browse files Browse the repository at this point in the history
* Async snowflake

* Use async in destination implenentation

* Format

* Switch redshif to asyn mode

* Remove old unused consumer creation

* Add new version

* Fix non staging mode

* Change switcing to use the get serialized consumer

* Automated Commit - Format and Process Resources Changes

* Test

* Automated Commit - Format and Process Resources Changes

* Use method

* Test smaller buffer

* Test smaller buffer for redshift

* Automated Commit - Format and Process Resources Changes

* Bigger ratio

* Remove snowflake changes

* Implement the new interface

* Automated Commit - Format and Process Resources Changes

* push ratio to 0.8

* Smaller Optimal buffer size

* Automated Commit - Format and Process Resources Changes

* Bigger buffer

* Use a buffer of 10 Mb

* Use a buffer of 75 Mb

* Test reduce lib thread

* Add flags for remote profiler.

* Part size to match the async part size

* Part size to 100 Mb

* restore default

* Try with 1 thread

* Go back to default

* Clean up

* Bump version

* Restore gradle

* Re-add vm capture

* Test reduce allowed buffer size

* Use all the memory available

* only 3 threads for the lib

* Automated Commit - Format and Process Resources Changes

* test with 1

* Automated Commit - Format and Process Resources Changes

* Add local log ling.

* Do not use all RAM for heap.

* Fix build

* Clean up

* Clean up

* Update airbyte-integrations/bases/bases-destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/AsyncFlush.java

Co-authored-by: Davin Chia <davinchia@gmail.com>

* Automated Commit - Format and Process Resources Changes

---------

Co-authored-by: Davin Chia <davinchia@gmail.com>
Co-authored-by: benmoriceau <benmoriceau@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 14, 2023
1 parent 4aff2b2 commit 8d19017
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,36 @@ class AsyncFlush implements DestinationFlushFunction {
private final ConfiguredAirbyteCatalog catalog;
private final TypeAndDedupeOperationValve typerDeduperValve;
private final TyperDeduper typerDeduper;
private final long optimalBatchSizeBytes;

public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
final StagingOperations stagingOperations,
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper) {
this(streamDescToWriteConfig, stagingOperations, database, catalog, typerDeduperValve, typerDeduper, 50 * 1024 * 1024);
}

public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
final StagingOperations stagingOperations,
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
// In general, this size is chosen to improve the performance of lower memory connectors. With 1 Gi
// of
// resource the connector will usually at most fill up around 150 MB in a single queue. By lowering
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
// freed earlier similar to a sliding window effect
long optimalBatchSizeBytes) {
this.streamDescToWriteConfig = streamDescToWriteConfig;
this.stagingOperations = stagingOperations;
this.database = database;
this.catalog = catalog;
this.typerDeduperValve = typerDeduperValve;
this.typerDeduper = typerDeduper;
this.optimalBatchSizeBytes = optimalBatchSizeBytes;
}

@Override
Expand Down Expand Up @@ -110,12 +127,7 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag

@Override
public long getOptimalBatchSizeBytes() {
// todo(ryankfu): this should be per-destination specific. currently this is for Snowflake.
// The size chosen is currently for improving the performance of low memory connectors. With 1 Gi of
// resource the connector will usually at most fill up around 150 MB in a single queue. By lowering
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
// freed earlier similar to a sliding window effect
return 50 * 1024 * 1024;
return optimalBatchSizeBytes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.3
LABEL io.airbyte.version=0.6.4
LABEL io.airbyte.name=airbyte/destination-redshift

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.6.3
dockerImageTag: 0.6.4
dockerRepository: airbyte/destination-redshift
githubIssueLabel: destination-redshift
icon: redshift.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
Expand All @@ -46,6 +47,8 @@
import java.util.Map;
import java.util.function.Consumer;
import javax.sql.DataSource;

import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,15 +139,20 @@ public JsonNode toJdbcConfig(final JsonNode config) {
}

@Override
@Deprecated
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
throw new NotImplementedException("Should use the getSerializedMessageConsumer instead");
}

@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) throws Exception {
final EncryptionConfig encryptionConfig =
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
final JsonNode s3Options = findS3Options(config);
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options);

if (numberOfFileBuffers > FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER) {
LOGGER.warn("""
Increasing the number of file buffers past {} can lead to increased performance but
Expand All @@ -153,12 +161,11 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
""", FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size());
}

return new StagingConsumerFactory().create(
return new StagingConsumerFactory().createAsync(
outputRecordCollector,
getDatabase(getDataSource(config)),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX, numberOfFileBuffers)),
config,
catalog,
isPurgeStagingData(s3Options),
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c
## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.6.4 | 2023-08-10 | [\#28619](https://github.com/airbytehq/airbyte/pull/28619) | Use async method for staging |
| 0.6.3 | 2023-08-07 | [\#29188](https://github.com/airbytehq/airbyte/pull/29188) | Internal code refactoring |
| 0.6.2 | 2023-07-24 | [\#28618](https://github.com/airbytehq/airbyte/pull/28618) | Add hooks in preparation for destinations v2 implementation |
| 0.6.1 | 2023-07-14 | [\#28345](https://github.com/airbytehq/airbyte/pull/28345) | Increment patch to trigger a rebuild |
Expand All @@ -175,7 +176,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
| 0.3.55 | 2023-01-26 | [\#20631](https://github.com/airbytehq/airbyte/pull/20631) | Added support for destination checkpointing with staging |
| 0.3.54 | 2023-01-18 | [\#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions |
| 0.3.53 | 2023-01-03 | [\#17273](https://github.com/airbytehq/airbyte/pull/17273) | Flatten JSON arrays to fix maximum size check for SUPER field |
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) |
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) |
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
Expand Down

0 comments on commit 8d19017

Please sign in to comment.