-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CDK: Embedded reader utils #28873
CDK: Embedded reader utils #28873
Conversation
|
||
|
||
class CDKRunner(SourceRunner[TConfig]): | ||
def __init__(self, source: AbstractSource, name: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this have to be an AbstractSource / why not accept any Source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be any source, fixed this
|
||
|
||
def get_defined_id(stream: AirbyteStream, data: Dict[str, Any]) -> Optional[str]: | ||
import dpath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is dpath imported from the method instead of at the top of the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
…yte into flash1293/embedded-reader
output = self._handle_record(message.record, get_defined_id(stream, message.record.data)) | ||
if output: | ||
yield output | ||
elif message.type is Type.STATE and message.state: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it matters for embedded, but we generally need to output state messages to destination so they can checkpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it's just in-memory I think it's OK - if they want the guarantees from checkpointing, they can use hosted Airbyte (not trying to re-build the whole platform here :) )
""" | ||
pass | ||
|
||
def _load_data(self, stream_name: str, state: Optional[AirbyteStateMessage] = None) -> Iterable[TOutput]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method probably shouldn't be private since it's used by child classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understood the underscore as meaning protected, not private (https://jellis18.github.io/post/2022-01-15-access-modifiers-python/#:~:text=Private%20vs%20Protected,and%20not%20a%20private%20value.) - ChatGPT agrees with me as well ;) I'm going to leave it like this for now as you shouldn't call it on the integration from the outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting. I guess we don't have many private fields in our codebase 🤷
* relax pydantic dep * Automated Commit - Format and Process Resources Changes * wip * wrap up base integration * add init file * introduce CDK runner and improve error message * make state param optional * update protocol models * review comments * always run incremental if possible * fix --------- Co-authored-by: flash1293 <flash1293@users.noreply.github.com>
* Add everything for BQ but migrate, refactor interface after practical work * Make new default methods, refactor to single implemented method * MigrationInterface and BQ impl created * Trying to integrate with standard inserts * remove unnecessary NameAndNamespacePair class * Shimmed in * Java Docs * Initial Testing Setup * Tests! * Move Migrator into TyperDeduper * Functional Migration * Add Integration Test * Pr updates * bump version * bump version * version bump * Update to airbyte-ci-internal (#29026) * 🐛 Source Github, Instagram, Zendesk-support, Zendesk-talk: fix CAT tests fail on `spec` (#28910) * connectors-ci: better modified connectors detection logic (#28855) * connectors-ci: report path should always start with `airbyte-ci/` (#29030) * make report path always start with airbyte-ci * revert report path in orchestrator * add more test cases * bump version * Updated docs (#29019) * CDK: Embedded reader utils (#28873) * relax pydantic dep * Automated Commit - Format and Process Resources Changes * wip * wrap up base integration * add init file * introduce CDK runner and improve error message * make state param optional * update protocol models * review comments * always run incremental if possible * fix --------- Co-authored-by: flash1293 <flash1293@users.noreply.github.com> * 🤖 Bump minor version of Airbyte CDK * 🚨🚨 Low code CDK: Decouple SimpleRetriever and HttpStream (#28657) * fix tests * format * review comments * Automated Commit - Formatting Changes * review comments * review comments * review comments * log all messages * log all message * review comments * review comments * Automated Commit - Formatting Changes * add comment --------- Co-authored-by: flash1293 <flash1293@users.noreply.github.com> * 🤖 Bump minor version of Airbyte CDK * 🐛 Source Github, Instagram, Zendesk Support / Talk - revert `spec` changes and improve (#29031) * Source oauth0: new streams and fix incremental (#29001) * Add new streams Organizations,OrganizationMembers,OrganizationMemberRoles * relax schema definition to allow additional fields * Bump image tag version * revert some changes to the old schemas * Format python so gradle can pass * update incremental * remove unused print * fix unit test --------- Co-authored-by: Vasilis Gavriilidis <vasilis.gavriilidis@orfium.com> * 🐛 Source Mongo: Fix failing acceptance tests (#28816) * Fix failing acceptance tests * Fix failing strict acceptance tests * Source-Greenhouse: Fix unit tests for new CDK version (#28969) Fix unit tests * Add CSV options to the CSV parser (#28491) * remove invalid legacy option * remove unused option * the tests pass but this is quite messy * very slight clean up * Add skip options to csv format * fix some of the typing issues * fixme comment * remove extra log message * fix typing issues * skip before header * skip after header * format * add another test * Automated Commit - Formatting Changes * auto generate column names * delete dead code * update title and description * true and false values * Update the tests * Add comment * missing test * rename * update expected spec * move to method * Update comment * fix typo * remove unused import * Add a comment * None records do not pass the WaitForDiscoverPolicy * format * remove second branch to ensure we always go through the same processing * Raise an exception if the record is None * reset * Update tests * handle unquoted newlines * Automated Commit - Formatting Changes * Update test case so the quoting is explicit * Update comment * Automated Commit - Formatting Changes * Fail validation if skipping rows before header and header is autogenerated * always fail if a record cannot be parsed * format * set write line_no in error message * remove none check * Automated Commit - Formatting Changes * enable autogenerate test * remove duplicate test * missing unit tests * Update * remove branching * remove unused none check * Update tests * remove branching * format * extract to function * comment * missing type * type annotation * use set * Document that the strings are case-sensitive * public -> private * add unit test * newline --------- Co-authored-by: girarda <girarda@users.noreply.github.com> * Dagster: Add sentry logging (#28822) * Add sentry * add sentry decorator * Add traces * Use sentry trace * Improve duplicate logging * Add comments * DNC * Fix up issues * Move to scopes * Remove breadcrumb * Update lock * ✨Source Shortio: Migrate Python CDK to Low-code CDK (#28950) * Migrate Shortio to Low-Code * Update abnormal state * Format * Update Docs * Fix metadata.yaml * Add pagination * Add incremental sync * add incremental parameters * update metadata * rollback update version * release date --------- Co-authored-by: marcosmarxm <marcosmarxm@gmail.com> * Update to new verbiage (#29051) * [skip ci] Metadata: Remove leading underscore (#29024) * DNC * Add test models * Add model test * Remove underscore from metadata files * Regenerate models * Add test to check for key transformation * Allow additional fields on metadata * Delete transform * Proof of concept parallel source stream reading implementation for MySQL (#26580) * Proof of concept parallel source stream reading implementation for MySQL * Automated Change * Add read method that supports concurrent execution to Source interface * Remove parallel iterator * Ensure that executor service is stopped * Automated Commit - Format and Process Resources Changes * Expose method to fix compilation issue * Use concurrent map to avoid access issues * Automated Commit - Format and Process Resources Changes * Ensure concurrent streams finish before closing source * Fix compile issue * Formatting * Exclude concurrent stream threads from orphan thread watcher * Automated Commit - Format and Process Resources Changes * Refactor orphaned thread logic to account for concurrent execution * PR feedback * Implement readStreams in wrapper source * Automated Commit - Format and Process Resources Changes * Add readStream override * Automated Commit - Format and Process Resources Changes * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * Debug logging * Reduce logging level * Replace synchronized calls to System.out.println when concurrent * Close consumer * Flush before close * Automated Commit - Format and Process Resources Changes * Remove charset * Use ASCII and flush periodically for parallel streams * Test performance harness patch * Automated Commit - Format and Process Resources Changes * Cleanup * Logging to identify concurrent read enabled * Mark parameter as final --------- Co-authored-by: jdpgrailsdev <jdpgrailsdev@users.noreply.github.com> Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Co-authored-by: rodireich <rodireich@users.noreply.github.com> * connectors-ci: disable dependency scanning (#29033) * updates (#29059) * Metadata: skip breaking change validation on prerelease (#29017) * skip breaking change validation * Move ValidatorOpts higher in call * Add prerelease test * Fix test * ✨ Source MongoDB Internal POC: Generate Test Data (#29049) * Add script to generate test data * Fix prose * Update credentials example * PR feedback * Bump Airbyte version from 0.50.12 to 0.50.13 * Bump versions for mssql strict-encrypt (#28964) * Bump versions for mssql strict-encrypt * Fix failing test * Fix failing test * 🎨 Improve replication method selection UX (#28882) * update replication method in MySQL source * bump version * update expected specs * update registries * bump strict encrypt version * make password always_show * change url * update registries * 🐛 Avoid writing records to log (#29047) * Avoid writing records to log * Update version * Rollout ctid cdc (#28708) * source-postgres: enable ctid+cdc implementation * 100% ctid rollout for cdc * remove CtidFeatureFlags * fix CdcPostgresSourceAcceptanceTest * Bump versions and release notes * Fix compilation error due to previous merge --------- Co-authored-by: subodh <subodh1810@gmail.com> * connectors-ci: fix `unhashable type 'set'` (#29064) * Add Slack Alert lifecycle to Dagster for Metadata publish (#28759) * DNC * Add slack lifecycle logging * Update to use slack * Update slack to use resource and bot * Improve markdown * Improve log * Add sensor logging * Extend sensor time * merge conflict * PR Refactoring * Make the tests work * remove unnecessary classes, pr feedback * more merging * Update airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java Co-authored-by: Edward Gao <edward.gao@airbyte.io> * snowflake updates --------- Co-authored-by: Ben Church <ben@airbyte.io> Co-authored-by: Baz <oleksandr.bazarnov@globallogic.com> Co-authored-by: Augustin <augustin@airbyte.io> Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Co-authored-by: Joe Reuter <joe@airbyte.io> Co-authored-by: flash1293 <flash1293@users.noreply.github.com> Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com> Co-authored-by: Vasilis Gavriilidis <vasilis.gavriilidis@orfium.com> Co-authored-by: Jonathan Pearlin <jonathan@airbyte.io> Co-authored-by: Alexandre Girard <alexandre@airbyte.io> Co-authored-by: girarda <girarda@users.noreply.github.com> Co-authored-by: btkcodedev <btk.codedev@gmail.com> Co-authored-by: marcosmarxm <marcosmarxm@gmail.com> Co-authored-by: Natalie Kwong <38087517+nataliekwong@users.noreply.github.com> Co-authored-by: jdpgrailsdev <jdpgrailsdev@users.noreply.github.com> Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Co-authored-by: rodireich <rodireich@users.noreply.github.com> Co-authored-by: Alexandre Cuoci <Hesperide@users.noreply.github.com> Co-authored-by: terencecho <terencecho@users.noreply.github.com> Co-authored-by: Lake Mossman <lake@airbyte.io> Co-authored-by: Benoit Moriceau <benoit@airbyte.io> Co-authored-by: subodh <subodh1810@gmail.com> Co-authored-by: Edward Gao <edward.gao@airbyte.io>
* Add everything for BQ but migrate, refactor interface after practical work * Make new default methods, refactor to single implemented method * MigrationInterface and BQ impl created * Trying to integrate with standard inserts * remove unnecessary NameAndNamespacePair class * Shimmed in * Java Docs * Initial Testing Setup * Tests! * Move Migrator into TyperDeduper * Functional Migration * Add Integration Test * Pr updates * bump version * bump version * version bump * Update to airbyte-ci-internal (airbytehq#29026) * 🐛 Source Github, Instagram, Zendesk-support, Zendesk-talk: fix CAT tests fail on `spec` (airbytehq#28910) * connectors-ci: better modified connectors detection logic (airbytehq#28855) * connectors-ci: report path should always start with `airbyte-ci/` (airbytehq#29030) * make report path always start with airbyte-ci * revert report path in orchestrator * add more test cases * bump version * Updated docs (airbytehq#29019) * CDK: Embedded reader utils (airbytehq#28873) * relax pydantic dep * Automated Commit - Format and Process Resources Changes * wip * wrap up base integration * add init file * introduce CDK runner and improve error message * make state param optional * update protocol models * review comments * always run incremental if possible * fix --------- Co-authored-by: flash1293 <flash1293@users.noreply.github.com> * 🤖 Bump minor version of Airbyte CDK * 🚨🚨 Low code CDK: Decouple SimpleRetriever and HttpStream (airbytehq#28657) * fix tests * format * review comments * Automated Commit - Formatting Changes * review comments * review comments * review comments * log all messages * log all message * review comments * review comments * Automated Commit - Formatting Changes * add comment --------- Co-authored-by: flash1293 <flash1293@users.noreply.github.com> * 🤖 Bump minor version of Airbyte CDK * 🐛 Source Github, Instagram, Zendesk Support / Talk - revert `spec` changes and improve (airbytehq#29031) * Source oauth0: new streams and fix incremental (airbytehq#29001) * Add new streams Organizations,OrganizationMembers,OrganizationMemberRoles * relax schema definition to allow additional fields * Bump image tag version * revert some changes to the old schemas * Format python so gradle can pass * update incremental * remove unused print * fix unit test --------- Co-authored-by: Vasilis Gavriilidis <vasilis.gavriilidis@orfium.com> * 🐛 Source Mongo: Fix failing acceptance tests (airbytehq#28816) * Fix failing acceptance tests * Fix failing strict acceptance tests * Source-Greenhouse: Fix unit tests for new CDK version (airbytehq#28969) Fix unit tests * Add CSV options to the CSV parser (airbytehq#28491) * remove invalid legacy option * remove unused option * the tests pass but this is quite messy * very slight clean up * Add skip options to csv format * fix some of the typing issues * fixme comment * remove extra log message * fix typing issues * skip before header * skip after header * format * add another test * Automated Commit - Formatting Changes * auto generate column names * delete dead code * update title and description * true and false values * Update the tests * Add comment * missing test * rename * update expected spec * move to method * Update comment * fix typo * remove unused import * Add a comment * None records do not pass the WaitForDiscoverPolicy * format * remove second branch to ensure we always go through the same processing * Raise an exception if the record is None * reset * Update tests * handle unquoted newlines * Automated Commit - Formatting Changes * Update test case so the quoting is explicit * Update comment * Automated Commit - Formatting Changes * Fail validation if skipping rows before header and header is autogenerated * always fail if a record cannot be parsed * format * set write line_no in error message * remove none check * Automated Commit - Formatting Changes * enable autogenerate test * remove duplicate test * missing unit tests * Update * remove branching * remove unused none check * Update tests * remove branching * format * extract to function * comment * missing type * type annotation * use set * Document that the strings are case-sensitive * public -> private * add unit test * newline --------- Co-authored-by: girarda <girarda@users.noreply.github.com> * Dagster: Add sentry logging (airbytehq#28822) * Add sentry * add sentry decorator * Add traces * Use sentry trace * Improve duplicate logging * Add comments * DNC * Fix up issues * Move to scopes * Remove breadcrumb * Update lock * ✨Source Shortio: Migrate Python CDK to Low-code CDK (airbytehq#28950) * Migrate Shortio to Low-Code * Update abnormal state * Format * Update Docs * Fix metadata.yaml * Add pagination * Add incremental sync * add incremental parameters * update metadata * rollback update version * release date --------- Co-authored-by: marcosmarxm <marcosmarxm@gmail.com> * Update to new verbiage (airbytehq#29051) * [skip ci] Metadata: Remove leading underscore (airbytehq#29024) * DNC * Add test models * Add model test * Remove underscore from metadata files * Regenerate models * Add test to check for key transformation * Allow additional fields on metadata * Delete transform * Proof of concept parallel source stream reading implementation for MySQL (airbytehq#26580) * Proof of concept parallel source stream reading implementation for MySQL * Automated Change * Add read method that supports concurrent execution to Source interface * Remove parallel iterator * Ensure that executor service is stopped * Automated Commit - Format and Process Resources Changes * Expose method to fix compilation issue * Use concurrent map to avoid access issues * Automated Commit - Format and Process Resources Changes * Ensure concurrent streams finish before closing source * Fix compile issue * Formatting * Exclude concurrent stream threads from orphan thread watcher * Automated Commit - Format and Process Resources Changes * Refactor orphaned thread logic to account for concurrent execution * PR feedback * Implement readStreams in wrapper source * Automated Commit - Format and Process Resources Changes * Add readStream override * Automated Commit - Format and Process Resources Changes * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * Debug logging * Reduce logging level * Replace synchronized calls to System.out.println when concurrent * Close consumer * Flush before close * Automated Commit - Format and Process Resources Changes * Remove charset * Use ASCII and flush periodically for parallel streams * Test performance harness patch * Automated Commit - Format and Process Resources Changes * Cleanup * Logging to identify concurrent read enabled * Mark parameter as final --------- Co-authored-by: jdpgrailsdev <jdpgrailsdev@users.noreply.github.com> Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Co-authored-by: rodireich <rodireich@users.noreply.github.com> * connectors-ci: disable dependency scanning (airbytehq#29033) * updates (airbytehq#29059) * Metadata: skip breaking change validation on prerelease (airbytehq#29017) * skip breaking change validation * Move ValidatorOpts higher in call * Add prerelease test * Fix test * ✨ Source MongoDB Internal POC: Generate Test Data (airbytehq#29049) * Add script to generate test data * Fix prose * Update credentials example * PR feedback * Bump Airbyte version from 0.50.12 to 0.50.13 * Bump versions for mssql strict-encrypt (airbytehq#28964) * Bump versions for mssql strict-encrypt * Fix failing test * Fix failing test * 🎨 Improve replication method selection UX (airbytehq#28882) * update replication method in MySQL source * bump version * update expected specs * update registries * bump strict encrypt version * make password always_show * change url * update registries * 🐛 Avoid writing records to log (airbytehq#29047) * Avoid writing records to log * Update version * Rollout ctid cdc (airbytehq#28708) * source-postgres: enable ctid+cdc implementation * 100% ctid rollout for cdc * remove CtidFeatureFlags * fix CdcPostgresSourceAcceptanceTest * Bump versions and release notes * Fix compilation error due to previous merge --------- Co-authored-by: subodh <subodh1810@gmail.com> * connectors-ci: fix `unhashable type 'set'` (airbytehq#29064) * Add Slack Alert lifecycle to Dagster for Metadata publish (airbytehq#28759) * DNC * Add slack lifecycle logging * Update to use slack * Update slack to use resource and bot * Improve markdown * Improve log * Add sensor logging * Extend sensor time * merge conflict * PR Refactoring * Make the tests work * remove unnecessary classes, pr feedback * more merging * Update airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java Co-authored-by: Edward Gao <edward.gao@airbyte.io> * snowflake updates --------- Co-authored-by: Ben Church <ben@airbyte.io> Co-authored-by: Baz <oleksandr.bazarnov@globallogic.com> Co-authored-by: Augustin <augustin@airbyte.io> Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Co-authored-by: Joe Reuter <joe@airbyte.io> Co-authored-by: flash1293 <flash1293@users.noreply.github.com> Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com> Co-authored-by: Vasilis Gavriilidis <vasilis.gavriilidis@orfium.com> Co-authored-by: Jonathan Pearlin <jonathan@airbyte.io> Co-authored-by: Alexandre Girard <alexandre@airbyte.io> Co-authored-by: girarda <girarda@users.noreply.github.com> Co-authored-by: btkcodedev <btk.codedev@gmail.com> Co-authored-by: marcosmarxm <marcosmarxm@gmail.com> Co-authored-by: Natalie Kwong <38087517+nataliekwong@users.noreply.github.com> Co-authored-by: jdpgrailsdev <jdpgrailsdev@users.noreply.github.com> Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Co-authored-by: rodireich <rodireich@users.noreply.github.com> Co-authored-by: Alexandre Cuoci <Hesperide@users.noreply.github.com> Co-authored-by: terencecho <terencecho@users.noreply.github.com> Co-authored-by: Lake Mossman <lake@airbyte.io> Co-authored-by: Benoit Moriceau <benoit@airbyte.io> Co-authored-by: subodh <subodh1810@gmail.com> Co-authored-by: Edward Gao <edward.gao@airbyte.io>
Based on #28854
Related to #28016
This PR adds basic helpers to build embedded loaders in other python programs so they can leverage existing python sources.
The main part of this PR is adding the
BaseEmbeddedIntegration
class that wraps a source into a more generic interface that can easily be adapted for langchain and llama index loaders. Once it's merged, it can be used to make run-llama/llama-hub#428 ready for review.Feature set:
last_state
member)AbstractSource
at the top level)This PR is kept slim on purpose to be able to ship some loaders quickly.
Things that can be implemented in follow-up PRs: