-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Seed rewrite #618
Seed rewrite #618
Changes from 28 commits
4343697
0bf8489
5b3cf6f
4899e0b
28d2886
dbac3c5
07f283f
935d2a5
e49cab0
434e7b7
23a01ce
584578f
936e68e
224174a
ce25e78
5b97a86
e9b5795
b86159f
797a966
34f5e41
4f67dca
58b98f0
90380d5
04b05a2
122fba7
3688286
256fca0
e60a55c
f15044f
868d338
0b0c423
6d9af2b
4a49afd
f518505
9d92eb6
c597803
c86f086
97b616e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,12 @@ | ||
import psycopg2 | ||
import agate | ||
|
||
from contextlib import contextmanager | ||
|
||
import dbt.adapters.default | ||
import dbt.compat | ||
import dbt.exceptions | ||
from dbt.utils import max_digits | ||
|
||
from dbt.logger import GLOBAL_LOGGER as logger | ||
|
||
|
@@ -165,3 +167,72 @@ def cancel_connection(cls, profile, connection): | |
res = cursor.fetchone() | ||
|
||
logger.debug("Cancel query '{}': {}".format(connection_name, res)) | ||
|
||
@classmethod | ||
def convert_text_type(cls, agate_table, col_idx): | ||
return "text" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why go with this approach here, instead of getting the max length using http://agate.readthedocs.io/en/1.6.0/api/aggregations.html#agate.MaxLength (similar to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PostgreSQL and Snowflake both treat the N portion of |
||
|
||
@classmethod | ||
def convert_number_type(cls, agate_table, col_idx): | ||
column = agate_table.columns[col_idx] | ||
precision = max_digits(column.values_without_nulls()) | ||
# agate uses the term Precision but in this context, it is really the | ||
# scale - ie. the number of decimal places | ||
scale = agate_table.aggregate(agate.MaxPrecision(col_idx)) | ||
if not scale: | ||
return "integer" | ||
return "numeric({}, {})".format(precision, scale) | ||
|
||
@classmethod | ||
def convert_boolean_type(cls, agate_table, col_idx): | ||
return "boolean" | ||
|
||
@classmethod | ||
def convert_datetime_type(cls, agate_table, col_idx): | ||
return "timestamp without time zone" | ||
|
||
@classmethod | ||
def convert_date_type(cls, agate_table, col_idx): | ||
return "date" | ||
|
||
@classmethod | ||
def convert_time_type(cls, agate_table, col_idx): | ||
return "time" | ||
|
||
@classmethod | ||
def create_csv_table(cls, profile, schema, table_name, agate_table): | ||
col_sqls = [] | ||
for idx, col_name in enumerate(agate_table.column_names): | ||
type_ = cls.convert_agate_type(agate_table, idx) | ||
col_sqls.append('{} {}'.format(col_name, type_)) | ||
sql = 'create table "{}"."{}" ({})'.format(schema, table_name, | ||
", ".join(col_sqls)) | ||
return cls.add_query(profile, sql) | ||
|
||
@classmethod | ||
def reset_csv_table(cls, profile, schema, table_name, agate_table, | ||
full_refresh=False): | ||
if full_refresh: | ||
cls.drop_table(profile, schema, table_name, None) | ||
cls.create_csv_table(profile, schema, table_name, agate_table) | ||
else: | ||
cls.truncate(profile, schema, table_name) | ||
|
||
@classmethod | ||
def load_csv_rows(cls, profile, schema, table_name, agate_table): | ||
bindings = [] | ||
placeholders = [] | ||
cols_sql = ", ".join(c for c in agate_table.column_names) | ||
|
||
for row in agate_table.rows: | ||
bindings += row | ||
placeholders.append("({})".format( | ||
", ".join("%s" for _ in agate_table.column_names))) | ||
|
||
sql = ('insert into {}.{} ({}) values {}' | ||
.format(cls.quote(schema), | ||
cls.quote(table_name), | ||
cols_sql, | ||
",\n".join(placeholders))) | ||
|
||
cls.add_query(profile, sql, bindings=bindings) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmcarthur this failed to run for me on Redshift. If
bindings
is None, then this expression evaluates to an empty tuple. I think we want that to beNone
, right? http://initd.org/psycopg/docs/cursor.html#cursor.execute