Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Airbyte based readers #428

Merged
merged 15 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions llama_hub/airbyte_cdk/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test.py
48 changes: 48 additions & 0 deletions llama_hub/airbyte_cdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Airbyte CDK Loader

The Airbyte CDK Loader is a shim for sources created using the [Airbyte Python CDK](https://docs.airbyte.com/connector-development/cdk-python/). It allows you to load data from any Airbyte source into LlamaIndex.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install airbyte-cdk: `pip install airbyte-cdk`
* Install a source via git (or implement your own): `pip install git+https://github.com/airbytehq/airbyte.git@master#egg=source_github&subdirectory=airbyte-integrations/connectors/source-github`
flash1293 marked this conversation as resolved.
Show resolved Hide resolved

## Usage

Implement and import your own source. You can find lots of resources for how to achieve this on the [Airbyte documentation page](https://docs.airbyte.com/connector-development/).

Here's an example usage of the AirbyteCdkReader.

```python
from llama_index import download_loader
from llama_hub.airbyte_cdk.base import AirbyteCDKReader
from source_github.source import SourceGithub # this is just an example, you can use any source here - this one is loaded from the Airbyte Github repo via pip install git+https://github.com/airbytehq/airbyte.git@master#egg=source_github&subdirectory=airbyte-integrations/connectors/source-github`


github_config = {
# ...
}
reader = AirbyteCDKReader(source_class=SourceGithub,config=github_config)
documents = reader.load_data(stream_name="issues")
```

By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader:
```python
for doc in documents:
doc.text = doc.extra_info["title"]
```

## Incremental loads

If a stream supports it, this loader can be used to load data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteCDKReader(source_class=SourceGithub,config=github_config)
documents = reader.load_data(stream_name="issues")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="issues", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
33 changes: 33 additions & 0 deletions llama_hub/airbyte_cdk/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Any, List, Mapping, Optional

from llama_index.readers.base import BaseReader
from llama_index.readers.schema.base import Document
from airbyte_protocol.models.airbyte_protocol import AirbyteRecordMessage
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should figure out a better test system on our side, but currently tests are failing because we assume that third-party imports are lazy imports.

i know specifically for AirbyteRecordMessage it's used in the RecordHandler type used to type record_handler. For this do you think we could type record_handler as Any first, and then within __init__ lazy import AirbyteRecordMessage, import RecordHandler, and do an isinstance() check on record_handler?

A bit hacky but will at least allow tests to pass. thanks

from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration
from airbyte_cdk.sources.embedded.runner import CDKRunner


class AirbyteCDKReader(BaseReader, BaseEmbeddedIntegration):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer composition vs. multiple inheritance, easier to inspect the specific object (BaseEmbeddedIntegration) you're using

"""AirbyteCDKReader reader.

Retrieve documents from an Airbyte source implemented using the CDK.

Args:
source_class: The Airbyte source class.
config: The config object for the Airbyte source.
"""

def __init__(
self,
source_class: Any,
config: Mapping[str, Any],
) -> None:
"""Initialize with parameters."""

super().__init__(config=config, runner=CDKRunner(source=source_class(), name=source_class.__name__))
flash1293 marked this conversation as resolved.
Show resolved Hide resolved

def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Document:
return Document(doc_id=id,text="", extra_info=record.data)

def load_data(self, *args: Any, **load_kwargs: Any) -> List[Document]:
return list(self._load_data(*args, **load_kwargs))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oof, guessing there's no iterator method available for llamaindex?

Could we implement one anyway / is there any value in doing so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, what do you think, @jerryjliu ? Probably something we can split out of the first PR though

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by default every loader implements load_data but you're free to define custom methods too!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK cool, let's add a lazy_load as well that's returning an iterable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity where is _load_data defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's on the base integration class that's coming from the CDK. We chose this approach so we can roll out improvements without having to update the loader itself

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, makes sense

2 changes: 2 additions & 0 deletions llama_hub/airbyte_cdk/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
airbyte-cdk
airbyte-protocol-models
56 changes: 56 additions & 0 deletions llama_hub/airbyte_gong/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Airbyte Gong Loader

The Airbyte Gong Loader allows you to access different Gong objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the gong source: `pip install airbyte-source-gong`

## Usage

Here's an example usage of the AirbyteGongReader.

```python
from llama_hub.airbyte_gong.base import AirbyteGongReader

gong_config = {
# ...
}
reader = AirbyteGongReader(config=gong_config)
documents = reader.load_data(stream_name="calls")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/gong/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-gong/source_gong/spec.yaml](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-gong/source_gong/spec.yaml).

The general shape looks like this:
```python
{
"access_key": "<access key name>",
"access_key_secret": "<access key secret>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}
```

By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader:
```python
for doc in documents:
doc.text = doc.extra_info["title"]
Copy link

@yisding yisding Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the first time I've seen this pattern of putting everything in metadata, but it's an interesting idea that we should explore further.

From my understanding of the way LlamaIndex currently works it won't work properly because our text splitting doesn't split the metadata but rather attaches it to every split node (which will be an issue if the metadata is large).

@logan-markewich @jerryjliu correct me if I'm wrong.

But maybe we can create a Document subclass that's just a structured dictionary with some kind of list you define of the metadata of the content that you actually want to be chunked.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i agree, this is a good concern. right now to keep it simple i'd almost recommend doing a big text dump (so at least text splitters work), for ease-of-use out of the box

```

## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteGongReader(...so many things...)
documents = reader.load_data(stream_name="calls")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="calls", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
21 changes: 21 additions & 0 deletions llama_hub/airbyte_gong/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Any, Mapping
from llama_hub.airbyte_cdk.base import AirbyteCDKReader


class AirbyteGongReader(AirbyteCDKReader):
"""AirbyteGongReader reader.

Retrieve documents from Gong

Args:
config: The config object for the gong source.
"""

def __init__(
self,
config: Mapping[str, Any],
) -> None:
"""Initialize with parameters."""
import source_gong

super().__init__(source_class=source_gong.SourceGong, config=config)
1 change: 1 addition & 0 deletions llama_hub/airbyte_gong/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
airbyte-source-gong
58 changes: 58 additions & 0 deletions llama_hub/airbyte_hubspot/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Airbyte Hubspot Loader

The Airbyte Hubspot Loader allows you to access different Hubspot objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the hubspot source: `pip install airbyte-source-hubspot`

## Usage

Here's an example usage of the AirbyteHubspotReader.

```python
from llama_hub.airbyte_hubspot.base import AirbyteHubspotReader

hubspot_config = {
# ...
}
reader = AirbyteHubspotReader(config=hubspot_config)
documents = reader.load_data(stream_name="products")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/hubspot/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml).

The general shape looks like this:
```python
{
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"credentials": {
"credentials_title": "Private App Credentials",
"access_token": "<access token of your private app>"
}
}
```

By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader:
```python
for doc in documents:
doc.text = doc.extra_info["title"]
Copy link

@yisding yisding Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to expand on the other comment further:

If I'm thinking about a hubspot use case, it may be let me do some Q&A over the previous notes and emails I've had with a partner.

The content of the notes and emails are probably what we want to be split, embedded, and retrieved over. If we're only embedding each note individually without any additional text splitting, then we're OK because we embed all of the metadata by default.

However, if we want to start text splitting, then the problem arises, because 1. the splitting will only happen on doc.text, but 2. somewhat even more problematic, every split will have a complete copy of the document's metadata.

3000 words is a lot of text (definitely bigger than most interaction notes and emails) so we may be able to get away with it for now, but I think longer run we'll want a different document class to handle this kind of use case.

There could be a lot of interesting things here, because after the loader loads it in in this more structured way, the user or the application designer could say: I want my coworkers' Notion comments to be split alongside the content of the doc, or no, I don't want the comments to split alongside the doc and it's just a matter of adjusting a list for the fields to include in the splitting.

Copy link
Contributor Author

@flash1293 flash1293 Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing we could do is to allow the user to pass a RecordHandler = Optional[Callable[[AirbyteRecordMessage, Optional[str]], Document]] as part of the constructor arguments. If it's not set, it will do the current "all metadata" thing, but the user can overwrite it and put things into text vs. metadata as they see fit.

Do you think that would be a better fit for the architecture?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flash1293 yeah i think that can work! can maybe add a usage example in the README of the main AirbyteCDKReader

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other alternative is to dump everything into text actually (so naive text dump), and if the user wants to have more fine-grained metadata specifications they use this argument

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the record handler plus dumping everything in there by default, seems like the easiest way to get started. The text could get really messy for some streams, but that's probably ok

```

## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteHubspotReader(...so many things...)
documents = reader.load_data(stream_name="products")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="products", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
21 changes: 21 additions & 0 deletions llama_hub/airbyte_hubspot/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Any, Mapping
from llama_hub.airbyte_cdk.base import AirbyteCDKReader


class AirbyteHubspotReader(AirbyteCDKReader):
"""AirbyteHubspotReader reader.

Retrieve documents from Hubspot

Args:
config: The config object for the hubspot source.
"""

def __init__(
self,
config: Mapping[str, Any],
) -> None:
"""Initialize with parameters."""
import source_hubspot

super().__init__(source_class=source_hubspot.SourceHubspot, config=config)
1 change: 1 addition & 0 deletions llama_hub/airbyte_hubspot/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
source_hubspot
63 changes: 63 additions & 0 deletions llama_hub/airbyte_salesforce/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Airbyte Salesforce Loader

The Airbyte Salesforce Loader allows you to access different Salesforce objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the salesforce source: `pip install airbyte-source-salesforce`

## Usage

Here's an example usage of the AirbyteSalesforceReader.

```python
from llama_hub.airbyte_salesforce.base import AirbyteSalesforceReader

salesforce_config = {
# ...
}
reader = AirbyteSalesforceReader(config=salesforce_config)
documents = reader.load_data(stream_name="asset")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/salesforce/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml).

