Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Introduce StateIteratorProcessor in CDK #33312

Merged
merged 18 commits into from
Jan 3, 2024
Merged

Conversation

xiaohansong
Copy link
Contributor

@xiaohansong xiaohansong commented Dec 11, 2023

What

Add SourceStateIterator and SourceStateIteratorProcessor interface.
Add count into AirbyteStateMessage.

Idea is that all source connector will use the standard SourceStateIterator to emit state. SourceStateIterator will composite with SourceStateIteratorProcessor interface and inside of the interface each connector will have its own implementation to define:

  1. whether it has reached a checkpoint
  2. how to process a record message
  3. how to emit a checkpoint state message
  4. how to emit a final state message.

How

See doc: https://docs.google.com/document/d/14Qg_lXMzMvMO5oP0RYIe11JIwuCzR9QexcEaSiPJKEk/edit#heading=h.fa72b39y2m99 for details.

This PR demonstrate how do we refactor mysql connector.

We need to apply the same refactoring logic towards:

Postgres:

  • CtidStateIterator
  • XminStateIterator

MongoDB:

  • MongoDbStateIterator

Recommended reading order

  1. SourceStateIterator.java and SourceStateIteratorProcessor.java - these two are created in CDK
  2. MySqlInitialSyncStateIteratorProcessor.java - individual processor
  3. Note I've deleted the old MySqlInitialSyncStateIterator.java - logic have been moved to the previous classes.

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user?

For connector PRs, use this section to explain which type of semantic versioning bump occurs as a result of the changes. Refer to our Semantic Versioning for Connectors guidelines for more information. Breaking changes to connectors must be documented by an Airbyte engineer (PR author, or reviewer for community PRs) by using the Breaking Change Release Playbook.

If there are breaking changes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

Pre-merge Actions

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Connector version is set to 0.0.1
    • Dockerfile has version 0.0.1
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog with an entry for the initial version. See changelog example
    • docs/integrations/README.md

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Unit & integration tests added

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds then checking in your changes
  • Documentation which references the generator is updated as needed
Updating the Python CDK

Airbyter

Before merging:

  • Pull Request description explains what problem it is solving
  • Code change is unit tested
  • Build and my-py check pass
  • Smoke test the change on at least one affected connector
    • On Github: Run this workflow, passing --use-local-cdk --name=source-<connector> as options
    • Locally: airbyte-ci connectors --use-local-cdk --name=source-<connector> test
  • PR is reviewed and approved

After merging:

  • Publish the CDK
    • The CDK does not follow proper semantic versioning. Choose minor if this the change has significant user impact or is a breaking change. Choose patch otherwise.
    • Write a thoughtful changelog message so we know what was updated.
  • Merge the platform PR that was auto-created for updating the Connector Builder's CDK version
    • This step is optional if the change does not affect the connector builder or declarative connectors.

Copy link

vercel bot commented Dec 11, 2023

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jan 3, 2024 11:11pm

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit connectors/source/mysql labels Dec 11, 2023
Copy link
Contributor

github-actions bot commented Dec 11, 2023

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@xiaohansong xiaohansong changed the title Introduce StateIteratorProcessor in CDK ✨ Introduce StateIteratorProcessor in CDK Dec 11, 2023
@xiaohansong xiaohansong marked this pull request as ready for review December 11, 2023 22:00
@xiaohansong xiaohansong requested a review from a team as a code owner December 11, 2023 22:00
@stephane-airbyte
Copy link
Contributor

This is a weird one. Usually in java, an iterator is a class that allows one to iterate and run business logic. Your change seems to build a class that takes a processor and calls it iteratively behind the scene.
I really don't think the name is right.
I'm not convinced about the abstraction either, but I don't know enough to have a strong opinion about it

import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import java.time.Instant;

public interface SourceStateIteratorProcessor<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the implementation of this is MySqlInitialSyncStateIteratorProcessor - this seems to be a wrapper around StateManager? Perhaps we should rename this to SourceStateManager?

A more stretch goal is to see if there is a way to combine parts of this code and the various StateManagers since there is definitely some duplicate code (move certain fields to generics)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and we decide to keep this as is-

I didn’t make the stateManager change, because:

  • Abstracting all logic from various statemanager is difficult because each has different function signature to generate state message
  • Merging Processor logic into stateManager within the same connector is problematic; because then we would have to create an abstract class of the stateManager on top of global and PerStream so we don’t duplicate the Processor logic in it, and since we are creating another layer anyway we would prefer composition than inheritance.

Copy link
Contributor

github-actions bot commented Dec 29, 2023

Warning

🚨 Connector code freeze is in effect until 2024-01-02. This PR is changing connector code. Please contact the current OC engineers if you want to merge this change to master.

@xiaohansong
Copy link
Contributor Author

/publish-cdk-java

@xiaohansong
Copy link
Contributor Author

xiaohansong commented Jan 3, 2024

/publish-java-cdk

🕑 https://github.com/airbytehq/airbyte/actions/runs/7403247875
✅ Successfully published Java CDK version=0.10.3!

@octavia-squidington-iii octavia-squidington-iii added the area/documentation Improvements or additions to documentation label Jan 3, 2024
@xiaohansong xiaohansong merged commit 18e0e77 into master Jan 3, 2024
23 checks passed
@xiaohansong xiaohansong deleted the xiaohan/stateiterator branch January 3, 2024 23:29
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation CDK Connector Development Kit connectors/source/mysql
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants