diff --git a/tap_redshift/__init__.py b/tap_redshift/__init__.py index 85305f5..b611abc 100644 --- a/tap_redshift/__init__.py +++ b/tap_redshift/__init__.py @@ -48,18 +48,20 @@ 'password', 'start_date' ] - STRING_TYPES = {'char', 'character', 'nchar', 'bpchar', 'text', 'varchar', 'character varying', 'nvarchar'} BYTES_FOR_INTEGER_TYPE = { 'int2': 2, + 'smallint': 2, 'int': 4, 'int4': 4, - 'int8': 8 + 'integer': 4, + 'int8': 8, + 'bigint': 8 } -FLOAT_TYPES = {'float', 'float4', 'float8'} +FLOAT_TYPES = {'float', 'float4', 'float8', 'double precision', 'real'} DATE_TYPES = {'date'} @@ -76,38 +78,37 @@ def discover_catalog(conn, db_schema): conn, """ SELECT table_name, table_type - FROM INFORMATION_SCHEMA.Tables + FROM SVV_TABLES WHERE table_schema = '{}' """.format(db_schema)) column_specs = select_all( conn, """ - SELECT c.table_name, c.ordinal_position, c.column_name, c.udt_name, - c.is_nullable - FROM INFORMATION_SCHEMA.Tables t - JOIN INFORMATION_SCHEMA.Columns c ON c.table_name = t.table_name - WHERE t.table_schema = '{}' - ORDER BY c.table_name, c.ordinal_position + SELECT table_name, ordinal_position, column_name, data_type, is_nullable + FROM SVV_COLUMNS + WHERE table_schema = '{}' + ORDER BY table_name, ordinal_position """.format(db_schema)) pk_specs = select_all( conn, """ - SELECT kc.table_name, kc.column_name - FROM information_schema.table_constraints tc - JOIN information_schema.key_column_usage kc - ON kc.table_name = tc.table_name AND - kc.table_schema = tc.table_schema AND - kc.constraint_name = tc.constraint_name - WHERE tc.constraint_type = 'PRIMARY KEY' AND - tc.table_schema = '{}' + SELECT + c.relname AS table_name, + a.attname AS column_name + FROM + pg_catalog.pg_constraint AS con + JOIN pg_catalog.pg_class AS c ON c.oid = con.conrelid + JOIN pg_catalog.pg_attribute AS a ON a.attrelid = c.oid AND a.attnum = ANY(con.conkey) + JOIN pg_catalog.pg_namespace AS n ON n.oid = c.relnamespace + WHERE n.nspname = '{}' AND contype IN ('p') ORDER BY - tc.table_schema, - tc.table_name, - kc.ordinal_position + n.nspname, + table_name, + a.attnum; """.format(db_schema)) - + entries = [] table_columns = [{'name': k, 'columns': [ {'pos': t[1], 'name': t[2], 'type': t[3], @@ -277,7 +278,7 @@ def get_stream_version(tap_stream_id, state): def row_to_record(catalog_entry, version, row, columns, time_extracted): row_to_persist = () for idx, elem in enumerate(row): - if isinstance(elem, datetime.date): + if isinstance(elem, datetime.datetime): elem = elem.isoformat('T') + 'Z' row_to_persist += (elem,) return singer.RecordMessage(