Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster setting and Distributed Table tests #186

Merged
merged 14 commits into from
Oct 26, 2023
16 changes: 6 additions & 10 deletions .github/workflows/test_matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,10 @@ jobs:
echo "TEST_SETTINGS_FILE=22_3" >> $GITHUB_ENV
echo "DBT_CH_TEST_CH_VERSION=22.3" >> $GITHUB_ENV

- name: Run ClickHouse Container
run: docker run
-d
-p 8123:8123
-p 9000:9000
--name clickhouse
-v /var/lib/clickhouse
-v ${{ github.workspace }}/tests/integration/test_settings_$TEST_SETTINGS_FILE.xml:/etc/clickhouse-server/users.d/test_settings.xml
--ulimit nofile=262144:262144
clickhouse/clickhouse-server:${{ matrix.clickhouse-version }}
- name: Run ClickHouse Cluster Containers
env:
PROJECT_ROOT: ${{ github.workspace }}/tests/integration
run: REPLICA_NUM=1 docker-compose -f ${{ github.workspace }}/tests/integration/docker-compose.yml up -d

- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
Expand All @@ -64,6 +58,8 @@ jobs:
run: pip3 install -r dev_requirements.txt

- name: Run HTTP tests
env:
DBT_CH_TEST_CLUSTER: test_shard
run: |
PYTHONPATH="${PYTHONPATH}:dbt"
pytest tests
Expand Down
31 changes: 26 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ your_profile_name:
port: [8123] # If not set, defaults to 8123, 8443, 9000, 9440 depending on the secure and driver settings
user: [default] # User for all database operations
password: [<empty string>] # Password for the user
cluster: [<empty string>] If set, DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster
cluster: [<empty string>] If set, certain DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster. Distributed materializations require this setting to work. See the following ClickHouse Cluster section for more details.
verify: [True] # Validate TLS certificate if using TLS/SSL
secure: [False] # Use TLS (native protocol) or HTTPS (http protocol)
retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error)
Expand All @@ -75,7 +75,7 @@ your_profile_name:
cluster_mode: [False] # Use specific settings designed to improve operation on Replicated databases (recommended for ClickHouse Cloud)
use_lw_deletes: [False] Use the strategy `delete+insert` as the default incremental strategy.
check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. (Not needed for most ClickHouse versions)
local_suffix [local] # Table suffix of local tables on shards for distributed materializations
local_suffix [_local] # Table suffix of local tables on shards for distributed materializations.
custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty.

# Native (clickhouse-driver) connection settings
Expand All @@ -91,14 +91,35 @@ your_profile_name:
| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional |
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | Optional (default: `rand()`) |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional |
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy |
## ClickHouse Cluster

`cluster` setting in profile enables dbt-clickhouse to run against a ClickHouse cluster.

### Effective Scope


if `cluster` is set in profile, `on_cluster_clause` now will return cluster info for:
- Database creation
- View materialization
- Distributed materializations
- Models with Replicated engines

table and incremental materializations with non-replicated engine will not be affected by `cluster` setting (model would be created on the connected node only).

### Compatibility


If a model has been created without a `cluster` setting, dbt-clickhouse will detect the situation and run all DDL/DML without `on cluster` clause for this model.


## Known Limitations

* Replicated tables (combined with the `cluster` profile setting) are available using the `on_cluster_clause` macro but are not included in the test suite and not formally tested.
* Ephemeral models/CTEs don't work if placed before the "INSERT INTO" in a ClickHouse insert statement, see https://github.com/ClickHouse/ClickHouse/issues/30323. This
should not affect most models, but care should be taken where an ephemeral model is placed in model definitions and other SQL statements.

