-
Notifications
You must be signed in to change notification settings - Fork 40
/
relation.py
155 lines (122 loc) · 4.46 KB
/
relation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from abc import ABC
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Dict, Optional, Any, Union, List
from dbt_common.contracts.config.materialization import OnConfigurationChangeOption
from dbt_common.contracts.util import Replaceable
from dbt_common.dataclass_schema import StrEnum, dbtClassMixin
from dbt_common.exceptions import CompilationError, DataclassNotDictError
from dbt_common.utils import deep_merge
from typing_extensions import Protocol
class RelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
MaterializedView = "materialized_view"
External = "external"
Ephemeral = "ephemeral"
class MaterializationContract(Protocol):
enforced: bool
alias_types: bool
class MaterializationConfig(Mapping, ABC):
materialized: str
incremental_strategy: Optional[str]
persist_docs: Dict[str, Any]
column_types: Dict[str, Any]
full_refresh: Optional[bool]
quoting: Dict[str, Any]
unique_key: Union[str, List[str], None]
on_schema_change: Optional[str]
on_configuration_change: OnConfigurationChangeOption
contract: MaterializationContract
extra: Dict[str, Any]
def __contains__(self, item):
...
def __delitem__(self, key):
...
class RelationConfig(Protocol):
name: str
database: str
schema: str
identifier: str
compiled_code: Optional[str]
quoting_dict: Dict[str, bool]
config: Optional[MaterializationConfig]
class ComponentName(StrEnum):
Database = "database"
Schema = "schema"
Identifier = "identifier"
class HasQuoting(Protocol):
quoting: Dict[str, bool]
class FakeAPIObject(dbtClassMixin, Replaceable, Mapping):
# override the mapping truthiness, len is always >1
def __bool__(self):
return True
def __getitem__(self, key):
try:
return getattr(self, key)
except AttributeError:
raise KeyError(key) from None
def __iter__(self):
raise DataclassNotDictError(self)
def __len__(self):
raise DataclassNotDictError(self)
def incorporate(self, **kwargs):
value = self.to_dict(omit_none=True)
value = deep_merge(value, kwargs)
return self.from_dict(value)
@dataclass
class Policy(FakeAPIObject):
database: bool = True
schema: bool = True
identifier: bool = True
def get_part(self, key: ComponentName) -> bool:
if key == ComponentName.Database:
return self.database
elif key == ComponentName.Schema:
return self.schema
elif key == ComponentName.Identifier:
return self.identifier
else:
raise ValueError(
"Got a key of {}, expected one of {}".format(key, list(ComponentName))
)
def replace_dict(self, dct: Dict[ComponentName, bool]):
kwargs: Dict[str, bool] = {}
for k, v in dct.items():
kwargs[str(k)] = v
return self.replace(**kwargs)
@dataclass
class Path(FakeAPIObject):
database: Optional[str] = None
schema: Optional[str] = None
identifier: Optional[str] = None
def __post_init__(self):
# handle pesky jinja2.Undefined sneaking in here and messing up rende
if not isinstance(self.database, (type(None), str)):
raise CompilationError("Got an invalid path database: {}".format(self.database))
if not isinstance(self.schema, (type(None), str)):
raise CompilationError("Got an invalid path schema: {}".format(self.schema))
if not isinstance(self.identifier, (type(None), str)):
raise CompilationError("Got an invalid path identifier: {}".format(self.identifier))
def get_lowered_part(self, key: ComponentName) -> Optional[str]:
part = self.get_part(key)
if part is not None:
part = part.lower()
return part
def get_part(self, key: ComponentName) -> Optional[str]:
if key == ComponentName.Database:
return self.database
elif key == ComponentName.Schema:
return self.schema
elif key == ComponentName.Identifier:
return self.identifier
else:
raise ValueError(
"Got a key of {}, expected one of {}".format(key, list(ComponentName))
)
def replace_dict(self, dct: Dict[ComponentName, str]):
kwargs: Dict[str, str] = {}
for k, v in dct.items():
kwargs[str(k)] = v
return self.replace(**kwargs)