From e1d407f05d75a5803ffc80a64adb6b034c6873cf Mon Sep 17 00:00:00 2001 From: Anush008 Date: Tue, 16 Jan 2024 11:54:58 +0530 Subject: [PATCH 01/17] feat: Qdrant Provider --- CONTRIBUTING.rst | 7 +- INSTALL | 7 +- airflow/providers/qdrant/CHANGELOG.rst | 26 ++ airflow/providers/qdrant/__init__.py | 16 + airflow/providers/qdrant/hooks/__init__.py | 16 + airflow/providers/qdrant/hooks/qdrant.py | 320 ++++++++++++++++++ .../providers/qdrant/operators/__init__.py | 16 + airflow/providers/qdrant/operators/qdrant.py | 109 ++++++ airflow/providers/qdrant/provider.yaml | 54 +++ .../changelog.rst | 18 + .../commits.rst | 19 ++ .../connections.rst | 58 ++++ .../apache-airflow-providers-qdrant/index.rst | 98 ++++++ .../installing-providers-from-sources.rst | 18 + .../operators/qdrant.rst | 40 +++ .../security.rst | 18 + docs/apache-airflow/extra-packages-ref.rst | 2 + docs/spelling_wordlist.txt | 3 + generated/provider_dependencies.json | 9 + images/breeze/output_build-docs.svg | 6 +- images/breeze/output_build-docs.txt | 2 +- ...release-management_add-back-references.svg | 6 +- ...release-management_add-back-references.txt | 2 +- ...output_release-management_publish-docs.svg | 6 +- ...output_release-management_publish-docs.txt | 2 +- ...t_sbom_generate-providers-requirements.svg | 6 +- ...t_sbom_generate-providers-requirements.txt | 2 +- pyproject.toml | 5 + tests/providers/qdrant/__init__.py | 16 + tests/providers/qdrant/hooks/__init__.py | 16 + tests/providers/qdrant/hooks/test_qdrant.py | 118 +++++++ tests/providers/qdrant/operators/__init__.py | 16 + .../providers/qdrant/operators/test_qdrant.py | 84 +++++ tests/system/providers/qdrant/__init__.py | 16 + .../providers/qdrant/example_dag_qdrant.py | 49 +++ 35 files changed, 1184 insertions(+), 22 deletions(-) create mode 100644 airflow/providers/qdrant/CHANGELOG.rst create mode 100644 airflow/providers/qdrant/__init__.py create mode 100644 airflow/providers/qdrant/hooks/__init__.py create mode 100644 airflow/providers/qdrant/hooks/qdrant.py create mode 100644 airflow/providers/qdrant/operators/__init__.py create mode 100644 airflow/providers/qdrant/operators/qdrant.py create mode 100644 airflow/providers/qdrant/provider.yaml create mode 100644 docs/apache-airflow-providers-qdrant/changelog.rst create mode 100644 docs/apache-airflow-providers-qdrant/commits.rst create mode 100644 docs/apache-airflow-providers-qdrant/connections.rst create mode 100644 docs/apache-airflow-providers-qdrant/index.rst create mode 100644 docs/apache-airflow-providers-qdrant/installing-providers-from-sources.rst create mode 100644 docs/apache-airflow-providers-qdrant/operators/qdrant.rst create mode 100644 docs/apache-airflow-providers-qdrant/security.rst create mode 100644 tests/providers/qdrant/__init__.py create mode 100644 tests/providers/qdrant/hooks/__init__.py create mode 100644 tests/providers/qdrant/hooks/test_qdrant.py create mode 100644 tests/providers/qdrant/operators/__init__.py create mode 100644 tests/providers/qdrant/operators/test_qdrant.py create mode 100644 tests/system/providers/qdrant/__init__.py create mode 100644 tests/system/providers/qdrant/example_dag_qdrant.py diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index ed70c3a2c22b..701e58f55e83 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -897,9 +897,10 @@ gcp_api, github, github-enterprise, google, google-auth, graphviz, grpc, hashico http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft-azure, microsoft-mssql, microsoft-psrp, microsoft-winrm, mongo, mssql, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, -pgvector, pinecone, pinot, postgres, presto, rabbitmq, redis, s3, s3fs, salesforce, samba, saml, -segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, -tableau, tabular, telegram, trino, vertica, virtualenv, weaviate, webhdfs, winrm, yandex, zendesk +pgvector, pinecone, pinot, postgres, presto, qdrant, rabbitmq, redis, s3, s3fs, salesforce, samba, +saml, segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, +statsd, tableau, tabular, telegram, trino, vertica, virtualenv, weaviate, webhdfs, winrm, yandex, +zendesk .. END REGULAR EXTRAS HERE Provider packages diff --git a/INSTALL b/INSTALL index ab0ff03ef517..b94962f54c43 100644 --- a/INSTALL +++ b/INSTALL @@ -252,9 +252,10 @@ gcp_api, github, github-enterprise, google, google-auth, graphviz, grpc, hashico http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft-azure, microsoft-mssql, microsoft-psrp, microsoft-winrm, mongo, mssql, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, -pgvector, pinecone, pinot, postgres, presto, rabbitmq, redis, s3, s3fs, salesforce, samba, saml, -segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, -tableau, tabular, telegram, trino, vertica, virtualenv, weaviate, webhdfs, winrm, yandex, zendesk +pgvector, pinecone, pinot, postgres, presto, qdrant, rabbitmq, redis, s3, s3fs, salesforce, samba, +saml, segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, +statsd, tableau, tabular, telegram, trino, vertica, virtualenv, weaviate, webhdfs, winrm, yandex, +zendesk # END REGULAR EXTRAS HERE Devel extras - used to install development-related tools. Only available during editable install. diff --git a/airflow/providers/qdrant/CHANGELOG.rst b/airflow/providers/qdrant/CHANGELOG.rst new file mode 100644 index 000000000000..5adfb1c9e7e5 --- /dev/null +++ b/airflow/providers/qdrant/CHANGELOG.rst @@ -0,0 +1,26 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-qdrant`` + +Changelog +--------- + +1.0.0 +..... + +Initial version of the provider. diff --git a/airflow/providers/qdrant/__init__.py b/airflow/providers/qdrant/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/qdrant/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/qdrant/hooks/__init__.py b/airflow/providers/qdrant/hooks/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/qdrant/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/qdrant/hooks/qdrant.py b/airflow/providers/qdrant/hooks/qdrant.py new file mode 100644 index 000000000000..a40126b0c14f --- /dev/null +++ b/airflow/providers/qdrant/hooks/qdrant.py @@ -0,0 +1,320 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from functools import cached_property +from typing import Any, Iterable, Mapping, Sequence + +from grpc import RpcError +from qdrant_client import QdrantClient, models +from qdrant_client.http.exceptions import UnexpectedResponse + +from airflow.hooks.base import BaseHook + + +class QdrantHook(BaseHook): + """ + Hook for interfacing with a Qdrant instance. + + :param conn_id: The connection id to use when connecting to Qdrant. Defaults to `qdrant_default`. + """ + + conn_name_attr = "conn_id" + conn_type = "qdrant" + default_conn_name = "qdrant_default" + hook_name = "Qdrant" + + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Returns connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import BooleanField, IntegerField, StringField + + return { + "url": StringField( + lazy_gettext("URL"), + widget=BS3TextFieldWidget(), + description="Optional. Qualified URL of the Qdrant instance." + "Example: https://xyz-example.eu-central.aws.cloud.qdrant.io:6333", + ), + "grpc_port": IntegerField( + lazy_gettext("GPRC Port"), + widget=BS3TextFieldWidget(), + description="Optional. Port of the gRPC interface.", + default=None, + ), + "prefer_gprc": BooleanField( + lazy_gettext("Prefer GRPC"), + widget=BS3TextFieldWidget(), + description="Optional. Whether to use gPRC interface whenever possible in custom methods.", + default=True, + ), + "https": BooleanField( + lazy_gettext("HTTPS"), + widget=BS3TextFieldWidget(), + description="Optional. Whether to use HTTPS(SSL) protocol.", + ), + "prefix": StringField( + lazy_gettext("Prefix"), + widget=BS3TextFieldWidget(), + description="Optional. Prefix to the REST URL path." + "Example: `service/v1` will result in http://localhost:6333/service/v1/{qdrant-endpoint} for REST API.", + ), + "timeout": IntegerField( + lazy_gettext("Timeout"), + widget=BS3TextFieldWidget(), + description="Optional. Timeout for REST and gRPC API requests.", + default=None, + ), + } + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Returns custom field behaviour.""" + return { + "hidden_fields": ["schema", "login", "extra"], + "relabeling": {"password": "API Key"}, + } + + def __init__(self, conn_id: str = default_conn_name, **kwargs) -> None: + super().__init__(**kwargs) + self.conn_id = conn_id + self.get_conn() + + def get_conn(self) -> QdrantClient: + """Get a Qdrant client instance.""" + connection = self.get_connection(self.conn_id) + host = connection.host or None + port = connection.port or 6333 + api_key = connection.password + extra = connection.extra_dejson + url = extra.get("url", None) + grpc_port = extra.get("grpc_port", 6334) + prefer_gprc = extra.get("prefer_gprc", False) + https = extra.get("https", False) + prefix = extra.get("prefix", "") + timeout = extra.get("timeout", None) + + return QdrantClient( + host=host, + port=port, + url=url, + api_key=api_key, + grpc_port=grpc_port, + prefer_grpc=prefer_gprc, + https=https, + prefix=prefix, + timeout=timeout, + ) + + @cached_property + def conn(self) -> QdrantClient: + """Get a Qdrant client instance.""" + return self.get_conn() + + def verify_connection(self) -> tuple[bool, str]: + """Check the connection to the Qdrant instance.""" + try: + self.conn.get_collections() + return True, "Connection established!" + except (UnexpectedResponse, RpcError, ValueError) as e: + return False, str(e) + + def list_collections(self) -> list[str]: + """Get a list of collections in the Qdrant instance.""" + return [collection.name for collection in self.conn.get_collections().collections] + + def upsert( + self, + collection_name: str, + vectors: Iterable[models.VectorStruct], + payload: Iterable[dict[str, Any]] | None = None, + ids: Iterable[str | int] | None = None, + batch_size: int = 64, + parallel: int = 1, + method: str | None = None, + max_retries: int = 3, + wait: bool = True, + ) -> None: + """ + Upload points to a Qdrant collection. + + :param collection_name: Name of the collection to upload points to. + :param vectors: An iterable over vectors to upload. + :param payload: Iterable of vectors payload, Optional. Defaults to None. + :param ids: Iterable of custom vectors ids, Optional. Defaults to None. + :param batch_size: Number of points to upload per-request. Defaults to 64. + :param parallel: Number of parallel upload processes. Defaults to 1. + :param method: Start method for parallel processes. Defaults to forkserver. + :param max_retries: Number of retries for failed requests. Defaults to 3. + :param wait: Await for the results to be applied on the server side. Defaults to True. + """ + return self.conn.upload_collection( + collection_name=collection_name, + vectors=vectors, + payload=payload, + ids=ids, + batch_size=batch_size, + parallel=parallel, + method=method, + max_retries=max_retries, + wait=wait, + ) + + def delete( + self, + collection_name: str, + points_selector: models.PointsSelector, + wait: bool = True, + ordering: models.WriteOrdering | None = None, + shard_key_selector: models.ShardKeySelector | None = None, + ) -> None: + """ + Delete points from a Qdrant collection. + + :param collection_name: Name of the collection to delete points from. + :param points_selector: Selector for points to delete. + :param wait: Await for the results to be applied on the server side. Defaults to True. + :param ordering: Ordering of the write operation. Defaults to None. + :param shard_key_selector: Selector for the shard key. Defaults to None. + """ + self.conn.delete( + collection_name=collection_name, + points_selector=points_selector, + wait=wait, + ordering=ordering, + shard_key_selector=shard_key_selector, + ) + + def search( + self, + collection_name: str, + query_vector: Sequence[float] + | tuple[str, list[float]] + | models.NamedVector + | models.NamedSparseVector, + query_filter: models.Filter | None = None, + search_params: models.SearchParams | None = None, + limit: int = 10, + offset: int | None = None, + with_payload: bool | Sequence[str] | models.PayloadSelector = True, + with_vectors: bool | Sequence[str] = False, + score_threshold: float | None = None, + consistency: models.ReadConsistency | None = None, + shard_key_selector: models.ShardKeySelector | None = None, + timeout: int | None = None, + ): + """ + Search for the closest points in a Qdrant collection. + + :param collection_name: Name of the collection to upload points to. + :param quey_vector: Query vector to search for. + :param query_filter: Filter for the query. Defaults to None. + :param search_params: Additional search parameters. Defaults to None. + :param limit: Number of results to return. Defaults to 10. + :param offset: Offset of the first results to return. Defaults to None. + :param with_payload: To specify which stored payload should be attached to the result. Defaults to True. + :param with_vectors: To specify whether vectors should be attached to the result. Defaults to False. + :param score_threshold: To specify the minimum score threshold of the results. Defaults to None. + :param consistency: Defines how many replicas should be queried before returning the result. Defaults to None. + :param shard_key_selector: To specify which shards should be queried.. Defaults to None. + :param wait: Await for the results to be applied on the server side. Defaults to True. + """ + return self.conn.search( + collection_name=collection_name, + query_vector=query_vector, + query_filter=query_filter, + search_params=search_params, + limit=limit, + offset=offset, + with_payload=with_payload, + with_vectors=with_vectors, + score_threshold=score_threshold, + consistency=consistency, + shard_key_selector=shard_key_selector, + timeout=timeout, + ) + + def create_collection( + self, + collection_name: str, + vectors_config: models.VectorParams | Mapping[str, models.VectorParams], + sparse_vectors_config: Mapping[str, models.SparseVectorParams] | None = None, + shard_number: int | None = None, + sharding_method: models.ShardingMethod | None = None, + replication_factor: int | None = None, + write_consistency_factor: int | None = None, + on_disk_payload: bool | None = None, + hnsw_config: models.HnswConfigDiff | None = None, + optimizers_config: models.OptimizersConfigDiff | None = None, + wal_config: models.WalConfigDiff | None = None, + quantization_config: models.QuantizationConfig | None = None, + init_from: models.InitFrom | None = None, + timeout: int | None = None, + ) -> bool: + """ + Create a new Qdrant collection. + + :param collection_name: Name of the collection to upload points to. + :param vectors_config: Configuration of the vector storage contains size and distance for the vectors. + :param sparse_vectors_config: Configuration of the sparse vector storage. Defaults to None. + :param shard_number: Number of shards in collection. Default is 1, minimum is 1. + :param sharding_method: Defines strategy for shard creation. Defaults to auto. + :param replication_factor: Replication factor for collection. Default is 1, minimum is 1. + :param write_consistency_factor: Write consistency factor for collection. Default is 1, minimum is 1. + :param on_disk_payload: If true - point`s payload will not be stored in memory. + :param hnsw_config: Parameters for HNSW index. + :param optimizers_config: Parameters for optimizer. + :param wal_config: Parameters for Write-Ahead-Log. + :param quantization_config: Parameters for quantization, if None - quantization will be disabled. + :param init_from: Whether to use data stored in another collection to initialize this collection. + :param timeout: Timeout for the request. Defaults to None. + """ + return self.conn.create_collection( + collection_name=collection_name, + vectors_config=vectors_config, + sparse_vectors_config=sparse_vectors_config, + shard_number=shard_number, + sharding_method=sharding_method, + replication_factor=replication_factor, + write_consistency_factor=write_consistency_factor, + on_disk_payload=on_disk_payload, + hnsw_config=hnsw_config, + optimizers_config=optimizers_config, + wal_config=wal_config, + quantization_config=quantization_config, + init_from=init_from, + timeout=timeout, + ) + + def get_collection(self, collection_name: str) -> models.CollectionInfo: + """ + Get information about a Qdrant collection. + + :param collection_name: Name of the collection to get information about. + """ + return self.conn.get_collection(collection_name=collection_name) + + def delete_collection(self, collection_name: str, timeout: int | None) -> bool: + """ + Delete a Qdrant collection. + + :param collection_name: Name of the collection to delete. + """ + return self.conn.delete_collection(collection_name=collection_name, timeout=timeout) diff --git a/airflow/providers/qdrant/operators/__init__.py b/airflow/providers/qdrant/operators/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/qdrant/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/qdrant/operators/qdrant.py b/airflow/providers/qdrant/operators/qdrant.py new file mode 100644 index 000000000000..d4e0d7afa139 --- /dev/null +++ b/airflow/providers/qdrant/operators/qdrant.py @@ -0,0 +1,109 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any, Iterable, Sequence + +from airflow.models import BaseOperator +from airflow.providers.qdrant.hooks.qdrant import QdrantHook + +if TYPE_CHECKING: + from qdrant_client.models import VectorStruct + + from airflow.utils.context import Context + + +class QdrantIngestOperator(BaseOperator): + """ + Upload points to a Qdrant collection. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:QdrantIngestOperator` + + :param conn_id: The connection id to connect to a Qdrant instance. + :param collection_name: The name of the collection to ingest data into. + :param vectors: An iterable over vectors to upload. + :param payload: Iterable of vectors payload, Optional. Defaults to None. + :param ids: Iterable of custom vectors ids, Optional. Defaults to None. + :param batch_size: Number of points to upload per-request. Defaults to 64. + :param parallel: Number of parallel upload processes. Defaults to 1. + :param method: Start method for parallel processes. Defaults to 'forkserver'. + :param max_retries: Number of retries for failed requests. Defaults to 3. + :param wait: Await for the results to be applied on the server side. Defaults to True. + :param kwargs: Additional keyword arguments passed to the BaseOperator constructor. + """ + + template_fields: Sequence[str] = ( + "collection_name", + "vectors", + "payload", + "ids", + "batch_size", + "parallel", + "method", + "max_retries", + "wait", + ) + + def __init__( + self, + *, + conn_id: str = QdrantHook.default_conn_name, + collection_name: str, + vectors: Iterable[VectorStruct], + payload: Iterable[dict[str, Any]] | None = None, + ids: Iterable[int | str] | None = None, + batch_size: int = 64, + parallel: int = 1, + method: str | None = None, + max_retries: int = 3, + wait: bool = True, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.conn_id = conn_id + self.collection_name = collection_name + self.vectors = vectors + self.payload = payload + self.ids = ids + self.batch_size = batch_size + self.parallel = parallel + self.method = method + self.max_retries = max_retries + self.wait = wait + + @cached_property + def hook(self) -> QdrantHook: + """Return an instance of QdrantHook.""" + return QdrantHook(conn_id=self.conn_id) + + def execute(self, context: Context) -> None: + """Upload points to a Qdrant collection.""" + self.hook.upsert( + collection_name=self.collection_name, + vectors=self.vectors, + payload=self.payload, + ids=self.ids, + batch_size=self.batch_size, + parallel=self.parallel, + method=self.method, + max_retries=self.max_retries, + wait=self.wait, + ) diff --git a/airflow/providers/qdrant/provider.yaml b/airflow/providers/qdrant/provider.yaml new file mode 100644 index 000000000000..9ff6b4b240c4 --- /dev/null +++ b/airflow/providers/qdrant/provider.yaml @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-qdrant + +name: Qdrant + +description: | + `Qdrant `__ + +state: not-ready +source-date-epoch: 1705379899 + +versions: + - 1.0.0 + +integrations: + - integration-name: Qdrant + external-doc-url: https://qdrant.tech/documentation + how-to-guide: + - /docs/apache-airflow-providers-qdrant/operators/qdrant.rst + tags: [software] + +dependencies: + - qdrant_client>=1.7.0 + +hooks: + - integration-name: Qdrant + python-modules: + - airflow.providers.qdrant.hooks.qdrant + +connection-types: + - hook-class-name: airflow.providers.qdrant.hooks.qdrant.QdrantHook + connection-type: qdrant + +operators: + - integration-name: Qdrant + python-modules: + - airflow.providers.qdrant.operators.qdrant diff --git a/docs/apache-airflow-providers-qdrant/changelog.rst b/docs/apache-airflow-providers-qdrant/changelog.rst new file mode 100644 index 000000000000..b4d67b06a076 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/changelog.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../../airflow/providers/qdrant/CHANGELOG.rst diff --git a/docs/apache-airflow-providers-qdrant/commits.rst b/docs/apache-airflow-providers-qdrant/commits.rst new file mode 100644 index 000000000000..c5206dab50a1 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/commits.rst @@ -0,0 +1,19 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Package apache-airflow-providers-qdrant +------------------------------------------- diff --git a/docs/apache-airflow-providers-qdrant/connections.rst b/docs/apache-airflow-providers-qdrant/connections.rst new file mode 100644 index 000000000000..37c70eff68f4 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/connections.rst @@ -0,0 +1,58 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/connection:qdrant: + +Qdrant Connection +=================== + +The `Qdrant `__ connection type enables access to Qdrant clusters. + +Default Connection IDs +---------------------- + +The Qdrant hook use the ``qdrant_default`` connection ID by default. + +Configuring the Connection +-------------------------- + +Host (optional) + Host of the Qdrant instance to connect to. + +API key (optional) + Qdrant API Key for authentication. + +Port (optional) + REST port of the Qdrant instance to connect to. Defaults to ``6333``. + +URL (optional) + URL of the Qdrant instance to connect to. If specified, it overrides the ``host`` and ``port`` parameters. + +GRPC Port (optional) + GRPC port of the Qdrant instance to connect to. Defaults to ``6334``. + +Prefer GRPC (optional) + Whether to use GRPC for custom methods. Defaults to ``False``. + +HTTPS (optional) + Whether to use HTTPS for requests. Defaults to ``False``. + +Prefix (optional) + Prefix to add to the REST URL endpoints. Defaults to ``None``. + +Timeout (optional) + Timeout for REST and gRPC API requests.. Defaults to ``5`` seconds for REST. Unlimited for GRPC. diff --git a/docs/apache-airflow-providers-qdrant/index.rst b/docs/apache-airflow-providers-qdrant/index.rst new file mode 100644 index 000000000000..57d361c0775d --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/index.rst @@ -0,0 +1,98 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-qdrant`` +====================================== + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Basics + + Home + Changelog + Security + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Guides + + Connection types + Operators + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Commits + + Detailed list of commits + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Resources + + Python API <_api/airflow/providers/qdrant/index> + PyPI Repository + Installing from sources + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: System tests + + System Tests <_api/tests/system/providers/qdrant/index> + +Package apache-airflow-providers-qdrant +----------------------------------------- + +`Qdrant `__ + + +Release: 1.0.0 + +Provider package +---------------- + +This is a provider package for ``Qdrant`` APIs. All classes for this provider package +are in ``airflow.providers.qdrant`` python module. + +Installation +------------ + +You can install this package on top of an existing Airflow 2 installation (see ``Requirements`` below) +for the minimum Airflow version supported) via +``pip install apache-airflow-providers-qdrant`` + + +Requirements +------------ + +The minimum Apache Airflow version supported by this provider package is ``2.5.0``. + + +=================== ================== +PIP package Version required +=================== ================== +``apache-airflow`` ``>=2.5.0`` +``qdrant_client`` ``>=1.7.0`` +=================== ================== diff --git a/docs/apache-airflow-providers-qdrant/installing-providers-from-sources.rst b/docs/apache-airflow-providers-qdrant/installing-providers-from-sources.rst new file mode 100644 index 000000000000..b4e730f4ff21 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/installing-providers-from-sources.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/installing-providers-from-sources.rst diff --git a/docs/apache-airflow-providers-qdrant/operators/qdrant.rst b/docs/apache-airflow-providers-qdrant/operators/qdrant.rst new file mode 100644 index 000000000000..22d82d5958f1 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/operators/qdrant.rst @@ -0,0 +1,40 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:QdrantIngestOperator: + +QdrantIngestOperator +====================== + +Use the :class:`~airflow.providers.qdrant.operators.qdrant.QdrantIngestOperator` to +ingest data into a Qdrant instance. + + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +The QdrantIngestOperator requires the ``vectors`` as an input ingest into Qdrant. Use the ``conn_id`` parameter to +specify the Qdrant connection to connect to Qdrant instance. The vectors could also contain metadata referencing +the original text corresponding to the vectors that could be ingested into the database. + +An example using the operator in this way: + +.. exampleinclude:: /../../tests/system/providers/qdrant/example_dag_qdrant.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_qdrant_ingest] + :end-before: [END howto_operator_qdrant_ingest] diff --git a/docs/apache-airflow-providers-qdrant/security.rst b/docs/apache-airflow-providers-qdrant/security.rst new file mode 100644 index 000000000000..afa13dac6fc9 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/security.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/security.rst diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index 8c1c1143c531..ea813aeb499f 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -210,6 +210,8 @@ These are extras that add dependencies needed for integration with external serv +---------------------+-----------------------------------------------------+-----------------------------------------------------+ | pinecone | ``pip install 'apache-airflow[pinecone]'`` | Pinecone Operators and Hooks | +---------------------+-----------------------------------------------------+-----------------------------------------------------+ +| qdrant | ``pip install 'apache-airflow[qdrant]'`` | Qdrant Operators and Hooks | ++---------------------+-----------------------------------------------------+-----------------------------------------------------+ | salesforce | ``pip install 'apache-airflow[salesforce]'`` | Salesforce hook | +---------------------+-----------------------------------------------------+-----------------------------------------------------+ | sendgrid | ``pip install 'apache-airflow[sendgrid]'`` | Send email using sendgrid | diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 6e5eaa07fd01..51b3eb22be2b 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -631,6 +631,7 @@ fn fo followsa forecasted +forkserver formatter formatters Formaturas @@ -1265,6 +1266,8 @@ pythonic PythonOperator pythonpath pywinrm +Qdrant +qdrant qds Qingping Qplum diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 8e69ded6578e..9dca11651e59 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -934,6 +934,15 @@ "excluded-python-versions": [], "state": "ready" }, + "qdrant": { + "deps": [ + "qdrant_client>=1.7.0" + ], + "devel-deps": [], + "cross-providers-deps": [], + "excluded-python-versions": [], + "state": "not-ready" + }, "redis": { "deps": [ "apache-airflow>=2.6.0", diff --git a/images/breeze/output_build-docs.svg b/images/breeze/output_build-docs.svg index dabcb6685415..bab81e920a35 100644 --- a/images/breeze/output_build-docs.svg +++ b/images/breeze/output_build-docs.svg @@ -182,9 +182,9 @@ dbt.cloud | dingding | discord | docker | docker-stack | elasticsearch | exasol | fab | facebook | ftp | github |      google | grpc | hashicorp | helm-chart | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Build documents. diff --git a/images/breeze/output_build-docs.txt b/images/breeze/output_build-docs.txt index b580d9d01c1f..1a93849cc2fb 100644 --- a/images/breeze/output_build-docs.txt +++ b/images/breeze/output_build-docs.txt @@ -1 +1 @@ -b4bc09e22159b362651e9dd299f795c2 +8f5e4f90611d600cbd02ae9e79d60df2 diff --git a/images/breeze/output_release-management_add-back-references.svg b/images/breeze/output_release-management_add-back-references.svg index a13066ce0d94..7492e1d03f88 100644 --- a/images/breeze/output_release-management_add-back-references.svg +++ b/images/breeze/output_release-management_add-back-references.svg @@ -144,9 +144,9 @@ dbt.cloud | dingding | discord | docker | docker-stack | elasticsearch | exasol | fab | facebook | ftp | github |      google | grpc | hashicorp | helm-chart | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Command to add back references for documentation to make it backward compatible. diff --git a/images/breeze/output_release-management_add-back-references.txt b/images/breeze/output_release-management_add-back-references.txt index c7d57a09d377..bd8679b924e6 100644 --- a/images/breeze/output_release-management_add-back-references.txt +++ b/images/breeze/output_release-management_add-back-references.txt @@ -1 +1 @@ -6ceda71ff8edfe80c678c2a6c844f22d +af0db4105f4aec228083f240d550bda3 diff --git a/images/breeze/output_release-management_publish-docs.svg b/images/breeze/output_release-management_publish-docs.svg index 1448a8a59448..0f8a6216c494 100644 --- a/images/breeze/output_release-management_publish-docs.svg +++ b/images/breeze/output_release-management_publish-docs.svg @@ -190,9 +190,9 @@ dbt.cloud | dingding | discord | docker | docker-stack | elasticsearch | exasol | fab | facebook | ftp | github |      google | grpc | hashicorp | helm-chart | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Command to publish generated documentation to airflow-site diff --git a/images/breeze/output_release-management_publish-docs.txt b/images/breeze/output_release-management_publish-docs.txt index f07d4889c61a..d3a965a4bbc2 100644 --- a/images/breeze/output_release-management_publish-docs.txt +++ b/images/breeze/output_release-management_publish-docs.txt @@ -1 +1 @@ -babcac730a3ede766b87ba14ab3484e1 +30bde47bb7c648532bcadd4c53ff3d1e diff --git a/images/breeze/output_sbom_generate-providers-requirements.svg b/images/breeze/output_sbom_generate-providers-requirements.svg index dbc947ad26b4..6505980e8712 100644 --- a/images/breeze/output_sbom_generate-providers-requirements.svg +++ b/images/breeze/output_sbom_generate-providers-requirements.svg @@ -191,9 +191,9 @@ │| facebook | ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc |       â”‚ │jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql│ │| neo4j | odbc | openai | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty â”‚ -│| papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba | segment |│ -│sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | â”‚ -│telegram | trino | vertica | weaviate | yandex | zendesk)                                     â”‚ +│| papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce | samba | â”‚ +│segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | â”‚ +│tabular | telegram | trino | vertica | weaviate | yandex | zendesk)                           â”‚ │--provider-versionProvider version to generate the requirements for i.e `2.1.0`. `latest` is also a supported   â”‚ │value to account for the most recent version of the provider                                  â”‚ │(TEXT)                                                                                        â”‚ diff --git a/images/breeze/output_sbom_generate-providers-requirements.txt b/images/breeze/output_sbom_generate-providers-requirements.txt index 56eadeec24b7..6eab2584f87c 100644 --- a/images/breeze/output_sbom_generate-providers-requirements.txt +++ b/images/breeze/output_sbom_generate-providers-requirements.txt @@ -1 +1 @@ -84d46887b3f47bc209014ec5cb26406c +75d28480ee1900ffd878862190585efc diff --git a/pyproject.toml b/pyproject.toml index 7256bb1ea36f..f54568e0dab6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -874,6 +874,9 @@ presto = [ "pandas>=1.2.5", "presto-python-client>=0.8.4", ] +qdrant = [ + "qdrant_client>=1.7.0", +] redis = [ "redis>=4.5.2,<5.0.0,!=4.5.5", ] @@ -1037,6 +1040,7 @@ all = [ "apache-airflow[pinecone]", "apache-airflow[postgres]", "apache-airflow[presto]", + "apache-airflow[qdrant]", "apache-airflow[redis]", "apache-airflow[salesforce]", "apache-airflow[samba]", @@ -1136,6 +1140,7 @@ devel-all = [ "apache-airflow[pinecone]", "apache-airflow[postgres]", "apache-airflow[presto]", + "apache-airflow[qdrant]", "apache-airflow[redis]", "apache-airflow[salesforce]", "apache-airflow[samba]", diff --git a/tests/providers/qdrant/__init__.py b/tests/providers/qdrant/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/providers/qdrant/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/qdrant/hooks/__init__.py b/tests/providers/qdrant/hooks/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/providers/qdrant/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/qdrant/hooks/test_qdrant.py b/tests/providers/qdrant/hooks/test_qdrant.py new file mode 100644 index 000000000000..90ffd811602e --- /dev/null +++ b/tests/providers/qdrant/hooks/test_qdrant.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import Mock, patch + +from airflow.providers.qdrant.hooks.qdrant import QdrantHook + + +class TestQdrantHook: + def setup_method(self): + """Set up the test connection for the QdrantHook.""" + with patch("airflow.models.Connection.get_connection_from_secrets") as mock_get_connection: + mock_conn = Mock() + mock_conn.host = "localhost" + mock_conn.port = 6333 + mock_conn.password = "some_test_api_key" + mock_get_connection.return_value = mock_conn + self.qdrant_hook = QdrantHook + self.collection_name = "test_collection" + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.upsert") + def test_upsert(self, mock_upsert): + """Test the upsert method of the QdrantHook with appropriate arguments.""" + vectors = [[0.732, 0.611, 0.289], [0.217, 0.526, 0.416], [0.326, 0.483, 0.376]] + ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"] + payloads = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}] + parallel = 2 + self.qdrant_hook.upsert( + collection_name=self.collection_name, + vectors=vectors, + ids=ids, + payloads=payloads, + parallel=parallel, + ) + mock_upsert.assert_called_once_with( + collection_name=self.collection_name, + vectors=vectors, + ids=ids, + payloads=payloads, + parallel=parallel, + ) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.list_collections") + def test_list_collections(self, mock_list_collections): + """Test that the list_collections is called correctly.""" + self.qdrant_hook.list_collections() + mock_list_collections.assert_called_once() + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.create_collection") + def test_create_collection(self, mock_create_collection): + """Test that the create_collection is called with correct arguments.""" + + from qdrant_client.models import Distance, VectorParams + + self.qdrant_hook.create_collection( + collection_name=self.collection_name, + vectors_config=VectorParams(size=384, distance=Distance.COSINE), + ) + mock_create_collection.assert_called_once_with( + collection_name=self.collection_name, + vectors_config=VectorParams(size=384, distance=Distance.COSINE), + ) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.delete") + def test_delete(self, mock_delete): + """Test that the delete is called with correct arguments.""" + + self.qdrant_hook.delete(collection_name=self.collection_name, points_selector=[32, 21], wait=False) + + mock_delete.assert_called_once_with( + collection_name=self.collection_name, points_selector=[32, 21], wait=False + ) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.search") + def test_search(self, mock_search): + """Test that the search is called with correct arguments.""" + + self.qdrant_hook.search( + collection_name=self.collection_name, + query_vector=[1.0, 2.0, 3.0], + limit=10, + with_vectors=True, + ) + + mock_search.assert_called_once_with( + collection_name=self.collection_name, query_vector=[1.0, 2.0, 3.0], limit=10, with_vectors=True + ) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.get_collection") + def test_get_collection(self, mock_get_collection): + """Test that the get_collection is called with correct arguments.""" + + self.qdrant_hook.get_collection(collection_name=self.collection_name) + + mock_get_collection.assert_called_once_with(collection_name=self.collection_name) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.delete_collection") + def test_delete_collection(self, mock_delete_collection): + """Test that the delete_collection is called with correct arguments.""" + + self.qdrant_hook.delete_collection(collection_name=self.collection_name) + + mock_delete_collection.assert_called_once_with(collection_name=self.collection_name) diff --git a/tests/providers/qdrant/operators/__init__.py b/tests/providers/qdrant/operators/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/providers/qdrant/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/qdrant/operators/test_qdrant.py b/tests/providers/qdrant/operators/test_qdrant.py new file mode 100644 index 000000000000..3088bd203777 --- /dev/null +++ b/tests/providers/qdrant/operators/test_qdrant.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime +from unittest.mock import Mock, patch + +import pytest + +from airflow.models import DAG +from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator + + +class MockQdrantHook: + """Mocking QdrantHook to avoid actual external calls""" + + def create_collection(self, *args, **kwargs): + pass + + @staticmethod + def upsert(*args, **kwargs): + return Mock() + + +@pytest.fixture +def dummy_dag(): + """Fixture to provide a dummy Airflow DAG for testing.""" + return DAG(dag_id="test_dag", start_date=datetime(2023, 9, 29)) + + +class TestQdrantIngestOperator: + def test_operator_execution(self, dummy_dag): + """ + Test the execution of the QdrantIngestOperator. + Ensures that the upsert method on the hook is correctly called. + """ + vectors = [[0.732, 0.611, 0.289], [0.217, 0.526, 0.416], [0.326, 0.483, 0.376]] + ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"] + payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}] + + task = QdrantIngestOperator( + task_id="ingest_vectors", + collection_name="test_collection", + vectors=vectors, + ids=ids, + payload=payload, + wait=False, + max_retries=1, + parallel=3, + dag=dummy_dag, + ) + + with patch( + "airflow.providers.qdrant.operators.qdrant.QdrantIngestOperator.hook", + new_callable=MockQdrantHook, + ) as mock_hook_instance: + mock_hook_instance.upsert = Mock() + + task.execute(context={}) + mock_hook_instance.upsert.assert_called_once_with( + collection_name="test_collection", + vectors=vectors, + ids=ids, + payload=payload, + batch_size=64, + wait=False, + max_retries=1, + parallel=3, + method=None, + ) diff --git a/tests/system/providers/qdrant/__init__.py b/tests/system/providers/qdrant/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/system/providers/qdrant/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/qdrant/example_dag_qdrant.py b/tests/system/providers/qdrant/example_dag_qdrant.py new file mode 100644 index 000000000000..a0e2449fd2f6 --- /dev/null +++ b/tests/system/providers/qdrant/example_dag_qdrant.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator + +with DAG( + "example_qdrant_ingest", + schedule=None, + start_date=datetime(2024, 1, 1), + catchup=True, +) as dag: + # [START howto_operator_qdrant_ingest] + vectors = [[0.732, 0.611, 0.289, 0.421], [0.217, 0.526, 0.416, 0.981], [0.326, 0.483, 0.376, 0.136]] + ids: list[str | int] = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"] + payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}] + + QdrantIngestOperator( + task_id="qdrant_ingest", + collection_name="test_collection", + vectors=vectors, + ids=ids, + payload=payload, + batch_size=1, + ) + # [END howto_operator_qdrant_ingest] + + +from tests.system.utils import get_test_run + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From 13fc647f54de4b754d97908d024ee2c7941e9cac Mon Sep 17 00:00:00 2001 From: Anush008 Date: Tue, 16 Jan 2024 12:57:10 +0530 Subject: [PATCH 02/17] ci: qdrant to providers_bug_report_yml --- .github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml index 3213bf8b3c2d..eeb201cbb05a 100644 --- a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml +++ b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml @@ -90,6 +90,7 @@ body: - pinecone - postgres - presto + - qdrant - redis - salesforce - samba From bc880993c8afe7914dc17a84d06e5f8613f0cbae Mon Sep 17 00:00:00 2001 From: Anush008 Date: Thu, 18 Jan 2024 11:38:39 +0530 Subject: [PATCH 03/17] refactor: remove redundant methods, only @property conn --- airflow/providers/qdrant/hooks/qdrant.py | 194 +----------------- airflow/providers/qdrant/operators/qdrant.py | 2 +- tests/providers/qdrant/hooks/test_qdrant.py | 69 ++++--- .../providers/qdrant/operators/test_qdrant.py | 20 +- 4 files changed, 50 insertions(+), 235 deletions(-) diff --git a/airflow/providers/qdrant/hooks/qdrant.py b/airflow/providers/qdrant/hooks/qdrant.py index a40126b0c14f..42b473d77aa3 100644 --- a/airflow/providers/qdrant/hooks/qdrant.py +++ b/airflow/providers/qdrant/hooks/qdrant.py @@ -18,10 +18,10 @@ from __future__ import annotations from functools import cached_property -from typing import Any, Iterable, Mapping, Sequence +from typing import Any from grpc import RpcError -from qdrant_client import QdrantClient, models +from qdrant_client import QdrantClient from qdrant_client.http.exceptions import UnexpectedResponse from airflow.hooks.base import BaseHook @@ -52,6 +52,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: widget=BS3TextFieldWidget(), description="Optional. Qualified URL of the Qdrant instance." "Example: https://xyz-example.eu-central.aws.cloud.qdrant.io:6333", + default=None, ), "grpc_port": IntegerField( lazy_gettext("GPRC Port"), @@ -98,7 +99,7 @@ def __init__(self, conn_id: str = default_conn_name, **kwargs) -> None: self.get_conn() def get_conn(self) -> QdrantClient: - """Get a Qdrant client instance.""" + """Get a Qdrant client instance for interfacing with the database.""" connection = self.get_connection(self.conn_id) host = connection.host or None port = connection.port or 6333 @@ -108,7 +109,7 @@ def get_conn(self) -> QdrantClient: grpc_port = extra.get("grpc_port", 6334) prefer_gprc = extra.get("prefer_gprc", False) https = extra.get("https", False) - prefix = extra.get("prefix", "") + prefix = extra.get("prefix", None) timeout = extra.get("timeout", None) return QdrantClient( @@ -125,7 +126,7 @@ def get_conn(self) -> QdrantClient: @cached_property def conn(self) -> QdrantClient: - """Get a Qdrant client instance.""" + """Get a Qdrant client instance for interfacing with the database.""" return self.get_conn() def verify_connection(self) -> tuple[bool, str]: @@ -135,186 +136,3 @@ def verify_connection(self) -> tuple[bool, str]: return True, "Connection established!" except (UnexpectedResponse, RpcError, ValueError) as e: return False, str(e) - - def list_collections(self) -> list[str]: - """Get a list of collections in the Qdrant instance.""" - return [collection.name for collection in self.conn.get_collections().collections] - - def upsert( - self, - collection_name: str, - vectors: Iterable[models.VectorStruct], - payload: Iterable[dict[str, Any]] | None = None, - ids: Iterable[str | int] | None = None, - batch_size: int = 64, - parallel: int = 1, - method: str | None = None, - max_retries: int = 3, - wait: bool = True, - ) -> None: - """ - Upload points to a Qdrant collection. - - :param collection_name: Name of the collection to upload points to. - :param vectors: An iterable over vectors to upload. - :param payload: Iterable of vectors payload, Optional. Defaults to None. - :param ids: Iterable of custom vectors ids, Optional. Defaults to None. - :param batch_size: Number of points to upload per-request. Defaults to 64. - :param parallel: Number of parallel upload processes. Defaults to 1. - :param method: Start method for parallel processes. Defaults to forkserver. - :param max_retries: Number of retries for failed requests. Defaults to 3. - :param wait: Await for the results to be applied on the server side. Defaults to True. - """ - return self.conn.upload_collection( - collection_name=collection_name, - vectors=vectors, - payload=payload, - ids=ids, - batch_size=batch_size, - parallel=parallel, - method=method, - max_retries=max_retries, - wait=wait, - ) - - def delete( - self, - collection_name: str, - points_selector: models.PointsSelector, - wait: bool = True, - ordering: models.WriteOrdering | None = None, - shard_key_selector: models.ShardKeySelector | None = None, - ) -> None: - """ - Delete points from a Qdrant collection. - - :param collection_name: Name of the collection to delete points from. - :param points_selector: Selector for points to delete. - :param wait: Await for the results to be applied on the server side. Defaults to True. - :param ordering: Ordering of the write operation. Defaults to None. - :param shard_key_selector: Selector for the shard key. Defaults to None. - """ - self.conn.delete( - collection_name=collection_name, - points_selector=points_selector, - wait=wait, - ordering=ordering, - shard_key_selector=shard_key_selector, - ) - - def search( - self, - collection_name: str, - query_vector: Sequence[float] - | tuple[str, list[float]] - | models.NamedVector - | models.NamedSparseVector, - query_filter: models.Filter | None = None, - search_params: models.SearchParams | None = None, - limit: int = 10, - offset: int | None = None, - with_payload: bool | Sequence[str] | models.PayloadSelector = True, - with_vectors: bool | Sequence[str] = False, - score_threshold: float | None = None, - consistency: models.ReadConsistency | None = None, - shard_key_selector: models.ShardKeySelector | None = None, - timeout: int | None = None, - ): - """ - Search for the closest points in a Qdrant collection. - - :param collection_name: Name of the collection to upload points to. - :param quey_vector: Query vector to search for. - :param query_filter: Filter for the query. Defaults to None. - :param search_params: Additional search parameters. Defaults to None. - :param limit: Number of results to return. Defaults to 10. - :param offset: Offset of the first results to return. Defaults to None. - :param with_payload: To specify which stored payload should be attached to the result. Defaults to True. - :param with_vectors: To specify whether vectors should be attached to the result. Defaults to False. - :param score_threshold: To specify the minimum score threshold of the results. Defaults to None. - :param consistency: Defines how many replicas should be queried before returning the result. Defaults to None. - :param shard_key_selector: To specify which shards should be queried.. Defaults to None. - :param wait: Await for the results to be applied on the server side. Defaults to True. - """ - return self.conn.search( - collection_name=collection_name, - query_vector=query_vector, - query_filter=query_filter, - search_params=search_params, - limit=limit, - offset=offset, - with_payload=with_payload, - with_vectors=with_vectors, - score_threshold=score_threshold, - consistency=consistency, - shard_key_selector=shard_key_selector, - timeout=timeout, - ) - - def create_collection( - self, - collection_name: str, - vectors_config: models.VectorParams | Mapping[str, models.VectorParams], - sparse_vectors_config: Mapping[str, models.SparseVectorParams] | None = None, - shard_number: int | None = None, - sharding_method: models.ShardingMethod | None = None, - replication_factor: int | None = None, - write_consistency_factor: int | None = None, - on_disk_payload: bool | None = None, - hnsw_config: models.HnswConfigDiff | None = None, - optimizers_config: models.OptimizersConfigDiff | None = None, - wal_config: models.WalConfigDiff | None = None, - quantization_config: models.QuantizationConfig | None = None, - init_from: models.InitFrom | None = None, - timeout: int | None = None, - ) -> bool: - """ - Create a new Qdrant collection. - - :param collection_name: Name of the collection to upload points to. - :param vectors_config: Configuration of the vector storage contains size and distance for the vectors. - :param sparse_vectors_config: Configuration of the sparse vector storage. Defaults to None. - :param shard_number: Number of shards in collection. Default is 1, minimum is 1. - :param sharding_method: Defines strategy for shard creation. Defaults to auto. - :param replication_factor: Replication factor for collection. Default is 1, minimum is 1. - :param write_consistency_factor: Write consistency factor for collection. Default is 1, minimum is 1. - :param on_disk_payload: If true - point`s payload will not be stored in memory. - :param hnsw_config: Parameters for HNSW index. - :param optimizers_config: Parameters for optimizer. - :param wal_config: Parameters for Write-Ahead-Log. - :param quantization_config: Parameters for quantization, if None - quantization will be disabled. - :param init_from: Whether to use data stored in another collection to initialize this collection. - :param timeout: Timeout for the request. Defaults to None. - """ - return self.conn.create_collection( - collection_name=collection_name, - vectors_config=vectors_config, - sparse_vectors_config=sparse_vectors_config, - shard_number=shard_number, - sharding_method=sharding_method, - replication_factor=replication_factor, - write_consistency_factor=write_consistency_factor, - on_disk_payload=on_disk_payload, - hnsw_config=hnsw_config, - optimizers_config=optimizers_config, - wal_config=wal_config, - quantization_config=quantization_config, - init_from=init_from, - timeout=timeout, - ) - - def get_collection(self, collection_name: str) -> models.CollectionInfo: - """ - Get information about a Qdrant collection. - - :param collection_name: Name of the collection to get information about. - """ - return self.conn.get_collection(collection_name=collection_name) - - def delete_collection(self, collection_name: str, timeout: int | None) -> bool: - """ - Delete a Qdrant collection. - - :param collection_name: Name of the collection to delete. - """ - return self.conn.delete_collection(collection_name=collection_name, timeout=timeout) diff --git a/airflow/providers/qdrant/operators/qdrant.py b/airflow/providers/qdrant/operators/qdrant.py index d4e0d7afa139..403bf12ff9e3 100644 --- a/airflow/providers/qdrant/operators/qdrant.py +++ b/airflow/providers/qdrant/operators/qdrant.py @@ -96,7 +96,7 @@ def hook(self) -> QdrantHook: def execute(self, context: Context) -> None: """Upload points to a Qdrant collection.""" - self.hook.upsert( + self.hook.conn.upload_collection( collection_name=self.collection_name, vectors=self.vectors, payload=self.payload, diff --git a/tests/providers/qdrant/hooks/test_qdrant.py b/tests/providers/qdrant/hooks/test_qdrant.py index 90ffd811602e..ab66aaf7ad96 100644 --- a/tests/providers/qdrant/hooks/test_qdrant.py +++ b/tests/providers/qdrant/hooks/test_qdrant.py @@ -28,26 +28,35 @@ def setup_method(self): mock_conn = Mock() mock_conn.host = "localhost" mock_conn.port = 6333 + mock_conn.extra_dejson = {} mock_conn.password = "some_test_api_key" mock_get_connection.return_value = mock_conn - self.qdrant_hook = QdrantHook + self.qdrant_hook = QdrantHook() + self.collection_name = "test_collection" - @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.upsert") - def test_upsert(self, mock_upsert): + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_verify_connection(self, mock_conn): + """Test the verify_connection of the QdrantHook.""" + self.qdrant_hook.verify_connection() + + mock_conn.get_collections.assert_called_once() + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_upsert(self, conn): """Test the upsert method of the QdrantHook with appropriate arguments.""" vectors = [[0.732, 0.611, 0.289], [0.217, 0.526, 0.416], [0.326, 0.483, 0.376]] ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"] payloads = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}] parallel = 2 - self.qdrant_hook.upsert( + self.qdrant_hook.conn.upsert( collection_name=self.collection_name, vectors=vectors, ids=ids, payloads=payloads, parallel=parallel, ) - mock_upsert.assert_called_once_with( + conn.upsert.assert_called_once_with( collection_name=self.collection_name, vectors=vectors, ids=ids, @@ -55,64 +64,66 @@ def test_upsert(self, mock_upsert): parallel=parallel, ) - @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.list_collections") - def test_list_collections(self, mock_list_collections): + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_list_collections(self, conn): """Test that the list_collections is called correctly.""" - self.qdrant_hook.list_collections() - mock_list_collections.assert_called_once() + self.qdrant_hook.conn.list_collections() + conn.list_collections.assert_called_once() - @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.create_collection") - def test_create_collection(self, mock_create_collection): + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_create_collection(self, conn): """Test that the create_collection is called with correct arguments.""" from qdrant_client.models import Distance, VectorParams - self.qdrant_hook.create_collection( + self.qdrant_hook.conn.create_collection( collection_name=self.collection_name, vectors_config=VectorParams(size=384, distance=Distance.COSINE), ) - mock_create_collection.assert_called_once_with( + conn.create_collection.assert_called_once_with( collection_name=self.collection_name, vectors_config=VectorParams(size=384, distance=Distance.COSINE), ) - @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.delete") - def test_delete(self, mock_delete): + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_delete(self, conn): """Test that the delete is called with correct arguments.""" - self.qdrant_hook.delete(collection_name=self.collection_name, points_selector=[32, 21], wait=False) + self.qdrant_hook.conn.delete( + collection_name=self.collection_name, points_selector=[32, 21], wait=False + ) - mock_delete.assert_called_once_with( + conn.delete.assert_called_once_with( collection_name=self.collection_name, points_selector=[32, 21], wait=False ) - @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.search") - def test_search(self, mock_search): + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_search(self, conn): """Test that the search is called with correct arguments.""" - self.qdrant_hook.search( + self.qdrant_hook.conn.search( collection_name=self.collection_name, query_vector=[1.0, 2.0, 3.0], limit=10, with_vectors=True, ) - mock_search.assert_called_once_with( + conn.search.assert_called_once_with( collection_name=self.collection_name, query_vector=[1.0, 2.0, 3.0], limit=10, with_vectors=True ) - @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.get_collection") - def test_get_collection(self, mock_get_collection): + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_get_collection(self, conn): """Test that the get_collection is called with correct arguments.""" - self.qdrant_hook.get_collection(collection_name=self.collection_name) + self.qdrant_hook.conn.get_collection(collection_name=self.collection_name) - mock_get_collection.assert_called_once_with(collection_name=self.collection_name) + conn.get_collection.assert_called_once_with(collection_name=self.collection_name) - @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.delete_collection") - def test_delete_collection(self, mock_delete_collection): + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_delete_collection(self, conn): """Test that the delete_collection is called with correct arguments.""" - self.qdrant_hook.delete_collection(collection_name=self.collection_name) + self.qdrant_hook.conn.delete_collection(collection_name=self.collection_name) - mock_delete_collection.assert_called_once_with(collection_name=self.collection_name) + conn.delete_collection.assert_called_once_with(collection_name=self.collection_name) diff --git a/tests/providers/qdrant/operators/test_qdrant.py b/tests/providers/qdrant/operators/test_qdrant.py index 3088bd203777..4bf1db749bb3 100644 --- a/tests/providers/qdrant/operators/test_qdrant.py +++ b/tests/providers/qdrant/operators/test_qdrant.py @@ -17,7 +17,7 @@ from __future__ import annotations from datetime import datetime -from unittest.mock import Mock, patch +from unittest.mock import patch import pytest @@ -25,17 +25,6 @@ from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator -class MockQdrantHook: - """Mocking QdrantHook to avoid actual external calls""" - - def create_collection(self, *args, **kwargs): - pass - - @staticmethod - def upsert(*args, **kwargs): - return Mock() - - @pytest.fixture def dummy_dag(): """Fixture to provide a dummy Airflow DAG for testing.""" @@ -65,13 +54,10 @@ def test_operator_execution(self, dummy_dag): ) with patch( - "airflow.providers.qdrant.operators.qdrant.QdrantIngestOperator.hook", - new_callable=MockQdrantHook, + "airflow.providers.qdrant.operators.qdrant.QdrantIngestOperator.hook" ) as mock_hook_instance: - mock_hook_instance.upsert = Mock() - task.execute(context={}) - mock_hook_instance.upsert.assert_called_once_with( + mock_hook_instance.conn.upload_collection.assert_called_once_with( collection_name="test_collection", vectors=vectors, ids=ids, From 9245f2447a6383793d3f1afbd914db54620945af Mon Sep 17 00:00:00 2001 From: Anush008 Date: Thu, 18 Jan 2024 11:56:41 +0530 Subject: [PATCH 04/17] test: Qdrant integration tests and CI --- .github/boring-cyborg.yml | 6 ++ .github/workflows/ci.yml | 3 + TESTING.rst | 2 + airflow/utils/db.py | 9 +++ .../src/airflow_breeze/global_constants.py | 12 +--- images/breeze/output-commands.svg | 2 +- images/breeze/output_shell.svg | 2 +- images/breeze/output_shell.txt | 2 +- images/breeze/output_start-airflow.svg | 2 +- images/breeze/output_start-airflow.txt | 2 +- .../output_testing_integration-tests.svg | 2 +- .../output_testing_integration-tests.txt | 2 +- images/breeze/output_testing_tests.svg | 2 +- images/breeze/output_testing_tests.txt | 2 +- .../ci/docker-compose/integration-qdrant.yml | 34 +++++++++ scripts/in_container/check_environment.sh | 7 ++ .../integration/providers/qdrant/__init__.py | 16 +++++ .../providers/qdrant/hooks/__init__.py | 16 +++++ .../providers/qdrant/hooks/test_qdrant.py | 69 ++++++++++++++++++ .../providers/qdrant/operators/__init__.py | 16 +++++ .../qdrant/operators/test_qdrant_ingest.py | 71 +++++++++++++++++++ 21 files changed, 260 insertions(+), 19 deletions(-) create mode 100644 scripts/ci/docker-compose/integration-qdrant.yml create mode 100644 tests/integration/providers/qdrant/__init__.py create mode 100644 tests/integration/providers/qdrant/hooks/__init__.py create mode 100644 tests/integration/providers/qdrant/hooks/test_qdrant.py create mode 100644 tests/integration/providers/qdrant/operators/__init__.py create mode 100644 tests/integration/providers/qdrant/operators/test_qdrant_ingest.py diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml index 287cad5157e6..335364aa56aa 100644 --- a/.github/boring-cyborg.yml +++ b/.github/boring-cyborg.yml @@ -403,6 +403,12 @@ labelPRBasedOnFilePath: - tests/providers/presto/**/* - tests/system/providers/presto/**/* + provider:qdrant: + - airflow/providers/qdrant/**/* + - docs/apache-airflow-providers-qdrant/**/* + - tests/providers/qdrant/**/* + - tests/system/providers/qdrant/**/* + provider:redis: - airflow/providers/redis/**/* - docs/apache-airflow-providers-redis/**/* diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 19b6d16471ad..305997bf5ff6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1469,6 +1469,9 @@ jobs: breeze testing integration-tests --integration kafka breeze down if: needs.build-info.outputs.is-airflow-runner != 'true' + - name: "Integration Tests Postgres: Qdrant" + run: breeze testing integration-tests --integration qdrant + if: needs.build-info.outputs.is-airflow-runner == 'true' - name: "Integration Tests Postgres: all-testable" run: breeze testing integration-tests --integration all-testable if: needs.build-info.outputs.is-airflow-runner == 'true' diff --git a/TESTING.rst b/TESTING.rst index 9d2a3716a17a..1a6a5d0a7e01 100644 --- a/TESTING.rst +++ b/TESTING.rst @@ -1232,6 +1232,8 @@ The following integrations are available: +--------------+----------------------------------------------------+ | pinot | Integration required for Apache Pinot hooks. | +--------------+----------------------------------------------------+ +| qdrant | Integration required for Qdrant tests. | ++--------------+----------------------------------------------------+ | statsd | Integration required for Satsd hooks. | +--------------+----------------------------------------------------+ | trino | Integration required for Trino hooks. | diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b9ee8323611c..e2de7666d105 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -541,6 +541,15 @@ def create_default_connections(session: Session = NEW_SESSION): ), session, ) + merge_conn( + Connection( + conn_id="qdrant_default", + conn_type="qdrant", + host="qdrant", + port=6333, + ), + session, + ) merge_conn( Connection( conn_id="redis_default", diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index ef1217acd78f..cb483eb4cdbf 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -50,7 +50,7 @@ ALLOWED_BACKENDS = ["sqlite", "mysql", "postgres", "none"] ALLOWED_PROD_BACKENDS = ["mysql", "postgres"] DEFAULT_BACKEND = ALLOWED_BACKENDS[0] -TESTABLE_INTEGRATIONS = ["cassandra", "celery", "kerberos", "mongo", "pinot", "trino", "kafka"] +TESTABLE_INTEGRATIONS = ["cassandra", "celery", "kerberos", "mongo", "pinot", "trino", "kafka", "qdrant"] OTHER_INTEGRATIONS = ["statsd", "otel", "openlineage"] ALLOWED_DEBIAN_VERSIONS = ["bookworm", "bullseye"] ALL_INTEGRATIONS = sorted( @@ -365,15 +365,7 @@ def get_airflow_extras(): # Initialize integrations -AVAILABLE_INTEGRATIONS = [ - "cassandra", - "kerberos", - "mongo", - "pinot", - "celery", - "statsd", - "trino", -] +AVAILABLE_INTEGRATIONS = ["cassandra", "kerberos", "mongo", "pinot", "celery", "statsd", "trino", "qdrant"] ALL_PROVIDER_YAML_FILES = Path(AIRFLOW_SOURCES_ROOT, "airflow", "providers").rglob("provider.yaml") PROVIDER_RUNTIME_DATA_SCHEMA_PATH = AIRFLOW_SOURCES_ROOT / "airflow" / "provider_info.schema.json" diff --git a/images/breeze/output-commands.svg b/images/breeze/output-commands.svg index 506044e2d76b..633e7c9353cd 100644 --- a/images/breeze/output-commands.svg +++ b/images/breeze/output-commands.svg @@ -285,7 +285,7 @@ │[default: 3.8]                                              â”‚ │--integrationIntegration(s) to enable when running (can be more than one).                       â”‚ │(all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage | â”‚ -│otel | pinot | statsd | trino)                                                      â”‚ +│otel | pinot | qdrant | statsd | trino)                                             â”‚ │--standalone-dag-processorRun standalone dag processor for start-airflow.│ │--database-isolationRun airflow in database isolation mode.│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/images/breeze/output_shell.svg b/images/breeze/output_shell.svg index a8edc07feb78..d72fd4a6e891 100644 --- a/images/breeze/output_shell.svg +++ b/images/breeze/output_shell.svg @@ -431,7 +431,7 @@ │[default: 3.8]                                              â”‚ │--integrationIntegration(s) to enable when running (can be more than one).                       â”‚ │(all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage | â”‚ -│otel | pinot | statsd | trino)                                                      â”‚ +│otel | pinot | qdrant | statsd | trino)                                             â”‚ │--standalone-dag-processorRun standalone dag processor for start-airflow.│ │--database-isolationRun airflow in database isolation mode.│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/images/breeze/output_shell.txt b/images/breeze/output_shell.txt index 0f3e91992caf..7284de07b984 100644 --- a/images/breeze/output_shell.txt +++ b/images/breeze/output_shell.txt @@ -1 +1 @@ -d0b4fa36683eaa51744c631715f9ffe5 +027f3033da6265d62d91bfc37aa477a3 diff --git a/images/breeze/output_start-airflow.svg b/images/breeze/output_start-airflow.svg index e3f259df4cdb..574eab9dc4b6 100644 --- a/images/breeze/output_start-airflow.svg +++ b/images/breeze/output_start-airflow.svg @@ -388,7 +388,7 @@ │--platformPlatform for Airflow image.(linux/amd64 | linux/arm64)│ │--integrationIntegration(s) to enable when running (can be more than one).                       â”‚ │(all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage | â”‚ -│otel | pinot | statsd | trino)                                                      â”‚ +│otel | pinot | qdrant | statsd | trino)                                             â”‚ │--standalone-dag-processorRun standalone dag processor for start-airflow.│ │--database-isolationRun airflow in database isolation mode.│ │--load-example-dags-eEnable configuration to load example DAGs when starting Airflow.│ diff --git a/images/breeze/output_start-airflow.txt b/images/breeze/output_start-airflow.txt index a2a850044e1b..f9810dcbda6a 100644 --- a/images/breeze/output_start-airflow.txt +++ b/images/breeze/output_start-airflow.txt @@ -1 +1 @@ -4ad796e7308801391d76089841fb3283 +723abd3f39e7fb9102bff29a75aa5824 diff --git a/images/breeze/output_testing_integration-tests.svg b/images/breeze/output_testing_integration-tests.svg index 547642729363..329cb40251e6 100644 --- a/images/breeze/output_testing_integration-tests.svg +++ b/images/breeze/output_testing_integration-tests.svg @@ -197,7 +197,7 @@ ╭─ Test environment â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â•® │--integrationIntegration(s) to enable when running (can be more than one).                            â”‚ │(all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage | otel â”‚ -│| pinot | statsd | trino)                                                                â”‚ +│| pinot | qdrant | statsd | trino)                                                       â”‚ │--backend-bDatabase backend to use. If 'none' is selected, breeze starts with invalid DB            â”‚ │configuration and no database and any attempts to connect to Airflow DB will fail.       â”‚ │(>sqlite< | mysql | postgres | none)                                                     â”‚ diff --git a/images/breeze/output_testing_integration-tests.txt b/images/breeze/output_testing_integration-tests.txt index 4ca2b2569b9e..55171927c831 100644 --- a/images/breeze/output_testing_integration-tests.txt +++ b/images/breeze/output_testing_integration-tests.txt @@ -1 +1 @@ -6b3936410fc1ef583fc241616ee8252d +115fdc525ff1089d338656403bf8cf25 diff --git a/images/breeze/output_testing_tests.svg b/images/breeze/output_testing_tests.svg index 9c40f203e330..7cd1d927da9d 100644 --- a/images/breeze/output_testing_tests.svg +++ b/images/breeze/output_testing_tests.svg @@ -368,7 +368,7 @@ ╭─ Test environment â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â•® │--integrationIntegration(s) to enable when running (can be more than one).                            â”‚ │(all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage | otel â”‚ -│| pinot | statsd | trino)                                                                â”‚ +│| pinot | qdrant | statsd | trino)                                                       â”‚ │--backend-bDatabase backend to use. If 'none' is selected, breeze starts with invalid DB            â”‚ │configuration and no database and any attempts to connect to Airflow DB will fail.       â”‚ │(>sqlite< | mysql | postgres | none)                                                     â”‚ diff --git a/images/breeze/output_testing_tests.txt b/images/breeze/output_testing_tests.txt index 9bf63945be86..e2ccd26dfe73 100644 --- a/images/breeze/output_testing_tests.txt +++ b/images/breeze/output_testing_tests.txt @@ -1 +1 @@ -93dd0156785e1dd5261b555a278a5ebd +7847d28a1237471aeed4c97f613d7ea7 diff --git a/scripts/ci/docker-compose/integration-qdrant.yml b/scripts/ci/docker-compose/integration-qdrant.yml new file mode 100644 index 000000000000..724c1e92cfb4 --- /dev/null +++ b/scripts/ci/docker-compose/integration-qdrant.yml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +version: "3.8" +services: + qdrant: + image: qdrant/qdrant:latest + labels: + breeze.description: "Integration required for Qdrant tests." + ports: + - "6333:6333" + - "6334:6334" + restart: "on-failure" + + airflow: + environment: + - INTEGRATION_QDRANT=true + depends_on: + qdrant: + condition: service_started diff --git a/scripts/in_container/check_environment.sh b/scripts/in_container/check_environment.sh index 187490dee8ca..73686e1368b7 100755 --- a/scripts/in_container/check_environment.sh +++ b/scripts/in_container/check_environment.sh @@ -177,6 +177,13 @@ if [[ ${INTEGRATION_PINOT} == "true" ]]; then CMD="curl --max-time 1 -X GET 'http://pinot:8000/health' -H 'accept: text/plain' | grep OK" check_service "Pinot (Broker API)" "${CMD}" 50 fi + +if [[ ${INTEGRATION_QDRANT} == "true" ]]; then + check_service "Qdrant" "run_nc qdrant 6333" 50 + CMD="curl -f -X GET 'http://qdrant:6333/collections'" + check_service "Qdrant (Collections API)" "${CMD}" 50 +fi + if [[ ${INTEGRATION_KAFKA} == "true" ]]; then check_service "Kafka Cluster" "run_nc broker 9092" 50 fi diff --git a/tests/integration/providers/qdrant/__init__.py b/tests/integration/providers/qdrant/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/integration/providers/qdrant/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/qdrant/hooks/__init__.py b/tests/integration/providers/qdrant/hooks/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/integration/providers/qdrant/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/qdrant/hooks/test_qdrant.py b/tests/integration/providers/qdrant/hooks/test_qdrant.py new file mode 100644 index 000000000000..296df3bed596 --- /dev/null +++ b/tests/integration/providers/qdrant/hooks/test_qdrant.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import random + +import numpy as np +import pytest +from qdrant_client import models + +from airflow.providers.qdrant.hooks.qdrant import QdrantHook + + +@pytest.mark.integration("qdrant") +class TestQdrant: + def setup_method(self): + self.test_collection_name = "test-hook-collection" + self.test_collection_dimension = random.randint(100, 2000) + self.hook = QdrantHook() + + self.hook.conn.recreate_collection( + self.test_collection_name, + vectors_config=models.VectorParams( + size=self.test_collection_dimension, distance=models.Distance.MANHATTAN + ), + ) + + def test_connection(self): + response, message = self.hook.verify_connection() + assert response and message == "Connection established!", "Successfully connected to Qdrant." + + def test_upsert_points(self): + vectors = np.random.rand(100, self.test_collection_dimension) + self.hook.conn.upsert( + self.test_collection_name, + points=[ + models.PointStruct( + id=idx, vector=vector.tolist(), payload={"color": "red", "rand_number": idx % 10} + ) + for idx, vector in enumerate(vectors) + ], + ) + + assert self.hook.conn.count(self.test_collection_name).count == 100 + + def test_delete_points(self): + self.hook.conn.delete( + self.test_collection_name, + points_selector=models.Filter( + must=[models.FieldCondition(key="color", match=models.MatchValue(value="red"))] + ), + ) + + assert self.hook.conn.count(self.test_collection_name).count == 0 diff --git a/tests/integration/providers/qdrant/operators/__init__.py b/tests/integration/providers/qdrant/operators/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/integration/providers/qdrant/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/qdrant/operators/test_qdrant_ingest.py b/tests/integration/providers/qdrant/operators/test_qdrant_ingest.py new file mode 100644 index 000000000000..eed4bd8e78db --- /dev/null +++ b/tests/integration/providers/qdrant/operators/test_qdrant_ingest.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import random +from unittest.mock import MagicMock + +import pytest +from qdrant_client.models import Distance, VectorParams + +from airflow.models.dag import DAG +from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2024, 1, 1) + + +@pytest.mark.integration("qdrant") +class TestQdrantIngestOperator: + def setup_method(self): + args = {"owner": "airflow", "start_date": DEFAULT_DATE} + + self.dag = DAG("test_qdrant_dag_id", default_args=args) + + self.mock_context = MagicMock() + self.channel = "test" + + def test_execute_hello(self): + collection_name = "test-operator-collection" + dimensions = 384 + points_count = 100 + vectors = [[random.uniform(0, 1) for _ in range(dimensions)] for _ in range(points_count)] + ids = random.sample(range(100, 10000), points_count) + payload = [{"some_number": i % 10} for i in range(points_count)] + + operator = QdrantIngestOperator( + task_id="qdrant_ingest", + conn_id="qdrant_default", + collection_name=collection_name, + vectors=vectors, + ids=ids, + payload=payload, + batch_size=1, + ) + + hook = operator.hook + + hook.conn.recreate_collection( + collection_name, vectors_config=VectorParams(size=dimensions, distance=Distance.COSINE) + ) + + operator.execute(self.mock_context) + + assert ( + hook.conn.count(collection_name=collection_name).count == points_count + ), f"Added {points_count} points to the Qdrant collection" From b97418434021ca335ae8ddf8c71f374c2a78a78a Mon Sep 17 00:00:00 2001 From: Anush Date: Sat, 20 Jan 2024 15:26:04 +0530 Subject: [PATCH 05/17] chore: remove redundant call --- airflow/providers/qdrant/hooks/qdrant.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/providers/qdrant/hooks/qdrant.py b/airflow/providers/qdrant/hooks/qdrant.py index 42b473d77aa3..045410160725 100644 --- a/airflow/providers/qdrant/hooks/qdrant.py +++ b/airflow/providers/qdrant/hooks/qdrant.py @@ -96,7 +96,6 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: def __init__(self, conn_id: str = default_conn_name, **kwargs) -> None: super().__init__(**kwargs) self.conn_id = conn_id - self.get_conn() def get_conn(self) -> QdrantClient: """Get a Qdrant client instance for interfacing with the database.""" From afef677c9d39681071c59c267c0caefaea1ca248 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Wed, 24 Jan 2024 09:07:23 +0530 Subject: [PATCH 06/17] chore: remove timeout param --- airflow/providers/qdrant/hooks/qdrant.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/airflow/providers/qdrant/hooks/qdrant.py b/airflow/providers/qdrant/hooks/qdrant.py index 045410160725..aef423905aea 100644 --- a/airflow/providers/qdrant/hooks/qdrant.py +++ b/airflow/providers/qdrant/hooks/qdrant.py @@ -52,19 +52,18 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: widget=BS3TextFieldWidget(), description="Optional. Qualified URL of the Qdrant instance." "Example: https://xyz-example.eu-central.aws.cloud.qdrant.io:6333", - default=None, ), "grpc_port": IntegerField( lazy_gettext("GPRC Port"), widget=BS3TextFieldWidget(), description="Optional. Port of the gRPC interface.", - default=None, + default=6334, ), "prefer_gprc": BooleanField( lazy_gettext("Prefer GRPC"), widget=BS3TextFieldWidget(), description="Optional. Whether to use gPRC interface whenever possible in custom methods.", - default=True, + default=False, ), "https": BooleanField( lazy_gettext("HTTPS"), @@ -77,12 +76,6 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: description="Optional. Prefix to the REST URL path." "Example: `service/v1` will result in http://localhost:6333/service/v1/{qdrant-endpoint} for REST API.", ), - "timeout": IntegerField( - lazy_gettext("Timeout"), - widget=BS3TextFieldWidget(), - description="Optional. Timeout for REST and gRPC API requests.", - default=None, - ), } @classmethod @@ -109,7 +102,6 @@ def get_conn(self) -> QdrantClient: prefer_gprc = extra.get("prefer_gprc", False) https = extra.get("https", False) prefix = extra.get("prefix", None) - timeout = extra.get("timeout", None) return QdrantClient( host=host, @@ -120,7 +112,6 @@ def get_conn(self) -> QdrantClient: prefer_grpc=prefer_gprc, https=https, prefix=prefix, - timeout=timeout, ) @cached_property From cdf713ee0e71a05d8e263b0d0f04ce02657e44e9 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Wed, 24 Jan 2024 09:09:12 +0530 Subject: [PATCH 07/17] chore: ready the provider --- airflow/providers/qdrant/provider.yaml | 2 +- ..._release-management_generate-issue-content-providers.svg | 6 +++--- ..._release-management_generate-issue-content-providers.txt | 2 +- ...ut_release-management_prepare-provider-documentation.svg | 6 +++--- ...ut_release-management_prepare-provider-documentation.txt | 2 +- .../output_release-management_prepare-provider-packages.svg | 6 +++--- .../output_release-management_prepare-provider-packages.txt | 2 +- dev/breeze/doc/images/output_shell.txt | 2 +- dev/breeze/doc/images/output_start-airflow.txt | 2 +- dev/breeze/doc/images/output_testing_integration-tests.txt | 2 +- dev/breeze/doc/images/output_testing_tests.txt | 2 +- generated/provider_dependencies.json | 2 +- pyproject.toml | 2 +- 13 files changed, 19 insertions(+), 19 deletions(-) diff --git a/airflow/providers/qdrant/provider.yaml b/airflow/providers/qdrant/provider.yaml index 9ff6b4b240c4..a6ccbbe9f777 100644 --- a/airflow/providers/qdrant/provider.yaml +++ b/airflow/providers/qdrant/provider.yaml @@ -23,7 +23,7 @@ name: Qdrant description: | `Qdrant `__ -state: not-ready +state: ready source-date-epoch: 1705379899 versions: diff --git a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg index e8e193e1d1e6..0646eb587117 100644 --- a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg +++ b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg @@ -144,9 +144,9 @@ common.sql | databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook |    ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |     -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Generates content for issue to test the release. diff --git a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt index aa42535eacb9..36d7be3c9969 100644 --- a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt +++ b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt @@ -1 +1 @@ -8109ff3d327a5e6921b15402f9c15bb6 +97f88e5ddbf7bd0f8de4fb734c8a2386 diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg index 561b4a627d67..34893298dbee 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg @@ -180,9 +180,9 @@ common.sql | databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook |    ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |     -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Prepare CHANGELOG, README and COMMITS information for providers. diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt index 6708878a1863..83b69b7ea721 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt @@ -1 +1 @@ -f4dbf1109bcdcca01230e5eb5331fa26 +663614748d86a8e2e8df08417e9b9307 diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg index 970ef4092429..0104899650f5 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg @@ -165,9 +165,9 @@ common.sql | databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook |    ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |     -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Prepare sdist/whl packages of Airflow Providers. diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt index 50a99b308f9b..a65dfccd0b6f 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt @@ -1 +1 @@ -e82f390815da62d3927de3f6cb9704f1 +c233e9c9a308ce97422dfb26a4125ada diff --git a/dev/breeze/doc/images/output_shell.txt b/dev/breeze/doc/images/output_shell.txt index ca43ac6ab733..2a342786afc4 100644 --- a/dev/breeze/doc/images/output_shell.txt +++ b/dev/breeze/doc/images/output_shell.txt @@ -1 +1 @@ -7658b53f007e54deb8da7c69edd878c3 +a5cb9724e8240f658d00282c8d30f125 diff --git a/dev/breeze/doc/images/output_start-airflow.txt b/dev/breeze/doc/images/output_start-airflow.txt index d8b5dd0f7010..c2dfd51ff79d 100644 --- a/dev/breeze/doc/images/output_start-airflow.txt +++ b/dev/breeze/doc/images/output_start-airflow.txt @@ -1 +1 @@ -1664c1bda5204995062f50c4e97b087d +e40d63b96551e328aee3fe8f1a3aedf2 diff --git a/dev/breeze/doc/images/output_testing_integration-tests.txt b/dev/breeze/doc/images/output_testing_integration-tests.txt index 2270869eb9d3..bab4263ef621 100644 --- a/dev/breeze/doc/images/output_testing_integration-tests.txt +++ b/dev/breeze/doc/images/output_testing_integration-tests.txt @@ -1 +1 @@ -ae82cfad12e55c5f0f847234992d3877 +d2fca4bbac3f1d53f1b56a47652997eb diff --git a/dev/breeze/doc/images/output_testing_tests.txt b/dev/breeze/doc/images/output_testing_tests.txt index 0b631fb981fb..f4864ad5156e 100644 --- a/dev/breeze/doc/images/output_testing_tests.txt +++ b/dev/breeze/doc/images/output_testing_tests.txt @@ -1 +1 @@ -ebfb809a4ef0e366f21b39495a20c7dd +5b4d64d2d8331c5fb9bd803ad9af53ef diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index b6ae6b12be0e..4af61a20b9cf 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -943,7 +943,7 @@ "devel-deps": [], "cross-providers-deps": [], "excluded-python-versions": [], - "state": "not-ready" + "state": "ready" }, "redis": { "deps": [ diff --git a/pyproject.toml b/pyproject.toml index 8e6f239328f1..b9100f51b99c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -533,6 +533,7 @@ alibaba = [ "oss2>=2.14.0", ] amazon = [ + "PyAthena>=3.0.10", "apache-airflow[common_sql]", "apache-airflow[http]", "asgiref", @@ -552,7 +553,6 @@ amazon = [ "mypy-boto3-redshift-data>=1.33.0", "mypy-boto3-s3>=1.33.0", "s3fs>=2023.10.0", - "PyAthena>=3.0.10", ] apache-beam = [ "apache-beam>=2.53.0", From b1a941636468319b820d2fde75b668e8426e64e3 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Wed, 24 Jan 2024 14:05:56 +0530 Subject: [PATCH 08/17] Update airflow/providers/qdrant/provider.yaml Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com> --- airflow/providers/qdrant/provider.yaml | 1 + generated/provider_dependencies.json | 1 + 2 files changed, 2 insertions(+) diff --git a/airflow/providers/qdrant/provider.yaml b/airflow/providers/qdrant/provider.yaml index a6ccbbe9f777..a78f55fd67cd 100644 --- a/airflow/providers/qdrant/provider.yaml +++ b/airflow/providers/qdrant/provider.yaml @@ -38,6 +38,7 @@ integrations: dependencies: - qdrant_client>=1.7.0 + - apache-airflow>=2.6.0 hooks: - integration-name: Qdrant diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 4af61a20b9cf..ab2325ab4e8b 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -938,6 +938,7 @@ }, "qdrant": { "deps": [ + "apache-airflow>=2.6.0", "qdrant_client>=1.7.0" ], "devel-deps": [], From 38ce62d3a5ab4a92a5d66b5f1561ab64383cffc6 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Wed, 24 Jan 2024 14:13:29 +0530 Subject: [PATCH 09/17] chore: default to None https --- airflow/providers/qdrant/hooks/qdrant.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/qdrant/hooks/qdrant.py b/airflow/providers/qdrant/hooks/qdrant.py index aef423905aea..494e4d57ed9d 100644 --- a/airflow/providers/qdrant/hooks/qdrant.py +++ b/airflow/providers/qdrant/hooks/qdrant.py @@ -100,7 +100,7 @@ def get_conn(self) -> QdrantClient: url = extra.get("url", None) grpc_port = extra.get("grpc_port", 6334) prefer_gprc = extra.get("prefer_gprc", False) - https = extra.get("https", False) + https = extra.get("https", None) prefix = extra.get("prefix", None) return QdrantClient( From 41b201f608297f1f4d381e757ab47c9d84e9c057 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Thu, 25 Jan 2024 12:34:07 +0530 Subject: [PATCH 10/17] docs: removed timeout param --- docs/apache-airflow-providers-qdrant/connections.rst | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/apache-airflow-providers-qdrant/connections.rst b/docs/apache-airflow-providers-qdrant/connections.rst index 37c70eff68f4..56d649d39cc6 100644 --- a/docs/apache-airflow-providers-qdrant/connections.rst +++ b/docs/apache-airflow-providers-qdrant/connections.rst @@ -53,6 +53,3 @@ HTTPS (optional) Prefix (optional) Prefix to add to the REST URL endpoints. Defaults to ``None``. - -Timeout (optional) - Timeout for REST and gRPC API requests.. Defaults to ``5`` seconds for REST. Unlimited for GRPC. From 08d78845531a3aedd1eb91c8efa45e040dec3c1e Mon Sep 17 00:00:00 2001 From: Anush008 Date: Thu, 25 Jan 2024 12:34:42 +0530 Subject: [PATCH 11/17] docs: improved https param description --- docs/apache-airflow-providers-qdrant/connections.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-qdrant/connections.rst b/docs/apache-airflow-providers-qdrant/connections.rst index 56d649d39cc6..2eadc0146db1 100644 --- a/docs/apache-airflow-providers-qdrant/connections.rst +++ b/docs/apache-airflow-providers-qdrant/connections.rst @@ -49,7 +49,7 @@ Prefer GRPC (optional) Whether to use GRPC for custom methods. Defaults to ``False``. HTTPS (optional) - Whether to use HTTPS for requests. Defaults to ``False``. + Whether to use HTTPS for requests. Defaults to ``True`` if an API key is provided. ``False`` otherwise. Prefix (optional) Prefix to add to the REST URL endpoints. Defaults to ``None``. From 86c50b2e7f362e75b58a461ef5d3aa74e4ce8ae7 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Tue, 30 Jan 2024 12:09:43 +0530 Subject: [PATCH 12/17] Apply suggestions from code review Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- .../apache-airflow-providers-qdrant/index.rst | 2 +- .../providers/qdrant/operators/test_qdrant.py | 41 ++++++++----------- .../providers/qdrant/example_dag_qdrant.py | 2 +- 3 files changed, 18 insertions(+), 27 deletions(-) diff --git a/docs/apache-airflow-providers-qdrant/index.rst b/docs/apache-airflow-providers-qdrant/index.rst index 57d361c0775d..0152cba5910f 100644 --- a/docs/apache-airflow-providers-qdrant/index.rst +++ b/docs/apache-airflow-providers-qdrant/index.rst @@ -93,6 +93,6 @@ The minimum Apache Airflow version supported by this provider package is ``2.5.0 =================== ================== PIP package Version required =================== ================== -``apache-airflow`` ``>=2.5.0`` +``apache-airflow`` ``>=2.6.0`` ``qdrant_client`` ``>=1.7.0`` =================== ================== diff --git a/tests/providers/qdrant/operators/test_qdrant.py b/tests/providers/qdrant/operators/test_qdrant.py index 4bf1db749bb3..0b6f49de074b 100644 --- a/tests/providers/qdrant/operators/test_qdrant.py +++ b/tests/providers/qdrant/operators/test_qdrant.py @@ -16,42 +16,33 @@ # under the License. from __future__ import annotations -from datetime import datetime from unittest.mock import patch -import pytest - -from airflow.models import DAG from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator -@pytest.fixture -def dummy_dag(): - """Fixture to provide a dummy Airflow DAG for testing.""" - return DAG(dag_id="test_dag", start_date=datetime(2023, 9, 29)) - - class TestQdrantIngestOperator: - def test_operator_execution(self, dummy_dag): + def test_operator_execution(self, dag_maker): """ Test the execution of the QdrantIngestOperator. Ensures that the upsert method on the hook is correctly called. """ - vectors = [[0.732, 0.611, 0.289], [0.217, 0.526, 0.416], [0.326, 0.483, 0.376]] - ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"] - payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}] + with dag_maker(dag_id="test_dag") as dummy_dag: + vectors = [[0.732, 0.611, 0.289], [0.217, 0.526, 0.416], [0.326, 0.483, 0.376]] + ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"] + payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}] - task = QdrantIngestOperator( - task_id="ingest_vectors", - collection_name="test_collection", - vectors=vectors, - ids=ids, - payload=payload, - wait=False, - max_retries=1, - parallel=3, - dag=dummy_dag, - ) + task = QdrantIngestOperator( + task_id="ingest_vectors", + collection_name="test_collection", + vectors=vectors, + ids=ids, + payload=payload, + wait=False, + max_retries=1, + parallel=3, + dag=dummy_dag, + ) with patch( "airflow.providers.qdrant.operators.qdrant.QdrantIngestOperator.hook" diff --git a/tests/system/providers/qdrant/example_dag_qdrant.py b/tests/system/providers/qdrant/example_dag_qdrant.py index a0e2449fd2f6..8f55d2d72c55 100644 --- a/tests/system/providers/qdrant/example_dag_qdrant.py +++ b/tests/system/providers/qdrant/example_dag_qdrant.py @@ -25,7 +25,7 @@ "example_qdrant_ingest", schedule=None, start_date=datetime(2024, 1, 1), - catchup=True, + catchup=False, ) as dag: # [START howto_operator_qdrant_ingest] vectors = [[0.732, 0.611, 0.289, 0.421], [0.217, 0.526, 0.416, 0.981], [0.326, 0.483, 0.376, 0.136]] From 1c4e48d3756ab69c44de6d74e12233017816d420 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Tue, 30 Jan 2024 13:21:43 +0530 Subject: [PATCH 13/17] chore: new pre-commit run --- airflow/providers/qdrant/operators/qdrant.py | 4 ++-- pyproject.toml | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow/providers/qdrant/operators/qdrant.py b/airflow/providers/qdrant/operators/qdrant.py index 403bf12ff9e3..6cc71d864b5a 100644 --- a/airflow/providers/qdrant/operators/qdrant.py +++ b/airflow/providers/qdrant/operators/qdrant.py @@ -40,8 +40,8 @@ class QdrantIngestOperator(BaseOperator): :param conn_id: The connection id to connect to a Qdrant instance. :param collection_name: The name of the collection to ingest data into. :param vectors: An iterable over vectors to upload. - :param payload: Iterable of vectors payload, Optional. Defaults to None. - :param ids: Iterable of custom vectors ids, Optional. Defaults to None. + :param payload: Iterable of vector payloads, Optional. Defaults to None. + :param ids: Iterable of custom vector ids, Optional. Defaults to None. :param batch_size: Number of points to upload per-request. Defaults to 64. :param parallel: Number of parallel upload processes. Defaults to 1. :param method: Start method for parallel processes. Defaults to 'forkserver'. diff --git a/pyproject.toml b/pyproject.toml index e1e69c245b51..7558d52caed8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -524,7 +524,7 @@ winrm = [ # If you want to modify these - modify the corresponding provider.yaml instead. ############################################################################################################# # START OF GENERATED DEPENDENCIES -# Hash of dependencies: ee123f98f7863f010cb52314e9c76927 +# Hash of dependencies: c9383270c1203f9654cb69b133ee8213 airbyte = [ # source: airflow/providers/airbyte/provider.yaml "apache-airflow[http]", ] @@ -880,6 +880,9 @@ presto = [ # source: airflow/providers/presto/provider.yaml "pandas>=1.2.5", "presto-python-client>=0.8.4", ] +qdrant = [ # source: airflow/providers/qdrant/provider.yaml + "qdrant_client>=1.7.0", +] redis = [ # source: airflow/providers/redis/provider.yaml "redis>=4.5.2,<5.0.0,!=4.5.5", ] From dfdfbdba1d8c1c61ad838654cc2410b0b6a34cdf Mon Sep 17 00:00:00 2001 From: Anush008 Date: Thu, 1 Feb 2024 13:39:42 +0530 Subject: [PATCH 14/17] chore: update pre-commit --- contributing-docs/testing/integration_tests.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/contributing-docs/testing/integration_tests.rst b/contributing-docs/testing/integration_tests.rst index df1a69b881c0..15f6cc9f1819 100644 --- a/contributing-docs/testing/integration_tests.rst +++ b/contributing-docs/testing/integration_tests.rst @@ -64,6 +64,8 @@ The following integrations are available: +--------------+----------------------------------------------------+ | pinot | Integration required for Apache Pinot hooks. | +--------------+----------------------------------------------------+ +| qdrant | Integration required for Qdrant tests. | ++--------------+----------------------------------------------------+ | statsd | Integration required for Satsd hooks. | +--------------+----------------------------------------------------+ | trino | Integration required for Trino hooks. | From a29dbd570ffc6f66ac6cb1c47a0d8f8508337b9b Mon Sep 17 00:00:00 2001 From: Elad Kalif <45845474+eladkal@users.noreply.github.com> Date: Fri, 2 Feb 2024 10:01:03 +0100 Subject: [PATCH 15/17] Update airflow/providers/qdrant/CHANGELOG.rst --- airflow/providers/qdrant/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/qdrant/CHANGELOG.rst b/airflow/providers/qdrant/CHANGELOG.rst index 5adfb1c9e7e5..460738e87f11 100644 --- a/airflow/providers/qdrant/CHANGELOG.rst +++ b/airflow/providers/qdrant/CHANGELOG.rst @@ -23,4 +23,4 @@ Changelog 1.0.0 ..... -Initial version of the provider. +* ``Initial version of the provider. (#36805)`` From 16688d309c6e665fd41b069f13fc5dfc2565462a Mon Sep 17 00:00:00 2001 From: Anush008 Date: Sat, 3 Feb 2024 23:27:19 +0530 Subject: [PATCH 16/17] chore: pin Airflow >= 2.7.0 --- airflow/providers/qdrant/provider.yaml | 2 +- dev/breeze/src/airflow_breeze/global_constants.py | 4 ++-- docs/apache-airflow-providers-qdrant/index.rst | 2 +- generated/provider_dependencies.json | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/providers/qdrant/provider.yaml b/airflow/providers/qdrant/provider.yaml index a78f55fd67cd..433fe08f9cec 100644 --- a/airflow/providers/qdrant/provider.yaml +++ b/airflow/providers/qdrant/provider.yaml @@ -38,7 +38,7 @@ integrations: dependencies: - qdrant_client>=1.7.0 - - apache-airflow>=2.6.0 + - apache-airflow>=2.7.0 hooks: - integration-name: Qdrant diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index 4519717aac3e..b21d72124ae9 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -460,12 +460,12 @@ def _exclusion(providers: Iterable[str]) -> str: { "python-version": "3.8", "airflow-version": "2.6.0", - "remove-providers": _exclusion(["openlineage", "common.io", "cohere", "fab"]), + "remove-providers": _exclusion(["openlineage", "common.io", "cohere", "fab", "qdrant"]), }, { "python-version": "3.9", "airflow-version": "2.6.0", - "remove-providers": _exclusion(["openlineage", "common.io", "fab"]), + "remove-providers": _exclusion(["openlineage", "common.io", "fab", "qdrant"]), }, { "python-version": "3.8", diff --git a/docs/apache-airflow-providers-qdrant/index.rst b/docs/apache-airflow-providers-qdrant/index.rst index 0152cba5910f..c2f8fc473abd 100644 --- a/docs/apache-airflow-providers-qdrant/index.rst +++ b/docs/apache-airflow-providers-qdrant/index.rst @@ -93,6 +93,6 @@ The minimum Apache Airflow version supported by this provider package is ``2.5.0 =================== ================== PIP package Version required =================== ================== -``apache-airflow`` ``>=2.6.0`` +``apache-airflow`` ``>=2.7.0`` ``qdrant_client`` ``>=1.7.0`` =================== ================== diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 372e55538cc4..100f2d5672b9 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -938,7 +938,7 @@ }, "qdrant": { "deps": [ - "apache-airflow>=2.6.0", + "apache-airflow>=2.7.0", "qdrant_client>=1.7.0" ], "devel-deps": [], From d6b3cf4588569c215e108128740d20bafb026bbc Mon Sep 17 00:00:00 2001 From: Anush008 Date: Wed, 7 Feb 2024 22:50:09 +0530 Subject: [PATCH 17/17] test: @pytest.mark.db_test --- tests/providers/qdrant/operators/test_qdrant.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/providers/qdrant/operators/test_qdrant.py b/tests/providers/qdrant/operators/test_qdrant.py index 0b6f49de074b..54b2bc1b3aed 100644 --- a/tests/providers/qdrant/operators/test_qdrant.py +++ b/tests/providers/qdrant/operators/test_qdrant.py @@ -18,10 +18,13 @@ from unittest.mock import patch +import pytest + from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator class TestQdrantIngestOperator: + @pytest.mark.db_test def test_operator_execution(self, dag_maker): """ Test the execution of the QdrantIngestOperator.