Expand Down Expand Up @@ -143,7 +164,7 @@ The following macros are included to facilitate creating ClickHouse specific tab
- `partition_cols` -- Uses the `partition_by` model configuration property to assign a ClickHouse partition key. No partition key is assigned by default.
- `order_cols` -- Uses the `order_by` model configuration to assign a ClickHouse order by/sorting key. If not specified ClickHouse will use an empty tuple() and the table will be unsorted
- `primary_key_clause` -- Uses the `primary_key` model configuration property to assign a ClickHouse primary key. By default, primary key is set and ClickHouse will use the order by clause as the primary key.
- `on_cluster_clause` -- Uses the `cluster` profile property to add an `ON CLUSTER` clause to all dbt-operations
- `on_cluster_clause` -- Uses the `cluster` profile property to add an `ON CLUSTER` clause to certain dbt-operations: distributed materializations, views creation, database creation.

### s3Source Helper Macro

Expand All @@ -168,7 +189,6 @@ See the [S3 test file](https://github.com/ClickHouse/dbt-clickhouse/blob/main/te

Notes:

- Distributed materializations are experimental and are not currently included in the automated test suite.
- dbt-clickhouse queries now automatically include the setting `insert_distributed_sync = 1` in order to ensure that downstream incremental
materialization operations execute correctly. This could cause some distributed table inserts to run more slowly than expected.

Expand Down Expand Up @@ -281,6 +301,7 @@ configuration file (this file should not be checked into git). The following en
8. DBT_CH_TEST_CH_VERSION - ClickHouse docker image to use. Defaults to `latest`
9. DBT_CH_TEST_INCLUDE_S3 - Include S3 tests. Default=False since these are currently dependent on a specific ClickHouse S3 bucket/test dataset
10. DBT_CH_TEST_CLUSTER_MODE - Use the profile value
11. DBT_CH_TEST_CLUSTER - ClickHouse cluster name, if DBT_CH_TEST_USE_DOCKER set to true, only `test_replica` and `test_shard` is valid (see tests/test_config.xml for cluster settings)


## Original Author
Expand Down
6 changes: 5 additions & 1 deletion dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ def _ensure_database(self, database_engine, cluster_name) -> None:
db_exists = self.command(check_db)
if not db_exists:
engine_clause = f' ENGINE {database_engine} ' if database_engine else ''
cluster_clause = f' ON CLUSTER {cluster_name} ' if cluster_name is not None else ''
cluster_clause = (
f' ON CLUSTER {cluster_name} '
if cluster_name is not None and cluster_name.strip() != ''
else ''
)
self.command(f'CREATE DATABASE {self.database}{cluster_clause}{engine_clause}')
db_exists = self.command(check_db)
if not db_exists:
Expand Down
19 changes: 16 additions & 3 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ClickHouseConfig(AdapterConfig):
engine: str = 'MergeTree()'
order_by: Optional[Union[List[str], str]] = 'tuple()'
partition_by: Optional[Union[List[str], str]] = None
sharding_key: Optional[Union[List[str], str]] = 'rand()'


class ClickHouseAdapter(SQLAdapter):
Expand Down Expand Up @@ -77,8 +78,11 @@ def get_clickhouse_cluster_name(self):
@available.parse(lambda *a, **k: {})
def get_clickhouse_local_suffix(self):
conn = self.connections.get_if_exists()
if conn.credentials.local_suffix:
return f'{conn.credentials.local_suffix}'
suffix = conn.credentials.local_suffix
if suffix:
if suffix.startswith('_'):
return f'{suffix}'
return f'_{suffix}'

@available
def clickhouse_db_engine_clause(self):
Expand Down Expand Up @@ -107,6 +111,13 @@ def can_exchange(self, schema: str, rel_type: str) -> bool:
ch_db = self.get_ch_database(schema)
return ch_db and ch_db.engine in ('Atomic', 'Replicated')

@available.parse_none
def should_on_cluster(self, materialized: str = '', engine: str = '') -> bool:
conn = self.connections.get_if_exists()
if conn and conn.credentials.cluster:
return ClickHouseRelation.get_on_cluster(conn.credentials.cluster, materialized, engine)
return ClickHouseRelation.get_on_cluster('', materialized, engine)

@available.parse_none
def calculate_incremental_strategy(self, strategy: str) -> str:
conn = self.connections.get_if_exists()
Expand Down Expand Up @@ -198,19 +209,21 @@ def list_relations_without_caching(

relations = []
for row in results:
name, schema, type_info, db_engine = row
name, schema, type_info, db_engine, on_cluster = row
rel_type = RelationType.View if 'view' in type_info else RelationType.Table
can_exchange = (
conn_supports_exchange
and rel_type == RelationType.Table
and db_engine in ('Atomic', 'Replicated')
)

relation = self.Relation.create(
database=None,
schema=schema,
identifier=name,
type=rel_type,
can_exchange=can_exchange,
can_on_cluster=(on_cluster >= 1),
)
relations.append(relation)

Expand Down
52 changes: 49 additions & 3 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from dataclasses import dataclass, field
from typing import Any, Optional, Type
from typing import Any, Dict, Optional, Type

from dbt.adapters.base.relation import BaseRelation, Policy, Self
from dbt.contracts.graph.nodes import SourceDefinition
from dbt.contracts.graph.nodes import ManifestNode, SourceDefinition
from dbt.contracts.relation import HasQuoting
from dbt.exceptions import DbtRuntimeError
from dbt.utils import deep_merge
from dbt.utils import deep_merge, merge


@dataclass
Expand All @@ -27,6 +28,7 @@ class ClickHouseRelation(BaseRelation):
include_policy: Policy = field(default_factory=lambda: ClickHouseIncludePolicy())
quote_character: str = ''
can_exchange: bool = False
can_on_cluster: bool = False

def __post_init__(self):
if self.database != self.schema and self.database:
Expand All @@ -50,6 +52,23 @@ def matches(
raise DbtRuntimeError(f'Passed unexpected schema value {schema} to Relation.matches')
return self.database == database and self.identifier == identifier

@property
def should_on_cluster(self) -> bool:
if self.include_policy.identifier:
return self.can_on_cluster
else:
# create database/schema on cluster by default
return True

@classmethod
def get_on_cluster(
cls: Type[Self], cluster: str = '', materialized: str = '', engine: str = ''
) -> bool:
if cluster.strip():
return 'view' == materialized or 'distributed' in materialized or 'Replicated' in engine
else:
return False

@classmethod
def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) -> Self:
source_quoting = source.quoting.to_dict(omit_none=True)
Expand All @@ -73,3 +92,30 @@ def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any)
quote_policy=quote_policy,
**kwargs,
)

