-
Notifications
You must be signed in to change notification settings - Fork 413
/
Copy pathintrospection.py
158 lines (140 loc) · 4.98 KB
/
introspection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
_TYPEINFO = '''\
(
SELECT
t.oid AS oid,
ns.nspname AS ns,
t.typname AS name,
t.typtype AS kind,
(CASE WHEN t.typtype = 'd' THEN
(WITH RECURSIVE typebases(oid, depth) AS (
SELECT
t2.typbasetype AS oid,
0 AS depth
FROM
pg_type t2
WHERE
t2.oid = t.oid
UNION ALL
SELECT
t2.typbasetype AS oid,
tb.depth + 1 AS depth
FROM
pg_type t2,
typebases tb
WHERE
tb.oid = t2.oid
AND t2.typbasetype != 0
) SELECT oid FROM typebases ORDER BY depth DESC LIMIT 1)
ELSE NULL
END) AS basetype,
t.typreceive::oid != 0 AND t.typsend::oid != 0
AS has_bin_io,
t.typelem AS elemtype,
elem_t.typdelim AS elemdelim,
range_t.rngsubtype AS range_subtype,
(CASE WHEN t.typtype = 'r' THEN
(SELECT
range_elem_t.typreceive::oid != 0 AND
range_elem_t.typsend::oid != 0
FROM
pg_catalog.pg_type AS range_elem_t
WHERE
range_elem_t.oid = range_t.rngsubtype)
ELSE
elem_t.typreceive::oid != 0 AND
elem_t.typsend::oid != 0
END) AS elem_has_bin_io,
(CASE WHEN t.typtype = 'c' THEN
(SELECT
array_agg(ia.atttypid ORDER BY ia.attnum)
FROM
pg_attribute ia
INNER JOIN pg_class c
ON (ia.attrelid = c.oid)
WHERE
ia.attnum > 0 AND NOT ia.attisdropped
AND c.reltype = t.oid)
ELSE NULL
END) AS attrtypoids,
(CASE WHEN t.typtype = 'c' THEN
(SELECT
array_agg(ia.attname::text ORDER BY ia.attnum)
FROM
pg_attribute ia
INNER JOIN pg_class c
ON (ia.attrelid = c.oid)
WHERE
ia.attnum > 0 AND NOT ia.attisdropped
AND c.reltype = t.oid)
ELSE NULL
END) AS attrnames
FROM
pg_catalog.pg_type AS t
INNER JOIN pg_catalog.pg_namespace ns ON (
ns.oid = t.typnamespace)
LEFT JOIN pg_type elem_t ON (
t.typlen = -1 AND
t.typelem != 0 AND
t.typelem = elem_t.oid
)
LEFT JOIN pg_range range_t ON (
t.oid = range_t.rngtypid
)
)
'''
INTRO_LOOKUP_TYPES = '''\
WITH RECURSIVE typeinfo_tree(
oid, ns, name, kind, basetype, has_bin_io, elemtype, elemdelim,
range_subtype, elem_has_bin_io, attrtypoids, attrnames, depth)
AS (
SELECT
ti.oid, ti.ns, ti.name, ti.kind, ti.basetype, ti.has_bin_io,
ti.elemtype, ti.elemdelim, ti.range_subtype, ti.elem_has_bin_io,
ti.attrtypoids, ti.attrnames, 0
FROM
{typeinfo} AS ti
WHERE
ti.oid = any($1::oid[])
UNION ALL
SELECT
ti.oid, ti.ns, ti.name, ti.kind, ti.basetype, ti.has_bin_io,
ti.elemtype, ti.elemdelim, ti.range_subtype, ti.elem_has_bin_io,
ti.attrtypoids, ti.attrnames, tt.depth + 1
FROM
{typeinfo} ti,
typeinfo_tree tt
WHERE
(tt.elemtype IS NOT NULL AND ti.oid = tt.elemtype)
OR (tt.attrtypoids IS NOT NULL AND ti.oid = any(tt.attrtypoids))
OR (tt.range_subtype IS NOT NULL AND ti.oid = tt.range_subtype)
)
SELECT DISTINCT
*
FROM
typeinfo_tree
ORDER BY
depth DESC
'''.format(typeinfo=_TYPEINFO)
TYPE_BY_NAME = '''\
SELECT
t.oid,
t.typelem AS elemtype,
t.typtype AS kind
FROM
pg_catalog.pg_type AS t
INNER JOIN pg_catalog.pg_namespace ns ON (ns.oid = t.typnamespace)
WHERE
t.typname = $1 AND ns.nspname = $2
'''
# 'b' for a base type, 'd' for a domain, 'e' for enum.
SCALAR_TYPE_KINDS = (b'b', b'd', b'e')
def is_scalar_type(typeinfo) -> bool:
return (
typeinfo['kind'] in SCALAR_TYPE_KINDS and
not typeinfo['elemtype']
)