Skip to content

Commit

Permalink
Add case sensitive cache
Browse files Browse the repository at this point in the history
  • Loading branch information
genzgd committed Nov 6, 2023
1 parent 685d6eb commit e72b775
Show file tree
Hide file tree
Showing 8 changed files with 552 additions and 21 deletions.
432 changes: 432 additions & 0 deletions dbt/adapters/clickhouse/cache.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ClickHouseCredentials(Credentials):
port: Optional[int] = None
user: Optional[str] = 'default'
retries: int = 1
database: Optional[str] = None
database: Optional[str] = ''
schema: Optional[str] = 'default'
password: str = ''
cluster: Optional[str] = None
Expand All @@ -43,15 +43,15 @@ def unique_field(self):
return self.host

def __post_init__(self):
if self.database is not None and self.database != self.schema:
if self.database and self.database != self.schema:
raise DbtRuntimeError(
f' schema: {self.schema} \n'
f' database: {self.database} \n'
f' cluster: {self.cluster} \n'
f'On Clickhouse, database must be omitted or have the same value as'
f' schema.'
)
self.database = None
self.database = ''

def _connection_keys(self):
return (
Expand Down
21 changes: 8 additions & 13 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
import agate
import dbt.exceptions
from dbt.adapters.base import AdapterConfig, available
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.base.impl import BaseAdapter, catch_as_completed
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.sql import SQLAdapter
from dbt.clients.agate_helper import table_from_rows
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.relation import RelationType
from dbt.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError
from dbt.utils import executor, filter_null_values

from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache
from dbt.adapters.clickhouse.column import ClickHouseColumn
from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
from dbt.adapters.clickhouse.logger import logger
Expand All @@ -39,6 +39,10 @@ class ClickHouseAdapter(SQLAdapter):
ConnectionManager = ClickHouseConnectionManager
AdapterSpecificConfigs = ClickHouseConfig

def __init__(self, config):
BaseAdapter.__init__(self, config)
self.cache = ClickHouseRelationsCache()

@classmethod
def date_function(cls):
return 'now()'
Expand Down Expand Up @@ -218,7 +222,7 @@ def list_relations_without_caching(
)

relation = self.Relation.create(
database=None,
database='',
schema=schema,
identifier=name,
type=rel_type,
Expand All @@ -230,7 +234,7 @@ def list_relations_without_caching(
return relations

def get_relation(self, database: Optional[str], schema: str, identifier: str):
return super().get_relation(None, schema, identifier)
return super().get_relation('', schema, identifier)

@available.parse_none
def get_ch_database(self, schema: str):
Expand Down Expand Up @@ -275,15 +279,6 @@ def _get_one_catalog(

return super()._get_one_catalog(information_schema, schemas, manifest)

@classmethod
def _catalog_filter_table(cls, table: agate.Table, manifest: Manifest) -> agate.Table:
table = table_from_rows(
table.rows,
table.column_names,
text_only_columns=['table_schema', 'table_name'],
)
return table.where(_catalog_filter_schemas(manifest))

def get_rows_different_sql(
self,
relation_a: ClickHouseRelation,
Expand Down
7 changes: 4 additions & 3 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ClickHouseRelation(BaseRelation):
def __post_init__(self):
if self.database != self.schema and self.database:
raise DbtRuntimeError(f'Cannot set database {self.database} in clickhouse!')
self.path.database = ''

def render(self):
if self.include_policy.database and self.include_policy.schema:
Expand All @@ -44,7 +45,7 @@ def render(self):

def matches(
self,
database: Optional[str] = None,
database: Optional[str] = '',
schema: Optional[str] = None,
identifier: Optional[str] = None,
):
Expand Down Expand Up @@ -86,7 +87,7 @@ def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any)
schema = source.database

return cls.create(
database=source.database,
database='',
schema=schema,
identifier=source.identifier,
quote_policy=quote_policy,
Expand All @@ -112,7 +113,7 @@ def create_from_node(
can_on_cluster = cls.get_on_cluster(cluster, materialized, engine)

return cls.create(
database=node.database,
database='',
schema=node.schema,
identifier=node.alias,
quote_policy=quote_policy,
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@


{% macro clickhouse__generate_database_name(custom_database_name=none, node=none) -%}
{% do return(None) %}
{% do return('') %}
{%- endmacro %}

{% macro clickhouse__get_columns_in_query(select_sql) %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro clickhouse__get_catalog(information_schema, schemas) -%}
{%- call statement('catalog', fetch_result=True) -%}
select
null as table_database,
'' as table_database,
columns.database as table_schema,
columns.table as table_name,
if(tables.engine not in ('MaterializedView', 'View'), 'table', 'view') as table_type,
Expand Down
102 changes: 102 additions & 0 deletions tests/integration/adapter/caching/test_caching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import pytest
from dbt.tests.util import run_dbt

model_sql = """
{{
config(
materialized='table'
)
}}
select 1 as id
"""

another_schema_model_sql = """
{{
config(
materialized='table',
schema='another_schema'
)
}}
select 1 as id
"""


class BaseCachingTest:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"quoting": {
"identifier": False,
"schema": False,
},
}

def run_and_inspect_cache(self, project, run_args=None):
run_dbt(run_args)

# the cache was empty at the start of the run.
# the model materialization returned an unquoted relation and added to the cache.
adapter = project.adapter
assert len(adapter.cache.relations) == 1
relation = list(adapter.cache.relations).pop()
assert relation.schema == project.test_schema

# on the second run, dbt will find a relation in the database during cache population.
# this relation will be quoted, because list_relations_without_caching (by default) uses
# quote_policy = {"database": True, "schema": True, "identifier": True}
# when adding relations to the cache.
run_dbt(run_args)
adapter = project.adapter
assert len(adapter.cache.relations) == 1
second_relation = list(adapter.cache.relations).pop()

for key in ["schema", "identifier"]:
assert getattr(relation, key) == getattr(second_relation, key)

def test_cache(self, project):
self.run_and_inspect_cache(project, run_args=["run"])


class TestNoPopulateCache(BaseCachingTest):
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": model_sql,
}

def test_cache(self, project):
# --no-populate-cache still allows the cache to populate all relations
# under a schema, so the behavior here remains the same as other tests
run_args = ["--no-populate-cache", "run"]
self.run_and_inspect_cache(project, run_args)


class TestCachingLowerCaseModel(BaseCachingTest):
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": model_sql,
}


class TestCachingUppercaseModel(BaseCachingTest):
@pytest.fixture(scope="class")
def models(self):
return {
"MODEL.sql": model_sql,
}


class TestCachingSelectedSchemaOnly(BaseCachingTest):
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": model_sql,
"another_schema_model.sql": another_schema_model_sql,
}

def test_cache(self, project):
# this should only cache the schema containing the selected model
run_args = ["--cache-selected-only", "run", "--select", "model"]
self.run_and_inspect_cache(project, run_args)
1 change: 1 addition & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def test_config(ch_test_users, ch_test_version):
'db_engine': test_db_engine,
'secure': test_secure,
'cluster_mode': test_cluster_mode,
'database': '',
}

if docker:
Expand Down

0 comments on commit e72b775

Please sign in to comment.