Skip to content

Commit

Permalink
Merge pull request #1 from PayLead/thomaub/chdb-as-driver
Browse files Browse the repository at this point in the history
feat: first working implementation of chdb as driver
  • Loading branch information
ThomAub authored Oct 18, 2024
2 parents 51bf0e1 + 218abe8 commit 855440d
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 58 deletions.
178 changes: 178 additions & 0 deletions dbt/adapters/clickhouse/chdbclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import json
import uuid
from pathlib import Path
from typing import List

import pkg_resources
from chdb import session, ChdbError
from chdb.dbapi import converters
from dbt.adapters.__about__ import version as dbt_adapters_version
from dbt_common.exceptions import DbtDatabaseError

from dbt.adapters.clickhouse import ClickHouseColumn, ClickHouseCredentials
from dbt.adapters.clickhouse.__version__ import version as dbt_clickhouse_version
from dbt.adapters.clickhouse.dbclient import ChClientWrapper, ChRetryableException
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier

try:
driver_version = pkg_resources.get_distribution("chdb").version
except pkg_resources.ResolutionError:
driver_version = "unknown"


class ChDBClient(ChClientWrapper):
def query(self, sql, **kwargs):
# TODO: we might need to preprocess `sql`
try:
result = self._client.query(sql, "JSON", **kwargs)
result = CHDBResult(result=result)
result.read()
return result
except CHDBResultError as ex:
raise DbtDatabaseError(f"reading result from chdb query using json failed: {str(ex).strip()}") from ex
except ChdbError as ex:
raise DbtDatabaseError(f"chdb query failed with exception: {str(ex).strip()}") from ex
except Exception as ex:
raise DbtDatabaseError(str(ex).strip()) from ex

def command(self, sql, **kwargs):
try:
result = self._client.query(sql, **kwargs)
if result.has_error():
raise DbtDatabaseError(str(result.error_message.strip()))
elif result.size() == 0:
return True
else:
result = int(result.data())
return result
except Exception as ex:
raise DbtDatabaseError(f"chdb command failed with exception: {str(ex).strip()}") from ex


def columns_in_query(self, sql: str, **kwargs) -> List[ClickHouseColumn]:
try:
query_result = self._client.query(
f"SELECT * FROM ( \n" f"{sql} \n" f") LIMIT 0",
**kwargs,
)
return [
ClickHouseColumn.create(name, ch_type.name)
for name, ch_type in zip(query_result.column_names, query_result.column_types)
]
except ChdbError as ex:
raise DbtDatabaseError(f"chdb columns_in_query failed with exception: {str(ex).strip()}") from ex
except Exception as ex:
raise DbtDatabaseError(str(ex).strip()) from ex

def get_ch_setting(self, setting_name):
try:
result = self._client.query(
f"SELECT value, readonly FROM system.settings WHERE name = '{setting_name}'",
"JSON",
)
if result.has_error():
raise DbtDatabaseError(str(result.error_message.strip()))
else:
result = json.loads(result.data())
result = result["data"][0]
return (result["value"], int(result["readonly"])) if result else (None, 0)
except Exception as ex:
logger.warning("Unexpected error retrieving ClickHouse server setting", ex)
return None

def close(self):
pass
# self._client.cleanup()

def _create_client(self, credentials: ClickHouseCredentials):
chdb_state_dir = Path(credentials.chdb_state_dir)

if not chdb_state_dir.exists():
logger.debug(f"Provided chdb_state_dir doesn't exist: {chdb_state_dir}")
chdb_state_dir.mkdir(parents=True, exist_ok=True)

session_dir = chdb_state_dir / f"{self._conn_settings['session_id']}"
logger.info(f"Provided session_dir: {session_dir}")
client = session.Session(path=session_dir.as_posix())

chdb_dump_dir = Path(credentials.chdb_dump_dir)
chdb_dump_files = list(chdb_dump_dir.glob("*.sql"))
if len(chdb_dump_files) == 0:
logger.warning(f"Provided chdb_dump_files is empty: {chdb_dump_files}")
return

