diff --git a/dbt/adapters/clickhouse/__version__.py b/dbt/adapters/clickhouse/__version__.py index e31b86a5..a3428ff6 100644 --- a/dbt/adapters/clickhouse/__version__.py +++ b/dbt/adapters/clickhouse/__version__.py @@ -1 +1 @@ -version = '1.7.7' +version = '1.8.0' diff --git a/dbt/adapters/clickhouse/cache.py b/dbt/adapters/clickhouse/cache.py index 28d9fa21..59588b95 100644 --- a/dbt/adapters/clickhouse/cache.py +++ b/dbt/adapters/clickhouse/cache.py @@ -3,14 +3,13 @@ from copy import deepcopy from typing import Any, Dict, Iterable, List, Optional, Set, Tuple -from dbt.events.functions import fire_event, fire_event_if -from dbt.events.types import CacheAction, CacheDumpGraph -from dbt.exceptions import ( +from dbt.adapters.events.types import CacheAction, CacheDumpGraph +from dbt.adapters.exceptions import ( NewNameAlreadyInCacheError, NoneRelationFoundError, TruncatedModelNameCausedCollisionError, ) -from dbt.flags import get_flags +from dbt_common.events.functions import fire_event, fire_event_if ReferenceKey = namedtuple("ReferenceKey", "schema identifier") @@ -152,10 +151,11 @@ class ClickHouseRelationsCache: :attr Set[str] schemas: The set of known/cached schemas """ - def __init__(self) -> None: + def __init__(self, log_cache_events: bool = False) -> None: self.relations: Dict[ReferenceKey, CachedRelation] = {} self.lock = threading.RLock() self.schemas: Set[Optional[str]] = set() + self.log_cache_events = log_cache_events def add_schema( self, @@ -233,10 +233,9 @@ def add(self, relation): :param BaseRelation relation: The underlying relation. """ - flags = get_flags() cached = CachedRelation(relation) fire_event_if( - flags.LOG_CACHE_EVENTS, + self.log_cache_events, lambda: CacheDumpGraph(before_after="before", action="adding", dump=self.dump_graph()), ) fire_event(CacheAction(action="add_relation", ref_key=_make_ref_key_dict(cached))) @@ -244,7 +243,7 @@ def add(self, relation): with self.lock: self._setdefault(cached) fire_event_if( - flags.LOG_CACHE_EVENTS, + self.log_cache_events, lambda: CacheDumpGraph(before_after="after", action="adding", dump=self.dump_graph()), ) @@ -368,9 +367,8 @@ def rename(self, old, new): ref_key_2=new_key._asdict(), ) ) - flags = get_flags() fire_event_if( - flags.LOG_CACHE_EVENTS, + self.log_cache_events, lambda: CacheDumpGraph(before_after="before", action="rename", dump=self.dump_graph()), ) @@ -381,7 +379,7 @@ def rename(self, old, new): self._setdefault(CachedRelation(new)) fire_event_if( - flags.LOG_CACHE_EVENTS, + self.log_cache_events, lambda: CacheDumpGraph(before_after="after", action="rename", dump=self.dump_graph()), ) diff --git a/dbt/adapters/clickhouse/column.py b/dbt/adapters/clickhouse/column.py index 393d0eaa..835b0f0d 100644 --- a/dbt/adapters/clickhouse/column.py +++ b/dbt/adapters/clickhouse/column.py @@ -3,7 +3,7 @@ from typing import Any, TypeVar from dbt.adapters.base.column import Column -from dbt.exceptions import DbtRuntimeError +from dbt_common.exceptions import DbtRuntimeError Self = TypeVar('Self', bound='ClickHouseColumn') diff --git a/dbt/adapters/clickhouse/connections.py b/dbt/adapters/clickhouse/connections.py index 17f18e1f..db8d9a13 100644 --- a/dbt/adapters/clickhouse/connections.py +++ b/dbt/adapters/clickhouse/connections.py @@ -4,8 +4,8 @@ from typing import TYPE_CHECKING, Any, Optional, Tuple, Union import dbt.exceptions +from dbt.adapters.contracts.connection import AdapterResponse, Connection from dbt.adapters.sql import SQLConnectionManager -from dbt.contracts.connection import AdapterResponse, Connection from dbt.adapters.clickhouse.dbclient import ChRetryableException, get_db_client from dbt.adapters.clickhouse.logger import logger @@ -68,7 +68,7 @@ def get_table_from_response(cls, response, column_names) -> "agate.Table": :param response: ClickHouse query result :param column_names: Table column names """ - from dbt.clients.agate_helper import table_from_data_flat + from dbt_common.clients.agate_helper import table_from_data_flat data = [] for row in response: @@ -101,7 +101,7 @@ def execute( query_result.result_set, query_result.column_names ) else: - from dbt.clients.agate_helper import empty_table + from dbt_common.clients.agate_helper import empty_table table = empty_table() return AdapterResponse(_message=status), table diff --git a/dbt/adapters/clickhouse/credentials.py b/dbt/adapters/clickhouse/credentials.py index d0775c6a..d4756e78 100644 --- a/dbt/adapters/clickhouse/credentials.py +++ b/dbt/adapters/clickhouse/credentials.py @@ -1,8 +1,8 @@ from dataclasses import dataclass from typing import Any, Dict, Optional -from dbt.contracts.connection import Credentials -from dbt.exceptions import DbtRuntimeError +from dbt.adapters.contracts.connection import Credentials +from dbt_common.exceptions import DbtRuntimeError @dataclass diff --git a/dbt/adapters/clickhouse/dbclient.py b/dbt/adapters/clickhouse/dbclient.py index 7bb78785..ba252d03 100644 --- a/dbt/adapters/clickhouse/dbclient.py +++ b/dbt/adapters/clickhouse/dbclient.py @@ -3,7 +3,8 @@ from abc import ABC, abstractmethod from typing import Dict -from dbt.exceptions import DbtConfigError, DbtDatabaseError, FailedToConnectError +from dbt.adapters.exceptions import FailedToConnectError +from dbt_common.exceptions import DbtConfigError, DbtDatabaseError from dbt.adapters.clickhouse.credentials import ClickHouseCredentials from dbt.adapters.clickhouse.errors import ( diff --git a/dbt/adapters/clickhouse/httpclient.py b/dbt/adapters/clickhouse/httpclient.py index a34e5a64..e809bca7 100644 --- a/dbt/adapters/clickhouse/httpclient.py +++ b/dbt/adapters/clickhouse/httpclient.py @@ -2,8 +2,8 @@ import clickhouse_connect from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError -from dbt.exceptions import DbtDatabaseError -from dbt.version import __version__ as dbt_version +from dbt.adapters.__about__ import version as dbt_adapters_version +from dbt_common.exceptions import DbtDatabaseError from dbt.adapters.clickhouse import ClickHouseColumn from dbt.adapters.clickhouse.__version__ import version as dbt_clickhouse_version @@ -60,7 +60,7 @@ def _create_client(self, credentials): compress=False if credentials.compression == '' else bool(credentials.compression), connect_timeout=credentials.connect_timeout, send_receive_timeout=credentials.send_receive_timeout, - client_name=f'dbt/{dbt_version} dbt-clickhouse/{dbt_clickhouse_version}', + client_name=f'dbt-adapters/{dbt_adapters_version} dbt-clickhouse/{dbt_clickhouse_version}', verify=credentials.verify, query_limit=0, settings=self._conn_settings, diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index e48229f6..8d200262 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -1,22 +1,33 @@ import csv import io from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Tuple, Union +from multiprocessing.context import SpawnContext +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) from dbt.adapters.base import AdapterConfig, available from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport from dbt.adapters.base.relation import BaseRelation, InformationSchema from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support +from dbt.adapters.contracts.relation import Path, RelationConfig +from dbt.adapters.events.types import ConstraintNotSupported from dbt.adapters.sql import SQLAdapter -from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.graph.nodes import ConstraintType, ModelLevelConstraint -from dbt.contracts.relation import Path -from dbt.events.functions import warn_or_error -from dbt.events.types import ConstraintNotSupported -from dbt.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError -from dbt.utils import filter_null_values - -import dbt +from dbt_common.contracts.constraints import ConstraintType, ModelLevelConstraint +from dbt_common.events.functions import warn_or_error +from dbt_common.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError +from dbt_common.utils import filter_null_values + from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache from dbt.adapters.clickhouse.column import ClickHouseColumn from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager @@ -67,8 +78,8 @@ class ClickHouseAdapter(SQLAdapter): } ) - def __init__(self, config): - BaseAdapter.__init__(self, config) + def __init__(self, config, mp_context: SpawnContext): + BaseAdapter.__init__(self, config, mp_context) self.cache = ClickHouseRelationsCache() @classmethod @@ -313,21 +324,28 @@ def get_ch_database(self, schema: str): except DbtRuntimeError: return None - def get_catalog(self, manifest) -> Tuple["agate.Table", List[Exception]]: - from dbt.clients.agate_helper import empty_table + def get_catalog( + self, + relation_configs: Iterable[RelationConfig], + used_schemas: FrozenSet[Tuple[str, str]], + ) -> Tuple["agate.Table", List[Exception]]: + from dbt_common.clients.agate_helper import empty_table - relations = self._get_catalog_relations(manifest) + relations = self._get_catalog_relations(relation_configs) schemas = set(relation.schema for relation in relations) if schemas: - catalog = self._get_one_catalog(InformationSchema(Path()), schemas, manifest) + catalog = self._get_one_catalog(InformationSchema(Path()), schemas, used_schemas) else: catalog = empty_table() return catalog, [] def get_filtered_catalog( - self, manifest: Manifest, relations: Optional[Set[BaseRelation]] = None + self, + relation_configs: Iterable[RelationConfig], + used_schemas: FrozenSet[Tuple[str, str]], + relations: Optional[Set[BaseRelation]] = None, ): - catalog, exceptions = self.get_catalog(manifest) + catalog, exceptions = self.get_catalog(relation_configs, used_schemas) if relations and catalog: relation_map = {(r.schema, r.identifier) for r in relations} @@ -512,8 +530,13 @@ def _expect_row_value(key: str, row: "agate.Row"): return row[key] -def _catalog_filter_schemas(manifest: Manifest) -> Callable[["agate.Row"], bool]: - schemas = frozenset((None, s) for d, s in manifest.get_used_schemas()) +def _catalog_filter_schemas( + used_schemas: FrozenSet[Tuple[str, str]] +) -> Callable[["agate.Row"], bool]: + """Return a function that takes a row and decides if the row should be + included in the catalog output. + """ + schemas = frozenset((d.lower(), s.lower()) for d, s in used_schemas) def test(row: "agate.Row") -> bool: table_database = _expect_row_value('table_database', row) diff --git a/dbt/adapters/clickhouse/logger.py b/dbt/adapters/clickhouse/logger.py index abbea143..5ea74245 100644 --- a/dbt/adapters/clickhouse/logger.py +++ b/dbt/adapters/clickhouse/logger.py @@ -1,3 +1,3 @@ -from dbt.events import AdapterLogger +from dbt.adapters.events.logging import AdapterLogger logger = AdapterLogger('dbt_clickhouse') diff --git a/dbt/adapters/clickhouse/nativeclient.py b/dbt/adapters/clickhouse/nativeclient.py index 772c8e96..e2864cff 100644 --- a/dbt/adapters/clickhouse/nativeclient.py +++ b/dbt/adapters/clickhouse/nativeclient.py @@ -3,8 +3,8 @@ import clickhouse_driver import pkg_resources from clickhouse_driver.errors import NetworkError, SocketTimeoutError -from dbt.exceptions import DbtDatabaseError -from dbt.version import __version__ as dbt_version +from dbt.adapters.__about__ import version as dbt_adapters_version +from dbt_common.exceptions import DbtDatabaseError from dbt.adapters.clickhouse import ClickHouseColumn, ClickHouseCredentials from dbt.adapters.clickhouse.__version__ import version as dbt_clickhouse_version @@ -61,7 +61,7 @@ def _create_client(self, credentials: ClickHouseCredentials): port=credentials.port, user=credentials.user, password=credentials.password, - client_name=f'dbt/{dbt_version} dbt-clickhouse/{dbt_clickhouse_version} clickhouse-driver/{driver_version}', + client_name=f'dbt-adapters/{dbt_adapters_version} dbt-clickhouse/{dbt_clickhouse_version} clickhouse-driver/{driver_version}', secure=credentials.secure, verify=credentials.verify, connect_timeout=credentials.connect_timeout, diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index 90e88a63..42db9818 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -1,15 +1,16 @@ from dataclasses import dataclass, field from typing import Any, Dict, Optional, Type -from dbt.adapters.base.relation import BaseRelation, Policy, Self -from dbt.contracts.graph.nodes import ManifestNode, SourceDefinition -from dbt.contracts.relation import HasQuoting, Path -from dbt.dataclass_schema import StrEnum -from dbt.exceptions import DbtRuntimeError -from dbt.utils import deep_merge, merge +from dbt.adapters.base.relation import BaseRelation, Path, Policy, Self +from dbt.adapters.contracts.relation import HasQuoting, RelationConfig +from dbt_common.dataclass_schema import StrEnum +from dbt_common.exceptions import DbtRuntimeError +from dbt_common.utils import deep_merge from dbt.adapters.clickhouse.query import quote_identifier +NODE_TYPE_SOURCE = 'source' + @dataclass class ClickHouseQuotePolicy(Policy): @@ -85,51 +86,47 @@ def get_on_cluster( return False @classmethod - def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) -> Self: - source_quoting = source.quoting.to_dict(omit_none=True) - source_quoting.pop("column", None) + def create_from( + cls: Type[Self], + quoting: HasQuoting, + relation_config: RelationConfig, + **kwargs: Any, + ) -> Self: + quote_policy = kwargs.pop("quote_policy", {}) + + config_quoting = relation_config.quoting_dict + config_quoting.pop("column", None) + # precedence: kwargs quoting > relation config quoting > base quoting > default quoting quote_policy = deep_merge( cls.get_default_quote_policy().to_dict(omit_none=True), - source_quoting, - kwargs.get("quote_policy", {}), + quoting.quoting, + config_quoting, + quote_policy, ) # If the database is set, and the source schema is "defaulted" to the source.name, override the # schema with the database instead, since that's presumably what's intended for clickhouse - schema = source.schema - if schema == source.source_name and source.database: - schema = source.database - - return cls.create( - database='', - schema=schema, - identifier=source.identifier, - quote_policy=quote_policy, - **kwargs, - ) - - @classmethod - def create_from_node( - cls: Type[Self], - config: HasQuoting, - node: ManifestNode, - quote_policy: Optional[Dict[str, bool]] = None, - **kwargs: Any, - ) -> Self: - if quote_policy is None: - quote_policy = {} - - quote_policy = merge(config.quoting, quote_policy) + schema = relation_config.schema + can_on_cluster = None + # We placed a hardcoded const (instead of importing it from dbt-core) in order to decouple the packages + if relation_config.resource_type == NODE_TYPE_SOURCE: + if schema == relation_config.source_name and relation_config.database: + schema = relation_config.database - cluster = config.credentials.cluster if config.credentials.cluster else '' - materialized = node.get_materialization() if node.get_materialization() else '' - engine = node.config.get('engine') if node.config.get('engine') else '' - can_on_cluster = cls.get_on_cluster(cluster, materialized, engine) + else: + cluster = quoting.credentials.cluster if quoting.credentials.cluster else '' + materialized = ( + relation_config.config.materialized if relation_config.config.materialized else '' + ) + engine = ( + relation_config.config.get('engine') if relation_config.config.get('engine') else '' + ) + can_on_cluster = cls.get_on_cluster(cluster, materialized, engine) return cls.create( database='', - schema=node.schema, - identifier=node.alias, + schema=schema, + identifier=relation_config.identifier, quote_policy=quote_policy, can_on_cluster=can_on_cluster, **kwargs, diff --git a/dbt/adapters/clickhouse/util.py b/dbt/adapters/clickhouse/util.py index 7114dbde..b730b9fd 100644 --- a/dbt/adapters/clickhouse/util.py +++ b/dbt/adapters/clickhouse/util.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from dbt.exceptions import DbtRuntimeError +from dbt_common.exceptions import DbtRuntimeError def compare_versions(v1: str, v2: str) -> int: diff --git a/dev_requirements.txt b/dev_requirements.txt index 9c72b0c5..abbd0573 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,9 +1,10 @@ -dbt-core~=1.7.11 +dbt-core~=1.8.0 +dbt-adapters~=1.1.1 +dbt-tests-adapter==1.8.0 clickhouse-connect>=0.7.6 clickhouse-driver>=0.2.7 pytest>=7.2.0 pytest-dotenv==0.5.2 -dbt-tests-adapter~=1.7.11 black==24.3.0 isort==5.10.1 mypy==0.991 diff --git a/setup.py b/setup.py index 1958f8c5..43b2151e 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ def _dbt_clickhouse_version(): package_version = _dbt_clickhouse_version() description = '''The Clickhouse plugin for dbt (data build tool)''' -dbt_version = '1.7.0' +dbt_version = '1.8.0' dbt_minor = '.'.join(dbt_version.split('.')[0:2]) if not package_version.startswith(dbt_minor):