1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14+ from __future__ import annotations
1415
1516import collections
1617import functools
@@ -232,7 +233,7 @@ def __init__(
232233 def _init_stream (self ):
233234 rpc_request = self ._get_rpc_request
234235
235- self ._rpc = ResumableBidiRpc (
236+ self ._rpc : ResumableBidiRpc | None = ResumableBidiRpc (
236237 start_rpc = self ._api ._transport .listen ,
237238 should_recover = _should_recover ,
238239 should_terminate = _should_terminate ,
@@ -243,7 +244,9 @@ def _init_stream(self):
243244 self ._rpc .add_done_callback (self ._on_rpc_done )
244245
245246 # The server assigns and updates the resume token.
246- self ._consumer = BackgroundConsumer (self ._rpc , self .on_snapshot )
247+ self ._consumer : BackgroundConsumer | None = BackgroundConsumer (
248+ self ._rpc , self .on_snapshot
249+ )
247250 self ._consumer .start ()
248251
249252 @classmethod
@@ -330,16 +333,18 @@ def close(self, reason=None):
330333 return
331334
332335 # Stop consuming messages.
333- if self .is_active :
334- _LOGGER .debug ("Stopping consumer." )
335- self ._consumer .stop ()
336- self ._consumer ._on_response = None
336+ if self ._consumer :
337+ if self .is_active :
338+ _LOGGER .debug ("Stopping consumer." )
339+ self ._consumer .stop ()
340+ self ._consumer ._on_response = None
337341 self ._consumer = None
338342
339343 self ._snapshot_callback = None
340- self ._rpc .close ()
341- self ._rpc ._initial_request = None
342- self ._rpc ._callbacks = []
344+ if self ._rpc :
345+ self ._rpc .close ()
346+ self ._rpc ._initial_request = None
347+ self ._rpc ._callbacks = []
343348 self ._rpc = None
344349 self ._closed = True
345350 _LOGGER .debug ("Finished stopping manager." )
@@ -460,13 +465,13 @@ def on_snapshot(self, proto):
460465 message = f"Unknown target change type: { target_change_type } "
461466 _LOGGER .info (f"on_snapshot: { message } " )
462467 self .close (reason = ValueError (message ))
463-
464- try :
465- # Use 'proto' vs 'pb' for datetime handling
466- meth (self , proto .target_change )
467- except Exception as exc2 :
468- _LOGGER .debug (f"meth(proto) exc: { exc2 } " )
469- raise
468+ else :
469+ try :
470+ # Use 'proto' vs 'pb' for datetime handling
471+ meth (self , proto .target_change )
472+ except Exception as exc2 :
473+ _LOGGER .debug (f"meth(proto) exc: { exc2 } " )
474+ raise
470475
471476 # NOTE:
472477 # in other implementations, such as node, the backoff is reset here
0 commit comments