Skip to content

Commit

Permalink
Langchain destination: Make starter pods work (#29513)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Reuter authored Aug 21, 2023
1 parent 28ecf6e commit dd170e2
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ COPY destination_langchain ./destination_langchain
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.0.6
LABEL io.airbyte.version=0.0.7
LABEL io.airbyte.name=airbyte/destination-langchain
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,33 @@ def __init__(self, config: PineconeIndexingModel, embedder: Embedder):
self.embed_fn = measure_time(self.embedder.langchain_embeddings.embed_documents)

def pre_sync(self, catalog: ConfiguredAirbyteCatalog):
index_description = pinecone.describe_index(self.config.index)
self._pod_type = index_description.pod_type
for stream in catalog.streams:
if stream.destination_sync_mode == DestinationSyncMode.overwrite:
self.pinecone_index.delete(filter={METADATA_STREAM_FIELD: stream.stream.name})
self._delete_vectors({METADATA_STREAM_FIELD: stream.stream.name})

def post_sync(self):
return [AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.WARN, message=self.embed_fn._get_stats()))]

def _delete_vectors(self, filter):
if self._pod_type == "starter":
# Starter pod types have a maximum of 1000000 rows
top_k = 10000
self._delete_by_metadata(filter, top_k)
else:
self.pinecone_index.delete(filter=filter)

def _delete_by_metadata(self, filter, top_k):
zero_vector = [0.0] * self.embedder.embedding_dimensions
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k)
vector_ids = [doc.id for doc in query_result.matches]
if len(vector_ids) > 0:
self.pinecone_index.delete(ids=vector_ids)

