From 2074959c65a235ea8686014b1c5dd1883f2678a3 Mon Sep 17 00:00:00 2001 From: Stanley Opara Date: Wed, 5 Jun 2024 17:00:53 -0400 Subject: [PATCH] make refresh code a daemon thread Signed-off-by: Stanley Opara --- sdk/python/feast/infra/registry/caching_registry.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 33306a4266..3fb9f45810 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -1,3 +1,4 @@ +import atexit import logging import threading from abc import abstractmethod @@ -33,7 +34,7 @@ def __init__( ) self.allow_async_cache = allow_async_cache if allow_async_cache: - threading.Timer(cache_ttl_seconds, self.refresh).start() + self._start_async_refresh(cache_ttl_seconds) @abstractmethod def _get_data_source(self, name: str, project: str) -> DataSource: @@ -312,3 +313,12 @@ def _refresh_cached_registry_if_necessary(self): if expired: logger.info("Registry cache expired, so refreshing") self.refresh() + + def _start_async_refresh(self, cache_ttl_seconds): + self.registry_refresh_thread = threading.Timer(cache_ttl_seconds, self.refresh) + self.registry_refresh_thread.setDaemon(True) + self.registry_refresh_thread.start() + atexit.register(self._exit_handler) + + def _exit_handler(self): + self.registry_refresh_thread.cancel()