Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The supported method of passing ClickHouse server settings is to prefix such arg

### Improvements
- Added `utc_tz_aware` parameter to client and query methods to opt in to returning timezone-aware UTC objects for DateTime/DateTime64 columns. Default behavior remains the same and returns tz naive objects for backward compatibility. Note: this parameter will likely be removed and only return tz-aware dts in some future release. Closes [#566](https://github.com/ClickHouse/clickhouse-connect/issues/566)
- Added `executor` parameter to `AsyncClient` constructor to allow passing a custom executor for async operations. This allows users to control the concurrency and thread pool used by the async client.

## 0.9.2, 2025-09-25

Expand Down
9 changes: 7 additions & 2 deletions clickhouse_connect/driver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from inspect import signature
from typing import Optional, Union, Dict, Any
from urllib.parse import urlparse, parse_qs
Expand All @@ -8,7 +9,7 @@
from clickhouse_connect.driver.common import dict_copy
from clickhouse_connect.driver.exceptions import ProgrammingError
from clickhouse_connect.driver.httpclient import HttpClient
from clickhouse_connect.driver.asyncclient import AsyncClient
from clickhouse_connect.driver.asyncclient import AsyncClient, DefaultThreadPoolExecutor, NEW_THREAD_POOL_EXECUTOR


# pylint: disable=too-many-arguments,too-many-locals,too-many-branches
Expand Down Expand Up @@ -147,6 +148,7 @@ async def create_async_client(*,
settings: Optional[Dict[str, Any]] = None,
generic_args: Optional[Dict[str, Any]] = None,
executor_threads: int = 0,
executor: Union[ThreadPoolExecutor, None, DefaultThreadPoolExecutor] = NEW_THREAD_POOL_EXECUTOR,
**kwargs) -> AsyncClient:
"""
The preferred method to get an async ClickHouse Connect Client instance.
Expand All @@ -170,6 +172,9 @@ async def create_async_client(*,
It is not recommended to use this parameter externally
:param executor_threads: 'max_worker' threads used by the client ThreadPoolExecutor. If not set, the default
of 4 + detected CPU cores will be used
:param executor: Optional `ThreadPoolExecutor` to use for async operations. If not set, a new `ThreadPoolExecutor`
will be created with the number of threads specified by `executor_threads`. If set to `None` it will use the
default executor of the event loop.
:param kwargs -- Recognized keyword arguments (used by the HTTP client), see below

:param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred
Expand Down Expand Up @@ -215,4 +220,4 @@ def _create_client():

loop = asyncio.get_running_loop()
_client = await loop.run_in_executor(None, _create_client)
return AsyncClient(client=_client, executor_threads=executor_threads)
return AsyncClient(client=_client, executor_threads=executor_threads, executor=executor)
31 changes: 28 additions & 3 deletions clickhouse_connect/driver/asyncclient.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import io
import logging
import os
from concurrent.futures.thread import ThreadPoolExecutor
from datetime import tzinfo
Expand All @@ -14,6 +15,16 @@
from clickhouse_connect.datatypes.base import ClickHouseType
from clickhouse_connect.driver.insert import InsertContext

logger = logging.getLogger(__name__)


class DefaultThreadPoolExecutor:
pass


# Sentinel value to preserve default behavior and also allow passing `None`
NEW_THREAD_POOL_EXECUTOR = DefaultThreadPoolExecutor()


# pylint: disable=too-many-public-methods,too-many-instance-attributes,too-many-arguments,too-many-positional-arguments,too-many-locals
class AsyncClient:
Expand All @@ -22,13 +33,25 @@ class AsyncClient:
Internally, each of the methods that uses IO is wrapped in a call to EventLoop.run_in_executor.
"""

def __init__(self, *, client: Client, executor_threads: int = 0):
def __init__(self,
*,
client: Client,
executor_threads: int = 0,
executor: Union[ThreadPoolExecutor, None, DefaultThreadPoolExecutor] = NEW_THREAD_POOL_EXECUTOR):
if isinstance(client, HttpClient):
client.headers['User-Agent'] = client.headers['User-Agent'].replace('mode:sync;', 'mode:async;')
self.client = client
if executor_threads == 0:
executor_threads = min(32, (os.cpu_count() or 1) + 4) # Mimic the default behavior
self.executor = ThreadPoolExecutor(max_workers=executor_threads)
if executor is NEW_THREAD_POOL_EXECUTOR:
self.new_executor = True
self.executor = ThreadPoolExecutor(max_workers=executor_threads)
else:
if executor_threads != 0:
logger.warning('executor_threads parameter is ignored when passing an executor object')

self.new_executor = False
self.executor = executor

def set_client_setting(self, key, value):
"""
Expand Down Expand Up @@ -69,7 +92,9 @@ async def close(self):
Subclass implementation to close the connection to the server/deallocate the client
"""
self.client.close()
await asyncio.to_thread(self.executor.shutdown, True)

if self.new_executor:
await asyncio.to_thread(self.executor.shutdown, True)

async def query(self,
query: Optional[str] = None,
Expand Down