for chdb_dump_file in chdb_dump_files:
sql_content = chdb_dump_file.read_text()
try:
client.query(sql_content)
except ChdbError as ex:
raise DbtDatabaseError(f"client creation failed with exception: {str(ex).strip()}") from ex
return client

def _set_client_database(self):
pass

def _server_version(self):
return self._client.query("select version()").data().strip().replace('"', "")


class CHDBResultError(Exception):
pass


# TODO: This is from https://github.com/chdb-io/chdb/blob/e326128df44248b187b4f421bf6a5c796791b2dc/chdb/dbapi/connections.py#L175C1-L217C70
# We might want to use the dbApi instead
class CHDBResult:
def __init__(self, result):
"""
:type connection: Connection
"""
self.result = result
self.affected_rows = 0
self.insert_id = None
self.warning_count = 0
self.message = None
self.field_count = 0
self.description = None
self.rows = None
self.has_next = None
self.result_set = None
self.column_names = None

def read(self):
# Handle empty responses (for instance from CREATE TABLE)
if self.result is None:
return

if self.result.has_error():
raise CHDBResultError(str(self.result.error_message.strip()))

try:
data = json.loads(self.result.data())
except Exception as error:
raise CHDBResultError("Unexpected error when loading query result in JSON") from error

try:
self.field_count = len(data["meta"])
description = []
column_names = []
for meta in data["meta"]:
fields = [meta["name"], meta["type"]]
column_names.append(meta["name"])
description.append(tuple(fields))
self.description = tuple(description)
self.column_names = column_names
rows = []
for line in data["data"]:
row = []
for i in range(self.field_count):
column_data = converters.convert_column_data(
self.description[i][1], line[self.description[i][0]]
)
row.append(column_data)
rows.append(tuple(row))
self.rows = tuple(rows)
self.result_set = tuple(rows)
except Exception as error:
raise CHDBResultError("Read return data err") from error
2 changes: 2 additions & 0 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class ClickHouseCredentials(Credentials):
local_db_prefix: str = ''
allow_automatic_deduplication: bool = False
tcp_keepalive: Union[bool, tuple[int, int, int], list[int]] = False
chdb_state_dir: str = ""
chdb_dump_dir: str = ""

@property
def type(self):
Expand Down
18 changes: 16 additions & 2 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ def get_db_client(credentials: ClickHouseCredentials):
elif driver == 'native':
if not port:
port = 9440 if credentials.secure else 9000
elif driver == "chdb":
logger.debug(f"using chdb driver with {credentials}")
else:
raise FailedToConnectError(f'Unrecognized ClickHouse driver {driver}')
raise FailedToConnectError(f"Unrecognized ClickHouse driver {driver}")

credentials.driver = driver
credentials.port = port
Expand All @@ -56,8 +58,20 @@ def get_db_client(credentials: ClickHouseCredentials):
return ChNativeClient(credentials)
except ImportError as ex:
raise FailedToConnectError(
'Native adapter required but package clickhouse-driver is not installed'
"Native adapter required but package clickhouse-driver is not installed"
) from ex
elif driver == "chdb":
try:
import chdb

from dbt.adapters.clickhouse.chdbclient import ChDBClient

return ChDBClient(credentials)
except ImportError as ex:
raise FailedToConnectError(
"chDB adapter required but package chdb is not installed"
) from ex

try:
import clickhouse_connect # noqa

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/logger.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from dbt.adapters.events.logging import AdapterLogger

logger = AdapterLogger('dbt_clickhouse')
logger = AdapterLogger("dbt_clickhouse")
8 changes: 4 additions & 4 deletions dbt/adapters/clickhouse/nativeclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from dbt.adapters.clickhouse.logger import logger

try:
driver_version = pkg_resources.get_distribution('clickhouse-driver').version
driver_version = pkg_resources.get_distribution("clickhouse-driver").version
except pkg_resources.ResolutionError:
driver_version = 'unknown'
driver_version = "unknown"


class ChNativeClient(ChClientWrapper):
Expand Down Expand Up @@ -48,7 +48,7 @@ def get_ch_setting(self, setting_name):
f"SELECT value, readonly FROM system.settings WHERE name = '{setting_name}'"
)
except clickhouse_driver.errors.Error as ex:
logger.warn('Unexpected error retrieving ClickHouse server setting', ex)
logger.warning("Unexpected error retrieving ClickHouse server setting", ex)
return None
return (result[0][0], result[0][1]) if result else (None, 0)