def index(self, document_chunks, delete_ids):
if len(delete_ids) > 0:
self.pinecone_index.delete(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}})
self._delete_vectors({METADATA_RECORD_ID_FIELD: {"$in": delete_ids}})
embedding_vectors = self.embed_fn([chunk.page_content for chunk in document_chunks])
pinecone_docs = []
for i in range(len(document_chunks)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import json
import logging
from time import sleep

import pinecone
from airbyte_cdk.models import DestinationSyncMode, Status
Expand All @@ -17,16 +18,22 @@
class PineconeIntegrationTest(BaseIntegrationTest):
def _init_pinecone(self):
pinecone.init(api_key=self.config["indexing"]["pinecone_key"], environment=self.config["indexing"]["pinecone_environment"])
self._index = pinecone.Index(self.config["indexing"]["index"])

def _clean_index(self):
self._init_pinecone()
zero_vector = [0.0] * OPEN_AI_VECTOR_SIZE
query_result = self._index.query(vector=zero_vector, top_k=10_000)
vector_ids = [doc.id for doc in query_result.matches]
if len(vector_ids) > 0:
self._index.delete(ids=vector_ids)

def setUp(self):
with open("secrets/config.json", "r") as f:
self.config = json.loads(f.read())
self._init_pinecone()

def tearDown(self):
# make sure pinecone is initialized correctly before cleaning up
self._init_pinecone()
pinecone.Index("testdata").delete(delete_all=True)
self._clean_index()

def test_check_valid_config(self):
outcome = DestinationLangchain().check(logging.getLogger("airbyte"), self.config)
Expand All @@ -49,19 +56,27 @@ def test_check_invalid_config(self):
assert outcome.status == Status.FAILED

def test_write(self):
self._init_pinecone()
is_starter_pod = pinecone.describe_index(self.config["indexing"]["index"]).pod_type == "starter"
catalog = self._get_configured_catalog(DestinationSyncMode.overwrite)
first_state_message = self._state({"state": "1"})
first_record_chunk = [self._record("mystream", f"Dogs are number {i}", i) for i in range(5)]

# initial sync
destination = DestinationLangchain()
list(destination.write(self.config, catalog, [*first_record_chunk, first_state_message]))
assert pinecone.Index("testdata").describe_index_stats().total_vector_count == 5
if is_starter_pod:
# Documents might not be available right away because Pinecone is handling them async
sleep(20)
assert self._index.describe_index_stats().total_vector_count == 5

# incrementalally update a doc
incremental_catalog = self._get_configured_catalog(DestinationSyncMode.append_dedup)
list(destination.write(self.config, incremental_catalog, [self._record("mystream", "Cats are nice", 2), first_state_message]))
result = pinecone.Index("testdata").query(
if is_starter_pod:
# Documents might not be available right away because Pinecone is handling them async
sleep(20)
result = self._index.query(
vector=[0] * OPEN_AI_VECTOR_SIZE, top_k=10, filter={"_record_id": "2"}, include_metadata=True
)
assert len(result.matches) == 1
Expand All @@ -70,7 +85,6 @@ def test_write(self):
# test langchain integration
embeddings = OpenAIEmbeddings(openai_api_key=self.config["embedding"]["openai_key"])
pinecone.init(api_key=self.config["indexing"]["pinecone_key"], environment=self.config["indexing"]["pinecone_environment"])
index = pinecone.Index("testdata")
vector_store = Pinecone(index, embeddings.embed_query, "text")
vector_store = Pinecone(self._index, embeddings.embed_query, "text")
result = vector_store.similarity_search("feline animals", 1)
assert result[0].metadata["_record_id"] == "2"
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: cf98d52c-ba5a-4dfd-8ada-c1baebfa6e73
dockerImageTag: 0.0.6
dockerImageTag: 0.0.7
dockerRepository: airbyte/destination-langchain
githubIssueLabel: destination-langchain
icon: langchain.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,41 @@
def create_pinecone_indexer():
config = PineconeIndexingModel(mode="pinecone", pinecone_environment="myenv", pinecone_key="mykey", index="myindex")
embedder = MagicMock()
embedder.embedding_dimensions = 3
indexer = PineconeIndexer(config, embedder)

indexer.pinecone_index.delete = MagicMock()
indexer.embed_fn = MagicMock(return_value=[[1, 2, 3], [4, 5, 6]])
indexer.pinecone_index.upsert = MagicMock()
indexer.pinecone_index.query = MagicMock()
return indexer


def test_pinecone_index_upsert_and_delete():
def create_index_description(dimensions=3, pod_type="p1"):
return IndexDescription(
name="",
metric="",
replicas=1,
dimension=dimensions,
shards=1,
pods=1,
pod_type=pod_type,
status=None,
metadata_config=None,
source_collection=None,
)


@pytest.fixture(scope="module", autouse=True)
def mock_describe_index():
with patch('pinecone.describe_index') as mock:
mock.return_value = create_index_description()
yield mock


def test_pinecone_index_upsert_and_delete(mock_describe_index):
indexer = create_pinecone_indexer()
indexer._pod_type = "p1"
indexer.index(
[
Document(page_content="test", metadata={"_airbyte_stream": "abc"}),
Expand All @@ -43,6 +68,29 @@ def test_pinecone_index_upsert_and_delete():
)


def test_pinecone_index_upsert_and_delete_starter(mock_describe_index):
indexer = create_pinecone_indexer()
indexer._pod_type = "starter"
indexer.pinecone_index.query.return_value = MagicMock(matches=[MagicMock(id="doc_id1"), MagicMock(id="doc_id2")])
indexer.index(
[
Document(page_content="test", metadata={"_airbyte_stream": "abc"}),
Document(page_content="test2", metadata={"_airbyte_stream": "abc"}),
],
["delete_id1", "delete_id2"],
)
indexer.pinecone_index.query.assert_called_with(vector=[0,0,0],filter={"_record_id": {"$in": ["delete_id1", "delete_id2"]}}, top_k=10_000)
indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"])
indexer.pinecone_index.upsert.assert_called_with(
vectors=(
(ANY, [1, 2, 3], {"_airbyte_stream": "abc", "text": "test"}),
(ANY, [4, 5, 6], {"_airbyte_stream": "abc", "text": "test2"}),
),
async_req=True,
show_progress=False,
)


def test_pinecone_index_empty_batch():
indexer = create_pinecone_indexer()
indexer.index(
Expand Down Expand Up @@ -75,43 +123,54 @@ def test_pinecone_index_upsert_batching():
)


def test_pinecone_pre_sync():
indexer = create_pinecone_indexer()
indexer.pre_sync(
ConfiguredAirbyteCatalog.parse_obj(
{
"streams": [
{
"stream": {
"name": "example_stream",
"json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": False,
"default_cursor_field": ["column_name"],
},
"primary_key": [["id"]],
"sync_mode": "incremental",
"destination_sync_mode": "append_dedup",
def generate_catalog():
return ConfiguredAirbyteCatalog.parse_obj(
{
"streams": [
{
"stream": {
"name": "example_stream",
"json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": False,
"default_cursor_field": ["column_name"],
},
{
"stream": {
"name": "example_stream2",
"json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": False,
"default_cursor_field": ["column_name"],
},
"primary_key": [["id"]],
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"primary_key": [["id"]],
"sync_mode": "incremental",
"destination_sync_mode": "append_dedup",
},
{
"stream": {
"name": "example_stream2",
"json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": False,
"default_cursor_field": ["column_name"],
},
]
}
)
"primary_key": [["id"]],
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
},
]
}
)


def test_pinecone_pre_sync(mock_describe_index):
indexer = create_pinecone_indexer()
indexer.pre_sync(generate_catalog())
indexer.pinecone_index.delete.assert_called_with(filter={"_airbyte_stream": "example_stream2"})


def test_pinecone_pre_sync_starter(mock_describe_index):
mock_describe_index.return_value = create_index_description(pod_type="starter")
indexer = create_pinecone_indexer()
indexer.pinecone_index.query.return_value = MagicMock(matches=[MagicMock(id="doc_id1"), MagicMock(id="doc_id2")])
indexer.pre_sync(generate_catalog())
indexer.pinecone_index.query.assert_called_with(vector=[0,0,0],filter={"_airbyte_stream": "example_stream2"}, top_k=10_000)
indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"])


@pytest.mark.parametrize(
"describe_throws,reported_dimensions,check_succeeds",
[
Expand All @@ -127,18 +186,7 @@ def test_pinecone_check(describe_mock, describe_throws, reported_dimensions, che
indexer.embedder.embedding_dimensions = 3
if describe_throws:
describe_mock.side_effect = Exception("describe failed")
describe_mock.return_value = IndexDescription(
name="",
metric="",
replicas=1,
dimension=reported_dimensions,
shards=1,
pods=1,
pod_type="p1",
status=None,
metadata_config=None,
source_collection=None,
)
describe_mock.return_value = create_index_description(dimensions=reported_dimensions)
result = indexer.check()
if check_succeeds:
assert result is None
Expand Down
7 changes: 7 additions & 0 deletions docs/integrations/destinations/langchain.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ vector_store = Pinecone(index, embeddings.embed_query, "text")
qa = RetrievalQA.from_chain_type(llm=OpenAI(temperature=0), chain_type="stuff", retriever=vector_store.as_retriever())
```

:::caution

For Pinecone pods of type starter, only up to 10,000 chunks can be indexed. For production use, please use a higher tier.

:::

#### Chroma vector store

The [Chroma vector store](https://trychroma.com) is running the Chroma embedding database as persistent client and stores the vectors in a local file.
Expand Down Expand Up @@ -133,6 +139,7 @@ Please make sure that Docker Desktop has access to `/tmp` (and `/private` on a M

| Version | Date | Pull Request | Subject |
|:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.0.7 | 2023-08-18 | [#29513](https://github.com/airbytehq/airbyte/pull/29513) | Fix for starter pods |
| 0.0.6 | 2023-08-02 | [#28977](https://github.com/airbytehq/airbyte/pull/28977) | Validate pinecone index dimensions during check |
| 0.0.5 | 2023-07-25 | [#28605](https://github.com/airbytehq/airbyte/pull/28605) | Add Chroma support |
| 0.0.4 | 2023-07-21 | [#28556](https://github.com/airbytehq/airbyte/pull/28556) | Correctly dedupe records with composite and nested primary keys |
Expand Down

0 comments on commit dd170e2

Please sign in to comment.