diff --git a/ibis-server/app/model/__init__.py b/ibis-server/app/model/__init__.py index 0bd588d2a..03ef9a4f3 100644 --- a/ibis-server/app/model/__init__.py +++ b/ibis-server/app/model/__init__.py @@ -47,6 +47,10 @@ class QuerySnowflakeDTO(QueryDTO): connection_info: SnowflakeConnectionInfo = connection_info_field +class QueryDruidDTO(QueryDTO): + connection_info: DruidConnectionInfo = connection_info_field + + class QueryTrinoDTO(QueryDTO): connection_info: ConnectionUrl | TrinoConnectionInfo = connection_info_field @@ -76,6 +80,12 @@ class ClickHouseConnectionInfo(BaseModel): password: SecretStr +class DruidConnectionInfo(BaseModel): + host: SecretStr = Field(examples=["localhost"]) + port: SecretStr = Field(examples=[8082]) + path: str + + class MSSqlConnectionInfo(BaseModel): host: SecretStr port: SecretStr diff --git a/ibis-server/app/model/data_source.py b/ibis-server/app/model/data_source.py index ea271f6aa..848ad5e33 100644 --- a/ibis-server/app/model/data_source.py +++ b/ibis-server/app/model/data_source.py @@ -13,12 +13,14 @@ CannerConnectionInfo, ClickHouseConnectionInfo, ConnectionInfo, + DruidConnectionInfo, MSSqlConnectionInfo, MySqlConnectionInfo, PostgresConnectionInfo, QueryBigQueryDTO, QueryCannerDTO, QueryClickHouseDTO, + QueryDruidDTO, QueryDTO, QueryMSSqlDTO, QueryMySqlDTO, @@ -39,6 +41,7 @@ class DataSource(StrEnum): postgres = auto() snowflake = auto() trino = auto() + druid = auto() def get_connection(self, info: ConnectionInfo) -> BaseBackend: try: @@ -62,6 +65,7 @@ class DataSourceExtension(Enum): postgres = QueryPostgresDTO snowflake = QuerySnowflakeDTO trino = QueryTrinoDTO + druid = QueryDruidDTO def __init__(self, dto: QueryDTO): self.dto = dto @@ -108,6 +112,14 @@ def get_clickhouse_connection(info: ClickHouseConnectionInfo) -> BaseBackend: password=info.password.get_secret_value(), ) + @staticmethod + def get_druid_connection(info: DruidConnectionInfo) -> BaseBackend: + return ibis.druid.connect( + host=info.host.get_secret_value(), + port=int(info.port.get_secret_value()), + path=info.path, + ) + @classmethod def get_mssql_connection(cls, info: MSSqlConnectionInfo) -> BaseBackend: return ibis.mssql.connect( diff --git a/ibis-server/pyproject.toml b/ibis-server/pyproject.toml index 2c85b0e58..55f64b50f 100644 --- a/ibis-server/pyproject.toml +++ b/ibis-server/pyproject.toml @@ -51,6 +51,7 @@ markers = [ "bigquery: mark a test as a bigquery test", "canner: mark a test as a canner test", "clickhouse: mark a test as a clickhouse test", + "druid: mark a test as a druid test", "mssql: mark a test as a mssql test", "mysql: mark a test as a mysql test", "postgres: mark a test as a postgres test", diff --git a/ibis-server/tests/resource/druid/docker-compose.yml b/ibis-server/tests/resource/druid/docker-compose.yml new file mode 100644 index 000000000..0cc0fad0f --- /dev/null +++ b/ibis-server/tests/resource/druid/docker-compose.yml @@ -0,0 +1,116 @@ +version: "2.2" + +volumes: + metadata_data: {} + middle_var: {} + historical_var: {} + broker_var: {} + coordinator_var: {} + router_var: {} + druid_shared: {} + + +services: + postgres: + container_name: postgres + image: postgres:latest + ports: + - "5432:5432" + volumes: + - metadata_data:/var/lib/postgresql/data + environment: + - POSTGRES_PASSWORD=FoolishPassword + - POSTGRES_USER=druid + - POSTGRES_DB=druid + + # Need 3.5 or later for container nodes + zookeeper: + container_name: zookeeper + image: zookeeper:3.5.10 + ports: + - "2181:2181" + environment: + - ZOO_MY_ID=1 + + coordinator: + image: apache/druid:31.0.0 + container_name: coordinator + volumes: + - druid_shared:/opt/shared + - coordinator_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + ports: + - "8081:8081" + command: + - coordinator + env_file: + - environment + + broker: + image: apache/druid:31.0.0 + container_name: broker + volumes: + - broker_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8082:8082" + command: + - broker + env_file: + - environment + + historical: + image: apache/druid:31.0.0 + container_name: historical + volumes: + - druid_shared:/opt/shared + - historical_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8083:8083" + command: + - historical + env_file: + - environment + + middlemanager: + image: apache/druid:31.0.0 + container_name: middlemanager + volumes: + - druid_shared:/opt/shared + - middle_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8091:8091" + - "8100-8105:8100-8105" + command: + - middleManager + env_file: + - environment + + router: + image: apache/druid:31.0.0 + container_name: router + volumes: + - router_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8888:8888" + command: + - router + env_file: + - environment diff --git a/ibis-server/tests/resource/druid/environment b/ibis-server/tests/resource/druid/environment new file mode 100644 index 000000000..3e1a8cc4e --- /dev/null +++ b/ibis-server/tests/resource/druid/environment @@ -0,0 +1,34 @@ +# Java tuning +#DRUID_XMX=1g +#DRUID_XMS=1g +#DRUID_MAXNEWSIZE=250m +#DRUID_NEWSIZE=250m +#DRUID_MAXDIRECTMEMORYSIZE=6172m +DRUID_SINGLE_NODE_CONF=micro-quickstart + +druid_emitter_logging_logLevel=debug + +druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-multi-stage-query", "druid-parquet-extensions"] + +druid_zk_service_host=zookeeper + +druid_metadata_storage_host= +druid_metadata_storage_type=postgresql +druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid +druid_metadata_storage_connector_user=druid +druid_metadata_storage_connector_password=FoolishPassword + +druid_coordinator_balancer_strategy=cachingCost + +druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"] +druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB + +druid_storage_type=local +druid_storage_storageDirectory=/opt/shared/segments +druid_indexer_logs_type=file +druid_indexer_logs_directory=/opt/shared/indexing-logs + +druid_processing_numThreads=2 +druid_processing_numMergeBuffers=2 + +DRUID_LOG4J= diff --git a/ibis-server/tests/routers/v2/connector/test_druid.py b/ibis-server/tests/routers/v2/connector/test_druid.py new file mode 100644 index 000000000..3dc8ca1b0 --- /dev/null +++ b/ibis-server/tests/routers/v2/connector/test_druid.py @@ -0,0 +1,92 @@ +import json +import time + +import pytest +import requests +from testcontainers.compose import DockerCompose + +pytestmark = pytest.mark.druid + +base_url = "/v2/connector/druid" + + +def wait_for_druid_service(url, timeout=300, interval=5): + """Wait for the Druid service to be ready. + + :param url: The URL to check. + :param timeout: The maximum time to wait (in seconds). + :param interval: The interval between checks (in seconds). + :return: True if the service is ready, False if the timeout is reached. + """ + start_time = time.time() + while time.time() - start_time < timeout: + try: + response = requests.get(url) + if response.status_code == 200: + return True + except requests.ConnectionError: + pass + time.sleep(interval) + return False + + +@pytest.fixture(scope="module") +def druid(request) -> DockerCompose: + with DockerCompose( + "tests/resource/druid", compose_file_name="docker-compose.yml", wait=True + ) as compose: + druid_url = "http://localhost:8081/status" + if not wait_for_druid_service(druid_url): + compose.stop() + raise Exception("Druid service did not become ready in time") + + yield compose + + +def test_create_datasource(druid: DockerCompose): + url = "http://localhost:8081/druid/indexer/v1/task" + payload = json.dumps( + { + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "orders", + "timestampSpec": {"column": "timestamp_column", "format": "auto"}, + "dimensionsSpec": { + "dimensions": ["dimension1", "dimension2", "dimension3"] + }, + "metricsSpec": [ + {"type": "count", "name": "count"}, + { + "type": "doubleSum", + "name": "metric1", + "fieldName": "metric1", + }, + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "day", + "queryGranularity": "none", + }, + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "local", + "baseDir": "tests/resource/tpch/data", + "filter": "orders.parquet", + }, + "inputFormat": {"type": "parquet"}, + }, + "tuningConfig": { + "type": "index_parallel", + "maxRowsPerSegment": 5000000, + "maxRowsInMemory": 25000, + }, + }, + } + ) + headers = {"Content-Type": "application/json"} + + response = requests.request("POST", url, headers=headers, data=payload) + assert response.status_code == 200