-
Notifications
You must be signed in to change notification settings - Fork 3
/
utils.py
131 lines (107 loc) · 3.87 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import config
import logging_config
import logging
from datetime import timedelta
from datetime import datetime
from functools import reduce
from bs4 import BeautifulSoup
from markdown import markdown
import threading
import pytz
from datetime import datetime
import random
import os
def buildCounterReply(user, words, count, countNR):
isNwords = words == config.N_WORDS
words = list(map(lambda w: censor(w), words))
words = words[0] if len(words) == 1 else ", ".join(words)
if isNwords:
if count == 0 and countNR == 0:
return config.COUNTER_REPLY_TEMPLATE_NWORD_NONE.format(user=user)
else:
return config.COUNTER_REPLY_TEMPLATE_NWORD.format(user=user, count=count, countNR=countNR)
else:
return config.COUNTER_REPLY_TEMPLATE.format(user=user, count=count, words=words)
def censor(s):
return reduce(lambda a, kv: a.replace(*kv), config.CENSOR_WORDS_MAP, s)
def markdownToText(text):
return ''.join(BeautifulSoup(markdown(text), "html.parser").findAll(text=True))
def linkify(c):
return "https://reddit.com" + (c.permalink if(hasattr(c, 'permalink')) else c)
def redditShortLink(id):
return f"https://redd.it/{id}"
def apiCommentsJsonLink(ids):
return "http://api.pushshift.io/reddit/search/comment/?ids=" + ",".join(ids)
def prettyLinks(links, offset=1, maxLength=5000):
if maxLength > 0:
random.shuffle(links)
text = ""
for i, link in enumerate(links):
if len(text) > maxLength:
break
text += f"""
{i+offset}: {link}"""
return text
# https://dev.to/astagi/rate-limiting-using-python-and-redis-58gk
def rateLimit(key: str, limit: int, period: timedelta):
r = config.redis
period_in_seconds = int(period.total_seconds())
t = r.time()[0]
separation = round(period_in_seconds / limit)
r.setnx(key, 0)
try:
with r.lock('lock:' + key, blocking_timeout=5) as lock:
tat = max(int(r.get(key)), t)
if tat - t <= period_in_seconds - separation:
new_tat = max(tat, t) + separation
r.set(key, new_tat)
return False
return True
except LockError:
return True
def background(f):
def wrapper(*a, **kw):
threading.Thread(target=f, args=a, kwargs=kw).start()
return wrapper
def datetime_force_utc(date_time):
return pytz.utc.localize(date_time)
def datetime_as_utc(date_time):
return date_time.astimezone(pytz.utc)
def datetime_from_timestamp(timestamp):
return datetime_force_utc(datetime.utcfromtimestamp(timestamp))
def datetime_now():
return datetime_force_utc(datetime.utcnow().replace(microsecond=0))
def datetime_from_timestamp(timestamp):
return datetime_force_utc(datetime.utcfromtimestamp(timestamp))
def get_datetime_string(date_time, convert_utc=True, format_string="%Y-%m-%d %H:%M:%S"):
if date_time is None:
return ""
if convert_utc:
date_time = datetime_as_utc(date_time)
return date_time.strftime(format_string)
def parse_datetime_string(date_time_string, force_utc=True, format_string="%Y-%m-%d %H:%M:%S"):
if date_time_string is None or date_time_string == "None" or date_time_string == "":
return None
date_time = datetime.strptime(date_time_string, format_string)
if force_utc:
date_time = datetime_force_utc(date_time)
return date_time
def get_last_seen(keyword, raw=False):
lastSeen = int(config.redis.get(f"last_seen_{keyword}") or 0)
return lastSeen if raw else datetime_from_timestamp(lastSeen)
def set_last_seen(keyword, seen):
config.redis.set(f"last_seen_{keyword}", str(int(seen)))
def is_processed(id, prefix="processed_comment_"):
return config.redis.exists(f"{prefix}{id}")
def set_processed(id, prefix="processed_comment_"):
config.redis.set(f"{prefix}{id}", 1)
def setup_proxy(botname):
proxy = config.env(f"{botname}_proxy", None)
if proxy:
logging.info(f"Using {botname} with proxy: {proxy}")
os.environ['http_proxy'] = proxy
os.environ['HTTP_PROXY'] = proxy
os.environ['https_proxy'] = proxy
os.environ['HTTPS_PROXY'] = proxy
def timestamp():
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')