@@ -1156,23 +1156,25 @@ def _select_address(self, *, access_mode, database, imp_user, bookmarks):
11561156 from neo4j .api import READ_ACCESS
11571157 """ Selects the address with the fewest in-use connections.
11581158 """
1159- self .ensure_routing_table_is_fresh (
1160- access_mode = access_mode , database = database , imp_user = imp_user ,
1161- bookmarks = bookmarks
1162- )
1163- log .debug ("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r" , self .routing_tables )
1164- if access_mode == READ_ACCESS :
1165- addresses = self .get_or_create_routing_table (database ).readers
1166- else :
1167- addresses = self .get_or_create_routing_table (database ).writers
1168- addresses_by_usage = {}
1169- for address in addresses :
1170- addresses_by_usage .setdefault (self .in_use_connection_count (address ), []).append (address )
1159+ with self .refresh_lock :
1160+ if access_mode == READ_ACCESS :
1161+ addresses = self .routing_tables [database ].readers
1162+ else :
1163+ addresses = self .routing_tables [database ].writers
1164+ addresses_by_usage = {}
1165+ for address in addresses :
1166+ addresses_by_usage .setdefault (
1167+ self .in_use_connection_count (address ), []
1168+ ).append (address )
11711169 if not addresses_by_usage :
11721170 if access_mode == READ_ACCESS :
1173- raise ReadServiceUnavailable ("No read service currently available" )
1171+ raise ReadServiceUnavailable (
1172+ "No read service currently available"
1173+ )
11741174 else :
1175- raise WriteServiceUnavailable ("No write service currently available" )
1175+ raise WriteServiceUnavailable (
1176+ "No write service currently available"
1177+ )
11761178 return choice (addresses_by_usage [min (addresses_by_usage )])
11771179
11781180 def acquire (self , access_mode = None , timeout = None , database = None ,
@@ -1184,17 +1186,25 @@ def acquire(self, access_mode=None, timeout=None, database=None,
11841186
11851187 from neo4j .api import check_access_mode
11861188 access_mode = check_access_mode (access_mode )
1189+ with self .refresh_lock :
1190+ log .debug ("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r" ,
1191+ self .routing_tables )
1192+ self .ensure_routing_table_is_fresh (
1193+ access_mode = access_mode , database = database , imp_user = imp_user ,
1194+ bookmarks = bookmarks
1195+ )
1196+
11871197 while True :
11881198 try :
11891199 # Get an address for a connection that have the fewest in-use connections.
11901200 address = self ._select_address (
11911201 access_mode = access_mode , database = database ,
11921202 imp_user = imp_user , bookmarks = bookmarks
11931203 )
1194- log .debug ("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r" , database , address )
11951204 except (ReadServiceUnavailable , WriteServiceUnavailable ) as err :
11961205 raise SessionExpired ("Failed to obtain connection towards '%s' server." % access_mode ) from err
11971206 try :
1207+ log .debug ("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r" , database , address )
11981208 connection = self ._acquire (address , timeout = timeout ) # should always be a resolved address
11991209 except ServiceUnavailable :
12001210 self .deactivate (address = address )
0 commit comments