-
Notifications
You must be signed in to change notification settings - Fork 224
/
caching.py
48 lines (38 loc) · 1.48 KB
/
caching.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from threading import RLock
from time import perf_counter, time
from typing import List
from cachetools import TTLCache
from pyrogram.enums import ChatMembersFilter
from pyrogram.types import CallbackQuery
from pyrogram.types.messages_and_media.message import Message
THREAD_LOCK = RLock()
# admins stay cached for 30 mins
ADMIN_CACHE = TTLCache(maxsize=512, ttl=(60 * 30), timer=perf_counter)
# Block from refreshing admin list for 10 mins
TEMP_ADMIN_CACHE_BLOCK = TTLCache(
maxsize=512, ttl=(60 * 10), timer=perf_counter)
async def admin_cache_reload(m: Message or CallbackQuery, status=None) -> List[int]:
start = time()
with THREAD_LOCK:
if isinstance(m, CallbackQuery):
m = m.message
if status is not None:
TEMP_ADMIN_CACHE_BLOCK[m.chat.id] = status
try:
if TEMP_ADMIN_CACHE_BLOCK[m.chat.id] in ("autoblock", "manualblock"):
return []
except KeyError:
# Because it might be first time when admn_list is being reloaded
pass
admin_list = [
(
z.user.id,
f"@{z.user.username}" if z.user.username else z.user.first_name,
z.privileges.is_anonymous,
)
async for z in m.chat.get_members(filter=ChatMembersFilter.ADMINISTRATORS)
if not z.user.is_deleted
]
ADMIN_CACHE[m.chat.id] = admin_list
TEMP_ADMIN_CACHE_BLOCK[m.chat.id] = "autoblock"
return admin_list