From eb06317b8fae6872c7f33a81044dad0677f6bdae Mon Sep 17 00:00:00 2001 From: Meir Elbaz Date: Thu, 30 Oct 2025 12:03:49 +0200 Subject: [PATCH 1/8] add executor parameter to AsyncClient --- clickhouse_connect/driver/__init__.py | 9 +++++++-- clickhouse_connect/driver/asyncclient.py | 18 +++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/clickhouse_connect/driver/__init__.py b/clickhouse_connect/driver/__init__.py index 250b9573..292aaff7 100644 --- a/clickhouse_connect/driver/__init__.py +++ b/clickhouse_connect/driver/__init__.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from inspect import signature from typing import Optional, Union, Dict, Any from urllib.parse import urlparse, parse_qs @@ -8,7 +9,7 @@ from clickhouse_connect.driver.common import dict_copy from clickhouse_connect.driver.exceptions import ProgrammingError from clickhouse_connect.driver.httpclient import HttpClient -from clickhouse_connect.driver.asyncclient import AsyncClient +from clickhouse_connect.driver.asyncclient import AsyncClient, NEW_THREAD_POOL_EXECUTOR # pylint: disable=too-many-arguments,too-many-locals,too-many-branches @@ -147,6 +148,7 @@ async def create_async_client(*, settings: Optional[Dict[str, Any]] = None, generic_args: Optional[Dict[str, Any]] = None, executor_threads: Optional[int] = None, + executor: Union[Optional[ThreadPoolExecutor], object] = NEW_THREAD_POOL_EXECUTOR, **kwargs) -> AsyncClient: """ The preferred method to get an async ClickHouse Connect Client instance. @@ -170,6 +172,9 @@ async def create_async_client(*, It is not recommended to use this parameter externally :param: executor_threads 'max_worker' threads used by the client ThreadPoolExecutor. If not set, the default of 4 + detected CPU cores will be used + :param executor: Optional `ThreadPoolExecutor` to use for async operations. If not set, a new `ThreadPoolExecutor` + will be created with the number of threads specified by `executor_threads`. If set to `None` it will use the + default executor of the event loop. :param kwargs -- Recognized keyword arguments (used by the HTTP client), see below :param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred @@ -215,4 +220,4 @@ def _create_client(): loop = asyncio.get_running_loop() _client = await loop.run_in_executor(None, _create_client) - return AsyncClient(client=_client, executor_threads=executor_threads) + return AsyncClient(client=_client, executor_threads=executor_threads, executor=executor) diff --git a/clickhouse_connect/driver/asyncclient.py b/clickhouse_connect/driver/asyncclient.py index 9dd46dd3..837e21c7 100644 --- a/clickhouse_connect/driver/asyncclient.py +++ b/clickhouse_connect/driver/asyncclient.py @@ -14,6 +14,9 @@ from clickhouse_connect.datatypes.base import ClickHouseType from clickhouse_connect.driver.insert import InsertContext +# Sentinel value to preserve default behavior and also allow passing `None` +NEW_THREAD_POOL_EXECUTOR = object() + # pylint: disable=too-many-public-methods,too-many-instance-attributes,too-many-arguments,too-many-positional-arguments,too-many-locals class AsyncClient: @@ -22,13 +25,20 @@ class AsyncClient: Internally, each of the methods that uses IO is wrapped in a call to EventLoop.run_in_executor. """ - def __init__(self, *, client: Client, executor_threads: int = 0): + def __init__(self, + *, + client: Client, + executor_threads: int | None = None, + executor: Union[Optional[ThreadPoolExecutor], object] = NEW_THREAD_POOL_EXECUTOR): if isinstance(client, HttpClient): client.headers['User-Agent'] = client.headers['User-Agent'].replace('mode:sync;', 'mode:async;') self.client = client if executor_threads == 0: executor_threads = min(32, (os.cpu_count() or 1) + 4) # Mimic the default behavior - self.executor = ThreadPoolExecutor(max_workers=executor_threads) + if executor is NEW_THREAD_POOL_EXECUTOR: + self.executor = ThreadPoolExecutor(max_workers=executor_threads) + else: + self.executor = executor def set_client_setting(self, key, value): """ @@ -69,7 +79,9 @@ async def close(self): Subclass implementation to close the connection to the server/deallocate the client """ self.client.close() - await asyncio.to_thread(self.executor.shutdown, True) + + if self.executor is None: + await asyncio.to_thread(self.executor.shutdown, True) async def query(self, query: Optional[str] = None, From fdc82791420086bbf05da66bb59ba7e729569370 Mon Sep 17 00:00:00 2001 From: Meir Elbaz Date: Thu, 30 Oct 2025 12:05:37 +0200 Subject: [PATCH 2/8] add note to change log --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f474c32..aa0c7b2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ The supported method of passing ClickHouse server settings is to prefix such arg ### Improvements - Added `utc_tz_aware` parameter to client and query methods to opt in to returning timezone-aware UTC objects for DateTime/DateTime64 columns. Default behavior remains the same and returns tz naive objects for backward compatibility. Note: this parameter will likely be removed and only return tz-aware dts in some future release. Closes [#566](https://github.com/ClickHouse/clickhouse-connect/issues/566) +- Added `executor` parameter to `AsyncClient` constructor to allow passing a custom executor for async operations. This allows users to control the concurrency and thread pool used by the async client. ## 0.9.2, 2025-09-25 From e665086482eebe109a2e21d34e7b01bd5bf73520 Mon Sep 17 00:00:00 2001 From: Meir Elbaz Date: Thu, 30 Oct 2025 12:35:34 +0200 Subject: [PATCH 3/8] fix: we only want to shutdown the executor if we created it --- clickhouse_connect/driver/asyncclient.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clickhouse_connect/driver/asyncclient.py b/clickhouse_connect/driver/asyncclient.py index 837e21c7..734f9693 100644 --- a/clickhouse_connect/driver/asyncclient.py +++ b/clickhouse_connect/driver/asyncclient.py @@ -36,8 +36,10 @@ def __init__(self, if executor_threads == 0: executor_threads = min(32, (os.cpu_count() or 1) + 4) # Mimic the default behavior if executor is NEW_THREAD_POOL_EXECUTOR: + self.new_executor = True self.executor = ThreadPoolExecutor(max_workers=executor_threads) else: + self.new_executor = False self.executor = executor def set_client_setting(self, key, value): @@ -80,7 +82,7 @@ async def close(self): """ self.client.close() - if self.executor is None: + if self.new_executor: await asyncio.to_thread(self.executor.shutdown, True) async def query(self, From 7f242b25e1313a5a68c706042455e7b694b28caa Mon Sep 17 00:00:00 2001 From: Meir Elbaz Date: Mon, 3 Nov 2025 22:44:12 +0200 Subject: [PATCH 4/8] add a custom object to represent the sentinel --- clickhouse_connect/driver/__init__.py | 4 ++-- clickhouse_connect/driver/asyncclient.py | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/clickhouse_connect/driver/__init__.py b/clickhouse_connect/driver/__init__.py index 292aaff7..4a9783c4 100644 --- a/clickhouse_connect/driver/__init__.py +++ b/clickhouse_connect/driver/__init__.py @@ -9,7 +9,7 @@ from clickhouse_connect.driver.common import dict_copy from clickhouse_connect.driver.exceptions import ProgrammingError from clickhouse_connect.driver.httpclient import HttpClient -from clickhouse_connect.driver.asyncclient import AsyncClient, NEW_THREAD_POOL_EXECUTOR +from clickhouse_connect.driver.asyncclient import AsyncClient, DefaultThreadPoolExecutor, NEW_THREAD_POOL_EXECUTOR # pylint: disable=too-many-arguments,too-many-locals,too-many-branches @@ -148,7 +148,7 @@ async def create_async_client(*, settings: Optional[Dict[str, Any]] = None, generic_args: Optional[Dict[str, Any]] = None, executor_threads: Optional[int] = None, - executor: Union[Optional[ThreadPoolExecutor], object] = NEW_THREAD_POOL_EXECUTOR, + executor: Union[ThreadPoolExecutor, None, DefaultThreadPoolExecutor] = NEW_THREAD_POOL_EXECUTOR, **kwargs) -> AsyncClient: """ The preferred method to get an async ClickHouse Connect Client instance. diff --git a/clickhouse_connect/driver/asyncclient.py b/clickhouse_connect/driver/asyncclient.py index 734f9693..324f608b 100644 --- a/clickhouse_connect/driver/asyncclient.py +++ b/clickhouse_connect/driver/asyncclient.py @@ -14,8 +14,12 @@ from clickhouse_connect.datatypes.base import ClickHouseType from clickhouse_connect.driver.insert import InsertContext +class DefaultThreadPoolExecutor: + pass + + # Sentinel value to preserve default behavior and also allow passing `None` -NEW_THREAD_POOL_EXECUTOR = object() +NEW_THREAD_POOL_EXECUTOR = DefaultThreadPoolExecutor() # pylint: disable=too-many-public-methods,too-many-instance-attributes,too-many-arguments,too-many-positional-arguments,too-many-locals @@ -29,7 +33,7 @@ def __init__(self, *, client: Client, executor_threads: int | None = None, - executor: Union[Optional[ThreadPoolExecutor], object] = NEW_THREAD_POOL_EXECUTOR): + executor: Union[ThreadPoolExecutor, None, DefaultThreadPoolExecutor] = NEW_THREAD_POOL_EXECUTOR): if isinstance(client, HttpClient): client.headers['User-Agent'] = client.headers['User-Agent'].replace('mode:sync;', 'mode:async;') self.client = client From debc71395fb2dc2988785ac1c06fdf6fbcdffa2e Mon Sep 17 00:00:00 2001 From: Meir Elbaz Date: Mon, 3 Nov 2025 22:44:33 +0200 Subject: [PATCH 5/8] fix typing --- clickhouse_connect/driver/asyncclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clickhouse_connect/driver/asyncclient.py b/clickhouse_connect/driver/asyncclient.py index 324f608b..8b5b805d 100644 --- a/clickhouse_connect/driver/asyncclient.py +++ b/clickhouse_connect/driver/asyncclient.py @@ -32,7 +32,7 @@ class AsyncClient: def __init__(self, *, client: Client, - executor_threads: int | None = None, + executor_threads: int = 0, executor: Union[ThreadPoolExecutor, None, DefaultThreadPoolExecutor] = NEW_THREAD_POOL_EXECUTOR): if isinstance(client, HttpClient): client.headers['User-Agent'] = client.headers['User-Agent'].replace('mode:sync;', 'mode:async;') From 180be7b20a97119ebff25dd824d7a88533750eda Mon Sep 17 00:00:00 2001 From: Meir Elbaz Date: Mon, 3 Nov 2025 22:46:17 +0200 Subject: [PATCH 6/8] add warning log when executor_threads and executor are used --- clickhouse_connect/driver/asyncclient.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/clickhouse_connect/driver/asyncclient.py b/clickhouse_connect/driver/asyncclient.py index 8b5b805d..41244bbb 100644 --- a/clickhouse_connect/driver/asyncclient.py +++ b/clickhouse_connect/driver/asyncclient.py @@ -1,5 +1,6 @@ import asyncio import io +import logging import os from concurrent.futures.thread import ThreadPoolExecutor from datetime import tzinfo @@ -14,6 +15,9 @@ from clickhouse_connect.datatypes.base import ClickHouseType from clickhouse_connect.driver.insert import InsertContext +logger = logging.getLogger(__name__) + + class DefaultThreadPoolExecutor: pass @@ -43,6 +47,9 @@ def __init__(self, self.new_executor = True self.executor = ThreadPoolExecutor(max_workers=executor_threads) else: + if executor_threads != 0: + logger.warning('executor_threads parameter is ignored when passing an executor object') + self.new_executor = False self.executor = executor From 8432fe822cd133b1ca65c57e5a8c592224a4cea6 Mon Sep 17 00:00:00 2001 From: Joe S Date: Tue, 4 Nov 2025 09:08:16 -0800 Subject: [PATCH 7/8] fix ambiguous time bug (#587) --- CHANGELOG.md | 1 + clickhouse_connect/datatypes/temporal.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f474c32..fe9dfaed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The supported method of passing ClickHouse server settings is to prefix such arg ## UNRELEASED ### Bug Fixes +- Fixed DST fallback bug in DateTime and DateTime64 types caused by passing potentially ambiguous times to pd.DateTimeIndex constructor. - Fixed issue with JSON key dot escaping. Closes [#571](https://github.com/ClickHouse/clickhouse-connect/issues/571) ### Improvements diff --git a/clickhouse_connect/datatypes/temporal.py b/clickhouse_connect/datatypes/temporal.py index 99ffd8c8..1808d857 100644 --- a/clickhouse_connect/datatypes/temporal.py +++ b/clickhouse_connect/datatypes/temporal.py @@ -89,8 +89,8 @@ def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence: if column.tz is None: return column.astype(self.pandas_dtype) - naive = column.tz_localize(None).astype(self.pandas_dtype) - return pd.DatetimeIndex(naive, tz=column.tz) + naive = column.tz_convert("UTC").tz_localize(None).astype(self.pandas_dtype) + return naive.tz_localize("UTC").tz_convert(column.tz) if self.nullable and isinstance(column, list): return np.array([None if pd.isna(s) else s for s in column]).astype( @@ -159,8 +159,8 @@ def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence: result = column.astype(self.pandas_dtype) return pd.array(result) if self.nullable else result - naive_ns = column.tz_localize(None).astype(self.pandas_dtype) - tz_aware_result = pd.DatetimeIndex(naive_ns, tz=column.tz) + naive_ns = column.tz_convert("UTC").tz_localize(None).astype(self.pandas_dtype) + tz_aware_result = naive_ns.tz_localize("UTC").tz_convert(column.tz) return ( pd.array(tz_aware_result) if self.nullable else tz_aware_result ) From eb190c17c8042653384010a757aed3c157fba818 Mon Sep 17 00:00:00 2001 From: Marek Wiese Date: Wed, 5 Nov 2025 00:41:21 +0100 Subject: [PATCH 8/8] fix version import (#583) * fix version import * add missing type * add missing types * fix #586: inconsistent type hint * fix tls test path in contribution docs * revert playtest --- CONTRIBUTING.md | 2 +- clickhouse_connect/common.py | 13 ++++++------- clickhouse_connect/driver/__init__.py | 4 ++-- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 51d84ddc..71d449b7 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -116,7 +116,7 @@ These tests require the `CLICKHOUSE_CONNECT_TEST_TLS` environment variable to be Additionally, the TLS ClickHouse instance should be running (see [docker-compose.yml](docker-compose.yml)). ```bash -CLICKHOUSE_CONNECT_TEST_TLS=1 pytest tests/tls +CLICKHOUSE_CONNECT_TEST_TLS=1 pytest tests/integration_tests/test_tls.py ``` ### Running the integration tests with ClickHouse Cloud diff --git a/clickhouse_connect/common.py b/clickhouse_connect/common.py index fd56a266..dc599302 100644 --- a/clickhouse_connect/common.py +++ b/clickhouse_connect/common.py @@ -2,13 +2,12 @@ import sys from dataclasses import dataclass from typing import Any, Sequence, Optional, Dict -from clickhouse_connect import __version__ - +from clickhouse_connect import __version__ from clickhouse_connect.driver.exceptions import ProgrammingError -def version(): +def version() -> str: return __version__.version @@ -30,7 +29,7 @@ class CommonSetting: _common_settings: Dict[str, CommonSetting] = {} -def build_client_name(client_name: str): +def build_client_name(client_name: str) -> str: product_name = get_setting('product_name') product_name = product_name.strip() + ' ' if product_name else '' client_name = client_name.strip() + ' ' if client_name else '' @@ -46,14 +45,14 @@ def build_client_name(client_name: str): return full_name.encode('ascii', 'ignore').decode() -def get_setting(name: str): +def get_setting(name: str) -> Any: setting = _common_settings.get(name) if setting is None: raise ProgrammingError(f'Unrecognized common setting {name}') return setting.value if setting.value is not None else setting.default -def set_setting(name: str, value: Any): +def set_setting(name: str, value: Any) -> None: setting = _common_settings.get(name) if setting is None: raise ProgrammingError(f'Unrecognized common setting {name}') @@ -65,7 +64,7 @@ def set_setting(name: str, value: Any): setting.value = value -def _init_common(name: str, options: Sequence[Any], default: Any): +def _init_common(name: str, options: Sequence[Any], default: Any) -> None: _common_settings[name] = CommonSetting(name, options, default) diff --git a/clickhouse_connect/driver/__init__.py b/clickhouse_connect/driver/__init__.py index 250b9573..13f90fb4 100644 --- a/clickhouse_connect/driver/__init__.py +++ b/clickhouse_connect/driver/__init__.py @@ -146,7 +146,7 @@ async def create_async_client(*, dsn: Optional[str] = None, settings: Optional[Dict[str, Any]] = None, generic_args: Optional[Dict[str, Any]] = None, - executor_threads: Optional[int] = None, + executor_threads: int = 0, **kwargs) -> AsyncClient: """ The preferred method to get an async ClickHouse Connect Client instance. @@ -168,7 +168,7 @@ async def create_async_client(*, :param settings: ClickHouse server settings to be used with the session/every request :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings. It is not recommended to use this parameter externally - :param: executor_threads 'max_worker' threads used by the client ThreadPoolExecutor. If not set, the default + :param executor_threads: 'max_worker' threads used by the client ThreadPoolExecutor. If not set, the default of 4 + detected CPU cores will be used :param kwargs -- Recognized keyword arguments (used by the HTTP client), see below