Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Primary key, column and table discovery queries #42

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
47 changes: 24 additions & 23 deletions tap_redshift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,20 @@
'password',
'start_date'
]

STRING_TYPES = {'char', 'character', 'nchar', 'bpchar', 'text', 'varchar',
'character varying', 'nvarchar'}

BYTES_FOR_INTEGER_TYPE = {
'int2': 2,
'smallint': 2,
'int': 4,
'int4': 4,
'int8': 8
'integer': 4,
'int8': 8,
'bigint': 8
}

FLOAT_TYPES = {'float', 'float4', 'float8'}
FLOAT_TYPES = {'float', 'float4', 'float8', 'double precision', 'real'}

DATE_TYPES = {'date'}

Expand All @@ -76,38 +78,37 @@ def discover_catalog(conn, db_schema):
conn,
"""
SELECT table_name, table_type
FROM INFORMATION_SCHEMA.Tables
FROM SVV_TABLES
WHERE table_schema = '{}'
""".format(db_schema))

column_specs = select_all(
conn,
"""
SELECT c.table_name, c.ordinal_position, c.column_name, c.udt_name,
c.is_nullable
FROM INFORMATION_SCHEMA.Tables t
JOIN INFORMATION_SCHEMA.Columns c ON c.table_name = t.table_name
WHERE t.table_schema = '{}'
ORDER BY c.table_name, c.ordinal_position
SELECT table_name, ordinal_position, column_name, data_type, is_nullable
FROM SVV_COLUMNS
WHERE table_schema = '{}'
ORDER BY table_name, ordinal_position
""".format(db_schema))

pk_specs = select_all(
conn,
"""
SELECT kc.table_name, kc.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kc
ON kc.table_name = tc.table_name AND
kc.table_schema = tc.table_schema AND
kc.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'PRIMARY KEY' AND
tc.table_schema = '{}'
SELECT
c.relname AS table_name,
a.attname AS column_name
FROM
pg_catalog.pg_constraint AS con
JOIN pg_catalog.pg_class AS c ON c.oid = con.conrelid
JOIN pg_catalog.pg_attribute AS a ON a.attrelid = c.oid AND a.attnum = ANY(con.conkey)
JOIN pg_catalog.pg_namespace AS n ON n.oid = c.relnamespace
WHERE n.nspname = '{}' AND contype IN ('p')
ORDER BY
tc.table_schema,
tc.table_name,
kc.ordinal_position
n.nspname,
table_name,
a.attnum;
""".format(db_schema))

entries = []
table_columns = [{'name': k, 'columns': [
{'pos': t[1], 'name': t[2], 'type': t[3],
Expand Down Expand Up @@ -277,7 +278,7 @@ def get_stream_version(tap_stream_id, state):
def row_to_record(catalog_entry, version, row, columns, time_extracted):
row_to_persist = ()
for idx, elem in enumerate(row):
if isinstance(elem, datetime.date):
if isinstance(elem, datetime.datetime):
elem = elem.isoformat('T') + 'Z'
row_to_persist += (elem,)
return singer.RecordMessage(
Expand Down