-
Notifications
You must be signed in to change notification settings - Fork 15.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Airbyte based loaders #8586
Airbyte based loaders #8586
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
@flash1293 feel free to ping when this is ready for review |
@baskaryan Should be ready for a look now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made some comments on some potential UX/ergonomic improvements, but generally looks good!
Thanks Joe 😄
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"#!pip install \"source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github\"" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be updated to reference the PyPI connectors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, for those they should use the other loaders, this is basically the "escape hatch"
return list(self._load_data(stream_name=self._stream_name, state=self._state)) | ||
|
||
def lazy_load(self) -> Iterator[Document]: | ||
return self._load_data(stream_name=self._stream_name, state=self._state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice we're passing state through... should the loader also internally update _state
with the latest data, or do we want to make it explicit for users to pass through state themselves?
Just thinking about the ergonomics, someone that wants to implement incremental would need to instantiate a new instance of the loader if state changed. Not sure if instead we should pass state in the load
method rather than on init.
Thinking about this from the Airbyte PoV, state changes on each run of the source, but the other info like configs/catalog stays the same throughout, which is why I'm leaning a bit towards moving state to the load.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the slack convo, if this is part of the langchain interface please ignore me 😛
from airbyte_protocol.models.airbyte_protocol import AirbyteRecordMessage, AirbyteStateMessage | ||
from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration | ||
from airbyte_cdk.sources.embedded.runner import CDKRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are the airbyte_protocol
and airbyte_cdk
dependencies defined?
Do we expect users to install them separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One is a transitive dependency of the other, but airbyte-cdk is actually re-exporting the types, so we can work with just airbyte_protocol here.
super().__init__(config=config, runner=CDKRunner(source=source_class(), name=source_class.__name__)) | ||
self._stream_name = stream_name | ||
self._state = state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be something on the CDK runner side, but do we want to do any sort of validation on the config? I think if we're not doing so already, a quick win on UX could be to run check
with the config on init
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that running check makes sense, I will add that in a separate PR on the CDK side
docs/extras/integrations/document_loaders/airbyte_zendesk_support.ipynb
Outdated
Show resolved
Hide resolved
" def _handle_record(self, record, id):\n", | ||
" return Document(page_content=record.data[\"title\"], metadata=record.data)\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be an ok place to start, but I wonder if we can make this easier when creating the loader so don't have to extend the whole class.
I could see this as either a method passed on init, or exposing things like document_text_field: "title"
and document_metadata_fields: []
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to a parameter passed into init
Thanks for the review @pedroslopez @eyurtsev @hwchase17 , made some adjustments:
Things I'm not sure about:
|
looks good from our end! @pedroslopez any additional thoughts? |
thanks @flash1293! |
Throws:
|
@homanp - Thanks for reporting. I am looking into this. 👀 |
A fix has been merged on master @aaronsteers |
Great - thanks, @eyurtsev ! 🎉 |
@eyurtsev when will you release the next version? |
This PR adds 8 new loaders:
AirbyteCDKLoader
This reader can wrap and run all python-based Airbyte source connectors.AirbyteGongLoader
AirbyteHubspotLoader
AirbyteSalesforceLoader
AirbyteShopifyLoader
AirbyteStripeLoader
AirbyteTypeformLoader
AirbyteZendeskSupportLoader
Documentation and getting started
I added the basic shape of the config to the notebooks. This increases the maintenance effort a bit, but I think it's worth it to make sure people can get started quickly with these important connectors. This is also why I linked the spec and the documentation page in the readme as these two contain all the information to configure a source correctly (e.g. it won't suggest using oauth if that's avoidable even if the connector supports it).
Document generation
The "documents" produced by these loaders won't have a text part (instead, all the record fields are put into the metadata). If a text is required by the use case, the caller needs to do custom transformation suitable for their use case.
Incremental sync
All loaders support incremental syncs if the underlying streams support it. By storing the
last_state
from the reader instance away and passing it in when loading, it will only load updated records.