@classmethod
def create_from_node(
cls: Type[Self],
config: HasQuoting,
node: ManifestNode,
quote_policy: Optional[Dict[str, bool]] = None,
**kwargs: Any,
) -> Self:
if quote_policy is None:
quote_policy = {}

quote_policy = merge(config.quoting, quote_policy)

cluster = config.credentials.cluster if config.credentials.cluster else ''
materialized = node.get_materialization() if node.get_materialization() else ''
engine = node.config.get('engine') if node.config.get('engine') else ''
can_on_cluster = cls.get_on_cluster(cluster, materialized, engine)

return cls.create(
database=node.database,
schema=node.schema,
identifier=node.alias,
quote_policy=quote_policy,
can_on_cluster=can_on_cluster,
**kwargs,
)
40 changes: 26 additions & 14 deletions dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

{{ sql_header if sql_header is not none }}

create view {{ relation.include(database=False) }} {{ on_cluster_clause()}}
create view {{ relation.include(database=False) }} {{ on_cluster_clause(relation)}}
as (
{{ sql }}
)
Expand All @@ -19,14 +19,14 @@
{% macro clickhouse__create_schema(relation) -%}
{%- call statement('create_schema') -%}
create database if not exists {{ relation.without_identifier().include(database=False) }}
{{ on_cluster_clause()}}
{{ on_cluster_clause(relation)}}
{{ adapter.clickhouse_db_engine_clause() }}
{% endcall %}
{% endmacro %}

