Skip to content

Commit

Permalink
Initial dependency updates for 1.7.x
Browse files Browse the repository at this point in the history
  • Loading branch information
genzgd committed Dec 7, 2023
1 parent 223f1e6 commit ed11c6c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 37 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
### Release [1.7.0], TBD
### Release [1.7.0], 2023-12-07
#### Improvements
- Minimal compatibility with dbt 1.7.x. The date_spine macro and additional automated tests have not been implemented,
but are planned for a future patch release.
- DBT 1.7 introduces a (complex) optimization mechanism for retrieving a dbt catalog which is overkill for ClickHouse
(which has no separate schema/database level), so this release includes some internal catalog changes to simplify that process.

### Release [1.6.2], 2023-12-06
#### Bug Fix
Expand Down
72 changes: 36 additions & 36 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import csv
import io
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union

import agate
from dbt.adapters.base import AdapterConfig, available
from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport, catch_as_completed
from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support
from dbt.adapters.sql import SQLAdapter
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ConstraintType, ModelLevelConstraint
from dbt.contracts.relation import RelationType
from dbt.contracts.relation import Path, RelationType
from dbt.events.functions import warn_or_error
from dbt.events.types import ConstraintNotSupported
from dbt.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError
from dbt.utils import executor, filter_null_values
from dbt.utils import filter_null_values

import dbt
from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache
from dbt.adapters.clickhouse.column import ClickHouseColumn
from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
Expand Down Expand Up @@ -56,6 +57,13 @@ class ClickHouseAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED,
}

_capabilities: CapabilityDict = CapabilityDict(
{
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Unsupported),
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Unsupported),
}
)

def __init__(self, config):
BaseAdapter.__init__(self, config)
self.cache = ClickHouseRelationsCache()
Expand Down Expand Up @@ -295,37 +303,29 @@ def get_ch_database(self, schema: str):
except DbtRuntimeError:
return None

def get_catalog(self, manifest):
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
for schema in schemas:
futures.append(
tpe.submit_connected(
self,
schema,
self._get_one_catalog,
info,
[schema],
manifest,
)
)
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def _get_one_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
) -> agate.Table:
if len(schemas) != 1:
raise DbtRuntimeError(
f"Expected only one schema in clickhouse _get_one_catalog, found ' f'{schemas}'"
)
return super()._get_one_catalog(information_schema, schemas, manifest)
def get_catalog(self, manifest) -> Tuple[agate.Table, List[Exception]]:
relations = self._get_catalog_relations(manifest)
schemas = set(relation.schema for relation in relations)
if schemas:
catalog = self._get_one_catalog(InformationSchema(Path()), schemas, manifest)
else:
catalog = dbt.clients.agate_helper.empty_table()
return catalog, []

def get_filtered_catalog(
self, manifest: Manifest, relations: Optional[Set[BaseRelation]] = None
):
catalog, exceptions = self.get_catalog(manifest)
if relations and catalog:
relation_map = {(r.schema, r.identifier) for r in relations}

def in_map(row: agate.Row):
s = _expect_row_value("table_schema", row)
i = _expect_row_value("table_name", row)
return (s, i) in relation_map

catalog = catalog.where(in_map)
return catalog, exceptions

def get_rows_different_sql(
self,
Expand Down

0 comments on commit ed11c6c

Please sign in to comment.