-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Source-Postgres] : Implement WASS algorithm #41649
Conversation
…airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java Co-authored-by: Evan Tahler <evan@airbyte.io>
airbyte-integrations/connectors/source-postgres/acceptance-test-config.yml
Outdated
Show resolved
Hide resolved
final var propertiesManager = new RelationalDbDebeziumPropertiesManager( | ||
PostgresCdcProperties.getDebeziumDefaultProperties(database), sourceConfig, catalog, cdcStreamList); | ||
final var eventConverter = new RelationalDbDebeziumEventConverter(new PostgresCdcConnectorMetadataInjector(), emittedAt); | ||
// Debezium is started for streams that have been started - that is they have been partially or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit and optional: can you also note this is for incremental streams? (better yet I think you can put following logic into a separate function so it's pure wass operation (incremental streams only) so it doesn't confuse readers with full refresh)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added the comment. However, we need to figure out the started cdc stream list here so that we can figure out in line 288 which of the scenarios the sync falls under :
- First sync (no prev state) : in which case we do initial sync + CDC
- Purely incremental sync
- WASS case
...gres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java
Show resolved
Hide resolved
...gres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java
Outdated
Show resolved
Hide resolved
...es/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java
Outdated
Show resolved
Hide resolved
...gres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thanks for the changes!
Port of #38240 for Postgres