{% macro clickhouse__drop_schema(relation) -%}
{%- call statement('drop_schema') -%}
drop database if exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause()}}
drop database if exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause(relation)}}
{%- endcall -%}
{% endmacro %}

Expand All @@ -36,9 +36,19 @@
t.name as name,
t.database as schema,
if(engine not in ('MaterializedView', 'View'), 'table', 'view') as type,
db.engine as db_engine
from system.tables as t JOIN system.databases as db on t.database = db.name
where schema = '{{ schema_relation.schema }}'
db.engine as db_engine,
{%- if adapter.get_clickhouse_cluster_name() -%}
count(distinct _shard_num) > 1 as is_on_cluster
from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t
join system.databases as db on t.database = db.name
where schema = '{{ schema_relation.schema }}'
group by name, schema, type, db_engine
{%- else -%}
0 as is_on_cluster
from system.tables as t join system.databases as db on t.database = db.name
where schema = '{{ schema_relation.schema }}'
{% endif %}

{% endcall %}
{{ return(load_result('list_relations_without_caching').table) }}
{% endmacro %}
Expand All @@ -56,22 +66,23 @@

{% macro clickhouse__drop_relation(relation, obj_type='table') -%}
{% call statement('drop_relation', auto_begin=False) -%}
drop {{ obj_type }} if exists {{ relation }} {{ on_cluster_clause()}}
{# drop relation on cluster by default if cluster is set #}
drop {{ obj_type }} if exists {{ relation }} {{ on_cluster_clause(relation.without_identifier())}}
{%- endcall %}
{% endmacro %}

{% macro clickhouse__rename_relation(from_relation, to_relation, obj_type='table') -%}
{% call statement('drop_relation') %}
drop {{ obj_type }} if exists {{ to_relation }} {{ on_cluster_clause()}}
drop {{ obj_type }} if exists {{ to_relation }} {{ on_cluster_clause(to_relation.without_identifier())}}
{% endcall %}
{% call statement('rename_relation') %}
rename {{ obj_type }} {{ from_relation }} to {{ to_relation }} {{ on_cluster_clause()}}
rename {{ obj_type }} {{ from_relation }} to {{ to_relation }} {{ on_cluster_clause(from_relation)}}
{% endcall %}
{% endmacro %}

{% macro clickhouse__truncate_relation(relation) -%}
{% call statement('truncate_relation') -%}
truncate table {{ relation }} {{ on_cluster_clause()}}
truncate table {{ relation }} {{ on_cluster_clause(relation)}}
{%- endcall %}
{% endmacro %}

Expand Down Expand Up @@ -100,17 +111,18 @@

{% macro clickhouse__alter_column_type(relation, column_name, new_column_type) -%}
{% call statement('alter_column_type') %}
alter table {{ relation }} {{ on_cluster_clause()}} modify column {{ adapter.quote(column_name) }} {{ new_column_type }}
alter table {{ relation }} {{ on_cluster_clause(relation)}} modify column {{ adapter.quote(column_name) }} {{ new_column_type }}
{% endcall %}
{% endmacro %}

{% macro exchange_tables_atomic(old_relation, target_relation, obj_types='TABLES') %}

{%- if adapter.get_clickhouse_cluster_name() is not none and obj_types == 'TABLES' and 'Replicated' in engine_clause() %}
{% do run_query("SYSTEM SYNC REPLICA " + on_cluster_clause() + target_relation.schema + '.' + target_relation.identifier) %}
{%- call statement('exchange_table_sync_replica') -%}
SYSTEM SYNC REPLICA {{ on_cluster_clause(target_relation) }} {{ target_relation.schema }}.{{ target_relation.identifier }}
{% endcall %}
{%- endif %}

{%- call statement('exchange_tables_atomic') -%}
EXCHANGE {{ obj_types }} {{ old_relation }} AND {{ target_relation }} {{ on_cluster_clause()}}
EXCHANGE {{ obj_types }} {{ old_relation }} AND {{ target_relation }} {{ on_cluster_clause(target_relation)}}
{% endcall %}
{% endmacro %}
Loading