diff --git a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml index 3213bf8b3c2d..eeb201cbb05a 100644 --- a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml +++ b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml @@ -90,6 +90,7 @@ body: - pinecone - postgres - presto + - qdrant - redis - salesforce - samba diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml index 4fbcfcf85e0f..c18afde719dd 100644 --- a/.github/boring-cyborg.yml +++ b/.github/boring-cyborg.yml @@ -403,6 +403,12 @@ labelPRBasedOnFilePath: - tests/providers/presto/**/* - tests/system/providers/presto/**/* + provider:qdrant: + - airflow/providers/qdrant/**/* + - docs/apache-airflow-providers-qdrant/**/* + - tests/providers/qdrant/**/* + - tests/system/providers/qdrant/**/* + provider:redis: - airflow/providers/redis/**/* - docs/apache-airflow-providers-redis/**/* diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 153b00fed1fc..8147ee77ef64 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1540,6 +1540,9 @@ jobs: breeze testing integration-tests --integration kafka breeze down if: needs.build-info.outputs.is-airflow-runner != 'true' + - name: "Integration Tests Postgres: Qdrant" + run: breeze testing integration-tests --integration qdrant + if: needs.build-info.outputs.is-airflow-runner == 'true' - name: "Integration Tests Postgres: all-testable" run: breeze testing integration-tests --integration all-testable if: needs.build-info.outputs.is-airflow-runner == 'true' diff --git a/INSTALL b/INSTALL index 173442e4857b..082858264c58 100644 --- a/INSTALL +++ b/INSTALL @@ -253,9 +253,10 @@ gcp_api, github, github-enterprise, google, google-auth, graphviz, grpc, hashico http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft-azure, microsoft-mssql, microsoft-psrp, microsoft-winrm, mongo, mssql, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, -pgvector, pinecone, pinot, postgres, presto, rabbitmq, redis, s3, s3fs, salesforce, samba, saml, -segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, -tableau, tabular, telegram, trino, vertica, virtualenv, weaviate, webhdfs, winrm, yandex, zendesk +pgvector, pinecone, pinot, postgres, presto, qdrant, rabbitmq, redis, s3, s3fs, salesforce, samba, +saml, segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, +statsd, tableau, tabular, telegram, trino, vertica, virtualenv, weaviate, webhdfs, winrm, yandex, +zendesk # END REGULAR EXTRAS HERE diff --git a/airflow/providers/qdrant/CHANGELOG.rst b/airflow/providers/qdrant/CHANGELOG.rst new file mode 100644 index 000000000000..460738e87f11 --- /dev/null +++ b/airflow/providers/qdrant/CHANGELOG.rst @@ -0,0 +1,26 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-qdrant`` + +Changelog +--------- + +1.0.0 +..... + +* ``Initial version of the provider. (#36805)`` diff --git a/airflow/providers/qdrant/__init__.py b/airflow/providers/qdrant/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/qdrant/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/qdrant/hooks/__init__.py b/airflow/providers/qdrant/hooks/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/qdrant/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/qdrant/hooks/qdrant.py b/airflow/providers/qdrant/hooks/qdrant.py new file mode 100644 index 000000000000..494e4d57ed9d --- /dev/null +++ b/airflow/providers/qdrant/hooks/qdrant.py @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from functools import cached_property +from typing import Any + +from grpc import RpcError +from qdrant_client import QdrantClient +from qdrant_client.http.exceptions import UnexpectedResponse + +from airflow.hooks.base import BaseHook + + +class QdrantHook(BaseHook): + """ + Hook for interfacing with a Qdrant instance. + + :param conn_id: The connection id to use when connecting to Qdrant. Defaults to `qdrant_default`. + """ + + conn_name_attr = "conn_id" + conn_type = "qdrant" + default_conn_name = "qdrant_default" + hook_name = "Qdrant" + + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Returns connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import BooleanField, IntegerField, StringField + + return { + "url": StringField( + lazy_gettext("URL"), + widget=BS3TextFieldWidget(), + description="Optional. Qualified URL of the Qdrant instance." + "Example: https://xyz-example.eu-central.aws.cloud.qdrant.io:6333", + ), + "grpc_port": IntegerField( + lazy_gettext("GPRC Port"), + widget=BS3TextFieldWidget(), + description="Optional. Port of the gRPC interface.", + default=6334, + ), + "prefer_gprc": BooleanField( + lazy_gettext("Prefer GRPC"), + widget=BS3TextFieldWidget(), + description="Optional. Whether to use gPRC interface whenever possible in custom methods.", + default=False, + ), + "https": BooleanField( + lazy_gettext("HTTPS"), + widget=BS3TextFieldWidget(), + description="Optional. Whether to use HTTPS(SSL) protocol.", + ), + "prefix": StringField( + lazy_gettext("Prefix"), + widget=BS3TextFieldWidget(), + description="Optional. Prefix to the REST URL path." + "Example: `service/v1` will result in http://localhost:6333/service/v1/{qdrant-endpoint} for REST API.", + ), + } + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Returns custom field behaviour.""" + return { + "hidden_fields": ["schema", "login", "extra"], + "relabeling": {"password": "API Key"}, + } + + def __init__(self, conn_id: str = default_conn_name, **kwargs) -> None: + super().__init__(**kwargs) + self.conn_id = conn_id + + def get_conn(self) -> QdrantClient: + """Get a Qdrant client instance for interfacing with the database.""" + connection = self.get_connection(self.conn_id) + host = connection.host or None + port = connection.port or 6333 + api_key = connection.password + extra = connection.extra_dejson + url = extra.get("url", None) + grpc_port = extra.get("grpc_port", 6334) + prefer_gprc = extra.get("prefer_gprc", False) + https = extra.get("https", None) + prefix = extra.get("prefix", None) + + return QdrantClient( + host=host, + port=port, + url=url, + api_key=api_key, + grpc_port=grpc_port, + prefer_grpc=prefer_gprc, + https=https, + prefix=prefix, + ) + + @cached_property + def conn(self) -> QdrantClient: + """Get a Qdrant client instance for interfacing with the database.""" + return self.get_conn() + + def verify_connection(self) -> tuple[bool, str]: + """Check the connection to the Qdrant instance.""" + try: + self.conn.get_collections() + return True, "Connection established!" + except (UnexpectedResponse, RpcError, ValueError) as e: + return False, str(e) diff --git a/airflow/providers/qdrant/operators/__init__.py b/airflow/providers/qdrant/operators/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/qdrant/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/qdrant/operators/qdrant.py b/airflow/providers/qdrant/operators/qdrant.py new file mode 100644 index 000000000000..6cc71d864b5a --- /dev/null +++ b/airflow/providers/qdrant/operators/qdrant.py @@ -0,0 +1,109 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any, Iterable, Sequence + +from airflow.models import BaseOperator +from airflow.providers.qdrant.hooks.qdrant import QdrantHook + +if TYPE_CHECKING: + from qdrant_client.models import VectorStruct + + from airflow.utils.context import Context + + +class QdrantIngestOperator(BaseOperator): + """ + Upload points to a Qdrant collection. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:QdrantIngestOperator` + + :param conn_id: The connection id to connect to a Qdrant instance. + :param collection_name: The name of the collection to ingest data into. + :param vectors: An iterable over vectors to upload. + :param payload: Iterable of vector payloads, Optional. Defaults to None. + :param ids: Iterable of custom vector ids, Optional. Defaults to None. + :param batch_size: Number of points to upload per-request. Defaults to 64. + :param parallel: Number of parallel upload processes. Defaults to 1. + :param method: Start method for parallel processes. Defaults to 'forkserver'. + :param max_retries: Number of retries for failed requests. Defaults to 3. + :param wait: Await for the results to be applied on the server side. Defaults to True. + :param kwargs: Additional keyword arguments passed to the BaseOperator constructor. + """ + + template_fields: Sequence[str] = ( + "collection_name", + "vectors", + "payload", + "ids", + "batch_size", + "parallel", + "method", + "max_retries", + "wait", + ) + + def __init__( + self, + *, + conn_id: str = QdrantHook.default_conn_name, + collection_name: str, + vectors: Iterable[VectorStruct], + payload: Iterable[dict[str, Any]] | None = None, + ids: Iterable[int | str] | None = None, + batch_size: int = 64, + parallel: int = 1, + method: str | None = None, + max_retries: int = 3, + wait: bool = True, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.conn_id = conn_id + self.collection_name = collection_name + self.vectors = vectors + self.payload = payload + self.ids = ids + self.batch_size = batch_size + self.parallel = parallel + self.method = method + self.max_retries = max_retries + self.wait = wait + + @cached_property + def hook(self) -> QdrantHook: + """Return an instance of QdrantHook.""" + return QdrantHook(conn_id=self.conn_id) + + def execute(self, context: Context) -> None: + """Upload points to a Qdrant collection.""" + self.hook.conn.upload_collection( + collection_name=self.collection_name, + vectors=self.vectors, + payload=self.payload, + ids=self.ids, + batch_size=self.batch_size, + parallel=self.parallel, + method=self.method, + max_retries=self.max_retries, + wait=self.wait, + ) diff --git a/airflow/providers/qdrant/provider.yaml b/airflow/providers/qdrant/provider.yaml new file mode 100644 index 000000000000..433fe08f9cec --- /dev/null +++ b/airflow/providers/qdrant/provider.yaml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-qdrant + +name: Qdrant + +description: | + `Qdrant `__ + +state: ready +source-date-epoch: 1705379899 + +versions: + - 1.0.0 + +integrations: + - integration-name: Qdrant + external-doc-url: https://qdrant.tech/documentation + how-to-guide: + - /docs/apache-airflow-providers-qdrant/operators/qdrant.rst + tags: [software] + +dependencies: + - qdrant_client>=1.7.0 + - apache-airflow>=2.7.0 + +hooks: + - integration-name: Qdrant + python-modules: + - airflow.providers.qdrant.hooks.qdrant + +connection-types: + - hook-class-name: airflow.providers.qdrant.hooks.qdrant.QdrantHook + connection-type: qdrant + +operators: + - integration-name: Qdrant + python-modules: + - airflow.providers.qdrant.operators.qdrant diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b9ee8323611c..e2de7666d105 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -541,6 +541,15 @@ def create_default_connections(session: Session = NEW_SESSION): ), session, ) + merge_conn( + Connection( + conn_id="qdrant_default", + conn_type="qdrant", + host="qdrant", + port=6333, + ), + session, + ) merge_conn( Connection( conn_id="redis_default", diff --git a/contributing-docs/12_airflow_dependencies_and_extras.rst b/contributing-docs/12_airflow_dependencies_and_extras.rst index c9124d1ccbd6..270f567afd06 100644 --- a/contributing-docs/12_airflow_dependencies_and_extras.rst +++ b/contributing-docs/12_airflow_dependencies_and_extras.rst @@ -209,9 +209,10 @@ gcp_api, github, github-enterprise, google, google-auth, graphviz, grpc, hashico http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft-azure, microsoft-mssql, microsoft-psrp, microsoft-winrm, mongo, mssql, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, -pgvector, pinecone, pinot, postgres, presto, rabbitmq, redis, s3, s3fs, salesforce, samba, saml, -segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, -tableau, tabular, telegram, trino, vertica, virtualenv, weaviate, webhdfs, winrm, yandex, zendesk +pgvector, pinecone, pinot, postgres, presto, qdrant, rabbitmq, redis, s3, s3fs, salesforce, samba, +saml, segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, +statsd, tableau, tabular, telegram, trino, vertica, virtualenv, weaviate, webhdfs, winrm, yandex, +zendesk .. END REGULAR EXTRAS HERE diff --git a/contributing-docs/testing/integration_tests.rst b/contributing-docs/testing/integration_tests.rst index 2d698f6cb8f6..8650df1b508c 100644 --- a/contributing-docs/testing/integration_tests.rst +++ b/contributing-docs/testing/integration_tests.rst @@ -64,6 +64,8 @@ The following integrations are available: +--------------+----------------------------------------------------+ | pinot | Integration required for Apache Pinot hooks. | +--------------+----------------------------------------------------+ +| qdrant | Integration required for Qdrant tests. | ++--------------+----------------------------------------------------+ | statsd | Integration required for Statsd hooks. | +--------------+----------------------------------------------------+ | trino | Integration required for Trino hooks. | diff --git a/dev/breeze/doc/images/output-commands.svg b/dev/breeze/doc/images/output-commands.svg index 3837045fc384..f0886818f254 100644 --- a/dev/breeze/doc/images/output-commands.svg +++ b/dev/breeze/doc/images/output-commands.svg @@ -285,7 +285,7 @@ [default: 3.8]                                               --integrationIntegration(s) to enable when running (can be more than one).                        (all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage |  -otel | pinot | statsd | trino)                                                       +otel | pinot | qdrant | statsd | trino)                                              --standalone-dag-processorRun standalone dag processor for start-airflow. --database-isolationRun airflow in database isolation mode. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/dev/breeze/doc/images/output_build-docs.svg b/dev/breeze/doc/images/output_build-docs.svg index dabcb6685415..bab81e920a35 100644 --- a/dev/breeze/doc/images/output_build-docs.svg +++ b/dev/breeze/doc/images/output_build-docs.svg @@ -182,9 +182,9 @@ dbt.cloud | dingding | discord | docker | docker-stack | elasticsearch | exasol | fab | facebook | ftp | github |      google | grpc | hashicorp | helm-chart | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Build documents. diff --git a/dev/breeze/doc/images/output_build-docs.txt b/dev/breeze/doc/images/output_build-docs.txt index b580d9d01c1f..1a93849cc2fb 100644 --- a/dev/breeze/doc/images/output_build-docs.txt +++ b/dev/breeze/doc/images/output_build-docs.txt @@ -1 +1 @@ -b4bc09e22159b362651e9dd299f795c2 +8f5e4f90611d600cbd02ae9e79d60df2 diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.svg b/dev/breeze/doc/images/output_release-management_add-back-references.svg index a13066ce0d94..7492e1d03f88 100644 --- a/dev/breeze/doc/images/output_release-management_add-back-references.svg +++ b/dev/breeze/doc/images/output_release-management_add-back-references.svg @@ -144,9 +144,9 @@ dbt.cloud | dingding | discord | docker | docker-stack | elasticsearch | exasol | fab | facebook | ftp | github |      google | grpc | hashicorp | helm-chart | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Command to add back references for documentation to make it backward compatible. diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.txt b/dev/breeze/doc/images/output_release-management_add-back-references.txt index c7d57a09d377..bd8679b924e6 100644 --- a/dev/breeze/doc/images/output_release-management_add-back-references.txt +++ b/dev/breeze/doc/images/output_release-management_add-back-references.txt @@ -1 +1 @@ -6ceda71ff8edfe80c678c2a6c844f22d +af0db4105f4aec228083f240d550bda3 diff --git a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg index e8e193e1d1e6..0646eb587117 100644 --- a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg +++ b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg @@ -144,9 +144,9 @@ common.sql | databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook |    ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |     -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Generates content for issue to test the release. diff --git a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt index aa42535eacb9..36d7be3c9969 100644 --- a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt +++ b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt @@ -1 +1 @@ -8109ff3d327a5e6921b15402f9c15bb6 +97f88e5ddbf7bd0f8de4fb734c8a2386 diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg index 561b4a627d67..34893298dbee 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg @@ -180,9 +180,9 @@ common.sql | databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook |    ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |     -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Prepare CHANGELOG, README and COMMITS information for providers. diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt index 6708878a1863..83b69b7ea721 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt @@ -1 +1 @@ -f4dbf1109bcdcca01230e5eb5331fa26 +663614748d86a8e2e8df08417e9b9307 diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg index 970ef4092429..0104899650f5 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg @@ -165,9 +165,9 @@ common.sql | databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook |    ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |     -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Prepare sdist/whl packages of Airflow Providers. diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt index 50a99b308f9b..a65dfccd0b6f 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt @@ -1 +1 @@ -e82f390815da62d3927de3f6cb9704f1 +c233e9c9a308ce97422dfb26a4125ada diff --git a/dev/breeze/doc/images/output_release-management_publish-docs.svg b/dev/breeze/doc/images/output_release-management_publish-docs.svg index 1448a8a59448..0f8a6216c494 100644 --- a/dev/breeze/doc/images/output_release-management_publish-docs.svg +++ b/dev/breeze/doc/images/output_release-management_publish-docs.svg @@ -190,9 +190,9 @@ dbt.cloud | dingding | discord | docker | docker-stack | elasticsearch | exasol | fab | facebook | ftp | github |      google | grpc | hashicorp | helm-chart | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba |     -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram |     -trino | vertica | weaviate | yandex | zendesk]...                                                                      +opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    +samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        +telegram | trino | vertica | weaviate | yandex | zendesk]...                                                           Command to publish generated documentation to airflow-site diff --git a/dev/breeze/doc/images/output_release-management_publish-docs.txt b/dev/breeze/doc/images/output_release-management_publish-docs.txt index f07d4889c61a..d3a965a4bbc2 100644 --- a/dev/breeze/doc/images/output_release-management_publish-docs.txt +++ b/dev/breeze/doc/images/output_release-management_publish-docs.txt @@ -1 +1 @@ -babcac730a3ede766b87ba14ab3484e1 +30bde47bb7c648532bcadd4c53ff3d1e diff --git a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg index dbc947ad26b4..6505980e8712 100644 --- a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg +++ b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg @@ -191,9 +191,9 @@ | facebook | ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc |        jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty  -| papermill | pgvector | pinecone | postgres | presto | redis | salesforce | samba | segment | -sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |  -telegram | trino | vertica | weaviate | yandex | zendesk)                                      +| papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce | samba |  +segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau |  +tabular | telegram | trino | vertica | weaviate | yandex | zendesk)                            --provider-versionProvider version to generate the requirements for i.e `2.1.0`. `latest` is also a supported    value to account for the most recent version of the provider                                   (TEXT)                                                                                         diff --git a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt index 56eadeec24b7..6eab2584f87c 100644 --- a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt +++ b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt @@ -1 +1 @@ -84d46887b3f47bc209014ec5cb26406c +75d28480ee1900ffd878862190585efc diff --git a/dev/breeze/doc/images/output_shell.svg b/dev/breeze/doc/images/output_shell.svg index ff0289a6c73a..22111e4033d1 100644 --- a/dev/breeze/doc/images/output_shell.svg +++ b/dev/breeze/doc/images/output_shell.svg @@ -431,7 +431,7 @@ [default: 3.8]                                               --integrationIntegration(s) to enable when running (can be more than one).                        (all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage |  -otel | pinot | statsd | trino)                                                       +otel | pinot | qdrant | statsd | trino)                                              --standalone-dag-processorRun standalone dag processor for start-airflow. --database-isolationRun airflow in database isolation mode. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/dev/breeze/doc/images/output_shell.txt b/dev/breeze/doc/images/output_shell.txt index ca43ac6ab733..2a342786afc4 100644 --- a/dev/breeze/doc/images/output_shell.txt +++ b/dev/breeze/doc/images/output_shell.txt @@ -1 +1 @@ -7658b53f007e54deb8da7c69edd878c3 +a5cb9724e8240f658d00282c8d30f125 diff --git a/dev/breeze/doc/images/output_start-airflow.svg b/dev/breeze/doc/images/output_start-airflow.svg index 18922ae8f7b0..eab034f663f5 100644 --- a/dev/breeze/doc/images/output_start-airflow.svg +++ b/dev/breeze/doc/images/output_start-airflow.svg @@ -388,7 +388,7 @@ --platformPlatform for Airflow image.(linux/amd64 | linux/arm64) --integrationIntegration(s) to enable when running (can be more than one).                        (all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage |  -otel | pinot | statsd | trino)                                                       +otel | pinot | qdrant | statsd | trino)                                              --standalone-dag-processorRun standalone dag processor for start-airflow. --database-isolationRun airflow in database isolation mode. --load-example-dags-eEnable configuration to load example DAGs when starting Airflow. diff --git a/dev/breeze/doc/images/output_start-airflow.txt b/dev/breeze/doc/images/output_start-airflow.txt index d8b5dd0f7010..c2dfd51ff79d 100644 --- a/dev/breeze/doc/images/output_start-airflow.txt +++ b/dev/breeze/doc/images/output_start-airflow.txt @@ -1 +1 @@ -1664c1bda5204995062f50c4e97b087d +e40d63b96551e328aee3fe8f1a3aedf2 diff --git a/dev/breeze/doc/images/output_testing_integration-tests.svg b/dev/breeze/doc/images/output_testing_integration-tests.svg index 35beec897b69..b4e8af9aff9b 100644 --- a/dev/breeze/doc/images/output_testing_integration-tests.svg +++ b/dev/breeze/doc/images/output_testing_integration-tests.svg @@ -197,7 +197,7 @@ ╭─ Test environment ───────────────────────────────────────────────────────────────────────────────────────────────────╮ --integrationIntegration(s) to enable when running (can be more than one).                             (all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage | otel  -| pinot | statsd | trino)                                                                 +| pinot | qdrant | statsd | trino)                                                        --backend-bDatabase backend to use. If 'none' is selected, breeze starts with invalid DB             configuration and no database and any attempts to connect to Airflow DB will fail.        (>sqlite< | mysql | postgres | none)                                                      diff --git a/dev/breeze/doc/images/output_testing_integration-tests.txt b/dev/breeze/doc/images/output_testing_integration-tests.txt index 2270869eb9d3..bab4263ef621 100644 --- a/dev/breeze/doc/images/output_testing_integration-tests.txt +++ b/dev/breeze/doc/images/output_testing_integration-tests.txt @@ -1 +1 @@ -ae82cfad12e55c5f0f847234992d3877 +d2fca4bbac3f1d53f1b56a47652997eb diff --git a/dev/breeze/doc/images/output_testing_tests.svg b/dev/breeze/doc/images/output_testing_tests.svg index ff52a515a7ae..2732943b907a 100644 --- a/dev/breeze/doc/images/output_testing_tests.svg +++ b/dev/breeze/doc/images/output_testing_tests.svg @@ -368,7 +368,7 @@ ╭─ Test environment ───────────────────────────────────────────────────────────────────────────────────────────────────╮ --integrationIntegration(s) to enable when running (can be more than one).                             (all | all-testable | cassandra | celery | kafka | kerberos | mongo | openlineage | otel  -| pinot | statsd | trino)                                                                 +| pinot | qdrant | statsd | trino)                                                        --backend-bDatabase backend to use. If 'none' is selected, breeze starts with invalid DB             configuration and no database and any attempts to connect to Airflow DB will fail.        (>sqlite< | mysql | postgres | none)                                                      diff --git a/dev/breeze/doc/images/output_testing_tests.txt b/dev/breeze/doc/images/output_testing_tests.txt index 0b631fb981fb..f4864ad5156e 100644 --- a/dev/breeze/doc/images/output_testing_tests.txt +++ b/dev/breeze/doc/images/output_testing_tests.txt @@ -1 +1 @@ -ebfb809a4ef0e366f21b39495a20c7dd +5b4d64d2d8331c5fb9bd803ad9af53ef diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index c867a86a60b4..324176c44609 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -50,7 +50,7 @@ ALLOWED_BACKENDS = ["sqlite", "mysql", "postgres", "none"] ALLOWED_PROD_BACKENDS = ["mysql", "postgres"] DEFAULT_BACKEND = ALLOWED_BACKENDS[0] -TESTABLE_INTEGRATIONS = ["cassandra", "celery", "kerberos", "mongo", "pinot", "trino", "kafka"] +TESTABLE_INTEGRATIONS = ["cassandra", "celery", "kerberos", "mongo", "pinot", "trino", "kafka", "qdrant"] OTHER_INTEGRATIONS = ["statsd", "otel", "openlineage"] ALLOWED_DEBIAN_VERSIONS = ["bookworm", "bullseye"] ALL_INTEGRATIONS = sorted( @@ -365,15 +365,7 @@ def get_airflow_extras(): # Initialize integrations -AVAILABLE_INTEGRATIONS = [ - "cassandra", - "kerberos", - "mongo", - "pinot", - "celery", - "statsd", - "trino", -] +AVAILABLE_INTEGRATIONS = ["cassandra", "kerberos", "mongo", "pinot", "celery", "statsd", "trino", "qdrant"] ALL_PROVIDER_YAML_FILES = Path(AIRFLOW_SOURCES_ROOT, "airflow", "providers").rglob("provider.yaml") PROVIDER_RUNTIME_DATA_SCHEMA_PATH = AIRFLOW_SOURCES_ROOT / "airflow" / "provider_info.schema.json" @@ -468,12 +460,12 @@ def _exclusion(providers: Iterable[str]) -> str: { "python-version": "3.8", "airflow-version": "2.6.0", - "remove-providers": _exclusion(["openlineage", "common.io", "cohere", "fab"]), + "remove-providers": _exclusion(["openlineage", "common.io", "cohere", "fab", "qdrant"]), }, { "python-version": "3.9", "airflow-version": "2.6.0", - "remove-providers": _exclusion(["openlineage", "common.io", "fab"]), + "remove-providers": _exclusion(["openlineage", "common.io", "fab", "qdrant"]), }, { "python-version": "3.8", diff --git a/docs/apache-airflow-providers-qdrant/changelog.rst b/docs/apache-airflow-providers-qdrant/changelog.rst new file mode 100644 index 000000000000..b4d67b06a076 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/changelog.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../../airflow/providers/qdrant/CHANGELOG.rst diff --git a/docs/apache-airflow-providers-qdrant/commits.rst b/docs/apache-airflow-providers-qdrant/commits.rst new file mode 100644 index 000000000000..c5206dab50a1 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/commits.rst @@ -0,0 +1,19 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Package apache-airflow-providers-qdrant +------------------------------------------- diff --git a/docs/apache-airflow-providers-qdrant/connections.rst b/docs/apache-airflow-providers-qdrant/connections.rst new file mode 100644 index 000000000000..2eadc0146db1 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/connections.rst @@ -0,0 +1,55 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/connection:qdrant: + +Qdrant Connection +=================== + +The `Qdrant `__ connection type enables access to Qdrant clusters. + +Default Connection IDs +---------------------- + +The Qdrant hook use the ``qdrant_default`` connection ID by default. + +Configuring the Connection +-------------------------- + +Host (optional) + Host of the Qdrant instance to connect to. + +API key (optional) + Qdrant API Key for authentication. + +Port (optional) + REST port of the Qdrant instance to connect to. Defaults to ``6333``. + +URL (optional) + URL of the Qdrant instance to connect to. If specified, it overrides the ``host`` and ``port`` parameters. + +GRPC Port (optional) + GRPC port of the Qdrant instance to connect to. Defaults to ``6334``. + +Prefer GRPC (optional) + Whether to use GRPC for custom methods. Defaults to ``False``. + +HTTPS (optional) + Whether to use HTTPS for requests. Defaults to ``True`` if an API key is provided. ``False`` otherwise. + +Prefix (optional) + Prefix to add to the REST URL endpoints. Defaults to ``None``. diff --git a/docs/apache-airflow-providers-qdrant/index.rst b/docs/apache-airflow-providers-qdrant/index.rst new file mode 100644 index 000000000000..c2f8fc473abd --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/index.rst @@ -0,0 +1,98 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-qdrant`` +====================================== + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Basics + + Home + Changelog + Security + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Guides + + Connection types + Operators + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Commits + + Detailed list of commits + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Resources + + Python API <_api/airflow/providers/qdrant/index> + PyPI Repository + Installing from sources + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: System tests + + System Tests <_api/tests/system/providers/qdrant/index> + +Package apache-airflow-providers-qdrant +----------------------------------------- + +`Qdrant `__ + + +Release: 1.0.0 + +Provider package +---------------- + +This is a provider package for ``Qdrant`` APIs. All classes for this provider package +are in ``airflow.providers.qdrant`` python module. + +Installation +------------ + +You can install this package on top of an existing Airflow 2 installation (see ``Requirements`` below) +for the minimum Airflow version supported) via +``pip install apache-airflow-providers-qdrant`` + + +Requirements +------------ + +The minimum Apache Airflow version supported by this provider package is ``2.5.0``. + + +=================== ================== +PIP package Version required +=================== ================== +``apache-airflow`` ``>=2.7.0`` +``qdrant_client`` ``>=1.7.0`` +=================== ================== diff --git a/docs/apache-airflow-providers-qdrant/installing-providers-from-sources.rst b/docs/apache-airflow-providers-qdrant/installing-providers-from-sources.rst new file mode 100644 index 000000000000..b4e730f4ff21 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/installing-providers-from-sources.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/installing-providers-from-sources.rst diff --git a/docs/apache-airflow-providers-qdrant/operators/qdrant.rst b/docs/apache-airflow-providers-qdrant/operators/qdrant.rst new file mode 100644 index 000000000000..22d82d5958f1 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/operators/qdrant.rst @@ -0,0 +1,40 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:QdrantIngestOperator: + +QdrantIngestOperator +====================== + +Use the :class:`~airflow.providers.qdrant.operators.qdrant.QdrantIngestOperator` to +ingest data into a Qdrant instance. + + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +The QdrantIngestOperator requires the ``vectors`` as an input ingest into Qdrant. Use the ``conn_id`` parameter to +specify the Qdrant connection to connect to Qdrant instance. The vectors could also contain metadata referencing +the original text corresponding to the vectors that could be ingested into the database. + +An example using the operator in this way: + +.. exampleinclude:: /../../tests/system/providers/qdrant/example_dag_qdrant.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_qdrant_ingest] + :end-before: [END howto_operator_qdrant_ingest] diff --git a/docs/apache-airflow-providers-qdrant/security.rst b/docs/apache-airflow-providers-qdrant/security.rst new file mode 100644 index 000000000000..afa13dac6fc9 --- /dev/null +++ b/docs/apache-airflow-providers-qdrant/security.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/security.rst diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index 8c1c1143c531..ea813aeb499f 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -210,6 +210,8 @@ These are extras that add dependencies needed for integration with external serv +---------------------+-----------------------------------------------------+-----------------------------------------------------+ | pinecone | ``pip install 'apache-airflow[pinecone]'`` | Pinecone Operators and Hooks | +---------------------+-----------------------------------------------------+-----------------------------------------------------+ +| qdrant | ``pip install 'apache-airflow[qdrant]'`` | Qdrant Operators and Hooks | ++---------------------+-----------------------------------------------------+-----------------------------------------------------+ | salesforce | ``pip install 'apache-airflow[salesforce]'`` | Salesforce hook | +---------------------+-----------------------------------------------------+-----------------------------------------------------+ | sendgrid | ``pip install 'apache-airflow[sendgrid]'`` | Send email using sendgrid | diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 343c09a57cf1..2d168708a8e0 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -634,6 +634,7 @@ fn fo followsa forecasted +forkserver formatter formatters Formaturas @@ -1269,6 +1270,8 @@ pythonic PythonOperator pythonpath pywinrm +Qdrant +qdrant qds Qingping Qplum diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index f916a2d2abc5..5991796ee599 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -936,6 +936,16 @@ "excluded-python-versions": [], "state": "ready" }, + "qdrant": { + "deps": [ + "apache-airflow>=2.7.0", + "qdrant_client>=1.7.0" + ], + "devel-deps": [], + "cross-providers-deps": [], + "excluded-python-versions": [], + "state": "ready" + }, "redis": { "deps": [ "apache-airflow>=2.6.0", diff --git a/pyproject.toml b/pyproject.toml index 0f7652dd24ac..68ea02694ae8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -873,6 +873,9 @@ presto = [ # source: airflow/providers/presto/provider.yaml "pandas>=1.2.5", "presto-python-client>=0.8.4", ] +qdrant = [ # source: airflow/providers/qdrant/provider.yaml + "qdrant_client>=1.7.0", +] redis = [ # source: airflow/providers/redis/provider.yaml "redis>=4.5.2,<5.0.0,!=4.5.5", ] @@ -1036,6 +1039,7 @@ all = [ "apache-airflow[pinecone]", "apache-airflow[postgres]", "apache-airflow[presto]", + "apache-airflow[qdrant]", "apache-airflow[redis]", "apache-airflow[salesforce]", "apache-airflow[samba]", @@ -1135,6 +1139,7 @@ devel-all = [ "apache-airflow[pinecone]", "apache-airflow[postgres]", "apache-airflow[presto]", + "apache-airflow[qdrant]", "apache-airflow[redis]", "apache-airflow[salesforce]", "apache-airflow[samba]", diff --git a/scripts/ci/docker-compose/integration-qdrant.yml b/scripts/ci/docker-compose/integration-qdrant.yml new file mode 100644 index 000000000000..724c1e92cfb4 --- /dev/null +++ b/scripts/ci/docker-compose/integration-qdrant.yml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +version: "3.8" +services: + qdrant: + image: qdrant/qdrant:latest + labels: + breeze.description: "Integration required for Qdrant tests." + ports: + - "6333:6333" + - "6334:6334" + restart: "on-failure" + + airflow: + environment: + - INTEGRATION_QDRANT=true + depends_on: + qdrant: + condition: service_started diff --git a/scripts/in_container/check_environment.sh b/scripts/in_container/check_environment.sh index 187490dee8ca..73686e1368b7 100755 --- a/scripts/in_container/check_environment.sh +++ b/scripts/in_container/check_environment.sh @@ -177,6 +177,13 @@ if [[ ${INTEGRATION_PINOT} == "true" ]]; then CMD="curl --max-time 1 -X GET 'http://pinot:8000/health' -H 'accept: text/plain' | grep OK" check_service "Pinot (Broker API)" "${CMD}" 50 fi + +if [[ ${INTEGRATION_QDRANT} == "true" ]]; then + check_service "Qdrant" "run_nc qdrant 6333" 50 + CMD="curl -f -X GET 'http://qdrant:6333/collections'" + check_service "Qdrant (Collections API)" "${CMD}" 50 +fi + if [[ ${INTEGRATION_KAFKA} == "true" ]]; then check_service "Kafka Cluster" "run_nc broker 9092" 50 fi diff --git a/tests/integration/providers/qdrant/__init__.py b/tests/integration/providers/qdrant/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/integration/providers/qdrant/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/qdrant/hooks/__init__.py b/tests/integration/providers/qdrant/hooks/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/integration/providers/qdrant/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/qdrant/hooks/test_qdrant.py b/tests/integration/providers/qdrant/hooks/test_qdrant.py new file mode 100644 index 000000000000..296df3bed596 --- /dev/null +++ b/tests/integration/providers/qdrant/hooks/test_qdrant.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import random + +import numpy as np +import pytest +from qdrant_client import models + +from airflow.providers.qdrant.hooks.qdrant import QdrantHook + + +@pytest.mark.integration("qdrant") +class TestQdrant: + def setup_method(self): + self.test_collection_name = "test-hook-collection" + self.test_collection_dimension = random.randint(100, 2000) + self.hook = QdrantHook() + + self.hook.conn.recreate_collection( + self.test_collection_name, + vectors_config=models.VectorParams( + size=self.test_collection_dimension, distance=models.Distance.MANHATTAN + ), + ) + + def test_connection(self): + response, message = self.hook.verify_connection() + assert response and message == "Connection established!", "Successfully connected to Qdrant." + + def test_upsert_points(self): + vectors = np.random.rand(100, self.test_collection_dimension) + self.hook.conn.upsert( + self.test_collection_name, + points=[ + models.PointStruct( + id=idx, vector=vector.tolist(), payload={"color": "red", "rand_number": idx % 10} + ) + for idx, vector in enumerate(vectors) + ], + ) + + assert self.hook.conn.count(self.test_collection_name).count == 100 + + def test_delete_points(self): + self.hook.conn.delete( + self.test_collection_name, + points_selector=models.Filter( + must=[models.FieldCondition(key="color", match=models.MatchValue(value="red"))] + ), + ) + + assert self.hook.conn.count(self.test_collection_name).count == 0 diff --git a/tests/integration/providers/qdrant/operators/__init__.py b/tests/integration/providers/qdrant/operators/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/integration/providers/qdrant/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/qdrant/operators/test_qdrant_ingest.py b/tests/integration/providers/qdrant/operators/test_qdrant_ingest.py new file mode 100644 index 000000000000..eed4bd8e78db --- /dev/null +++ b/tests/integration/providers/qdrant/operators/test_qdrant_ingest.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import random +from unittest.mock import MagicMock + +import pytest +from qdrant_client.models import Distance, VectorParams + +from airflow.models.dag import DAG +from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2024, 1, 1) + + +@pytest.mark.integration("qdrant") +class TestQdrantIngestOperator: + def setup_method(self): + args = {"owner": "airflow", "start_date": DEFAULT_DATE} + + self.dag = DAG("test_qdrant_dag_id", default_args=args) + + self.mock_context = MagicMock() + self.channel = "test" + + def test_execute_hello(self): + collection_name = "test-operator-collection" + dimensions = 384 + points_count = 100 + vectors = [[random.uniform(0, 1) for _ in range(dimensions)] for _ in range(points_count)] + ids = random.sample(range(100, 10000), points_count) + payload = [{"some_number": i % 10} for i in range(points_count)] + + operator = QdrantIngestOperator( + task_id="qdrant_ingest", + conn_id="qdrant_default", + collection_name=collection_name, + vectors=vectors, + ids=ids, + payload=payload, + batch_size=1, + ) + + hook = operator.hook + + hook.conn.recreate_collection( + collection_name, vectors_config=VectorParams(size=dimensions, distance=Distance.COSINE) + ) + + operator.execute(self.mock_context) + + assert ( + hook.conn.count(collection_name=collection_name).count == points_count + ), f"Added {points_count} points to the Qdrant collection" diff --git a/tests/providers/qdrant/__init__.py b/tests/providers/qdrant/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/providers/qdrant/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/qdrant/hooks/__init__.py b/tests/providers/qdrant/hooks/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/providers/qdrant/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/qdrant/hooks/test_qdrant.py b/tests/providers/qdrant/hooks/test_qdrant.py new file mode 100644 index 000000000000..ab66aaf7ad96 --- /dev/null +++ b/tests/providers/qdrant/hooks/test_qdrant.py @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import Mock, patch + +from airflow.providers.qdrant.hooks.qdrant import QdrantHook + + +class TestQdrantHook: + def setup_method(self): + """Set up the test connection for the QdrantHook.""" + with patch("airflow.models.Connection.get_connection_from_secrets") as mock_get_connection: + mock_conn = Mock() + mock_conn.host = "localhost" + mock_conn.port = 6333 + mock_conn.extra_dejson = {} + mock_conn.password = "some_test_api_key" + mock_get_connection.return_value = mock_conn + self.qdrant_hook = QdrantHook() + + self.collection_name = "test_collection" + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_verify_connection(self, mock_conn): + """Test the verify_connection of the QdrantHook.""" + self.qdrant_hook.verify_connection() + + mock_conn.get_collections.assert_called_once() + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_upsert(self, conn): + """Test the upsert method of the QdrantHook with appropriate arguments.""" + vectors = [[0.732, 0.611, 0.289], [0.217, 0.526, 0.416], [0.326, 0.483, 0.376]] + ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"] + payloads = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}] + parallel = 2 + self.qdrant_hook.conn.upsert( + collection_name=self.collection_name, + vectors=vectors, + ids=ids, + payloads=payloads, + parallel=parallel, + ) + conn.upsert.assert_called_once_with( + collection_name=self.collection_name, + vectors=vectors, + ids=ids, + payloads=payloads, + parallel=parallel, + ) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_list_collections(self, conn): + """Test that the list_collections is called correctly.""" + self.qdrant_hook.conn.list_collections() + conn.list_collections.assert_called_once() + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_create_collection(self, conn): + """Test that the create_collection is called with correct arguments.""" + + from qdrant_client.models import Distance, VectorParams + + self.qdrant_hook.conn.create_collection( + collection_name=self.collection_name, + vectors_config=VectorParams(size=384, distance=Distance.COSINE), + ) + conn.create_collection.assert_called_once_with( + collection_name=self.collection_name, + vectors_config=VectorParams(size=384, distance=Distance.COSINE), + ) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_delete(self, conn): + """Test that the delete is called with correct arguments.""" + + self.qdrant_hook.conn.delete( + collection_name=self.collection_name, points_selector=[32, 21], wait=False + ) + + conn.delete.assert_called_once_with( + collection_name=self.collection_name, points_selector=[32, 21], wait=False + ) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_search(self, conn): + """Test that the search is called with correct arguments.""" + + self.qdrant_hook.conn.search( + collection_name=self.collection_name, + query_vector=[1.0, 2.0, 3.0], + limit=10, + with_vectors=True, + ) + + conn.search.assert_called_once_with( + collection_name=self.collection_name, query_vector=[1.0, 2.0, 3.0], limit=10, with_vectors=True + ) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_get_collection(self, conn): + """Test that the get_collection is called with correct arguments.""" + + self.qdrant_hook.conn.get_collection(collection_name=self.collection_name) + + conn.get_collection.assert_called_once_with(collection_name=self.collection_name) + + @patch("airflow.providers.qdrant.hooks.qdrant.QdrantHook.conn") + def test_delete_collection(self, conn): + """Test that the delete_collection is called with correct arguments.""" + + self.qdrant_hook.conn.delete_collection(collection_name=self.collection_name) + + conn.delete_collection.assert_called_once_with(collection_name=self.collection_name) diff --git a/tests/providers/qdrant/operators/__init__.py b/tests/providers/qdrant/operators/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/providers/qdrant/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/qdrant/operators/test_qdrant.py b/tests/providers/qdrant/operators/test_qdrant.py new file mode 100644 index 000000000000..54b2bc1b3aed --- /dev/null +++ b/tests/providers/qdrant/operators/test_qdrant.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator + + +class TestQdrantIngestOperator: + @pytest.mark.db_test + def test_operator_execution(self, dag_maker): + """ + Test the execution of the QdrantIngestOperator. + Ensures that the upsert method on the hook is correctly called. + """ + with dag_maker(dag_id="test_dag") as dummy_dag: + vectors = [[0.732, 0.611, 0.289], [0.217, 0.526, 0.416], [0.326, 0.483, 0.376]] + ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"] + payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}] + + task = QdrantIngestOperator( + task_id="ingest_vectors", + collection_name="test_collection", + vectors=vectors, + ids=ids, + payload=payload, + wait=False, + max_retries=1, + parallel=3, + dag=dummy_dag, + ) + + with patch( + "airflow.providers.qdrant.operators.qdrant.QdrantIngestOperator.hook" + ) as mock_hook_instance: + task.execute(context={}) + mock_hook_instance.conn.upload_collection.assert_called_once_with( + collection_name="test_collection", + vectors=vectors, + ids=ids, + payload=payload, + batch_size=64, + wait=False, + max_retries=1, + parallel=3, + method=None, + ) diff --git a/tests/system/providers/qdrant/__init__.py b/tests/system/providers/qdrant/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/system/providers/qdrant/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/qdrant/example_dag_qdrant.py b/tests/system/providers/qdrant/example_dag_qdrant.py new file mode 100644 index 000000000000..8f55d2d72c55 --- /dev/null +++ b/tests/system/providers/qdrant/example_dag_qdrant.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator + +with DAG( + "example_qdrant_ingest", + schedule=None, + start_date=datetime(2024, 1, 1), + catchup=False, +) as dag: + # [START howto_operator_qdrant_ingest] + vectors = [[0.732, 0.611, 0.289, 0.421], [0.217, 0.526, 0.416, 0.981], [0.326, 0.483, 0.376, 0.136]] + ids: list[str | int] = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"] + payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}] + + QdrantIngestOperator( + task_id="qdrant_ingest", + collection_name="test_collection", + vectors=vectors, + ids=ids, + payload=payload, + batch_size=1, + ) + # [END howto_operator_qdrant_ingest] + + +from tests.system.utils import get_test_run + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)