The general shape looks like this:
```python
{
"client_id": "<oauth client id>",
"client_secret": "<oauth client secret>",
"refresh_token": "<oauth refresh token>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"is_sandbox": False, # set to True if you're using a sandbox environment
"streams_criteria": [ # Array of filters for salesforce objects that should be loadable
{"criteria": "exacts", "value": "Account"}, # Exact name of salesforce object
{"criteria": "starts with", "value": "Asset"}, # Prefix of the name
# Other allowed criteria: ends with, contains, starts not with, ends not with, not contains, not exacts
],
}
```

By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader:
```python
for doc in documents:
doc.text = doc.extra_info["title"]
```

## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteSalesforceReader(...so many things...)
documents = reader.load_data(stream_name="asset")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="asset", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
21 changes: 21 additions & 0 deletions llama_hub/airbyte_salesforce/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Any, Mapping
from llama_hub.airbyte_cdk.base import AirbyteCDKReader


class AirbyteSalesforceReader(AirbyteCDKReader):
"""AirbyteSalesforceReader reader.

Retrieve documents from Salesforce

Args:
config: The config object for the salesforce source.
"""

def __init__(
self,
config: Mapping[str, Any],
) -> None:
"""Initialize with parameters."""
import source_salesforce

super().__init__(source_class=source_salesforce.SourceSalesforce, config=config)
1 change: 1 addition & 0 deletions llama_hub/airbyte_salesforce/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
airbyte-source-salesforce
Loading