Skip to content

Commit

Permalink
feat: enhance error handling and speed input
Browse files Browse the repository at this point in the history
  • Loading branch information
isHarryh committed Sep 19, 2024
1 parent 7441be0 commit e6fbb7b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 79 deletions.
2 changes: 1 addition & 1 deletion api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def video_progress_log(self, _session, _course, _job, _job_info, _dtoken, _durat
logger.warning("出现403报错,尝试修复无效,正在跳过当前任务点...")
return False

def study_video(self, _course, _job, _job_info, _speed: float = 1, _type: str = "Video"):
def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str = "Video"):
if _type == "Video":
_session = init_session(isVideo=True)
else:
Expand Down
15 changes: 5 additions & 10 deletions api/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
from loguru import logger

try:
from requests.exceptions import JSONDecodeError
except: # noqa: E722
from json import JSONDecodeError

class BaseException(Exception):
def __init__(self, _msg: str = None):
if _msg:
logger.error(_msg)


class LoginError(BaseException):
pass
class LoginError(Exception):
def __init__(self, *args: object):
super().__init__(*args)


class FormatError(Exception):
pass
def __init__(self, *args: object):
super().__init__(*args)
142 changes: 74 additions & 68 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def init_config():
parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号")
parser.add_argument("-p", "--password", type=str, default=None, help="登录密码")
parser.add_argument("-l", "--list", type=str, default=None, help="要学习的课程ID列表")
parser.add_argument("-s", "--speed", type=int, default=1, help="视频播放倍速(默认1,最大2)")
parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速(默认1,最大2)")
args = parser.parse_args()
if args.config:
config = configparser.ConfigParser()
Expand All @@ -25,73 +25,79 @@ def init_config():


if __name__ == '__main__':
# 初始化登录信息
username, password, course_list, speed = init_config()
# 强行限制倍速最大为2倍速
speed = 2 if speed > 2 else speed
if (not username) or (not password):
username = input("请输入你的手机号,按回车确认\n手机号:")
password = input("请输入你的密码,按回车确认\n密码:")
account = Account(username, password)
# 实例化超星API
chaoxing = Chaoxing(account=account)
# 检查当前登录状态,并检查账号密码
_login_state = chaoxing.login()
if not _login_state["status"]:
raise LoginError(_login_state["msg"])
# 获取所有的课程列表
all_course = chaoxing.get_course_list()
course_task = []
# 手动输入要学习的课程ID列表
if not course_list:
print("*" * 10 + "课程列表" + "*" * 10)
try:
# 初始化登录信息
username, password, course_list, speed = init_config()
# 规范化播放速度的输入值
speed = min(2.0, max(1.0, speed))
if (not username) or (not password):
username = input("请输入你的手机号,按回车确认\n手机号:")
password = input("请输入你的密码,按回车确认\n密码:")
account = Account(username, password)
# 实例化超星API
chaoxing = Chaoxing(account=account)
# 检查当前登录状态,并检查账号密码
_login_state = chaoxing.login()
if not _login_state["status"]:
raise LoginError(_login_state["msg"])
# 获取所有的课程列表
all_course = chaoxing.get_course_list()
course_task = []
# 手动输入要学习的课程ID列表
if not course_list:
print("*" * 10 + "课程列表" + "*" * 10)
for course in all_course:
print(f"ID: {course['courseId']} 课程名: {course['title']}")
print("*" * 28)
try:
course_list = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").split(",")
except Exception as e:
raise FormatError("输入格式错误") from e
# 筛选需要学习的课程
for course in all_course:
print(f"ID: {course['courseId']} 课程名: {course['title']}")
print("*" * 28)
try:
course_list = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").split(",")
except Exception as e:
raise FormatError("输入格式错误") from e
# 筛选需要学习的课程
for course in all_course:
if course["courseId"] in course_list:
course_task.append(course)
if not course_task:
course_task = all_course
# 开始遍历要学习的课程列表
logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(course_task)}")
for course in course_task:
# 获取当前课程的所有章节
point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"])
for point in point_list["points"]:
# 获取当前章节的所有任务点
jobs = []
job_info = None
jobs, job_info = chaoxing.get_job_list(course["clazzId"], course["courseId"], course["cpi"], point["id"])
# 可能存在章节无任何内容的情况
if not jobs:
continue
# 遍历所有任务点
for job in jobs:
# 视频任务
if job["type"] == "video":
logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}")
# 超星的接口没有返回当前任务是否为Audio音频任务
isAudio = False
try:
chaoxing.study_video(course, job, job_info, _speed=speed, _type="Video")
except JSONDecodeError as e:
logger.warning("当前任务非视频任务,正在尝试音频任务解码")
isAudio = True
if isAudio:
if course["courseId"] in course_list:
course_task.append(course)
if not course_task:
course_task = all_course
# 开始遍历要学习的课程列表
logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(course_task)}")
for course in course_task:
# 获取当前课程的所有章节
point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"])
for point in point_list["points"]:
# 获取当前章节的所有任务点
jobs = []
job_info = None
jobs, job_info = chaoxing.get_job_list(course["clazzId"], course["courseId"], course["cpi"], point["id"])
# 可能存在章节无任何内容的情况
if not jobs:
continue
# 遍历所有任务点
for job in jobs:
# 视频任务
if job["type"] == "video":
logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}")
# 超星的接口没有返回当前任务是否为Audio音频任务
isAudio = False
try:
chaoxing.study_video(course, job, job_info, _speed=speed, _type="Audio")
chaoxing.study_video(course, job, job_info, _speed=speed, _type="Video")
except JSONDecodeError as e:
logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过")
# 文档任务
elif job["type"] == "document":
logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}")
chaoxing.study_document(course, job)
# 测验任务
elif job["type"] == "workid":
logger.trace(f"识别到测验任务, 任务章节: {course['title']}")
logger.warning("当前任务非视频任务,正在尝试音频任务解码")
isAudio = True
if isAudio:
try:
chaoxing.study_video(course, job, job_info, _speed=speed, _type="Audio")
except JSONDecodeError as e:
logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过")
# 文档任务
elif job["type"] == "document":
logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}")
chaoxing.study_document(course, job)
# 测验任务
elif job["type"] == "workid":
logger.trace(f"识别到测验任务, 任务章节: {course['title']}")
except BaseException as e:
import traceback
logger.error(f"错误: {type(e).__name__}: {e}")
logger.error(traceback.format_exc())
raise e

0 comments on commit e6fbb7b

Please sign in to comment.