-
Notifications
You must be signed in to change notification settings - Fork 179
/
adapters.sql
288 lines (235 loc) · 9.58 KB
/
adapters.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
{% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%}
{%- if language == 'sql' -%}
{%- set transient = config.get('transient', default=true) -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}
{%- if cluster_by_keys is not none and cluster_by_keys is string -%}
{%- set cluster_by_keys = [cluster_by_keys] -%}
{%- endif -%}
{%- if cluster_by_keys is not none -%}
{%- set cluster_by_string = cluster_by_keys|join(", ")-%}
{% else %}
{%- set cluster_by_string = none -%}
{%- endif -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation }} {% if copy_grants and not temporary -%} copy grants {%- endif %} as
(
{%- if cluster_by_string is not none -%}
select * from(
{{ compiled_code }}
) order by ({{ cluster_by_string }})
{%- else -%}
{{ compiled_code }}
{%- endif %}
);
{% if cluster_by_string is not none and not temporary -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
{%- endif -%}
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%}
alter table {{relation}} resume recluster;
{%- endif -%}
{%- elif language == 'python' -%}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }}
{%- else -%}
{% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %}
{%- endif -%}
{% endmacro %}
{% macro get_column_comment_sql(column_name, column_dict) -%}
{% if (column_name|upper in column_dict) -%}
{% set matched_column = column_name|upper -%}
{% elif (column_name|lower in column_dict) -%}
{% set matched_column = column_name|lower -%}
{% elif (column_name in column_dict) -%}
{% set matched_column = column_name -%}
{% else -%}
{% set matched_column = None -%}
{% endif -%}
{% if matched_column -%}
{{ adapter.quote(column_name) }} COMMENT $${{ column_dict[matched_column]['description'] | replace('$', '[$]') }}$$
{%- else -%}
{{ adapter.quote(column_name) }} COMMENT $$$$
{%- endif -%}
{% endmacro %}
{% macro get_persist_docs_column_list(model_columns, query_columns) %}
(
{% for column_name in query_columns %}
{{ get_column_comment_sql(column_name, model_columns) }}
{{- ", " if not loop.last else "" }}
{% endfor %}
)
{% endmacro %}
{% macro snowflake__create_view_as(relation, sql) -%}
{%- set secure = config.get('secure', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
create or replace {% if secure -%}
secure
{%- endif %} view {{ relation }}
{% if config.persist_column_docs() -%}
{% set model_columns = model.columns %}
{% set query_columns = get_columns_in_query(sql) %}
{{ get_persist_docs_column_list(model_columns, query_columns) }}
{%- endif %}
{% if copy_grants -%} copy grants {%- endif %} as (
{{ sql }}
);
{% endmacro %}
{% macro snowflake__get_columns_in_relation(relation) -%}
{%- set sql -%}
describe table {{ relation }}
{%- endset -%}
{%- set result = run_query(sql) -%}
{% set maximum = 10000 %}
{% if (result | length) >= maximum %}
{% set msg %}
Too many columns in relation {{ relation }}! dbt can only get
information about relations with fewer than {{ maximum }} columns.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}
{% set columns = [] %}
{% for row in result %}
{% do columns.append(api.Column.from_description(row['name'], row['type'])) %}
{% endfor %}
{% do return(columns) %}
{% endmacro %}
{% macro snowflake__list_schemas(database) -%}
{# 10k limit from here: https://docs.snowflake.net/manuals/sql-reference/sql/show-schemas.html#usage-notes #}
{% set maximum = 10000 %}
{% set sql -%}
show terse schemas in database {{ database }}
limit {{ maximum }}
{%- endset %}
{% set result = run_query(sql) %}
{% if (result | length) >= maximum %}
{% set msg %}
Too many schemas in database {{ database }}! dbt can only get
information about databases with fewer than {{ maximum }} schemas.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}
{{ return(result) }}
{% endmacro %}
{% macro snowflake__list_relations_without_caching(schema_relation) %}
{%- set sql -%}
show terse objects in {{ schema_relation }}
{%- endset -%}
{%- set result = run_query(sql) -%}
{% set maximum = 10000 %}
{% if (result | length) >= maximum %}
{% set msg %}
Too many schemas in schema {{ schema_relation }}! dbt can only get
information about schemas with fewer than {{ maximum }} objects.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}
{%- do return(result) -%}
{% endmacro %}
{% macro snowflake__check_schema_exists(information_schema, schema) -%}
{% call statement('check_schema_exists', fetch_result=True) -%}
select count(*)
from {{ information_schema }}.schemata
where upper(schema_name) = upper('{{ schema }}')
and upper(catalog_name) = upper('{{ information_schema.database }}')
{%- endcall %}
{{ return(load_result('check_schema_exists').table) }}
{%- endmacro %}
{% macro snowflake__rename_relation(from_relation, to_relation) -%}
{% call statement('rename_relation') -%}
alter table {{ from_relation }} rename to {{ to_relation }}
{%- endcall %}
{% endmacro %}
{% macro snowflake__alter_column_type(relation, column_name, new_column_type) -%}
{% call statement('alter_column_type') %}
alter table {{ relation }} alter {{ adapter.quote(column_name) }} set data type {{ new_column_type }};
{% endcall %}
{% endmacro %}
{% macro snowflake__alter_relation_comment(relation, relation_comment) -%}
comment on {{ relation.type }} {{ relation }} IS $${{ relation_comment | replace('$', '[$]') }}$$;
{% endmacro %}
{% macro snowflake__alter_column_comment(relation, column_dict) -%}
{% set existing_columns = adapter.get_columns_in_relation(relation) | map(attribute="name") | list %}
alter {{ relation.type }} {{ relation }} alter
{% for column_name in existing_columns if (column_name in existing_columns) or (column_name|lower in existing_columns) %}
{{ get_column_comment_sql(column_name, column_dict) }} {{- ',' if not loop.last else ';' }}
{% endfor %}
{% endmacro %}
{% macro get_current_query_tag() -%}
{{ return(run_query("show parameters like 'query_tag' in session").rows[0]['value']) }}
{% endmacro %}
{% macro set_query_tag() -%}
{{ return(adapter.dispatch('set_query_tag', 'dbt')()) }}
{% endmacro %}
{% macro snowflake__set_query_tag() -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}
{% macro unset_query_tag(original_query_tag) -%}
{{ return(adapter.dispatch('unset_query_tag', 'dbt')(original_query_tag)) }}
{% endmacro %}
{% macro snowflake__unset_query_tag(original_query_tag) -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% if original_query_tag %}
{{ log("Resetting query_tag to '" ~ original_query_tag ~ "'.") }}
{% do run_query("alter session set query_tag = '{}'".format(original_query_tag)) %}
{% else %}
{{ log("No original query_tag, unsetting parameter.") }}
{% do run_query("alter session unset query_tag") %}
{% endif %}
{% endif %}
{% endmacro %}
{% macro snowflake__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% if add_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} add column
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% if remove_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} drop column
{% for column in remove_columns %}
{{ column.name }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}
{% macro snowflake_dml_explicit_transaction(dml) %}
{#
Use this macro to wrap all INSERT, MERGE, UPDATE, DELETE, and TRUNCATE
statements before passing them into run_query(), or calling in the 'main' statement
of a materialization
#}
{% set dml_transaction -%}
begin;
{{ dml }};
commit;
{%- endset %}
{% do return(dml_transaction) %}
{% endmacro %}
{% macro snowflake__truncate_relation(relation) -%}
{% set truncate_dml %}
truncate table {{ relation }}
{% endset %}
{% call statement('truncate_relation') -%}
{{ snowflake_dml_explicit_transaction(truncate_dml) }}
{%- endcall %}
{% endmacro %}