-
Notifications
You must be signed in to change notification settings - Fork 262
/
main.py
266 lines (241 loc) · 11.1 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# -*- coding: utf-8 -*-
import argparse
import configparser
import random
from api.logger import logger
from api.base import Chaoxing, Account
from api.exceptions import LoginError, FormatError, JSONDecodeError, MaxRollBackError
from api.answer import Tiku
from urllib3 import disable_warnings, exceptions
import time
import sys
import os
# # 定义全局变量, 用于存储配置文件路径
# textPath = './resource/BookID.txt'
# # 获取文本 -> 用于查看学习过的课程ID
# def getText():
# try:
# if not os.path.exists(textPath):
# with open(textPath, 'x') as file: pass
# return []
# with open(textPath, 'r', encoding='utf-8') as file: content = file.read().split(',')
# content = {int(item.strip()) for item in content if item.strip()}
# return list(content)
# except Exception as e: logger.error(f"获取文本失败: {e}"); return []
# # 追加文本 -> 用于记录学习过的课程ID
# def appendText(text):
# if not os.path.exists(textPath): return
# with open(textPath, 'a', encoding='utf-8') as file: file.write(f'{text}, ')
# 关闭警告
disable_warnings(exceptions.InsecureRequestWarning)
def init_config():
parser = argparse.ArgumentParser(
description="Samueli924/chaoxing",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-c", "--config", type=str, default=None, help="使用配置文件运行程序"
)
parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号")
parser.add_argument("-p", "--password", type=str, default=None, help="登录密码")
parser.add_argument(
"-l", "--list", type=str, default=None, help="要学习的课程ID列表, 以 , 分隔"
)
parser.add_argument(
"-s", "--speed", type=float, default=1.0, help="视频播放倍速 (默认1, 最大2)"
)
parser.add_argument(
"-v",
"--verbose",
"--debug",
action="store_true",
help="启用调试模式, 输出DEBUG级别日志",
)
# 在解析之前捕获 -h 的行为
if len(sys.argv) == 2 and sys.argv[1] in {"-h", "--help"}:
parser.print_help()
# 返回一个 SystemExit 异常, 用于退出程序
raise SystemExit
# 提前检查 -h 和 --help 并退出
args = parser.parse_args()
if args.config:
config = configparser.ConfigParser()
config.read(args.config, encoding="utf8")
return (
config.get("common", "username"),
config.get("common", "password"),
(
str(config.get("common", "course_list")).split(",")
if config.get("common", "course_list")
else None
),
int(config.get("common", "speed")),
config["tiku"],
)
else:
return (
args.username,
args.password,
args.list.split(",") if args.list else None,
int(args.speed) if args.speed else 1,
None,
)
class RollBackManager:
def __init__(self) -> None:
self.rollback_times = 0
self.rollback_id = ""
def add_times(self, id: str) -> None:
if id == self.rollback_id and self.rollback_times == 3:
raise MaxRollBackError("回滚次数已达3次, 请手动检查学习通任务点完成情况")
elif id != self.rollback_id:
# 新job
self.rollback_id = id
self.rollback_times = 1
else:
self.rollback_times += 1
if __name__ == "__main__":
try:
# 避免异常的无限回滚
RB = RollBackManager()
# 初始化登录信息
username, password, course_list, speed, tiku_config = init_config()
# 规范化播放速度的输入值
speed = min(2.0, max(1.0, speed))
if (not username) or (not password):
username = input("请输入你的手机号, 按回车确认\n手机号:")
password = input("请输入你的密码, 按回车确认\n密码:")
account = Account(username, password)
# 设置题库
tiku = Tiku()
tiku.config_set(tiku_config) # 载入配置
tiku = tiku.get_tiku_from_config() # 载入题库
tiku.init_tiku() # 初始化题库
# 实例化超星API
chaoxing = Chaoxing(account=account, tiku=tiku)
# 检查当前登录状态, 并检查账号密码
_login_state = chaoxing.login()
if not _login_state["status"]:
raise LoginError(_login_state["msg"])
# 获取所有的课程列表
all_course = chaoxing.get_course_list()
course_task = []
# 手动输入要学习的课程ID列表
if not course_list:
print("*" * 10 + "课程列表" + "*" * 10)
for course in all_course:
print(f"ID: {course['courseId']} 课程名: {course['title']}")
print("*" * 28)
try:
course_list = input(
"请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n"
).split(",")
except Exception as e:
raise FormatError("输入格式错误") from e
# 筛选需要学习的课程
for course in all_course:
if course["courseId"] in course_list:
course_task.append(course)
if not course_task:
course_task = all_course
# 开始遍历要学习的课程列表
logger.info(f"课程列表过滤完毕, 当前课程任务数量: {len(course_task)}")
for course in course_task:
logger.info(f"开始学习课程: {course['title']}")
# 获取当前课程的所有章节
point_list = chaoxing.get_course_point(
course["courseId"], course["clazzId"], course["cpi"]
)
# 为了支持课程任务回滚, 采用下标方式遍历任务点
__point_index = 0
while __point_index < len(point_list["points"]):
point = point_list["points"][__point_index]
logger.info(f'当前章节: {point["title"]}')
logger.debug(f"当前章节 __point_index: {__point_index}") # 触发参数: -v
sleep_duration = random.uniform(1, 3)
logger.debug(f"本次随机等待时间: {sleep_duration}")
time.sleep(sleep_duration) # 避免请求过快导致异常, 所以引入随机sleep
# 获取当前章节的所有任务点
jobs = []
job_info = None
jobs, job_info = chaoxing.get_job_list(
course["clazzId"], course["courseId"], course["cpi"], point["id"]
)
# bookID = job_info["knowledgeid"] # 获取视频ID
# 发现未开放章节, 尝试回滚上一个任务重新完成一次
try:
if job_info.get("notOpen", False):
__point_index -= 1 # 默认第一个任务总是开放的
# 针对题库启用情况
if not tiku or tiku.DISABLE or not tiku.SUBMIT:
# 未启用题库或未开启题库提交, 章节检测未完成会导致无法开始下一章, 直接退出
logger.error(
f"章节未开启, 可能由于上一章节的章节检测未完成, 请手动完成并提交再重试, 或者开启题库并启用提交"
)
break
RB.add_times(point["id"])
continue
except MaxRollBackError as e:
logger.error("回滚次数已达3次, 请手动检查学习通任务点完成情况")
# 跳过该课程, 继续下一课程
break
# 可能存在章节无任何内容的情况
if not jobs:
__point_index += 1
continue
# 遍历所有任务点
for job in jobs:
# 视频任务
if job["type"] == "video":
# TODO: 目前这个记录功能还不够完善, 中途退出的课程ID也会被记录
# TextBookID = getText() # 获取学习过的课程ID
# if TextBookID.count(bookID) > 0:
# logger.info(f"课程: {course['title']} 章节: {point['title']} 任务: {job['title']} 已学习过或在学习中, 跳过") # 如果已经学习过该课程, 则跳过
# break # 如果已经学习过该课程, 则跳过
# appendText(bookID) # 记录正在学习的课程ID
logger.trace(
f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}"
)
# 超星的接口没有返回当前任务是否为Audio音频任务
isAudio = False
try:
chaoxing.study_video(
course, job, job_info, _speed=speed, _type="Video"
)
except JSONDecodeError as e:
logger.warning("当前任务非视频任务, 正在尝试音频任务解码")
isAudio = True
if isAudio:
try:
chaoxing.study_video(
course, job, job_info, _speed=speed, _type="Audio"
)
except JSONDecodeError as e:
logger.warning(
f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过"
)
# 文档任务
elif job["type"] == "document":
logger.trace(
f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}"
)
chaoxing.study_document(course, job)
# 测验任务
elif job["type"] == "workid":
logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}")
chaoxing.study_work(course, job, job_info)
# 阅读任务
elif job["type"] == "read":
logger.trace(f"识别到阅读任务, 任务章节: {course['title']}")
chaoxing.strdy_read(course, job, job_info)
__point_index += 1
logger.info("所有课程学习任务已完成")
except SystemExit as e:
if e.code == 0: # 正常退出
sys.exit(0)
else:
raise
except BaseException as e:
import traceback
logger.error(f"错误: {type(e).__name__}: {e}")
logger.error(traceback.format_exc())
raise e