forked from qaqFei/BiliClear
-
Notifications
You must be signed in to change notification settings - Fork 0
/
robot3.py
174 lines (156 loc) · 5.46 KB
/
robot3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import smtplib
import json
import time
import sys
import re
from email.mime.text import MIMEText
from email.header import Header
from os import system, chdir
from os.path import exists, dirname, abspath
from getpass import getpass
import requests
from threading import Thread
from queue import Queue
from functools import lru_cache
selfdir = dirname(sys.argv[0])
if selfdir == "": selfdir = abspath(".")
chdir(selfdir)
if not exists("./config.json"):
sender_email = input("report sender email: ")
sender_password = getpass("report sender password: ")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Cookie": getpass("bilibili cookie: ")
}
smtps = {
"@mali.aliyun.com": "server = smtp.aliyun.com, port = 465",
"@google.com": "server = smtp.gmail.com, port = 587",
"@sina.com": "server = smtp.sina.com.cn, port = 25",
"@top.com": "server = smtp.tom.com, port = 25",
"@163.com": "server = smtp.163.com, port = 465",
"@126.com": "server = smtp.126.com, port = 25",
"@yahoo.com.cn": "server = smtp.mail.yahoo.com.cn, port = 587",
"@foxmail.com": "server = smtp.foxmail.com, port = 25",
"@sohu.com": "server = smtp.sohu.com, port = 25"
}
print("
smtp servers:")
for k, v in smtps.items():
print(f" {k}: {v}")
smtp_server = input("
smtp server: ")
smtp_port = int(input("smtp port: "))
with open("./config.json", "w", encoding="utf-8") as f:
f.write(json.dumps({
"sender_email": sender_email,
"sender_password": sender_password,
"headers": headers,
"smtp_server": smtp_server,
"smtp_port": smtp_port
}, indent=4, ensure_ascii=False))
else:
with open("./config.txt", "r", encoding="utf-8") as f:
rules = list(filter(lambda x: x and "eval" not in x and "exec" not in x, f.read().splitlines()))
compiled_rules = [re.compile(rule) for rule in rules]
system("cls")
def getVideos():
return [
i["param"]
for i in requests.get(f"https://app.bilibili.com/x/v2/feed/index", headers=headers).json()["data"]["items"]
if i.get("can_play", 0)
]
@lru_cache(maxsize=100) # 使用LRU缓存来缓存API响应
def getReplys(avid: str|int):
maxNum = 100
page = 1
replies = []
while page * 20 <= maxNum:
result = requests.get(
f"https://api.bilibili.com/x/v2/reply?type=1&oid={avid}&nohot=1&pn={page}&ps=20",
headers=headers
).json()
try:
replies += result["data"]["replies"]
except Exception:
break
page += 1
return replies
def isPorn(text: str):
for rule in compiled_rules:
if rule.search(text): # 使用预编译的正则表达式进行匹配
return True, rule.pattern
return False, None
def report(data: dict, r: str):
report_text = f"""
违规用户UID:{data["mid"]}
违规类型:色情
违规信息发布形式:评论, (动态)
问题描述:该评论疑似发布色情信息,破坏了B站的和谐环境
诉求:移除违规内容,封禁账号
评论数据内容(B站API返回, x/v2/reply):
{json.dumps(data, ensure_ascii=False, indent=4)}
"""
# 发送邮件报告的逻辑在这里添加(省略)
def processReply(reply: dict):
isp, r = isPorn(reply["content"]["message"])
if isp:
print("porn", repr(reply["content"]["message"]), "\nrule:", r, "\n")
report(reply, r)
else:
print(f" not porn, {time.time()}\r", end="")
def setMethod():
global method
method = None
method_choices = {
"1": "自动获取推荐视频评论",
"2": "获取指定视频评论"
}
def bvid2avid(bvid: str):
result = requests.get(
f"https://api.bilibili.com/x/web-interface/view?bvid={bvid}",
headers=headers
).json()
return result["data"]["aid"]
setMethod()
def worker(queue):
while True:
avid = queue.get()
if avid is None:
break
try:
for reply in getReplys(avid):
processReply(reply)
except Exception as e:
print("err", e)
finally:
queue.task_done()
num_threads = 4 # 设置线程数量为4个,可以根据需要调整
work_queue = Queue()
threads = []
for _ in range(num_threads):
t = Thread(target=worker, args=(work_queue,))
t.start()
threads.append(t)
while True:
try:
match method:
case "1":
for avid in getVideos():
work_queue.put(avid)
work_queue.join() # 等待所有任务完成
case "2":
system("cls")
link = input("输入视频bvid: ")
work_queue.put(bvid2avid(link))
work_queue.join() # 等待所有任务完成
case _:
print("链接格式错误")
except (Exception, KeyboardInterrupt) as e:
print("err", e)
if isinstance(e, KeyboardInterrupt):
break
finally:
for _ in range(num_threads): # 向队列中添加None来通知线程退出
work_queue.put(None)
for t in threads: # 等待所有线程结束
t.join()