13
13
import struct
14
14
import urllib .parse
15
15
16
+ from . import capabilities
16
17
from . import cursor
17
18
from . import introspection
18
19
from . import prepared_stmt
@@ -31,7 +32,8 @@ class Connection:
31
32
'_type_by_name_stmt' , '_top_xact' , '_uid' , '_aborted' ,
32
33
'_stmt_cache_max_size' , '_stmt_cache' , '_stmts_to_close' ,
33
34
'_addr' , '_opts' , '_command_timeout' , '_listeners' ,
34
- '_server_version' , '_intro_query' )
35
+ '_server_version' , '_server_caps' , '_intro_query' ,
36
+ '_reset_query' )
35
37
36
38
def __init__ (self , protocol , transport , loop , addr , opts , * ,
37
39
statement_cache_size , command_timeout ):
@@ -55,15 +57,40 @@ def __init__(self, protocol, transport, loop, addr, opts, *,
55
57
56
58
self ._listeners = {}
57
59
58
- ver_string = self ._protocol .get_settings ().server_version
60
+ settings = self ._protocol .get_settings ()
61
+ ver_string = settings .server_version
59
62
self ._server_version = \
60
63
serverversion .split_server_version_string (ver_string )
61
64
65
+ self ._server_caps = caps = capabilities .detect_server_capabilities (
66
+ self ._server_version , settings )
67
+
62
68
if self ._server_version < (9 , 2 ):
63
69
self ._intro_query = introspection .INTRO_LOOKUP_TYPES_91
64
70
else :
65
71
self ._intro_query = introspection .INTRO_LOOKUP_TYPES
66
72
73
+ _reset_query = ''
74
+ if caps .advisory_locks :
75
+ _reset_query += 'SELECT pg_advisory_unlock_all();\n '
76
+ if caps .cursors :
77
+ _reset_query += 'CLOSE ALL;\n '
78
+ if caps .notifications and caps .plpgsql :
79
+ _reset_query += '''
80
+ DO $$
81
+ BEGIN
82
+ PERFORM * FROM pg_listening_channels() LIMIT 1;
83
+ IF FOUND THEN
84
+ UNLISTEN *;
85
+ END IF;
86
+ END;
87
+ $$;
88
+ '''
89
+ if caps .sql_reset :
90
+ _reset_query += 'RESET ALL;\n '
91
+
92
+ self ._reset_query = _reset_query
93
+
67
94
async def add_listener (self , channel , callback ):
68
95
"""Add a listener for Postgres notifications.
69
96
@@ -107,9 +134,26 @@ def get_server_version(self):
107
134
ServerVersion(major=9, minor=6, micro=1,
108
135
releaselevel='final', serial=0)
109
136
137
+ .. versionadded:: 0.8.0
110
138
"""
111
139
return self ._server_version
112
140
141
+ def get_server_capabilities (self ):
142
+ """Return the capabilities supported by the server as detected.
143
+
144
+ The returned value is a named tuple:
145
+
146
+ .. code-block:: pycon
147
+
148
+ >>> con.get_server_capabilities()
149
+ ServerCapabilities(advisory_locks=True, cursors=True,
150
+ notifications=True, plpgsql=True,
151
+ sql_reset=True)
152
+
153
+ .. versionadded:: 0.10.0
154
+ """
155
+ return self ._server_caps
156
+
113
157
def get_settings (self ):
114
158
"""Return connection settings.
115
159
@@ -394,22 +438,9 @@ def terminate(self):
394
438
self ._protocol .abort ()
395
439
396
440
async def reset (self ):
397
- self ._listeners = {}
398
-
399
- await self .execute ('''
400
- DO $$
401
- BEGIN
402
- PERFORM * FROM pg_listening_channels() LIMIT 1;
403
- IF FOUND THEN
404
- UNLISTEN *;
405
- END IF;
406
- END;
407
- $$;
408
- SET SESSION AUTHORIZATION DEFAULT;
409
- RESET ALL;
410
- CLOSE ALL;
411
- SELECT pg_advisory_unlock_all();
412
- ''' )
441
+ self ._listeners .clear ()
442
+ if self ._reset_query :
443
+ await self .execute (self ._reset_query )
413
444
414
445
def _get_unique_id (self ):
415
446
self ._uid += 1
0 commit comments