Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Redshift: (Performance) Add parallel loading to Redshift via manifest file #6583

Merged
merged 5 commits into from
Oct 13, 2021

Conversation

archaean
Copy link
Contributor

@archaean archaean commented Sep 30, 2021

What

Issue: 4871
Add Parallelization for copying multiple files in Redshift.

How

  • Leverage the existing change bucketing batches into different files in s3
  • Writing code to build a manifest of all the files to load to S3.
  • Change copy command to use manifest to load data in parallel

Recommended reading order

  1. RedshiftStreamCopier.java
  2. Manifest.java
  3. Entry.java
  4. S3StreamCopier.java

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions
  • Connector added to connector index like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions
  • Connector version bumped like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Connector Generator

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed.

@CLAassistant
Copy link

CLAassistant commented Sep 30, 2021

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added the area/connectors Connector related issues label Sep 30, 2021
@archaean archaean changed the title 4871/add redshift manifest Redshift Destination Connector performance improvement parallel loading via manifest file Sep 30, 2021
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.version=0.3.6
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if I needed to add this since the changes here are passive to this connector.

Comment on lines +47 to +59
public void copyStagingFileToTemporaryTable() {
var possibleManifest = Optional.ofNullable(createManifest());
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName);
possibleManifest.stream()
.map(this::putManifest)
.forEach(this::executeCopy);
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
}

