-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ Source MongoDB Internal POC: CDC State Handling #29763
✨ Source MongoDB Internal POC: CDC State Handling #29763
Conversation
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
source-mssql test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mssql docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mssql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mssql test
Coverage report for source-postgres
|
source-mysql test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
source-postgres test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-postgres docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-postgres/metadata.yaml | ✅ |
Connector version semver check | ❌ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-postgres test
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
source-mysql test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-postgres test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-postgres docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-postgres/metadata.yaml | ✅ |
Connector version semver check | ❌ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-postgres test
source-mssql test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mssql docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mssql/metadata.yaml | ✅ |
Connector version semver check | ❌ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mssql test
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
source-mssql test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mssql docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mssql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mssql test
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
source-mysql test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-postgres test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-postgres docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-postgres/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-postgres test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand what's going on here. I'm going to read it again tomorrow morning, but feel free to merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few questions/concerns :
- Need to understand if swallowing exception in the
hasNext
method ofMongoDbStateIterator
is the right thing to do? - How would we identify that initial sync for a given stream is complete. I dont see any special state emitted for streams that complete initial sync to denote now we just need to process the change streams for these streams. Which leads to the next question is that I dont see any specific logic to identify which streams only need to go through the initial sync. Right now all the streams present in the catalog go through the intial sync all the time.
- Other DBs emit state at a frequency of every 15 minutes or once more than 10K records are emiited. May be we could do the same thing?
if (!stateMessages.isEmpty()) { | ||
if (stateMessages.size() == 1) { | ||
final AirbyteStateMessage stateMessage = stateMessages.get(0); | ||
stateManager.updateCdcState(Jsons.object(stateMessage.getGlobal().getSharedState(), MongoDbCdcState.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the CDC state supposed to be the debezium state? How will it be de-serialised to MongoDbCdcState
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MongoDbCdcState
holds the data extracted from the Debezium offset map. There is code to handle the translation to/from the offset map and MongoDbCdcState
that is not in this PR, but will be in the PR that integrates all the bits together.
...-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIterator.java
Outdated
Show resolved
Hide resolved
@subodh1810 I think the answer to #1 is that it is the correct thing to do, but I will defer to @colesnodgrass as he wrote that code. As for #2 and #3, those are out of scope of this PR. The intent here is to just set up the initial structure. We will need to handle those two cases when we integrate this with the iterator, which itself will probably need to move/change to be part of the code that sets up the iterators for the CDC sync given the state. |
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
source-mysql test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
source-postgres test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-postgres docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-postgres/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-postgres test
source-mssql test report (commit
|
Step | Result |
---|
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mssql test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Again lets make sure that the tests for all the other CDC connectors are green.
Also as discussed over chat I pushed a commit to disable the following methods
@Override
public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() {
throw new RuntimeException("Debezium is not used to carry out the snapshot of tables.");
}
@Override
public boolean isCdcCheckpointEnabled() {
return false;
}
The method saveStateAfterCompletionOfSnapshotOfNewStreams
is not required cause we wont be using Debezium for initial snapshot of streams.
The method isCdcCheckpointEnabled
requires the following methods in CdcTargetPosition
to be implemented as well. So an item on the TODO list to implement these methods in the future PR and then we can set the isCdcCheckpointEnabled
to true.
default boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
return false;
}
and
default boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
return true;
}
import java.util.concurrent.atomic.AtomicLong; | ||
import org.bson.BsonTimestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required right?
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ❌ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
source-mssql test report (commit
|
Step | Result |
---|
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mssql test
source-mysql test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-postgres test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-postgres docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-postgres/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-postgres test
source-mongodb-internal-poc test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ❌ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
What
How
CdcStateHandler
interface so that state is updated as part of the CDC incremental syncRecommended reading order
MongoDbStateManager.java
MongoDbCdcStateHandler.java
MongoDbCdcState.java
This PR does not completely hook in the state management for CDC. That will come in a later PR when we integrate all the CDC bits together to actually facilitate a CDC-based sync. For now, I confirmed that all unit tests validation state pass, as well as acceptance tests that perform incremental syncs and validate state messages pass. I also put some TODO's in place where we need to come back and fix once we integrate this.
I opted to not depend on the relational DB package where the current state management code lives. That code is very coupled to relational database concepts, which do not really apply to MongoDB.
Finally, this PR is based off of #29730 (you can ignore the metadata injection changes -- I needed some changes from that PR to implement this code) and supersedes #29678, which will be closed in favor of this PR.