1616
1717import datetime
1818import queue
19+ import time
1920
2021from google .cloud .exceptions import NotFound
2122from google .cloud .spanner_v1 import BatchCreateSessionsRequest
2425 _metadata_with_prefix ,
2526 _metadata_with_leader_aware_routing ,
2627)
28+ from google .cloud .spanner_v1 ._opentelemetry_tracing import (
29+ get_current_span ,
30+ )
2731from warnings import warn
2832
2933_NOW = datetime .datetime .utcnow # unit tests may replace
@@ -199,13 +203,32 @@ def bind(self, database):
199203 _metadata_with_leader_aware_routing (database ._route_to_leader_enabled )
200204 )
201205 self ._database_role = self ._database_role or self ._database .database_role
206+ requested_session_count = self .size - self ._sessions .qsize ()
202207 request = BatchCreateSessionsRequest (
203208 database = database .name ,
204- session_count = self . size - self . _sessions . qsize () ,
209+ session_count = requested_session_count ,
205210 session_template = Session (creator_role = self .database_role ),
206211 )
207212
213+ current_span = get_current_span ()
214+ if requested_session_count > 0 :
215+ current_span .add_event (
216+ f"Requesting { requested_session_count } sessions" ,
217+ {"kind" : "fixed_size_pool" },
218+ )
219+
220+ if self ._sessions .full ():
221+ current_span .add_event (
222+ "Session pool is already full" , {"kind" : "fixed_size_pool" }
223+ )
224+ return
225+
226+ returned_session_count = 0
208227 while not self ._sessions .full ():
228+ current_span .add_event (
229+ f"Creating { request .session_count } sessions" ,
230+ {"kind" : "fixed_size_pool" },
231+ )
209232 resp = api .batch_create_sessions (
210233 request = request ,
211234 metadata = metadata ,
@@ -214,6 +237,12 @@ def bind(self, database):
214237 session = self ._new_session ()
215238 session ._session_id = session_pb .name .split ("/" )[- 1 ]
216239 self ._sessions .put (session )
240+ returned_session_count += 1
241+
242+ current_span .add_event (
243+ f"Requested for { requested_session_count } , returned { returned_session_count } " ,
244+ {"kind" : "fixed_size_pool" },
245+ )
217246
218247 def get (self , timeout = None ):
219248 """Check a session out from the pool.
@@ -229,12 +258,23 @@ def get(self, timeout=None):
229258 if timeout is None :
230259 timeout = self .default_timeout
231260
261+ start_time = time .time ()
262+ current_span = get_current_span ()
263+ current_span .add_event ("Acquiring session" , {"kind" : type (self ).__name__ })
232264 session = self ._sessions .get (block = True , timeout = timeout )
233265
234266 if not session .exists ():
235267 session = self ._database .session ()
236268 session .create ()
237269
270+ current_span .add_event (
271+ "Acquired session" ,
272+ {
273+ "time.elapsed" : time .time () - start_time ,
274+ "session.id" : session .session_id ,
275+ "kind" : type (self ).__name__ ,
276+ },
277+ )
238278 return session
239279
240280 def put (self , session ):
@@ -307,6 +347,10 @@ def get(self):
307347 :returns: an existing session from the pool, or a newly-created
308348 session.
309349 """
350+ start_time = time .time ()
351+ current_span = get_current_span ()
352+ current_span .add_event ("Acquiring session" , {"kind" : type (self ).__name__ })
353+
310354 try :
311355 session = self ._sessions .get_nowait ()
312356 except queue .Empty :
@@ -316,6 +360,15 @@ def get(self):
316360 if not session .exists ():
317361 session = self ._new_session ()
318362 session .create ()
363+ else :
364+ current_span .add_event (
365+ "Cache hit: has usable session" ,
366+ {
367+ "id" : session .session_id ,
368+ "kind" : type (self ).__name__ ,
369+ },
370+ )
371+
319372 return session
320373
321374 def put (self , session ):
@@ -422,6 +475,18 @@ def bind(self, database):
422475 session_template = Session (creator_role = self .database_role ),
423476 )
424477
478+ requested_session_count = request .session_count
479+ current_span = get_current_span ()
480+ current_span .add_event (f"Requesting { requested_session_count } sessions" )
481+
482+ if created_session_count >= self .size :
483+ current_span .add_event (
484+ "Created no new sessions as sessionPool is full" ,
485+ {"kind" : type (self ).__name__ },
486+ )
487+ return
488+
489+ returned_session_count = 0
425490 while created_session_count < self .size :
426491 resp = api .batch_create_sessions (
427492 request = request ,
@@ -431,8 +496,17 @@ def bind(self, database):
431496 session = self ._new_session ()
432497 session ._session_id = session_pb .name .split ("/" )[- 1 ]
433498 self .put (session )
499+ returned_session_count += 1
500+
434501 created_session_count += len (resp .session )
435502
503+ current_span .add_event (
504+ "Requested for {requested_session_count} sessions, return {returned_session_count}" ,
505+ {
506+ "kind" : "pinging_pool" ,
507+ },
508+ )
509+
436510 def get (self , timeout = None ):
437511 """Check a session out from the pool.
438512
@@ -447,6 +521,12 @@ def get(self, timeout=None):
447521 if timeout is None :
448522 timeout = self .default_timeout
449523
524+ start_time = time .time ()
525+ current_span = get_current_span ()
526+ current_span .add_event (
527+ "Waiting for a session to become available" , {"kind" : "pinging_pool" }
528+ )
529+
450530 ping_after , session = self ._sessions .get (block = True , timeout = timeout )
451531
452532 if _NOW () > ping_after :
@@ -457,6 +537,14 @@ def get(self, timeout=None):
457537 session = self ._new_session ()
458538 session .create ()
459539
540+ current_span .add_event (
541+ "Acquired session" ,
542+ {
543+ "time.elapsed" : time .time () - start_time ,
544+ "session.id" : session .session_id ,
545+ "kind" : "pinging_pool" ,
546+ },
547+ )
460548 return session
461549
462550 def put (self , session ):
0 commit comments