@Override
public void copyS3CsvFileIntoTable(JdbcDatabase database, String s3FileLocation, String schema, String tableName, S3Config s3Config)
throws SQLException {
public void copyS3CsvFileIntoTable(JdbcDatabase database, String s3FileLocation, String schema, String tableName, S3Config s3Config) {
throw new RuntimeException("Redshift Stream Copier should not copy individual files without use of a manifest");
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this breaks the initial abstraction, but it was necessary and I wanted to make this change more surgical rather than structural to all related s3 connectors.

@archaean
Copy link
Contributor Author

archaean commented Sep 30, 2021

$ ./gradlew airbyte-integrations:connectors:destination-redshift:integrationTest
...
> Task :airbyte-integrations:connectors:destination-redshift:integrationTestJava

RedshiftDestination > When not given S3 credentials should use INSERT PASSED

RedshiftDestination > When given S3 credentials should use COPY PASSED

RedshiftInsertDestinationAcceptanceTest > testEntrypointEnvVar() PASSED

RedshiftInsertDestinationAcceptanceTest > testCheckConnectionInvalidCredentials() PASSED

RedshiftInsertDestinationAcceptanceTest > testSyncVeryBigRecords() PASSED

RedshiftInsertDestinationAcceptanceTest > [1] exchange_rate_messages.txt, exchange_rate_catalog.json PASSED

RedshiftInsertDestinationAcceptanceTest > [2] edge_case_messages.txt, edge_case_catalog.json PASSED

RedshiftInsertDestinationAcceptanceTest > [1] exchange_rate_messages.txt, exchange_rate_catalog.json PASSED

RedshiftInsertDestinationAcceptanceTest > [2] edge_case_messages.txt, edge_case_catalog.json PASSED

RedshiftInsertDestinationAcceptanceTest > testIncrementalDedupeSync() PASSED

RedshiftInsertDestinationAcceptanceTest > testGetSpec() PASSED

RedshiftInsertDestinationAcceptanceTest > testSecondSync() PASSED

RedshiftInsertDestinationAcceptanceTest > testCustomDbtTransformationsFailure() PASSED

RedshiftInsertDestinationAcceptanceTest > testIncrementalSync() PASSED

RedshiftInsertDestinationAcceptanceTest > testCheckConnection() PASSED

RedshiftInsertDestinationAcceptanceTest > specDBTValueShouldBeCorrect() PASSED

RedshiftInsertDestinationAcceptanceTest > [1] exchange_rate_messages.txt, exchange_rate_catalog.json PASSED

RedshiftInsertDestinationAcceptanceTest > [2] edge_case_messages.txt, edge_case_catalog.json PASSED

RedshiftInsertDestinationAcceptanceTest > testSyncWriteSameTableNameDifferentNamespace() PASSED

RedshiftInsertDestinationAcceptanceTest > specNormalizationValueShouldBeCorrect() PASSED

RedshiftInsertDestinationAcceptanceTest > testLineBreakCharacters() PASSED

RedshiftInsertDestinationAcceptanceTest > testSyncUsesAirbyteStreamNamespaceIfNotNull() PASSED

RedshiftInsertDestinationAcceptanceTest > testCustomDbtTransformations() PASSED

RedshiftCopyDestinationAcceptanceTest > testEntrypointEnvVar() PASSED

RedshiftCopyDestinationAcceptanceTest > testCheckConnectionInvalidCredentials() PASSED

RedshiftCopyDestinationAcceptanceTest > testSyncVeryBigRecords() PASSED

RedshiftCopyDestinationAcceptanceTest > [1] exchange_rate_messages.txt, exchange_rate_catalog.json PASSED

RedshiftCopyDestinationAcceptanceTest > [2] edge_case_messages.txt, edge_case_catalog.json PASSED

RedshiftCopyDestinationAcceptanceTest > [1] exchange_rate_messages.txt, exchange_rate_catalog.json PASSED

RedshiftCopyDestinationAcceptanceTest > [2] edge_case_messages.txt, edge_case_catalog.json PASSED

RedshiftCopyDestinationAcceptanceTest > testIncrementalDedupeSync() PASSED

RedshiftCopyDestinationAcceptanceTest > testGetSpec() PASSED

RedshiftCopyDestinationAcceptanceTest > testSecondSync() PASSED

RedshiftCopyDestinationAcceptanceTest > testCustomDbtTransformationsFailure() PASSED

RedshiftCopyDestinationAcceptanceTest > testIncrementalSync() PASSED

RedshiftCopyDestinationAcceptanceTest > testCheckConnection() PASSED

RedshiftCopyDestinationAcceptanceTest > specDBTValueShouldBeCorrect() PASSED

RedshiftCopyDestinationAcceptanceTest > [1] exchange_rate_messages.txt, exchange_rate_catalog.json PASSED

RedshiftCopyDestinationAcceptanceTest > [2] edge_case_messages.txt, edge_case_catalog.json PASSED

RedshiftCopyDestinationAcceptanceTest > testSyncWriteSameTableNameDifferentNamespace() PASSED

RedshiftCopyDestinationAcceptanceTest > specNormalizationValueShouldBeCorrect() PASSED

RedshiftCopyDestinationAcceptanceTest > testLineBreakCharacters() PASSED

RedshiftCopyDestinationAcceptanceTest > testSyncUsesAirbyteStreamNamespaceIfNotNull() PASSED

RedshiftCopyDestinationAcceptanceTest > testCustomDbtTransformations() PASSED

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.7.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 32m 49s
101 actionable tasks: 22 executed, 79 up-to-date

private final ExtendedNameTransformer nameTransformer;
private final SqlOperations sqlOperations;
private final Set<String> s3StagingFiles = new HashSet<>();
protected final Set<String> s3StagingFiles = new HashSet<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was probably the only variable that definitely needed to be changed to protected for child class access, but I changed the others because seemed cleaner than defining them all again in: RedshiftStreamCopier.java

@archaean archaean changed the title Redshift Destination Connector performance improvement parallel loading via manifest file Destination Redshift: (Performance) Add parallel loading to Redshift via manifest file Oct 1, 2021
@marcosmarxm
Copy link
Member

Thanks @archaean we're going to review and give you feedback about your implementation. Did you deploy as dev and tests with any dataset you have?

tableName,
s3FileLocation,
+ "STATUPDATE OFF\n"
+ "MANIFEST;",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @archaean. I only have one question. I deliberately split one temporary file on S3 or GCS for every stream into several smaller files to avoid a timeout exception in a copy query from a file to a temp table. This happened when the work was done on a large amount of data. Are you sure using the manifest won't get us back to this issue?

Copy link
Contributor Author

@archaean archaean Oct 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andriikorotkov , I guess there are edge cases where it could, but the data would likely need to be an order of magnitude larger than whatever caused the timeout in your case given the parallelism.

I don't know exactly what timeout exception you are referring to, but if it from the Redshift connection, it is likely ERROR: Query (150) canceled on user's request which is generally caused when a query reaches the timeout limit in redshift. I assume the COPY command is also affected by this query limitation, but this timeout is configurable.

Other timeouts are possible for long running COPY commands due to firewall configurations.

One possible implementation would be to further batch the files to arbitrary number (hopefully close to the number of shards on the redshift cluster) and make a manifest file for each batch and do serial copy commands that way.

So to answer your question, it won't be the same issue because the copy command should process an order of magnitude faster than serial copy commands depending on the configuration of the Redshift cluster. I think the query timeout limit can be configured to avoid this issue. And while batching the files into multiple manifest and copy commands is an option, at this point it seems to me like a premature optimization.

@archaean
Copy link
Contributor Author

archaean commented Oct 1, 2021

Thanks @archaean we're going to review and give you feedback about your implementation. Did you deploy as dev and tests with any dataset you have?

@marcosmarxm I have not tested it with any datasets locally, I just ran the acceptance tests. I plan on running it against a medium size table today and benchmarking against the old implementation.

@archaean
Copy link
Contributor Author

archaean commented Oct 1, 2021

I started some small load tests with 1M row table and began by benchmarking the Redshift Destination: (0.3.14):
It ran fine with (790.41 MB | 1,063,022 records | 8m 23s | Sync)
and 47 second copying to redshift (log below)

2021-10-01 19:22:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-01 19:22:00 INFO i.a.i.d.j.c.s.S3StreamCopier(copyStagingFileToTemporaryTable):189 - {} - Starting copy to tmp table: _airbyte_tmp_bfo_capfin_invoice in destination for stream: capfin_invoice, schema: airbyte_dev_test, .
2021-10-01 19:22:47 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-01 19:22:47 INFO i.a.i.d.j.c.s.S3StreamCopier(copyStagingFileToTemporaryTable):191 - {} - Copy to tmp table _airbyte_tmp_bfo_capfin_invoice in destination for stream capfin_invoice complete.

However, against master though (not even my code) it kept failing after about 200K records (two separate failed logs below):

(master)$ git log
commit 153c003e4a32014f8701c82336ed84452df8e7bb (HEAD -> master, origin/master, origin/HEAD, airbyte/master)
2021-10-01 20:29:32 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-01 20:29:32 INFO i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):323 - {} - Reading stream capfin_invoice. Records read: 200000
2021-10-01 20:29:32 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 200000
2021-10-01 20:29:32 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-01 20:29:32 INFO i.a.i.d.j.c.s.S3StreamCopier(prepareStagingFile):98 - {} - S3 upload part size: 10 MB
2021-10-01 20:29:33 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-01 20:29:33 INFO a.m.s.StreamTransferManager(getMultiPartOutputStreams):329 - {} - Initiated multipart upload to c2fo-atlas-rs-temp/airbyte-dev-test/11924164-db11-499c-aa22-03ff202e2acd/airbyte_dev_test/ezg_ycj_capfin_invoice with full ID tIktl57OH8T_Wo6sAOG_v6._UZk.00wYs9RrAD8bAwwBVRXD1oCsOQD0BqQQIPe62oMRsMbg5FVR5q312IrEC.oFJ2gydQgWfgngVoqKOj9VJDR2kVseT8nXi05fW7gh
2021-10-01 20:29:33 ERROR () LineGobbler(voidCall):65 - /airbyte/javabase.sh: line 12:    11 Killed                  /airbyte/bin/"$APPLICATION" "$@"
2021-10-01 20:17:29 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-01 20:17:29 INFO i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):323 - {} - Reading stream capfin_invoice. Records read: 220000
2021-10-01 20:17:29 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 220000
2021-10-01 20:17:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-01 20:17:30 INFO i.a.i.d.j.c.s.S3StreamCopier(prepareStagingFile):98 - {} - S3 upload part size: 10 MB
2021-10-01 20:17:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-01 20:17:30 INFO a.m.s.StreamTransferManager(getMultiPartOutputStreams):329 - {} - Initiated multipart upload to c2fo-atlas-rs-temp/airbyte-dev-test/2f456b2e-09ff-4851-a5fa-ffe501668b15/airbyte_dev_test/ymt_ale_capfin_invoice with full ID MQ.0wjQBOrtESDK_oIN1LZXSgoU65sujnj_I1xlvlu8LbgT0JI6U_vrz_YguQASvpvAEWnq1SiBNOJscgF6a9OuGgEotQeZoIKS01Tdm.saczTluAD4eG575SjD2TdNI
2021-10-01 20:17:30 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 221000
2021-10-01 20:17:30 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 222000
2021-10-01 20:17:30 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 223000
2021-10-01 20:17:31 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 224000
2021-10-01 20:17:31 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 225000
2021-10-01 20:17:32 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 226000
2021-10-01 20:17:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 227000
2021-10-01 20:17:33 ERROR () LineGobbler(voidCall):65 - /airbyte/javabase.sh: line 12:    10 Killed                  /airbyte/bin/"$APPLICATION" "$@"
2021-10-01 20:22:31 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$2):179 - Running sync worker cancellation...

