Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow clickhouse connection pool #3187

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,12 @@ class ClickhouseConnectionConfig(ConnectionConfig):
register_comments: bool = True
pre_ping: bool = False

# This object expects options from urllib3 and also from clickhouse-connect
# See:
# * https://urllib3.readthedocs.io/en/stable/advanced-usage.html
# * https://clickhouse.com/docs/en/integrations/python#customizing-the-http-connection-pool
connection_pool_options: t.Optional[t.Dict[str, t.Any]] = None

type_: Literal["clickhouse"] = Field(alias="type", default="clickhouse")

@property
Expand All @@ -1455,8 +1461,20 @@ def _engine_adapter(self) -> t.Type[EngineAdapter]:
@property
def _connection_factory(self) -> t.Callable:
from clickhouse_connect.dbapi import connect # type: ignore
from clickhouse_connect.driver import httputil # type: ignore
from functools import partial

pool_manager_options: t.Dict[str, t.Any] = dict(
# Match the maxsize to the number of concurrent tasks
maxsize=self.concurrent_tasks,
# Block if there are no free connections
block=True,
)
if self.connection_pool_options:
pool_manager_options.update(self.connection_pool_options)
pool_mgr = httputil.get_pool_manager(**pool_manager_options)

return connect
return partial(connect, pool_mgr=pool_mgr)

@property
def cloud_mode(self) -> bool:
Expand Down Expand Up @@ -1497,7 +1515,11 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
# - "use majority number (number_of_replicas / 2 + 1) as quorum number"
settings["insert_quorum"] = "auto"

return {"compress": compress, "client_name": f"SQLMesh/{__version__}", **settings}
return {
"compress": compress,
"client_name": f"SQLMesh/{__version__}",
**settings,
}


class AthenaConnectionConfig(ConnectionConfig):
Expand Down Expand Up @@ -1575,7 +1597,9 @@ def _connection_factory(self) -> t.Callable:
# Map all subclasses of ConnectionConfig to the value of their `type_` field.
tpe.all_field_infos()["type_"].default: tpe
for tpe in subclasses(
__name__, ConnectionConfig, exclude=(ConnectionConfig, BaseDuckDBConnectionConfig)
__name__,
ConnectionConfig,
exclude=(ConnectionConfig, BaseDuckDBConnectionConfig),
)
}

Expand Down