From dd170e2e9dd0aa09110ae115657da1c83761f87b Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 21 Aug 2023 12:25:30 +0200 Subject: [PATCH] Langchain destination: Make starter pods work (#29513) --- .../destination-langchain/Dockerfile | 2 +- .../destination_langchain/indexer.py | 21 ++- .../pinecone_integration_test.py | 30 ++-- .../destination-langchain/metadata.yaml | 2 +- .../unit_tests/pinecone_indexer_test.py | 136 ++++++++++++------ docs/integrations/destinations/langchain.md | 7 + 6 files changed, 142 insertions(+), 56 deletions(-) diff --git a/airbyte-integrations/connectors/destination-langchain/Dockerfile b/airbyte-integrations/connectors/destination-langchain/Dockerfile index f83e69630335..67e7824b5803 100644 --- a/airbyte-integrations/connectors/destination-langchain/Dockerfile +++ b/airbyte-integrations/connectors/destination-langchain/Dockerfile @@ -42,5 +42,5 @@ COPY destination_langchain ./destination_langchain ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.0.6 +LABEL io.airbyte.version=0.0.7 LABEL io.airbyte.name=airbyte/destination-langchain diff --git a/airbyte-integrations/connectors/destination-langchain/destination_langchain/indexer.py b/airbyte-integrations/connectors/destination-langchain/destination_langchain/indexer.py index e3ca599ab3c0..6f8fbaeafb0c 100644 --- a/airbyte-integrations/connectors/destination-langchain/destination_langchain/indexer.py +++ b/airbyte-integrations/connectors/destination-langchain/destination_langchain/indexer.py @@ -69,16 +69,33 @@ def __init__(self, config: PineconeIndexingModel, embedder: Embedder): self.embed_fn = measure_time(self.embedder.langchain_embeddings.embed_documents) def pre_sync(self, catalog: ConfiguredAirbyteCatalog): + index_description = pinecone.describe_index(self.config.index) + self._pod_type = index_description.pod_type for stream in catalog.streams: if stream.destination_sync_mode == DestinationSyncMode.overwrite: - self.pinecone_index.delete(filter={METADATA_STREAM_FIELD: stream.stream.name}) + self._delete_vectors({METADATA_STREAM_FIELD: stream.stream.name}) def post_sync(self): return [AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.WARN, message=self.embed_fn._get_stats()))] + def _delete_vectors(self, filter): + if self._pod_type == "starter": + # Starter pod types have a maximum of 1000000 rows + top_k = 10000 + self._delete_by_metadata(filter, top_k) + else: + self.pinecone_index.delete(filter=filter) + + def _delete_by_metadata(self, filter, top_k): + zero_vector = [0.0] * self.embedder.embedding_dimensions + query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k) + vector_ids = [doc.id for doc in query_result.matches] + if len(vector_ids) > 0: + self.pinecone_index.delete(ids=vector_ids) + def index(self, document_chunks, delete_ids): if len(delete_ids) > 0: - self.pinecone_index.delete(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}) + self._delete_vectors({METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}) embedding_vectors = self.embed_fn([chunk.page_content for chunk in document_chunks]) pinecone_docs = [] for i in range(len(document_chunks)): diff --git a/airbyte-integrations/connectors/destination-langchain/integration_tests/pinecone_integration_test.py b/airbyte-integrations/connectors/destination-langchain/integration_tests/pinecone_integration_test.py index ac29954e1d68..55d6b01a98ee 100644 --- a/airbyte-integrations/connectors/destination-langchain/integration_tests/pinecone_integration_test.py +++ b/airbyte-integrations/connectors/destination-langchain/integration_tests/pinecone_integration_test.py @@ -4,6 +4,7 @@ import json import logging +from time import sleep import pinecone from airbyte_cdk.models import DestinationSyncMode, Status @@ -17,16 +18,22 @@ class PineconeIntegrationTest(BaseIntegrationTest): def _init_pinecone(self): pinecone.init(api_key=self.config["indexing"]["pinecone_key"], environment=self.config["indexing"]["pinecone_environment"]) + self._index = pinecone.Index(self.config["indexing"]["index"]) + + def _clean_index(self): + self._init_pinecone() + zero_vector = [0.0] * OPEN_AI_VECTOR_SIZE + query_result = self._index.query(vector=zero_vector, top_k=10_000) + vector_ids = [doc.id for doc in query_result.matches] + if len(vector_ids) > 0: + self._index.delete(ids=vector_ids) def setUp(self): with open("secrets/config.json", "r") as f: self.config = json.loads(f.read()) - self._init_pinecone() def tearDown(self): - # make sure pinecone is initialized correctly before cleaning up - self._init_pinecone() - pinecone.Index("testdata").delete(delete_all=True) + self._clean_index() def test_check_valid_config(self): outcome = DestinationLangchain().check(logging.getLogger("airbyte"), self.config) @@ -49,6 +56,8 @@ def test_check_invalid_config(self): assert outcome.status == Status.FAILED def test_write(self): + self._init_pinecone() + is_starter_pod = pinecone.describe_index(self.config["indexing"]["index"]).pod_type == "starter" catalog = self._get_configured_catalog(DestinationSyncMode.overwrite) first_state_message = self._state({"state": "1"}) first_record_chunk = [self._record("mystream", f"Dogs are number {i}", i) for i in range(5)] @@ -56,12 +65,18 @@ def test_write(self): # initial sync destination = DestinationLangchain() list(destination.write(self.config, catalog, [*first_record_chunk, first_state_message])) - assert pinecone.Index("testdata").describe_index_stats().total_vector_count == 5 + if is_starter_pod: + # Documents might not be available right away because Pinecone is handling them async + sleep(20) + assert self._index.describe_index_stats().total_vector_count == 5 # incrementalally update a doc incremental_catalog = self._get_configured_catalog(DestinationSyncMode.append_dedup) list(destination.write(self.config, incremental_catalog, [self._record("mystream", "Cats are nice", 2), first_state_message])) - result = pinecone.Index("testdata").query( + if is_starter_pod: + # Documents might not be available right away because Pinecone is handling them async + sleep(20) + result = self._index.query( vector=[0] * OPEN_AI_VECTOR_SIZE, top_k=10, filter={"_record_id": "2"}, include_metadata=True ) assert len(result.matches) == 1 @@ -70,7 +85,6 @@ def test_write(self): # test langchain integration embeddings = OpenAIEmbeddings(openai_api_key=self.config["embedding"]["openai_key"]) pinecone.init(api_key=self.config["indexing"]["pinecone_key"], environment=self.config["indexing"]["pinecone_environment"]) - index = pinecone.Index("testdata") - vector_store = Pinecone(index, embeddings.embed_query, "text") + vector_store = Pinecone(self._index, embeddings.embed_query, "text") result = vector_store.similarity_search("feline animals", 1) assert result[0].metadata["_record_id"] == "2" diff --git a/airbyte-integrations/connectors/destination-langchain/metadata.yaml b/airbyte-integrations/connectors/destination-langchain/metadata.yaml index c8a09175bc73..3d083fb5aa5c 100644 --- a/airbyte-integrations/connectors/destination-langchain/metadata.yaml +++ b/airbyte-integrations/connectors/destination-langchain/metadata.yaml @@ -7,7 +7,7 @@ data: connectorSubtype: database connectorType: destination definitionId: cf98d52c-ba5a-4dfd-8ada-c1baebfa6e73 - dockerImageTag: 0.0.6 + dockerImageTag: 0.0.7 dockerRepository: airbyte/destination-langchain githubIssueLabel: destination-langchain icon: langchain.svg diff --git a/airbyte-integrations/connectors/destination-langchain/unit_tests/pinecone_indexer_test.py b/airbyte-integrations/connectors/destination-langchain/unit_tests/pinecone_indexer_test.py index 553cfd33434f..b7e430277dc2 100644 --- a/airbyte-integrations/connectors/destination-langchain/unit_tests/pinecone_indexer_test.py +++ b/airbyte-integrations/connectors/destination-langchain/unit_tests/pinecone_indexer_test.py @@ -15,16 +15,41 @@ def create_pinecone_indexer(): config = PineconeIndexingModel(mode="pinecone", pinecone_environment="myenv", pinecone_key="mykey", index="myindex") embedder = MagicMock() + embedder.embedding_dimensions = 3 indexer = PineconeIndexer(config, embedder) indexer.pinecone_index.delete = MagicMock() indexer.embed_fn = MagicMock(return_value=[[1, 2, 3], [4, 5, 6]]) indexer.pinecone_index.upsert = MagicMock() + indexer.pinecone_index.query = MagicMock() return indexer -def test_pinecone_index_upsert_and_delete(): +def create_index_description(dimensions=3, pod_type="p1"): + return IndexDescription( + name="", + metric="", + replicas=1, + dimension=dimensions, + shards=1, + pods=1, + pod_type=pod_type, + status=None, + metadata_config=None, + source_collection=None, + ) + + +@pytest.fixture(scope="module", autouse=True) +def mock_describe_index(): + with patch('pinecone.describe_index') as mock: + mock.return_value = create_index_description() + yield mock + + +def test_pinecone_index_upsert_and_delete(mock_describe_index): indexer = create_pinecone_indexer() + indexer._pod_type = "p1" indexer.index( [ Document(page_content="test", metadata={"_airbyte_stream": "abc"}), @@ -43,6 +68,29 @@ def test_pinecone_index_upsert_and_delete(): ) +def test_pinecone_index_upsert_and_delete_starter(mock_describe_index): + indexer = create_pinecone_indexer() + indexer._pod_type = "starter" + indexer.pinecone_index.query.return_value = MagicMock(matches=[MagicMock(id="doc_id1"), MagicMock(id="doc_id2")]) + indexer.index( + [ + Document(page_content="test", metadata={"_airbyte_stream": "abc"}), + Document(page_content="test2", metadata={"_airbyte_stream": "abc"}), + ], + ["delete_id1", "delete_id2"], + ) + indexer.pinecone_index.query.assert_called_with(vector=[0,0,0],filter={"_record_id": {"$in": ["delete_id1", "delete_id2"]}}, top_k=10_000) + indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"]) + indexer.pinecone_index.upsert.assert_called_with( + vectors=( + (ANY, [1, 2, 3], {"_airbyte_stream": "abc", "text": "test"}), + (ANY, [4, 5, 6], {"_airbyte_stream": "abc", "text": "test2"}), + ), + async_req=True, + show_progress=False, + ) + + def test_pinecone_index_empty_batch(): indexer = create_pinecone_indexer() indexer.index( @@ -75,43 +123,54 @@ def test_pinecone_index_upsert_batching(): ) -def test_pinecone_pre_sync(): - indexer = create_pinecone_indexer() - indexer.pre_sync( - ConfiguredAirbyteCatalog.parse_obj( - { - "streams": [ - { - "stream": { - "name": "example_stream", - "json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": False, - "default_cursor_field": ["column_name"], - }, - "primary_key": [["id"]], - "sync_mode": "incremental", - "destination_sync_mode": "append_dedup", +def generate_catalog(): + return ConfiguredAirbyteCatalog.parse_obj( + { + "streams": [ + { + "stream": { + "name": "example_stream", + "json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": False, + "default_cursor_field": ["column_name"], }, - { - "stream": { - "name": "example_stream2", - "json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": False, - "default_cursor_field": ["column_name"], - }, - "primary_key": [["id"]], - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite", + "primary_key": [["id"]], + "sync_mode": "incremental", + "destination_sync_mode": "append_dedup", + }, + { + "stream": { + "name": "example_stream2", + "json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": False, + "default_cursor_field": ["column_name"], }, - ] - } - ) + "primary_key": [["id"]], + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, + ] + } ) + + +def test_pinecone_pre_sync(mock_describe_index): + indexer = create_pinecone_indexer() + indexer.pre_sync(generate_catalog()) indexer.pinecone_index.delete.assert_called_with(filter={"_airbyte_stream": "example_stream2"}) +def test_pinecone_pre_sync_starter(mock_describe_index): + mock_describe_index.return_value = create_index_description(pod_type="starter") + indexer = create_pinecone_indexer() + indexer.pinecone_index.query.return_value = MagicMock(matches=[MagicMock(id="doc_id1"), MagicMock(id="doc_id2")]) + indexer.pre_sync(generate_catalog()) + indexer.pinecone_index.query.assert_called_with(vector=[0,0,0],filter={"_airbyte_stream": "example_stream2"}, top_k=10_000) + indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"]) + + @pytest.mark.parametrize( "describe_throws,reported_dimensions,check_succeeds", [ @@ -127,18 +186,7 @@ def test_pinecone_check(describe_mock, describe_throws, reported_dimensions, che indexer.embedder.embedding_dimensions = 3 if describe_throws: describe_mock.side_effect = Exception("describe failed") - describe_mock.return_value = IndexDescription( - name="", - metric="", - replicas=1, - dimension=reported_dimensions, - shards=1, - pods=1, - pod_type="p1", - status=None, - metadata_config=None, - source_collection=None, - ) + describe_mock.return_value = create_index_description(dimensions=reported_dimensions) result = indexer.check() if check_succeeds: assert result is None diff --git a/docs/integrations/destinations/langchain.md b/docs/integrations/destinations/langchain.md index 2b4b068991dc..0b03bcc86846 100644 --- a/docs/integrations/destinations/langchain.md +++ b/docs/integrations/destinations/langchain.md @@ -53,6 +53,12 @@ vector_store = Pinecone(index, embeddings.embed_query, "text") qa = RetrievalQA.from_chain_type(llm=OpenAI(temperature=0), chain_type="stuff", retriever=vector_store.as_retriever()) ``` +:::caution + +For Pinecone pods of type starter, only up to 10,000 chunks can be indexed. For production use, please use a higher tier. + +::: + #### Chroma vector store The [Chroma vector store](https://trychroma.com) is running the Chroma embedding database as persistent client and stores the vectors in a local file. @@ -133,6 +139,7 @@ Please make sure that Docker Desktop has access to `/tmp` (and `/private` on a M | Version | Date | Pull Request | Subject | |:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.0.7 | 2023-08-18 | [#29513](https://github.com/airbytehq/airbyte/pull/29513) | Fix for starter pods | | 0.0.6 | 2023-08-02 | [#28977](https://github.com/airbytehq/airbyte/pull/28977) | Validate pinecone index dimensions during check | | 0.0.5 | 2023-07-25 | [#28605](https://github.com/airbytehq/airbyte/pull/28605) | Add Chroma support | | 0.0.4 | 2023-07-21 | [#28556](https://github.com/airbytehq/airbyte/pull/28556) | Correctly dedupe records with composite and nested primary keys |