Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Jun 6, 2024
1 parent 904df2a commit 08777f0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def _process_db_schemas(
snowflake_db: SnowflakeDatabase,
db_tables: Dict[str, List[SnowflakeTable]],
) -> Iterable[MetadataWorkUnit]:
q = queue.Queue(maxsize=100)
q: "queue.Queue[MetadataWorkUnit]" = queue.Queue(maxsize=100)

def _process_schema_worker(snowflake_schema: SnowflakeSchema) -> None:
for wu in self._process_schema(
Expand Down
31 changes: 19 additions & 12 deletions metadata-ingestion/src/datahub/utilities/serialized_lru_cache.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import functools
import threading
from typing import Callable, Dict, TypeVar
from typing import Callable, Dict, Hashable, Tuple, TypeVar

import cachetools
import cachetools.keys
from typing_extensions import ParamSpec

F = ParamSpec("F")
T = TypeVar("T")
_Key = Tuple[Hashable, ...]
_F = ParamSpec("_F")
_T = TypeVar("_T")


def serialized_lru_cache(maxsize: int) -> Callable[[Callable[F, T]], Callable[F, T]]:
def serialized_lru_cache(
maxsize: int,
) -> Callable[[Callable[_F, _T]], Callable[_F, _T]]:
"""Similar to `lru_cache`, but ensures multiple calls with the same parameters are serialized.
Calls with different parameters are allowed to proceed in parallel.
Expand All @@ -24,19 +27,21 @@ def serialized_lru_cache(maxsize: int) -> Callable[[Callable[F, T]], Callable[F,

UNSET = object()

def decorator(func: Callable[F, T]) -> Callable[F, T]:
def decorator(func: Callable[_F, _T]) -> Callable[_F, _T]:
hits = 0
misses = 0

cache_lock = threading.Lock()
cache = cachetools.LRUCache[str, T](maxsize=maxsize)
cache: "cachetools.LRUCache[_Key, _T]" = cachetools.LRUCache(maxsize=maxsize)

key_locks_lock = threading.Lock()
key_locks: Dict[str, threading.Lock] = {}
key_waiters: Dict[str, int] = {}
key_locks: Dict[_Key, threading.Lock] = {}
key_waiters: Dict[_Key, int] = {}

def wrapper(*args: F.args, **kwargs: F.kwargs) -> T:
key = cachetools.keys.hashkey(*args, **kwargs)
def wrapper(*args: _F.args, **kwargs: _F.kwargs) -> _T:
# We need a type ignore here because there's no way for us to require that
# the args and kwargs are hashable while using ParamSpec.
key: _Key = cachetools.keys.hashkey(*args, **kwargs) # type: ignore

with cache_lock:
if key in cache:
Expand Down Expand Up @@ -83,8 +88,10 @@ def cache_info() -> functools._CacheInfo:
currsize=len(cache),
)

wrapper.cache = cache
wrapper.cache_info = cache_info
# Add some extra attributes to the wrapper function. This makes it mostly compatible
# with functools.lru_cache.
wrapper.cache = cache # type: ignore
wrapper.cache_info = cache_info # type: ignore

return functools.update_wrapper(wrapper, func)

Expand Down

0 comments on commit 08777f0

Please sign in to comment.