I didn't have time to dig in more... But this seems like a blocking issue.

@archaean
Copy link
Contributor Author

archaean commented Oct 4, 2021

@andriikorotkov @marcosmarxm , I am not going to have much time to debug this issue this week on my end. I am curious if this problem can be recreated on master with medium sized dataset on your end. I am pretty sure my code has nothing to do with it (per my tests of dev on master) and is maybe related to the recent multi-file S3 changes.

@marcosmarxm
Copy link
Member

Hello! @archaean and @andriikorotkov the tests are passing in our CI. Should we merge this or wait to correct and improve tests?

@andriikorotkov
Copy link
Contributor

@marcosmarxm, I think we can merge this pull request.

@marcosmarxm marcosmarxm merged commit 22e21b7 into airbytehq:master Oct 13, 2021
schlattk pushed a commit to schlattk/airbyte that referenced this pull request Jan 4, 2022
…via manifest file (airbytehq#6583)

* 4871 - Modified RedshiftStreamCopier to use manifest and a single copy command
 - rather than a series of copy commands per AWS Redshift best practices

* 4871 - Add manifest keyword to COPY Statement

* 4871 - added fault tolerance for empty entries in the manifest
 - Also made sure to clean up the manifest file on cleanup

* 4871 - Fixed erroneous documentation in RedshiftStreamCopier

* 4781 - Update outdated `RedshiftCopyS3Destination` documentation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants