This repository has been archived by the owner on Sep 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 34
/
biliclear.py
475 lines (408 loc) · 15.4 KB
/
biliclear.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
import json
import sys
import re
import time
from datetime import datetime
from os import chdir, environ
from os.path import exists, dirname, abspath
from threading import Thread
import cv2
import numpy as np
import requests
import biliauth
import gpt
import syscmds
import checker
from compatible_getpass import getpass
sys.excepthook = lambda *args: [print("^C"), exec("raise SystemExit")] if KeyboardInterrupt in args[0].mro() else sys.__excepthook__(*args)
selfdir = dirname(sys.argv[0])
if selfdir == "": selfdir = abspath(".")
chdir(selfdir)
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
loaded = False
def checkRuleUpdate():
try:
new_rules = requests.get(open("./RULE_SOURCE", "r", encoding="utf-8").read(), verify=False).content.decode("utf-8")
with open("./res/rules.yaml", "w", encoding="utf-8") as f:
f.write(new_rules)
except Exception as e:
with open("update_rule_err.txt", "w", encoding="utf-8") as f:
f.write(f"{datetime.now()}\n{e}")
Thread(target=checkRuleUpdate, daemon=True).start()
def saveConfig():
with open("./config.json", "w", encoding="utf-8") as f:
f.write(json.dumps({
"headers": headers,
"bili_report_api": bili_report_api,
"reply_limit": reply_limit,
"enable_gpt": enable_gpt,
"gpt_apibase": gpt.openai.api_base,
"gpt_proxy": gpt.openai.proxy,
"gpt_apikey": gpt.openai.api_key,
"gpt_model": gpt.gpt_model,
"enable_check_lv2avatarat": enable_check_lv2avatarat,
"enable_check_replyimage": enable_check_replyimage,
"enable_check_user": enable_check_user
}, indent=4, ensure_ascii=False))
def putConfigVariables(config: dict):
global headers
global bili_report_api, csrf
global reply_limit, enable_gpt
global enable_check_lv2avatarat
global enable_check_replyimage
global enable_check_user
headers = config["headers"]
bili_report_api = config.get("bili_report_api", True)
csrf = getCsrf(headers["Cookie"])
reply_limit = config.get("reply_limit", 100)
enable_gpt = config.get("enable_gpt", False)
gpt.openai.api_base = config.get("gpt_apibase", gpt.openai.api_base)
gpt.openai.proxy = config.get("gpt_proxy", gpt.openai.proxy)
gpt.openai.api_key = config.get("gpt_apikey", "")
gpt.gpt_model = config.get("gpt_model", "gpt-4o-mini")
enable_check_lv2avatarat = config.get("enable_check_lv2avatarat", False)
enable_check_replyimage = config.get("enable_check_replyimage", False)
enable_check_user = config.get("enable_check_user", False)
if reply_limit <= 20:
reply_limit = 100
def getCsrf(cookie: str):
try:
return re.findall(r"bili_jct=(.*?);", cookie)[0]
except IndexError:
print("警告: 无法获取csrf")
return ""
def getCookieFromUser():
if not environ.get("gui", False):
if "n" in input("是否使用二维码登录B站, 默认为是(y/n): ").lower():
return getpass("Bilibili cookie: ")
else:
return biliauth.bilibiliAuth()
else:
return biliauth.bilibiliAuth()
def checkCookie():
result = requests.get(
"https://passport.bilibili.com/x/passport-login/web/cookie/info",
headers = headers,
data = {
"csrf": csrf
}
).json()
return result["code"] == 0 and not result.get("data", {}).get("refresh", True)
if not exists("./config.json"):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Cookie": getCookieFromUser()
}
csrf = getCsrf(headers["Cookie"])
bili_report_api = True
reply_limit = 100
enable_gpt = False
gpt.openai.api_key = ""
gpt.gpt_model = "gpt-4o-mini"
enable_check_lv2avatarat = False
enable_check_replyimage = False
enable_check_user = False
else:
with open("./config.json", "r", encoding="utf-8") as f:
try:
putConfigVariables(json.load(f))
except Exception as e:
print("加载config.json失败, 请删除或修改config.json, 错误:", repr(e))
print("如果你之前更新过BiliClear, 请删除config.json并重新运行")
print("请按回车键退出...")
syscmds.pause()
raise SystemExit
if not checkCookie():
print("bilibili cookie已过期或失效, 请重新登录")
headers["Cookie"] = getCookieFromUser()
csrf = getCsrf(headers["Cookie"])
try:
saveConfig()
except Exception as e:
print("警告: 保存config.json失败, 错误:", e)
text_checker = checker.Checker()
face_detector = cv2.CascadeClassifier("./res/haarcascade_frontalface_default.xml")
def _btyes2cv2im(byte_data):
nparr = np.frombuffer(byte_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
return img
def _img_face(img: cv2.typing.MatLike):
return not isinstance(
face_detector.detectMultiScale(
cv2.cvtColor(img, cv2.COLOR_BGR2GRAY),
scaleFactor=1.2, minNeighbors=1
),
tuple
)
def _img_qrcode(img: cv2.typing.MatLike):
return cv2.QRCodeDetector().detect(img)[0]
def getVideos():
"获取推荐视频列表"
return [
i["param"]
for i in requests.get(f"https://app.bilibili.com/x/v2/feed/index", headers=headers).json()["data"]["items"]
if i.get("can_play", 0)
]
def getReplys(avid: str | int):
"获取评论"
maxNum = reply_limit
page = 1
replies = []
while page * 20 <= maxNum:
time.sleep(0.4)
result = requests.get(
f"https://api.bilibili.com/x/v2/reply?type=1&oid={avid}&nohot=1&pn={page}&ps=20",
headers = headers
).json()
try:
if not result["data"]["replies"]:
break
replies += result["data"]["replies"]
except Exception:
break
page += 1
return replies
def _checkUser(uid: int|str):
"检查用户是否需要举报 (用于检测)"
user_crad = requests.get(
f"https://api.bilibili.com/x/web-interface/card?mid={uid}",
headers = headers
).json()["data"]["card"]
if user_crad["spacesta"] == -2:
return False # 封了, 没必要
if user_crad["level_info"]["current_level"] != 2:
return False # 不是 lv.2, 没必要
dynamics = [i["modules"]["module_dynamic"] for i in requests.get(
f"https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/all?host_mid={uid}",
headers = headers
).json()["data"]["items"]]
for dynamic in dynamics:
if dynamic["desc"] is None:
continue
text = dynamic["desc"]["text"]
if isPorn(text):
return True
if dynamic["major"] is None:
continue
elif dynamic["major"]["type"] != "MAJOR_TYPE_DRAW":
continue
ims = [_btyes2cv2im(requests.get(i["src"]).content) for i in dynamic["major"]["draw"]["items"]]
if any([_img_qrcode(img) for img in ims]):
return True
return False
def reqBiliReportUser(uid: int|str):
"调用B站举报用户API"
result = requests.post(
"https://space.bilibili.com/ajax/report/add",
headers = headers,
data = {
"mid": int(uid),
"reason": "1, 2, 3",
"reason_v2": 1,
"csrf": csrf
}
).json()
time.sleep(3.5)
result_code = result["code"]
if result_code not in (0, 12019):
print("b站举报用户API调用失败, 返回体:", result)
elif result_code == 0:
print("Bilibili举报用户API调用成功")
elif result_code == 12019:
print("举报过于频繁, 等待15s")
time.sleep(15)
return reqBiliReportUser(uid)
def processUser(uid: int|str):
"处理用户"
if _checkUser(uid):
print(f"用户{uid}违规")
reqBiliReportUser(uid)
else:
print(f"用户{uid}未违规")
def isPorn(text: str):
"判断评论是否为色情内容 (使用规则, rules.yaml)"
return text_checker.check(text)
def reqBiliReportReply(data: dict, rule: str | None):
"调用B站举报评论API"
result = requests.post(
"https://api.bilibili.com/x/v2/reply/report",
headers=headers,
data={
"type": 1,
"oid": data["oid"],
"rpid": data["rpid"],
"reason": 0,
"csrf": csrf,
"content": f"""
举报原因: 色情, 或...
程序匹配到的规则: {rule}
(此举报信息自动生成, 可能会存在误报)
"""
}
).json()
time.sleep(3.5)
result_code = result["code"]
if result_code not in (0, 12019, -352):
print("b站举报评论API调用失败, 返回体:", result)
elif result_code == 0:
print("Bilibili举报评论API调用成功")
elif result_code == 12019:
print("举报过于频繁, 等待15s")
time.sleep(15)
return reqBiliReportReply(data, rule)
elif result_code == -352:
print(f"举报评论的B站API调用失败, 返回 -352, 请尝试手动举报1次, {avid2bvid(data["oid"])}")
waitRiskControl()
return reqBiliReportReply(data, rule)
def reportReply(data: dict, r: str | None):
print("\n违规评论:", repr(data["content"]["message"]))
print("规则:", r)
if bili_report_api:
reqBiliReportReply(data, r)
print() # next line
def replyIsViolations(reply: dict):
"判断评论是否违规, 返回: (是否违规, 违规原因) 如果没有违规, 返回 (False, None)"
global enable_gpt
reply_msg = reply["content"]["message"]
isp, r = isPorn(reply_msg)
if "doge" in reply_msg:
return False, None
if not isp and enable_gpt:
try:
isp, r = gpt.gpt_porn(reply_msg) or gpt.gpt_ad(reply_msg), f"ChatGpt - {gpt.gpt_model} 检测到违规内容"
print(f"调用GPT进行检测, 结果: {isp}")
except gpt.RateLimitError:
enable_gpt = False
saveConfig()
print("GPT请求达到限制, 已关闭GPT检测")
if not isp and enable_check_lv2avatarat and reply["member"]["level_info"][
"current_level"] == 2 and "@" in reply_msg:
avatar_image = requests.get(
reply["member"]["avatar"],
headers=headers
).content
if _img_face(_btyes2cv2im(avatar_image)):
isp, r = True, "lv.2, 检测到头像中包含人脸,可疑"
print(f"lv.2和人脸检测, 结果: {isp}")
if not isp and enable_check_replyimage and reply["member"]["level_info"]["current_level"] == 2:
try:
images = [requests.get(i["img_src"], headers=headers).content for i in reply["content"].get("pictures", [])]
opencv_images = [_btyes2cv2im(img) for img in images]
have_qrcode = any([_img_qrcode(img) for img in opencv_images])
have_face = any([_img_face(img) for img in opencv_images])
if have_qrcode or have_face:
isp, r = True, "lv.2, 检测到评论中包含二维码或人脸, 可疑"
print(f"lv.2和二维码、人脸检测, 结果: {isp}")
except Exception as e:
print("警告: 二维码或人脸检测时发生错误, 已跳过", repr(e))
return isp, r
def processReply(reply: dict):
"处理评论并举报"
global replyCount, violationsReplyCount
global checkedReplies, violationsReplies
replyCount += 1
isp, r = replyIsViolations(reply)
if isp:
violationsReplyCount += 1
reportReply(reply, r)
violationsReplies.insert(0, (reply["rpid"], reply["content"]["message"], time.time()))
checkedReplies.insert(0, (reply["rpid"], reply["content"]["message"], time.time()))
checkedReplies = checkedReplies[:1500]
violationsReplies = violationsReplies[:1500]
return isp, r
def _setMethod():
global method
method = None
method_choices = {
"1": "自动获取推荐视频评论",
"2": "获取指定视频评论",
"3": "检查指定UID"
}
print("BiliClear - github.com/qaqFei/BiliClear")
print("\n请选择操作: ")
while method not in method_choices.keys():
if method is not None:
print("输入错误")
for k, v in method_choices.items():
print(f"{k}. {v}")
method = input("选择: ")
syscmds.clearScreen()
def bvid2avid(bvid: str):
result = requests.get(
f"https://api.bilibili.com/x/web-interface/view?bvid={bvid}",
headers = headers
).json()
return result["data"]["aid"]
def avid2bvid(avid: str):
result = requests.get(
f"https://api.bilibili.com/x/web-interface/view?aid={avid}",
headers = headers
).json()
return result["data"]["bvid"]
videoCount = 0
replyCount = 0
violationsReplyCount = 0
waitRiskControl_TimeRemaining = float("nan")
waitingRiskControl = False
checkedVideos = []
checkedReplies = []
violationsReplies = []
def _checkVideo(avid: str | int):
for reply in getReplys(avid):
if enable_check_user:
processUser(reply["mid"])
processReply(reply)
def checkNewVideos():
global videoCount, replyCount, violationsReplyCount, checkedVideos
print("".join([("\n" if videoCount != 0 else ""), "开始检查新一轮推荐视频..."]))
print(f"已检查视频: {videoCount}")
print(f"已检查评论: {replyCount}")
print(f"已举报评论: {violationsReplyCount} 评论违规率: {((violationsReplyCount / replyCount * 100) if replyCount != 0 else 0.0):.5f}%")
print() # next line
for avid in getVideos():
print(f"开始检查视频: av{avid}, 现在时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}")
_checkVideo(avid)
videoCount += 1
checkedVideos.insert(0, (avid, time.time()))
checkedVideos = checkedVideos[:1500]
time.sleep(1.25)
def checkVideo(bvid: str):
global videoCount, checkedVideos
avid = bvid2avid(bvid)
_checkVideo(avid)
videoCount += 1
checkedVideos.insert(0, (avid, time.time()))
checkedVideos = checkedVideos[:1500]
time.sleep(1.25)
def waitRiskControl(output: bool = True):
global waitRiskControl_TimeRemaining, waitingRiskControl
stopSt = time.time()
stopMinute = 3
waitRiskControl_TimeRemaining = 60 * stopMinute
waitingRiskControl = True
print(f"警告!!! B站API返回了非JSON格式数据, 大概率被风控, 暂停{stopMinute}分钟...")
while time.time() - stopSt < 60 * stopMinute:
waitRiskControl_TimeRemaining = 60 * stopMinute - (time.time() - stopSt)
if output:
print(f"由于可能被风控, BiliClear暂停{stopMinute}分钟, 还剩余: {waitRiskControl_TimeRemaining:.2f}s")
time.sleep(1.5)
else:
time.sleep(0.005)
waitingRiskControl = False
if __name__ == "__main__":
_setMethod()
while True:
try:
match method:
case "1":
checkNewVideos()
case "2":
checkVideo(input("输入视频bvid: "))
case "3":
processUser(input("输入UID: "))
case _:
assert False, "unknow method"
except Exception as e:
print("错误", repr(e))
if isinstance(e, json.JSONDecodeError):
waitRiskControl()