-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Destination redshift: async standard inserts #32888
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
@@ -41,4 +42,12 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, | |||
return destination.getConsumer(config, catalog, outputRecordCollector); | |||
} | |||
|
|||
@Override | |||
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't used by redshift, just adding it since I'm in this part of the code anyway
catalog, | ||
sqlOperations::isValidData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
every implementation of this method is just return true
🤷 but deleting it looks like a nontrivial refactor so ignoring it for now. It's not used in the async framework at all, and we don't want this functionality to begin with - it's dropping "invalid" records, which we don't want to do.
final String schemaName, | ||
final String tableName) | ||
throws Exception { | ||
dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> adapter.adapt(airbyteRecordMessage.getData()))); | ||
dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only DataAdapter in existence is destination-postgres, because of https://airbytehq-team.slack.com/archives/C03C4AVJWG4/p1700605909300539 / #3476
so I don't feel bad about de+reserializing here
// 1-indexed | ||
statement.setString(i, uuidSupplier.get().toString()); | ||
statement.setString(i + 1, Jsons.serialize(message.getData())); | ||
statement.setTimestamp(i + 2, Timestamp.from(Instant.ofEpochMilli(message.getEmittedAt()))); | ||
statement.setString(i + 1, message.getSerialized()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that we can skip the Jsons.serialize call here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. pending CI tests and connector checklist.
Can you also paste the link of the workspace where you ran large sync for posterity.
added a link in the pr description. dest-redshift tests are actually succeeding, we're failing because of source-google-search-console autoformat >.> |
meh. I'll publish this tomorrow. Either someone else will merge master + fix the format, or I'll just approve-and-merge (just don't want to release this when I'm about to go on my commute) |
/publish-java-cdk
|
closes #32521
I ran a 20M record sync (3 streams of 10M + 10M + 100 records) here https://cloud.airbyte.com/workspaces/b61bc266-ef3c-460c-af2f-f70da4a2993c/connections/3a93c4ec-9b61-42f8-b6f2-4ec052129dfc/job-history#6303716::0; it ran successfully and in approximately the same time as previous syncs. (2hr40min vs older syncs ranging from 2:28 - 3:43)