Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: apache druid datasource #882

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ibis-server/app/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class QuerySnowflakeDTO(QueryDTO):
connection_info: SnowflakeConnectionInfo = connection_info_field


class QueryDruidDTO(QueryDTO):
connection_info: DruidConnectionInfo = connection_info_field


class QueryTrinoDTO(QueryDTO):
connection_info: ConnectionUrl | TrinoConnectionInfo = connection_info_field

Expand Down Expand Up @@ -76,6 +80,12 @@ class ClickHouseConnectionInfo(BaseModel):
password: SecretStr


class DruidConnectionInfo(BaseModel):
host: SecretStr = Field(examples=["localhost"])
port: SecretStr = Field(examples=[8082])
path: str


class MSSqlConnectionInfo(BaseModel):
host: SecretStr
port: SecretStr
Expand Down
12 changes: 12 additions & 0 deletions ibis-server/app/model/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
CannerConnectionInfo,
ClickHouseConnectionInfo,
ConnectionInfo,
DruidConnectionInfo,
MSSqlConnectionInfo,
MySqlConnectionInfo,
PostgresConnectionInfo,
QueryBigQueryDTO,
QueryCannerDTO,
QueryClickHouseDTO,
QueryDruidDTO,
QueryDTO,
QueryMSSqlDTO,
QueryMySqlDTO,
Expand All @@ -39,6 +41,7 @@ class DataSource(StrEnum):
postgres = auto()
snowflake = auto()
trino = auto()
druid = auto()

def get_connection(self, info: ConnectionInfo) -> BaseBackend:
try:
Expand All @@ -62,6 +65,7 @@ class DataSourceExtension(Enum):
postgres = QueryPostgresDTO
snowflake = QuerySnowflakeDTO
trino = QueryTrinoDTO
druid = QueryDruidDTO

def __init__(self, dto: QueryDTO):
self.dto = dto
Expand Down Expand Up @@ -108,6 +112,14 @@ def get_clickhouse_connection(info: ClickHouseConnectionInfo) -> BaseBackend:
password=info.password.get_secret_value(),
)

@staticmethod
def get_druid_connection(info: DruidConnectionInfo) -> BaseBackend:
return ibis.druid.connect(
host=info.host.get_secret_value(),
port=int(info.port.get_secret_value()),
path=info.path,
)

@classmethod
def get_mssql_connection(cls, info: MSSqlConnectionInfo) -> BaseBackend:
return ibis.mssql.connect(
Expand Down
1 change: 1 addition & 0 deletions ibis-server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ markers = [
"bigquery: mark a test as a bigquery test",
"canner: mark a test as a canner test",
"clickhouse: mark a test as a clickhouse test",
"druid: mark a test as a druid test",
"mssql: mark a test as a mssql test",
"mysql: mark a test as a mysql test",
"postgres: mark a test as a postgres test",
Expand Down
116 changes: 116 additions & 0 deletions ibis-server/tests/resource/druid/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
version: "2.2"

volumes:
metadata_data: {}
middle_var: {}
historical_var: {}
broker_var: {}
coordinator_var: {}
router_var: {}
druid_shared: {}


services:
postgres:
container_name: postgres
image: postgres:latest
ports:
- "5432:5432"
volumes:
- metadata_data:/var/lib/postgresql/data
environment:
- POSTGRES_PASSWORD=FoolishPassword
- POSTGRES_USER=druid
- POSTGRES_DB=druid

# Need 3.5 or later for container nodes
zookeeper:
container_name: zookeeper
image: zookeeper:3.5.10
ports:
- "2181:2181"
environment:
- ZOO_MY_ID=1

coordinator:
image: apache/druid:31.0.0
container_name: coordinator
volumes:
- druid_shared:/opt/shared
- coordinator_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
ports:
- "8081:8081"
command:
- coordinator
env_file:
- environment

broker:
image: apache/druid:31.0.0
container_name: broker
volumes:
- broker_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
- coordinator
ports:
- "8082:8082"
command:
- broker
env_file:
- environment

historical:
image: apache/druid:31.0.0
container_name: historical
volumes:
- druid_shared:/opt/shared
- historical_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
- coordinator
ports:
- "8083:8083"
command:
- historical
env_file:
- environment

middlemanager:
image: apache/druid:31.0.0
container_name: middlemanager
volumes:
- druid_shared:/opt/shared
- middle_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
- coordinator
ports:
- "8091:8091"
- "8100-8105:8100-8105"
command:
- middleManager
env_file:
- environment

router:
image: apache/druid:31.0.0
container_name: router
volumes:
- router_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
- coordinator
ports:
- "8888:8888"
command:
- router
env_file:
- environment
34 changes: 34 additions & 0 deletions ibis-server/tests/resource/druid/environment
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Java tuning
#DRUID_XMX=1g
#DRUID_XMS=1g
#DRUID_MAXNEWSIZE=250m
#DRUID_NEWSIZE=250m
#DRUID_MAXDIRECTMEMORYSIZE=6172m
DRUID_SINGLE_NODE_CONF=micro-quickstart

druid_emitter_logging_logLevel=debug

druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-multi-stage-query", "druid-parquet-extensions"]

druid_zk_service_host=zookeeper

druid_metadata_storage_host=
druid_metadata_storage_type=postgresql
druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
druid_metadata_storage_connector_user=druid
druid_metadata_storage_connector_password=FoolishPassword

druid_coordinator_balancer_strategy=cachingCost

druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB

druid_storage_type=local
druid_storage_storageDirectory=/opt/shared/segments
druid_indexer_logs_type=file
druid_indexer_logs_directory=/opt/shared/indexing-logs

druid_processing_numThreads=2
druid_processing_numMergeBuffers=2

DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>
92 changes: 92 additions & 0 deletions ibis-server/tests/routers/v2/connector/test_druid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import json
import time

import pytest
import requests
from testcontainers.compose import DockerCompose

pytestmark = pytest.mark.druid

base_url = "/v2/connector/druid"


def wait_for_druid_service(url, timeout=300, interval=5):
"""Wait for the Druid service to be ready.

:param url: The URL to check.
:param timeout: The maximum time to wait (in seconds).
:param interval: The interval between checks (in seconds).
:return: True if the service is ready, False if the timeout is reached.
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(url)
if response.status_code == 200:
return True
except requests.ConnectionError:
pass
time.sleep(interval)
return False


@pytest.fixture(scope="module")
def druid(request) -> DockerCompose:
with DockerCompose(
"tests/resource/druid", compose_file_name="docker-compose.yml", wait=True
) as compose:
druid_url = "http://localhost:8081/status"
if not wait_for_druid_service(druid_url):
compose.stop()
raise Exception("Druid service did not become ready in time")

yield compose


def test_create_datasource(druid: DockerCompose):
url = "http://localhost:8081/druid/indexer/v1/task"
payload = json.dumps(
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "orders",
"timestampSpec": {"column": "timestamp_column", "format": "auto"},
"dimensionsSpec": {
"dimensions": ["dimension1", "dimension2", "dimension3"]
},
"metricsSpec": [
{"type": "count", "name": "count"},
{
"type": "doubleSum",
"name": "metric1",
"fieldName": "metric1",
},
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "day",
"queryGranularity": "none",
},
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "tests/resource/tpch/data",
"filter": "orders.parquet",
},
"inputFormat": {"type": "parquet"},
},
"tuningConfig": {
"type": "index_parallel",
"maxRowsPerSegment": 5000000,
"maxRowsInMemory": 25000,
},
},
}
)
headers = {"Content-Type": "application/json"}

response = requests.request("POST", url, headers=headers, data=payload)
assert response.status_code == 200
Loading