Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The supported method of passing ClickHouse server settings is to prefix such arg
- Fixed issue with JSON key dot escaping. Closes [#571](https://github.com/ClickHouse/clickhouse-connect/issues/571)

### Improvements
- Added SQLAlchemy core API support for `ARRAY JOIN` and `FINAL` modifier. Closes [#579](https://github.com/ClickHouse/clickhouse-connect/issues/579)
- Added `utc_tz_aware` parameter to client and query methods to opt in to returning timezone-aware UTC objects for DateTime/DateTime64 columns. Default behavior remains the same and returns tz naive objects for backward compatibility. Note: this parameter will likely be removed and only return tz-aware dts in some future release. Closes [#566](https://github.com/ClickHouse/clickhouse-connect/issues/566)

## 0.9.2, 2025-09-25
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ and **SQLAlchemy Core**.

Supported features include:
- Basic query execution via SQLAlchemy Core
- `SELECT` queries with `JOIN`s
- `SELECT` queries with `JOIN`s, `ARRAY JOIN`, and `FINAL` modifier
- Lightweight `DELETE` statements

The implementation does not include ORM support and is not intended as a full SQLAlchemy dialect. While it can support
Expand Down
4 changes: 4 additions & 0 deletions clickhouse_connect/cc_sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from clickhouse_connect import driver_name
from clickhouse_connect.cc_sqlalchemy.datatypes.base import schema_types
from clickhouse_connect.cc_sqlalchemy.sql import final
from clickhouse_connect.cc_sqlalchemy.sql.clauses import array_join, ArrayJoin

# pylint: disable=invalid-name
dialect_name = driver_name
ischema_names = schema_types

__all__ = ['dialect_name', 'ischema_names', 'array_join', 'ArrayJoin', 'final']
44 changes: 44 additions & 0 deletions clickhouse_connect/cc_sqlalchemy/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

from sqlalchemy import Table
from sqlalchemy.sql.selectable import FromClause, Select

from clickhouse_connect.driver.binding import quote_identifier

Expand All @@ -13,3 +14,46 @@ def full_table(table_name: str, schema: Optional[str] = None) -> str:

def format_table(table: Table):
return full_table(table.name, table.schema)


def final(select_stmt: Select, table: Optional[FromClause] = None) -> Select:
"""
Apply the ClickHouse FINAL modifier to a select statement.

Args:
select_stmt: The SQLAlchemy Select statement to modify.
table: Optional explicit table/alias to apply FINAL to. When omitted the
method will use the single FROM element present on the select. A
ValueError is raised if the statement has no FROMs or more than one
FROM element and table is not provided.

Returns:
A new Select that renders the FINAL modifier for the target table.
"""
if not isinstance(select_stmt, Select):
raise TypeError("final() expects a SQLAlchemy Select instance")

target = table
if target is None:
froms = select_stmt.get_final_froms()
if not froms:
raise ValueError("final() requires a table to apply the FINAL modifier.")
if len(froms) > 1:
raise ValueError("final() is ambiguous for statements with multiple FROM clauses. Specify the table explicitly.")
target = froms[0]

if not isinstance(target, FromClause):
raise TypeError("table must be a SQLAlchemy FromClause when provided")

return select_stmt.with_hint(target, "FINAL")


def _select_final(self: Select, table: Optional[FromClause] = None) -> Select:
"""
Select.final() convenience wrapper around the module-level final() helper.
"""
return final(self, table=table)


# Monkey-patch the Select class to add the .final() convenience method
Select.final = _select_final
93 changes: 93 additions & 0 deletions clickhouse_connect/cc_sqlalchemy/sql/clauses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from sqlalchemy.sql.base import Immutable
from sqlalchemy.sql.selectable import FromClause


# pylint: disable=protected-access,too-many-ancestors,abstract-method,unused-argument
class ArrayJoin(Immutable, FromClause):
"""Represents ClickHouse ARRAY JOIN clause"""

__visit_name__ = "array_join"
_is_from_container = True
named_with_column = False
_is_join = True

def __init__(self, left, array_column, alias=None, is_left=False):
"""Initialize ARRAY JOIN clause

Args:
left: The left side (table or subquery)
array_column: The array column to join
alias: Optional alias for the joined array elements
is_left: If True, use LEFT ARRAY JOIN instead of ARRAY JOIN
"""
super().__init__()
self.left = left
self.array_column = array_column
self.alias = alias
self.is_left = is_left
self._is_clone_of = None

@property
def selectable(self):
"""Return the selectable for this clause"""
return self.left

@property
def _hide_froms(self):
"""Hide the left table from the FROM clause since it's part of the ARRAY JOIN"""
return [self.left]

@property
def _from_objects(self):
"""Return all FROM objects referenced by this construct"""
return self.left._from_objects

def _clone(self, **kw):
"""Return a copy of this ArrayJoin"""
c = self.__class__.__new__(self.__class__)
c.__dict__ = self.__dict__.copy()
c._is_clone_of = self
return c

def _copy_internals(self, clone=None, **kw):
"""Copy internal state for cloning

This ensures that when queries are cloned (e.g., for subqueries, unions, or CTEs),
the left FromClause and array_column references are properly deep-cloned.
"""
def _default_clone(elem, **kwargs):
return elem

if clone is None:
clone = _default_clone

# Clone the left FromClause and array column to ensure proper
# reference handling in complex query scenarios
self.left = clone(self.left, **kw)
self.array_column = clone(self.array_column, **kw)


def array_join(left, array_column, alias=None, is_left=False):
"""Create an ARRAY JOIN clause

Args:
left: The left side (table or subquery)
array_column: The array column to join
alias: Optional alias for the joined array elements
is_left: If True, use LEFT ARRAY JOIN instead of ARRAY JOIN

Returns:
ArrayJoin: An ArrayJoin clause element

Example:
from clickhouse_connect.cc_sqlalchemy.sql.clauses import array_join

# Basic ARRAY JOIN
query = select(table).select_from(array_join(table, table.c.tags))

# LEFT ARRAY JOIN with alias
query = select(table).select_from(
array_join(table, table.c.tags, alias='tag', is_left=True)
)
"""
return ArrayJoin(left, array_column, alias, is_left)
22 changes: 19 additions & 3 deletions clickhouse_connect/cc_sqlalchemy/sql/compiler.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from sqlalchemy.exc import CompileError
from sqlalchemy.sql.compiler import SQLCompiler

from clickhouse_connect.cc_sqlalchemy import ArrayJoin
from clickhouse_connect.cc_sqlalchemy.sql import format_table


# pylint: disable=arguments-differ
class ChStatementCompiler(SQLCompiler):

# pylint: disable=attribute-defined-outside-init
# pylint: disable=attribute-defined-outside-init,unused-argument
def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
table = delete_stmt.table
text = f"DELETE FROM {format_table(table)}"
Expand All @@ -23,10 +24,20 @@ def visit_delete(self, delete_stmt, visiting_cte=None, **kw):

return text

def visit_select(self, select_stmt, **kw):
return super().visit_select(select_stmt, **kw)
def visit_array_join(self, array_join_clause, asfrom=False, from_linter=None, **kw):
left = self.process(array_join_clause.left, asfrom=True, from_linter=from_linter, **kw)
array_col = self.process(array_join_clause.array_column, **kw)
join_type = "LEFT ARRAY JOIN" if array_join_clause.is_left else "ARRAY JOIN"
text = f"{left} {join_type} {array_col}"
if array_join_clause.alias:
text += f" AS {self.preparer.quote(array_join_clause.alias)}"

return text

def visit_join(self, join, **kw):
if isinstance(join, ArrayJoin):
return self.visit_array_join(join, **kw)

left = self.process(join.left, **kw)
right = self.process(join.right, **kw)
onclause = join.onclause
Expand Down Expand Up @@ -72,3 +83,8 @@ def visit_empty_set_expr(self, element_types, **kw):

def visit_sequence(self, sequence, **kw):
raise NotImplementedError("ClickHouse doesn't support sequences")

def get_from_hint_text(self, table, text):
if text == "FINAL":
return "FINAL"
return super().get_from_hint_text(table, text)
18 changes: 17 additions & 1 deletion tests/integration_tests/test_sqlalchemy/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import contextmanager
from typing import Iterator
from pytest import fixture

from sqlalchemy import MetaData, Table
from sqlalchemy.engine import create_engine
from sqlalchemy.engine.base import Engine

Expand All @@ -17,3 +18,18 @@ def test_engine_fixture(test_config: TestConfig) -> Iterator[Engine]:

yield test_engine
test_engine.dispose()


def create_test_table(conn, metadata, table_name, columns, engine_params):
test_table = Table(table_name, metadata, *columns, engine_params)
test_table.drop(conn, checkfirst=True)
test_table.create(conn)
return test_table


@contextmanager
def table_context(engine, test_db, table_name, columns, engine_params):
with engine.begin() as conn:
metadata = MetaData(schema=test_db)
test_table = create_test_table(conn, metadata, table_name, columns, engine_params)
yield conn, test_table
114 changes: 114 additions & 0 deletions tests/integration_tests/test_sqlalchemy/test_array_join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from sqlalchemy import Column, literal_column, select
from sqlalchemy.engine.base import Engine
from sqlalchemy.types import Integer, String

from clickhouse_connect.cc_sqlalchemy.datatypes.sqltypes import Array
from clickhouse_connect.cc_sqlalchemy.datatypes.sqltypes import String as ChString
from clickhouse_connect.cc_sqlalchemy.ddl.tableengine import MergeTree
from clickhouse_connect.cc_sqlalchemy.sql.clauses import array_join
from tests.integration_tests.test_sqlalchemy.conftest import table_context


def test_array_join(test_engine: Engine, test_db: str):
"""Test ARRAY JOIN clause"""
with table_context(
test_engine,
test_db,
"test_array_join",
[
Column("id", Integer),
Column("name", String),
Column("tags", Array(ChString)),
],
MergeTree(order_by="id"),
) as (conn, test_table):

conn.execute(
test_table.insert(),
[
{"id": 1, "name": "Alice", "tags": ["python", "sql", "clickhouse"]},
{"id": 2, "name": "Bob", "tags": ["java", "sql"]},
{"id": 3, "name": "Joe", "tags": ["python", "javascript"]},
{"id": 4, "name": "Charlie", "tags": []},
],
)

query = (
select(test_table.c.id, test_table.c.name, test_table.c.tags)
.select_from(array_join(test_table, test_table.c.tags))
.order_by(test_table.c.id)
.order_by(test_table.c.tags)
)

compiled = query.compile(dialect=test_engine.dialect)
assert "ARRAY JOIN" in str(compiled).upper()

result = conn.execute(query)
rows = result.fetchall()
assert len(rows) == 7
assert rows[0].id == 1
assert rows[0].name == "Alice"
assert rows[0].tags == "clickhouse"
# ARRAY JOIN should not contain items with empty lists
assert "Charlie" not in [row.name for row in rows]

test_table.drop(conn)


def test_left_array_join_with_alias(test_engine: Engine, test_db: str):
"""Test LEFT ARRAY JOIN with alias"""
with table_context(
test_engine,
test_db,
"test_left_array_join",
[
Column("id", Integer),
Column("name", String),
Column("tags", Array(ChString)),
],
MergeTree(order_by="id"),
) as (conn, test_table):

conn.execute(
test_table.insert(),
[
{"id": 1, "name": "Alice", "tags": ["python", "sql", "clickhouse"]},
{"id": 2, "name": "Bob", "tags": ["java", "sql"]},
{"id": 3, "name": "Joe", "tags": ["python", "javascript"]},
{"id": 4, "name": "Charlie", "tags": []},
],
)

query = (
select(
test_table.c.id,
test_table.c.name,
literal_column("tag"), # Needed when using alias
)
.select_from(array_join(test_table, test_table.c.tags, alias="tag", is_left=True))
.order_by(test_table.c.id)
.order_by(literal_column("tag"))
)

compiled = query.compile(dialect=test_engine.dialect)
compiled_str = str(compiled).upper()
assert "LEFT ARRAY JOIN" in compiled_str
assert "AS" in compiled_str

result = conn.execute(query)
rows = result.fetchall()
assert len(rows) == 8

alice_tags = [row.tag for row in rows if row.name == "Alice"]
assert len(alice_tags) == 3
assert alice_tags == sorted(["python", "sql", "clickhouse"])

bob_tags = [row.tag for row in rows if row.name == "Bob"]
assert len(bob_tags) == 2
assert bob_tags == sorted(["java", "sql"])

charlie_rows = [row for row in rows if row.name == "Charlie"]
assert len(charlie_rows) == 1
assert charlie_rows[0].tag == ""

test_table.drop(conn)
Loading