From d4ac119f52c49be68118874441ef4291182eb2ce Mon Sep 17 00:00:00 2001 From: zhuoyuchen <> Date: Tue, 10 Feb 2026 15:33:33 +0800 Subject: [PATCH 1/3] Add Apache Kyuubi Provider --- providers/apache/kyuubi/README.rst | 47 ++++++++++++ providers/apache/kyuubi/docs/commits.rst | 34 +++++++++ providers/apache/kyuubi/docs/connections.rst | 49 ++++++++++++ providers/apache/kyuubi/docs/index.rst | 54 +++++++++++++ providers/apache/kyuubi/docs/operators.rst | 37 +++++++++ providers/apache/kyuubi/provider.yaml | 46 +++++++++++ providers/apache/kyuubi/pyproject.toml | 69 +++++++++++++++++ .../providers/apache/kyuubi/__init__.py | 16 ++++ .../apache/kyuubi/get_provider_info.py | 50 ++++++++++++ .../providers/apache/kyuubi/hooks/__init__.py | 16 ++++ .../providers/apache/kyuubi/hooks/kyuubi.py | 69 +++++++++++++++++ .../apache/kyuubi/operators/__init__.py | 16 ++++ .../apache/kyuubi/operators/kyuubi.py | 67 ++++++++++++++++ .../apache/kyuubi/hooks/test_kyuubi_hook.py | 76 +++++++++++++++++++ .../kyuubi/operators/test_kyuubi_operator.py | 43 +++++++++++ 15 files changed, 689 insertions(+) create mode 100644 providers/apache/kyuubi/README.rst create mode 100644 providers/apache/kyuubi/docs/commits.rst create mode 100644 providers/apache/kyuubi/docs/connections.rst create mode 100644 providers/apache/kyuubi/docs/index.rst create mode 100644 providers/apache/kyuubi/docs/operators.rst create mode 100644 providers/apache/kyuubi/provider.yaml create mode 100644 providers/apache/kyuubi/pyproject.toml create mode 100644 providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/__init__.py create mode 100644 providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/get_provider_info.py create mode 100644 providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/__init__.py create mode 100644 providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/kyuubi.py create mode 100644 providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/__init__.py create mode 100644 providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/kyuubi.py create mode 100644 providers/apache/kyuubi/tests/unit/apache/kyuubi/hooks/test_kyuubi_hook.py create mode 100644 providers/apache/kyuubi/tests/unit/apache/kyuubi/operators/test_kyuubi_operator.py diff --git a/providers/apache/kyuubi/README.rst b/providers/apache/kyuubi/README.rst new file mode 100644 index 0000000000000..069602595cdcc --- /dev/null +++ b/providers/apache/kyuubi/README.rst @@ -0,0 +1,47 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Package ``apache-airflow-providers-apache-kyuubi`` +================================================== + +Release: ``1.0.0`` + + +`Apache Kyuubi `__ + + +Provider package +---------------- + +This is a provider package for ``apache.kyuubi`` provider. All classes for this provider package +are in ``airflow.providers.apache.kyuubi`` python package. + +Installation +------------ + +You can install this package on top of an existing Airflow installation via +``pip install apache-airflow-providers-apache-kyuubi`` + +Requirements +------------ + +======================================= ================== +PIP package Version required +======================================= ================== +``apache-airflow`` ``>=2.8.0`` +``apache-airflow-providers-apache-hive`` ``>=6.0.0`` +======================================= ================== diff --git a/providers/apache/kyuubi/docs/commits.rst b/providers/apache/kyuubi/docs/commits.rst new file mode 100644 index 0000000000000..85ba7d47b94d3 --- /dev/null +++ b/providers/apache/kyuubi/docs/commits.rst @@ -0,0 +1,34 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN! + +.. IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE + `PROVIDER_COMMITS_TEMPLATE.rst.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY + +.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN! + +Package apache-airflow-providers-apache-kyuubi +------------------------------------------------------ + +`Apache Kyuubi `__ + + +This is detailed commit list of changes for versions provider package: ``apache.kyuubi``. +For high-level changelog, see :doc:`package information including changelog `. + +.. airflow-providers-commits:: diff --git a/providers/apache/kyuubi/docs/connections.rst b/providers/apache/kyuubi/docs/connections.rst new file mode 100644 index 0000000000000..833d176e53b2e --- /dev/null +++ b/providers/apache/kyuubi/docs/connections.rst @@ -0,0 +1,49 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/connection:kyuubi: + +Apache Kyuubi Connection +======================== + +The Apache Kyuubi connection type enables integrations with Apache Kyuubi. + +Default Connection IDs +---------------------- + +Kyuubi Hook uses ``kyuubi_default`` by default. + +Configuring the Connection +-------------------------- + +Host (required) + The host to connect to. + +Port (required) + The port to connect to. Default is 10009. + +Login (optional) + Specify the user name to connect. + +Password (optional) + Specify the password to connect. + +Extra (optional) + Specify any extra parameters (as json dictionary) that can be used in the connection. + + * ``beeline_path``: Path to the Kyuubi Beeline executable. + * ``use_beeline``: Whether to use Beeline or Hive CLI. Default is True. diff --git a/providers/apache/kyuubi/docs/index.rst b/providers/apache/kyuubi/docs/index.rst new file mode 100644 index 0000000000000..36b1ba1016fae --- /dev/null +++ b/providers/apache/kyuubi/docs/index.rst @@ -0,0 +1,54 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-apache-kyuubi`` +============================================ + +Content +------- + +.. toctree:: + :maxdepth: 1 + :caption: Guides + + connections + operators + +.. toctree:: + :maxdepth: 1 + :caption: References + + Python API <_api/airflow/providers/apache/kyuubi/index> + +.. toctree:: + :maxdepth: 1 + :caption: Resources + + Apache Kyuubi Website + +.. toctree:: + :maxdepth: 1 + :caption: Commits + + commits + +Package apache-airflow-providers-apache-kyuubi +------------------------------------------------ + +`Apache Kyuubi `__ is a distributed and multi-tenant gateway to provide serverless SQL on Lakehouses. + +Release: 1.0.0 diff --git a/providers/apache/kyuubi/docs/operators.rst b/providers/apache/kyuubi/docs/operators.rst new file mode 100644 index 0000000000000..0e8465b6a269c --- /dev/null +++ b/providers/apache/kyuubi/docs/operators.rst @@ -0,0 +1,37 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:KyuubiOperator: + +Apache Kyuubi Operator +====================== + +Use the :class:`~airflow.providers.apache.kyuubi.operators.kyuubi.KyuubiOperator` to execute +Hive Query Language (HQL) statements against Kyuubi. + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + from airflow.providers.apache.kyuubi.operators.kyuubi import KyuubiOperator + + t1 = KyuubiOperator( + task_id="kyuubi_task", + hql="SELECT * FROM table", + kyuubi_conn_id="kyuubi_default", + ) diff --git a/providers/apache/kyuubi/provider.yaml b/providers/apache/kyuubi/provider.yaml new file mode 100644 index 0000000000000..55afa1b122948 --- /dev/null +++ b/providers/apache/kyuubi/provider.yaml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-apache-kyuubi +name: Apache Kyuubi +description: | + `Apache Kyuubi `__ + +state: ready +source-date-epoch: 1700000000 +versions: + - 1.0.0 + +integrations: + - integration-name: Apache Kyuubi + external-doc-url: https://kyuubi.apache.org/ + tags: [apache] + +operators: + - integration-name: Apache Kyuubi + python-modules: + - airflow.providers.apache.kyuubi.operators.kyuubi + +hooks: + - integration-name: Apache Kyuubi + python-modules: + - airflow.providers.apache.kyuubi.hooks.kyuubi + +connection-types: + - hook-class-name: airflow.providers.apache.kyuubi.hooks.kyuubi.KyuubiHook + connection-type: kyuubi diff --git a/providers/apache/kyuubi/pyproject.toml b/providers/apache/kyuubi/pyproject.toml new file mode 100644 index 0000000000000..6a8cc4cbb826e --- /dev/null +++ b/providers/apache/kyuubi/pyproject.toml @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[build-system] +requires = ["flit_core==3.12.0"] +build-backend = "flit_core.buildapi" + +[project] +name = "apache-airflow-providers-apache-kyuubi" +version = "1.0.0" +description = "Provider package apache-airflow-providers-apache-kyuubi for Apache Airflow" +readme = "README.rst" +license = "Apache-2.0" +license-files = ['LICENSE', 'NOTICE'] +authors = [ + {name="Apache Software Foundation", email="dev@airflow.apache.org"}, +] +maintainers = [ + {name="Apache Software Foundation", email="dev@airflow.apache.org"}, +] +keywords = [ "airflow-provider", "apache.kyuubi", "airflow", "integration" ] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Environment :: Console", + "Environment :: Web Environment", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "Framework :: Apache Airflow", + "Framework :: Apache Airflow :: Provider", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: System :: Monitoring", +] +requires-python = ">=3.10" + +dependencies = [ + "apache-airflow>=2.8.0", + "apache-airflow-providers-apache-hive>=6.0.0", +] + +[project.urls] +"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-apache-kyuubi/1.0.0" +"Bug Tracker" = "https://github.com/apache/airflow/issues" +"Source Code" = "https://github.com/apache/airflow" +"Slack Chat" = "https://s.apache.org/airflow-slack" +"Mastodon" = "https://fosstodon.org/@airflow" +"YouTube" = "https://www.youtube.com/channel/UCSXwxpWZQ7XZ1WL3wqevChA/" + +[project.entry-points."apache_airflow_provider"] +provider_info = "airflow.providers.apache.kyuubi.get_provider_info:get_provider_info" + +[tool.flit.module] +name = "airflow.providers.apache.kyuubi" diff --git a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/__init__.py b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/__init__.py new file mode 100644 index 0000000000000..d216be4ddc949 --- /dev/null +++ b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. \ No newline at end of file diff --git a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/get_provider_info.py b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/get_provider_info.py new file mode 100644 index 0000000000000..05014f3e9f1ee --- /dev/null +++ b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/get_provider_info.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +def get_provider_info(): + return { + "package-name": "apache-airflow-providers-apache-kyuubi", + "name": "Apache Kyuubi", + "description": "`Apache Kyuubi `__\n", + "integrations": [ + { + "integration-name": "Apache Kyuubi", + "external-doc-url": "https://kyuubi.apache.org/", + "tags": ["apache"], + } + ], + "operators": [ + { + "integration-name": "Apache Kyuubi", + "python-modules": [ + "airflow.providers.apache.kyuubi.operators.kyuubi", + ], + } + ], + "hooks": [ + { + "integration-name": "Apache Kyuubi", + "python-modules": ["airflow.providers.apache.kyuubi.hooks.kyuubi"], + } + ], + "connection-types": [ + { + "hook-class-name": "airflow.providers.apache.kyuubi.hooks.kyuubi.KyuubiHook", + "connection-type": "kyuubi", + }, + ], + } diff --git a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/__init__.py b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/__init__.py new file mode 100644 index 0000000000000..d216be4ddc949 --- /dev/null +++ b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. \ No newline at end of file diff --git a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/kyuubi.py b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/kyuubi.py new file mode 100644 index 0000000000000..49484ce9fe14a --- /dev/null +++ b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/kyuubi.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any + +from flask_appbuilder.fieldwidgets import BS3TextFieldWidget +from flask_babel import lazy_gettext +from wtforms import StringField + +from airflow.providers.apache.hive.hooks.hive import HiveCliHook + + +class KyuubiHook(HiveCliHook): + """ + Wrapper around the Kyuubi connection. + + Inherits from HiveCliHook to leverage Hive CLI/Beeline functionality, + but allows specifying a custom Beeline path for Kyuubi. + + :param kyuubi_conn_id: Reference to the Kyuubi connection id. + """ + + conn_name_attr = "kyuubi_conn_id" + default_conn_name = "kyuubi_default" + conn_type = "kyuubi" + hook_name = "Kyuubi" + + def __init__(self, kyuubi_conn_id: str = default_conn_name, **kwargs) -> None: + super().__init__(hive_cli_conn_id=kyuubi_conn_id, **kwargs) + + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Return connection widgets to add to Kyuubi connection form.""" + widgets = super().get_connection_form_widgets() + widgets["beeline_path"] = StringField( + lazy_gettext("Beeline Path"), # type: ignore[arg-type] + widget=BS3TextFieldWidget(), # type: ignore[arg-type] + description="Path to the Kyuubi Beeline executable (e.g., /path/to/bin/beeline)", + ) + return widgets + + def _prepare_cli_cmd(self) -> list[Any]: + """Create the command list from available information, using custom beeline path if provided.""" + cmd = super()._prepare_cli_cmd() + + # If use_beeline is enabled (which is default in HiveCliHook widgets, but check logic) + # HiveCliHook sets defaults. + + beeline_path = self.conn.extra_dejson.get("beeline_path") + if self.use_beeline and beeline_path: + self.log.info("Using custom Beeline path: %s", beeline_path) + cmd[0] = beeline_path + + return cmd diff --git a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/__init__.py b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/__init__.py new file mode 100644 index 0000000000000..d216be4ddc949 --- /dev/null +++ b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. \ No newline at end of file diff --git a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/kyuubi.py b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/kyuubi.py new file mode 100644 index 0000000000000..bc0ca3a635629 --- /dev/null +++ b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/kyuubi.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import cached_property +from typing import Any, Sequence + +from airflow.providers.apache.hive.operators.hive import HiveOperator +from airflow.providers.apache.kyuubi.hooks.kyuubi import KyuubiHook + + +class KyuubiOperator(HiveOperator): + """ + Executes hql code or hive script using Kyuubi. + + :param hql: the hql to be executed. + :param kyuubi_conn_id: Reference to the Kyuubi connection id. + """ + + template_fields: Sequence[str] = ( + "hql", + "schema", + "hive_cli_conn_id", + "mapred_queue", + "hiveconfs", + "mapred_job_name", + "mapred_queue_priority", + "proxy_user", + ) + ui_color = "#f0ede4" + + def __init__( + self, + *, + kyuubi_conn_id: str = "kyuubi_default", + **kwargs: Any, + ) -> None: + # Pass kyuubi_conn_id as hive_cli_conn_id to parent + super().__init__(hive_cli_conn_id=kyuubi_conn_id, **kwargs) + self.kyuubi_conn_id = kyuubi_conn_id + + @cached_property + def hook(self) -> KyuubiHook: + """Get Kyuubi hook.""" + return KyuubiHook( + kyuubi_conn_id=self.kyuubi_conn_id, + mapred_queue=self.mapred_queue, + mapred_queue_priority=self.mapred_queue_priority, + mapred_job_name=self.mapred_job_name, + hive_cli_params=self.hive_cli_params, + auth=self.auth, + proxy_user=self.proxy_user, + ) diff --git a/providers/apache/kyuubi/tests/unit/apache/kyuubi/hooks/test_kyuubi_hook.py b/providers/apache/kyuubi/tests/unit/apache/kyuubi/hooks/test_kyuubi_hook.py new file mode 100644 index 0000000000000..c51d71e1c0d11 --- /dev/null +++ b/providers/apache/kyuubi/tests/unit/apache/kyuubi/hooks/test_kyuubi_hook.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import unittest +from unittest import mock + +from airflow.providers.apache.kyuubi.hooks.kyuubi import KyuubiHook + + +class TestKyuubiHook(unittest.TestCase): + @mock.patch("airflow.providers.apache.hive.hooks.hive.HiveCliHook.get_connection") + def test_init(self, mock_get_connection): + conn = mock.MagicMock() + conn.extra_dejson = {} + mock_get_connection.return_value = conn + + hook = KyuubiHook() + assert hook.conn_name_attr == "kyuubi_conn_id" + assert hook.default_conn_name == "kyuubi_default" + assert hook.conn_type == "kyuubi" + + @mock.patch("airflow.providers.apache.hive.hooks.hive.HiveCliHook.get_connection") + def test_prepare_cli_cmd_with_custom_beeline(self, mock_get_connection): + # Mock connection with extra beeline_path + conn = mock.MagicMock() + conn.extra_dejson = {"beeline_path": "/custom/path/to/beeline", "use_beeline": True} + conn.host = "localhost" + conn.port = 10000 + conn.schema = "default" + conn.login = "user" + conn.password = "password" + mock_get_connection.return_value = conn + + hook = KyuubiHook() + # Ensure use_beeline is True as it comes from conn.extra_dejson in __init__ + # But we mocked get_connection, so __init__ called it. + # Check if hook.use_beeline is set correctly + assert hook.use_beeline is True + + cmd = hook._prepare_cli_cmd() + assert cmd[0] == "/custom/path/to/beeline" + + @mock.patch("airflow.providers.apache.hive.hooks.hive.HiveCliHook.get_connection") + def test_prepare_cli_cmd_default(self, mock_get_connection): + # Mock connection without beeline_path + conn = mock.MagicMock() + conn.extra_dejson = {"use_beeline": True} + conn.host = "localhost" + conn.port = 10000 + conn.schema = "default" + mock_get_connection.return_value = conn + + hook = KyuubiHook() + + cmd = hook._prepare_cli_cmd() + assert cmd[0] == "beeline" + + def test_get_connection_form_widgets(self): + widgets = KyuubiHook.get_connection_form_widgets() + assert "beeline_path" in widgets + assert "use_beeline" in widgets diff --git a/providers/apache/kyuubi/tests/unit/apache/kyuubi/operators/test_kyuubi_operator.py b/providers/apache/kyuubi/tests/unit/apache/kyuubi/operators/test_kyuubi_operator.py new file mode 100644 index 0000000000000..bb2022353cb0c --- /dev/null +++ b/providers/apache/kyuubi/tests/unit/apache/kyuubi/operators/test_kyuubi_operator.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import unittest + +from unittest import mock + +from airflow.providers.apache.kyuubi.hooks.kyuubi import KyuubiHook +from airflow.providers.apache.kyuubi.operators.kyuubi import KyuubiOperator + + +class TestKyuubiOperator(unittest.TestCase): + def test_init(self): + op = KyuubiOperator(task_id="test", hql="SELECT 1") + assert op.kyuubi_conn_id == "kyuubi_default" + # Check that it passes kyuubi_conn_id as hive_cli_conn_id to base class + assert op.hive_cli_conn_id == "kyuubi_default" + + @mock.patch("airflow.providers.apache.hive.hooks.hive.HiveCliHook.get_connection") + def test_hook_property(self, mock_get_connection): + conn = mock.MagicMock() + conn.extra_dejson = {} + mock_get_connection.return_value = conn + + op = KyuubiOperator(task_id="test", hql="SELECT 1", kyuubi_conn_id="my_kyuubi_conn") + hook = op.hook + assert isinstance(hook, KyuubiHook) + mock_get_connection.assert_called_with("my_kyuubi_conn") From 32ae14c82620660e2a4e5f36018e7d429d016e90 Mon Sep 17 00:00:00 2001 From: zhuoyuchen <> Date: Tue, 10 Feb 2026 15:58:36 +0800 Subject: [PATCH 2/3] Add Apache Kyuubi Provider --- providers/apache/kyuubi/docs/operators.rst | 3 + .../providers/apache/kyuubi/hooks/kyuubi.py | 126 +++++++++++++++++- .../apache/kyuubi/operators/kyuubi.py | 14 +- .../apache/kyuubi/hooks/test_kyuubi_hook.py | 11 +- .../kyuubi/operators/test_kyuubi_operator.py | 12 +- 5 files changed, 159 insertions(+), 7 deletions(-) diff --git a/providers/apache/kyuubi/docs/operators.rst b/providers/apache/kyuubi/docs/operators.rst index 0e8465b6a269c..bb8320ff6ebcf 100644 --- a/providers/apache/kyuubi/docs/operators.rst +++ b/providers/apache/kyuubi/docs/operators.rst @@ -34,4 +34,7 @@ Using the Operator task_id="kyuubi_task", hql="SELECT * FROM table", kyuubi_conn_id="kyuubi_default", + spark_queue="root.default", + spark_app_name="my_kyuubi_app", + spark_conf={"spark.executor.memory": "4g"}, ) diff --git a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/kyuubi.py b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/kyuubi.py index 49484ce9fe14a..00aa9144b3501 100644 --- a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/kyuubi.py +++ b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/hooks/kyuubi.py @@ -16,13 +16,16 @@ # under the License. from __future__ import annotations +from tempfile import NamedTemporaryFile, TemporaryDirectory from typing import Any from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import StringField -from airflow.providers.apache.hive.hooks.hive import HiveCliHook +from airflow.exceptions import AirflowException +from airflow.providers.apache.hive.hooks.hive import HiveCliHook, get_context_from_env_var +import subprocess class KyuubiHook(HiveCliHook): @@ -40,8 +43,125 @@ class KyuubiHook(HiveCliHook): conn_type = "kyuubi" hook_name = "Kyuubi" - def __init__(self, kyuubi_conn_id: str = default_conn_name, **kwargs) -> None: - super().__init__(hive_cli_conn_id=kyuubi_conn_id, **kwargs) + def __init__( + self, + kyuubi_conn_id: str = default_conn_name, + spark_queue: str | None = None, + spark_app_name: str | None = None, + spark_sql_shuffle_partitions: str | None = None, + spark_conf: dict[str, str] | None = None, + **kwargs, + ) -> None: + super().__init__( + hive_cli_conn_id=kyuubi_conn_id, + mapred_queue=spark_queue, + mapred_job_name=spark_app_name, + **kwargs, + ) + self.kyuubi_conn_id = kyuubi_conn_id + self.spark_sql_shuffle_partitions = spark_sql_shuffle_partitions + self.spark_conf = spark_conf or {} + + def run_cli( + self, + hql: str, + schema: str | None = None, + verbose: bool = True, + hive_conf: dict[Any, Any] | None = None, + ) -> Any: + """ + Execute the hql using the hive cli (via beeline) for Kyuubi. + + :param hql: hql to be executed. + :param schema: hive schema to use. + :param verbose: whether to print verbose logs. + :param hive_conf: hive_conf to execute alone with the hql. + """ + conn = self.conn + schema = schema or conn.schema or "" + + if schema: + hql = f"USE {schema};\n{hql}" + + with TemporaryDirectory(prefix="airflow_kyuubi_") as tmp_dir, NamedTemporaryFile(dir=tmp_dir) as f: + hql += "\n" + f.write(hql.encode("UTF-8")) + f.flush() + hive_cmd = self._prepare_cli_cmd() + env_context = get_context_from_env_var() + if hive_conf: + env_context.update(hive_conf) + + # Spark SQL configs + spark_conf_params = self._prepare_hiveconf(env_context) + if self.mapred_queue: + spark_conf_params.extend(["--hiveconf", f"spark.yarn.queue={self.mapred_queue}"]) + if self.mapred_job_name: + spark_conf_params.extend(["--hiveconf", f"spark.app.name={self.mapred_job_name}"]) + if self.spark_sql_shuffle_partitions: + spark_conf_params.extend( + ["--hiveconf", f"spark.sql.shuffle.partitions={self.spark_sql_shuffle_partitions}"] + ) + + # User provided Spark configs + for key, value in self.spark_conf.items(): + spark_conf_params.extend(["--hiveconf", f"{key}={value}"]) + + # Error detection patterns + kyuubi_error = "error='Cannot allocate memory'" + jdbc_error = "Error: Could not open client transport with JDBC Uri:" + custom_err_code = "ERROR111" + err_content_list = [kyuubi_error, jdbc_error, custom_err_code] + + hive_cmd.extend(spark_conf_params) + hive_cmd.extend(["-f", f.name]) + hive_cmd.extend( + [ + f"; if [ $? -ne 0 ];then echo {custom_err_code} ;exit 111; fi ; " + ] + ) + + def __contains__(content_list, total_content): + for content in content_list: + if content in total_content: + self.log.error("Detected error content: %s", content) + return True + return False + + if verbose: + self.log.info("Executing command: %s", " ".join(hive_cmd)) + + sub_process = subprocess.Popen( + hive_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=tmp_dir, close_fds=True + ) + self.sub_process = sub_process + stdout = "" + while True: + if sub_process.stdout: + line = sub_process.stdout.readline() + if not line: + break + line_str = line.decode("UTF-8") + stdout += line_str + if verbose: + self.log.info(line_str.strip()) + + if __contains__(err_content_list, line_str): + self.log.error("Detected Kyuubi-Spark session timeout or memory overflow error") + raise AirflowException(f"Execution failed, detected error: {line_str}") + else: + break + + sub_process.wait() + + if sub_process.returncode: + self.log.error("Command exited with non-zero code: %s", sub_process.returncode) + raise AirflowException( + f"Command execution failed, return code: {sub_process.returncode}\nOutput: {stdout}" + ) + + self.log.info("Successfully executed using Kyuubi connection %s", self.kyuubi_conn_id) + return stdout @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: diff --git a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/kyuubi.py b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/kyuubi.py index bc0ca3a635629..150bb771d5449 100644 --- a/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/kyuubi.py +++ b/providers/apache/kyuubi/src/airflow/providers/apache/kyuubi/operators/kyuubi.py @@ -47,20 +47,30 @@ def __init__( self, *, kyuubi_conn_id: str = "kyuubi_default", + spark_queue: str | None = None, + spark_app_name: str | None = None, + spark_sql_shuffle_partitions: str | None = None, + spark_conf: dict[str, str] | None = None, **kwargs: Any, ) -> None: # Pass kyuubi_conn_id as hive_cli_conn_id to parent super().__init__(hive_cli_conn_id=kyuubi_conn_id, **kwargs) self.kyuubi_conn_id = kyuubi_conn_id + self.spark_queue = spark_queue + self.spark_app_name = spark_app_name + self.spark_sql_shuffle_partitions = spark_sql_shuffle_partitions + self.spark_conf = spark_conf @cached_property def hook(self) -> KyuubiHook: """Get Kyuubi hook.""" return KyuubiHook( kyuubi_conn_id=self.kyuubi_conn_id, - mapred_queue=self.mapred_queue, + spark_queue=self.spark_queue, + spark_app_name=self.spark_app_name, + spark_sql_shuffle_partitions=self.spark_sql_shuffle_partitions, + spark_conf=self.spark_conf, mapred_queue_priority=self.mapred_queue_priority, - mapred_job_name=self.mapred_job_name, hive_cli_params=self.hive_cli_params, auth=self.auth, proxy_user=self.proxy_user, diff --git a/providers/apache/kyuubi/tests/unit/apache/kyuubi/hooks/test_kyuubi_hook.py b/providers/apache/kyuubi/tests/unit/apache/kyuubi/hooks/test_kyuubi_hook.py index c51d71e1c0d11..242853e7e09f5 100644 --- a/providers/apache/kyuubi/tests/unit/apache/kyuubi/hooks/test_kyuubi_hook.py +++ b/providers/apache/kyuubi/tests/unit/apache/kyuubi/hooks/test_kyuubi_hook.py @@ -29,10 +29,19 @@ def test_init(self, mock_get_connection): conn.extra_dejson = {} mock_get_connection.return_value = conn - hook = KyuubiHook() + hook = KyuubiHook( + spark_queue="test_queue", + spark_app_name="test_app", + spark_sql_shuffle_partitions="200", + spark_conf={"spark.executor.memory": "4g"}, + ) assert hook.conn_name_attr == "kyuubi_conn_id" assert hook.default_conn_name == "kyuubi_default" assert hook.conn_type == "kyuubi" + assert hook.mapred_queue == "test_queue" + assert hook.mapred_job_name == "test_app" + assert hook.spark_sql_shuffle_partitions == "200" + assert hook.spark_conf == {"spark.executor.memory": "4g"} @mock.patch("airflow.providers.apache.hive.hooks.hive.HiveCliHook.get_connection") def test_prepare_cli_cmd_with_custom_beeline(self, mock_get_connection): diff --git a/providers/apache/kyuubi/tests/unit/apache/kyuubi/operators/test_kyuubi_operator.py b/providers/apache/kyuubi/tests/unit/apache/kyuubi/operators/test_kyuubi_operator.py index bb2022353cb0c..0fe69883e305c 100644 --- a/providers/apache/kyuubi/tests/unit/apache/kyuubi/operators/test_kyuubi_operator.py +++ b/providers/apache/kyuubi/tests/unit/apache/kyuubi/operators/test_kyuubi_operator.py @@ -37,7 +37,17 @@ def test_hook_property(self, mock_get_connection): conn.extra_dejson = {} mock_get_connection.return_value = conn - op = KyuubiOperator(task_id="test", hql="SELECT 1", kyuubi_conn_id="my_kyuubi_conn") + op = KyuubiOperator( + task_id="test", + hql="SELECT 1", + kyuubi_conn_id="my_kyuubi_conn", + spark_queue="test_queue", + spark_app_name="test_app", + spark_conf={"key": "value"}, + ) hook = op.hook assert isinstance(hook, KyuubiHook) mock_get_connection.assert_called_with("my_kyuubi_conn") + assert hook.mapred_queue == "test_queue" + assert hook.mapred_job_name == "test_app" + assert hook.spark_conf == {"key": "value"} From 1889d8395c8bcb442abbb9186f4028ce189f4051 Mon Sep 17 00:00:00 2001 From: zhuoyuchen <> Date: Tue, 10 Feb 2026 16:24:18 +0800 Subject: [PATCH 3/3] fix --- providers/apache/kyuubi/docs/commits.rst | 2 +- providers/apache/kyuubi/docs/index.rst | 3 +++ providers/apache/kyuubi/provider.yaml | 4 ++++ pyproject.toml | 8 ++++++++ scripts/ci/docker-compose/remove-sources.yml | 1 + scripts/ci/docker-compose/tests-sources.yml | 1 + 6 files changed, 18 insertions(+), 1 deletion(-) diff --git a/providers/apache/kyuubi/docs/commits.rst b/providers/apache/kyuubi/docs/commits.rst index 85ba7d47b94d3..dbcbe6933b93d 100644 --- a/providers/apache/kyuubi/docs/commits.rst +++ b/providers/apache/kyuubi/docs/commits.rst @@ -23,7 +23,7 @@ .. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN! Package apache-airflow-providers-apache-kyuubi ------------------------------------------------------- +---------------------------------------------- `Apache Kyuubi `__ diff --git a/providers/apache/kyuubi/docs/index.rst b/providers/apache/kyuubi/docs/index.rst index 36b1ba1016fae..fcd3f0a48e098 100644 --- a/providers/apache/kyuubi/docs/index.rst +++ b/providers/apache/kyuubi/docs/index.rst @@ -52,3 +52,6 @@ Package apache-airflow-providers-apache-kyuubi `Apache Kyuubi `__ is a distributed and multi-tenant gateway to provide serverless SQL on Lakehouses. Release: 1.0.0 + +:doc:`Detailed list of commits ` + diff --git a/providers/apache/kyuubi/provider.yaml b/providers/apache/kyuubi/provider.yaml index 55afa1b122948..2bdd8e68cd7e2 100644 --- a/providers/apache/kyuubi/provider.yaml +++ b/providers/apache/kyuubi/provider.yaml @@ -26,6 +26,10 @@ source-date-epoch: 1700000000 versions: - 1.0.0 +dependencies: + - apache-airflow>=2.8.0 + - apache-airflow-providers-apache-hive>=6.0.0 + integrations: - integration-name: Apache Kyuubi external-doc-url: https://kyuubi.apache.org/ diff --git a/pyproject.toml b/pyproject.toml index 0d23b828970fe..d653944e55346 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -138,6 +138,9 @@ packages = [] "apache.kylin" = [ "apache-airflow-providers-apache-kylin>=3.8.0" ] +"apache.kyuubi" = [ + "apache-airflow-providers-apache-kyuubi>=1.0.0" # Set from local provider pyproject.toml +] "apache.livy" = [ "apache-airflow-providers-apache-livy>=3.9.2" ] @@ -407,6 +410,7 @@ packages = [] "apache-airflow-providers-apache-impala>=1.5.2", "apache-airflow-providers-apache-kafka>=1.6.1", "apache-airflow-providers-apache-kylin>=3.8.0", + "apache-airflow-providers-apache-kyuubi>=1.0.0", "apache-airflow-providers-apache-livy>=3.9.2", "apache-airflow-providers-apache-pig>=4.6.0", "apache-airflow-providers-apache-pinot>=4.5.1", @@ -1074,6 +1078,8 @@ mypy_path = [ "$MYPY_CONFIG_FILE_DIR/providers/apache/kafka/tests", "$MYPY_CONFIG_FILE_DIR/providers/apache/kylin/src", "$MYPY_CONFIG_FILE_DIR/providers/apache/kylin/tests", + "$MYPY_CONFIG_FILE_DIR/providers/apache/kyuubi/src", + "$MYPY_CONFIG_FILE_DIR/providers/apache/kyuubi/tests", "$MYPY_CONFIG_FILE_DIR/providers/apache/livy/src", "$MYPY_CONFIG_FILE_DIR/providers/apache/livy/tests", "$MYPY_CONFIG_FILE_DIR/providers/apache/pig/src", @@ -1383,6 +1389,7 @@ apache-airflow-providers-apache-iceberg = { workspace = true } apache-airflow-providers-apache-impala = { workspace = true } apache-airflow-providers-apache-kafka = { workspace = true } apache-airflow-providers-apache-kylin = { workspace = true } +apache-airflow-providers-apache-kyuubi = { workspace = true } apache-airflow-providers-apache-livy = { workspace = true } apache-airflow-providers-apache-pig = { workspace = true } apache-airflow-providers-apache-pinot = { workspace = true } @@ -1512,6 +1519,7 @@ members = [ "providers/apache/impala", "providers/apache/kafka", "providers/apache/kylin", + "providers/apache/kyuubi", "providers/apache/livy", "providers/apache/pig", "providers/apache/pinot", diff --git a/scripts/ci/docker-compose/remove-sources.yml b/scripts/ci/docker-compose/remove-sources.yml index 245200fe76e44..e0b03c507eeac 100644 --- a/scripts/ci/docker-compose/remove-sources.yml +++ b/scripts/ci/docker-compose/remove-sources.yml @@ -40,6 +40,7 @@ services: - ../../../empty:/opt/airflow/providers/apache/impala/src - ../../../empty:/opt/airflow/providers/apache/kafka/src - ../../../empty:/opt/airflow/providers/apache/kylin/src + - ../../../empty:/opt/airflow/providers/apache/kyuubi/src - ../../../empty:/opt/airflow/providers/apache/livy/src - ../../../empty:/opt/airflow/providers/apache/pig/src - ../../../empty:/opt/airflow/providers/apache/pinot/src diff --git a/scripts/ci/docker-compose/tests-sources.yml b/scripts/ci/docker-compose/tests-sources.yml index e8f3c9a5fafab..3f2daacb20953 100644 --- a/scripts/ci/docker-compose/tests-sources.yml +++ b/scripts/ci/docker-compose/tests-sources.yml @@ -53,6 +53,7 @@ services: - ../../../providers/apache/impala/tests:/opt/airflow/providers/apache/impala/tests - ../../../providers/apache/kafka/tests:/opt/airflow/providers/apache/kafka/tests - ../../../providers/apache/kylin/tests:/opt/airflow/providers/apache/kylin/tests + - ../../../providers/apache/kyuubi/tests:/opt/airflow/providers/apache/kyuubi/tests - ../../../providers/apache/livy/tests:/opt/airflow/providers/apache/livy/tests - ../../../providers/apache/pig/tests:/opt/airflow/providers/apache/pig/tests - ../../../providers/apache/pinot/tests:/opt/airflow/providers/apache/pinot/tests