5858)
5959from threading import (
6060 Condition ,
61- Lock ,
6261 RLock ,
6362)
6463from time import perf_counter
@@ -880,7 +879,7 @@ def __init__(self, opener, pool_config, workspace_config, address):
880879 log .debug ("[#0000] C: <NEO4J POOL> routing address %r" , address )
881880 self .address = address
882881 self .routing_tables = {workspace_config .database : RoutingTable (database = workspace_config .database , routers = [address ])}
883- self .refresh_lock = Lock ()
882+ self .refresh_lock = RLock ()
884883
885884 def __repr__ (self ):
886885 """ The representation shows the initial routing addresses.
@@ -914,12 +913,13 @@ def get_routing_table_for_default_database(self):
914913 return self .routing_tables [self .workspace_config .database ]
915914
916915 def get_or_create_routing_table (self , database ):
917- if database not in self .routing_tables :
918- self .routing_tables [database ] = RoutingTable (
919- database = database ,
920- routers = self .get_default_database_initial_router_addresses ()
921- )
922- return self .routing_tables [database ]
916+ with self .refresh_lock :
917+ if database not in self .routing_tables :
918+ self .routing_tables [database ] = RoutingTable (
919+ database = database ,
920+ routers = self .get_default_database_initial_router_addresses ()
921+ )
922+ return self .routing_tables [database ]
923923
924924 def fetch_routing_info (self , address , database , imp_user , bookmarks ,
925925 timeout ):
@@ -1024,8 +1024,8 @@ def fetch_routing_table(self, *, address, timeout, database, imp_user,
10241024 # At least one of each is fine, so return this table
10251025 return new_routing_table
10261026
1027- def update_routing_table_from (self , * routers , database = None , imp_user = None ,
1028- bookmarks = None , database_callback = None ):
1027+ def _update_routing_table_from (self , * routers , database = None , imp_user = None ,
1028+ bookmarks = None , database_callback = None ):
10291029 """ Try to update routing tables with the given routers.
10301030
10311031 :return: True if the routing table is successfully updated,
@@ -1071,42 +1071,43 @@ def update_routing_table(self, *, database, imp_user, bookmarks,
10711071
10721072 :raise neo4j.exceptions.ServiceUnavailable:
10731073 """
1074- # copied because it can be modified
1075- existing_routers = set (
1076- self .get_or_create_routing_table (database ).routers
1077- )
1078-
1079- prefer_initial_routing_address = \
1080- self .routing_tables [database ].missing_fresh_writer ()
1074+ with self .refresh_lock :
1075+ # copied because it can be modified
1076+ existing_routers = set (
1077+ self .get_or_create_routing_table (database ).routers
1078+ )
10811079
1082- if prefer_initial_routing_address :
1083- # TODO: Test this state
1084- if self .update_routing_table_from (
1085- self .first_initial_routing_address , database = database ,
1086- imp_user = imp_user , bookmarks = bookmarks ,
1080+ prefer_initial_routing_address = \
1081+ self .routing_tables [database ].missing_fresh_writer ()
1082+
1083+ if prefer_initial_routing_address :
1084+ # TODO: Test this state
1085+ if self ._update_routing_table_from (
1086+ self .first_initial_routing_address , database = database ,
1087+ imp_user = imp_user , bookmarks = bookmarks ,
1088+ database_callback = database_callback
1089+ ):
1090+ # Why is only the first initial routing address used?
1091+ return
1092+ if self ._update_routing_table_from (
1093+ * (existing_routers - {self .first_initial_routing_address }),
1094+ database = database , imp_user = imp_user , bookmarks = bookmarks ,
10871095 database_callback = database_callback
10881096 ):
1089- # Why is only the first initial routing address used?
10901097 return
1091- if self .update_routing_table_from (
1092- * (existing_routers - {self .first_initial_routing_address }),
1093- database = database , imp_user = imp_user , bookmarks = bookmarks ,
1094- database_callback = database_callback
1095- ):
1096- return
10971098
1098- if not prefer_initial_routing_address :
1099- if self .update_routing_table_from (
1100- self .first_initial_routing_address , database = database ,
1101- imp_user = imp_user , bookmarks = bookmarks ,
1102- database_callback = database_callback
1103- ):
1104- # Why is only the first initial routing address used?
1105- return
1099+ if not prefer_initial_routing_address :
1100+ if self ._update_routing_table_from (
1101+ self .first_initial_routing_address , database = database ,
1102+ imp_user = imp_user , bookmarks = bookmarks ,
1103+ database_callback = database_callback
1104+ ):
1105+ # Why is only the first initial routing address used?
1106+ return
11061107
1107- # None of the routers have been successful, so just fail
1108- log .error ("Unable to retrieve routing information" )
1109- raise ServiceUnavailable ("Unable to retrieve routing information" )
1108+ # None of the routers have been successful, so just fail
1109+ log .error ("Unable to retrieve routing information" )
1110+ raise ServiceUnavailable ("Unable to retrieve routing information" )
11101111
11111112 def update_connection_pool (self , * , database ):
11121113 servers = self .get_or_create_routing_table (database ).servers ()
@@ -1129,11 +1130,11 @@ def ensure_routing_table_is_fresh(self, *, access_mode, database, imp_user,
11291130 :return: `True` if an update was required, `False` otherwise.
11301131 """
11311132 from neo4j .api import READ_ACCESS
1132- if self .get_or_create_routing_table (database )\
1133- .is_fresh (readonly = (access_mode == READ_ACCESS )):
1134- # Readers are fresh.
1135- return False
11361133 with self .refresh_lock :
1134+ if self .get_or_create_routing_table (database )\
1135+ .is_fresh (readonly = (access_mode == READ_ACCESS )):
1136+ # Readers are fresh.
1137+ return False
11371138
11381139 self .update_routing_table (
11391140 database = database , imp_user = imp_user , bookmarks = bookmarks ,
0 commit comments