Expand Down Expand Up @@ -88,7 +88,7 @@ def _set_client_database(self):
def _server_version(self):
server_info = self._client.connection.server_info
return (
f'{server_info.version_major}.{server_info.version_minor}.{server_info.version_patch}'
f"{server_info.version_major}.{server_info.version_minor}.{server_info.version_patch}"
)


Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.utils import deep_merge

from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier

NODE_TYPE_SOURCE = 'source'
Expand Down Expand Up @@ -127,6 +128,11 @@ def create_from(
relation_config.config.get('engine') if relation_config.config.get('engine') else ''
)
can_on_cluster = cls.get_on_cluster(cluster, materialized, engine)
if quoting.credentials.driver == "chdb":
logger.debug("Driver is chDB, forcing engine to be MergeTree")
engine = "MergeTree"
relation_config.config.engine = engine
can_on_cluster = False

return cls.create(
database='',
Expand Down
15 changes: 7 additions & 8 deletions examples/taxis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ CREATE TABLE taxis.trips (
)
ENGINE = MergeTree
ORDER BY trip_id;

SET input_format_skip_unknown_fields = 1;

INSERT INTO taxis.trips
Expand Down Expand Up @@ -62,26 +62,25 @@ FROM s3(

## Create a dbt profile entry

Use the following profile to create the associated dbt profile in the dbt_profiles.yml in ~/.dbt
Use the following profile to create the associated dbt profile in the dbt_profiles.yml in ~/.dbt

```yml
taxis:
outputs:

dev:
type: clickhouse
threads: 4
threads: 4
host: localhost
port: 8123
port: 8123
user: dbt_test
password: dbt_password
use_lw_deletes: true
schema: taxis_dbt
schema: taxis_dbt

target: dev

```
## Run the model
`dbt run` in this directory should execute the model. Each run will create a somewhat larger dataset (by adding
`dbt run` in this directory should execute the model. Each run will create a somewhat larger dataset (by adding
additional random trip_ids).
23 changes: 23 additions & 0 deletions examples/taxis/dump/taxi.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE DATABASE taxis;

CREATE TABLE taxis.trips (
trip_id UInt32,
pickup_datetime DateTime,
dropoff_datetime DateTime,
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count UInt8,
trip_distance Float32,
fare_amount Float32,
extra Float32,
tip_amount Float32,
tolls_amount Float32,
total_amount Float32,
payment_type LowCardinality(String),
pickup_ntaname LowCardinality(String),
dropoff_ntaname LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY trip_id;
16 changes: 8 additions & 8 deletions examples/taxis/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ models:
- name: trips_inc
description: NY Taxi dataset from S3
config:
materialized: incremental
materialized: table
order_by: rand_trip_id
unique_key: rand_trip_id

- name: trips_rand
description: Random indexes to apply to incremental materialization
config:
materialized: incremental
order_by: date_time
uniq_id: date_time
incremental_strategy: append
# - name: trips_rand
# description: Random indexes to apply to incremental materialization
# config:
# materialized: incremental
# order_by: date_time
# uniq_id: date_time
# incremental_strategy: append
13 changes: 5 additions & 8 deletions examples/taxis/models/trips_inc.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
with (select start, end from {{ ref('trips_rand') }} ORDER BY date_time DESC LIMIT 1) as range,
(select count() from {{ ref('trips_rand') }}) as run_num

select rand() as rand_trip_id, * EXCEPT trip_id, run_num, trip_id as orig_id from {{ source('taxis_source', 'trips') }}
LEFT JOIN numbers(24) as sysnum ON 1 = 1
where bitAnd(orig_id, 1023) between range.1 and range.2


select
rand() as rand_trip_id,
*,
trip_id as orig_id
from {{ source('taxis_source', 'trips') }}
1 change: 0 additions & 1 deletion examples/taxis/models/trips_rand.sql

This file was deleted.

Loading

0 comments on commit 855440d

Please sign in to comment.