From e6fbb7b7749115ea66a337f7d67fc047f3984cf1 Mon Sep 17 00:00:00 2001 From: Harry Huang Date: Thu, 19 Sep 2024 11:27:07 +0800 Subject: [PATCH] feat: enhance error handling and speed input --- api/base.py | 2 +- api/exceptions.py | 15 ++--- main.py | 142 ++++++++++++++++++++++++---------------------- 3 files changed, 80 insertions(+), 79 deletions(-) diff --git a/api/base.py b/api/base.py index 71e8a0f6..cab8bd75 100644 --- a/api/base.py +++ b/api/base.py @@ -177,7 +177,7 @@ def video_progress_log(self, _session, _course, _job, _job_info, _dtoken, _durat logger.warning("出现403报错,尝试修复无效,正在跳过当前任务点...") return False - def study_video(self, _course, _job, _job_info, _speed: float = 1, _type: str = "Video"): + def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str = "Video"): if _type == "Video": _session = init_session(isVideo=True) else: diff --git a/api/exceptions.py b/api/exceptions.py index 473c197d..005474ac 100644 --- a/api/exceptions.py +++ b/api/exceptions.py @@ -1,19 +1,14 @@ -from loguru import logger - try: from requests.exceptions import JSONDecodeError except: # noqa: E722 from json import JSONDecodeError -class BaseException(Exception): - def __init__(self, _msg: str = None): - if _msg: - logger.error(_msg) - -class LoginError(BaseException): - pass +class LoginError(Exception): + def __init__(self, *args: object): + super().__init__(*args) class FormatError(Exception): - pass \ No newline at end of file + def __init__(self, *args: object): + super().__init__(*args) \ No newline at end of file diff --git a/main.py b/main.py index 9cb76451..1a4cd696 100644 --- a/main.py +++ b/main.py @@ -11,7 +11,7 @@ def init_config(): parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号") parser.add_argument("-p", "--password", type=str, default=None, help="登录密码") parser.add_argument("-l", "--list", type=str, default=None, help="要学习的课程ID列表") - parser.add_argument("-s", "--speed", type=int, default=1, help="视频播放倍速(默认1,最大2)") + parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速(默认1,最大2)") args = parser.parse_args() if args.config: config = configparser.ConfigParser() @@ -25,73 +25,79 @@ def init_config(): if __name__ == '__main__': - # 初始化登录信息 - username, password, course_list, speed = init_config() - # 强行限制倍速最大为2倍速 - speed = 2 if speed > 2 else speed - if (not username) or (not password): - username = input("请输入你的手机号,按回车确认\n手机号:") - password = input("请输入你的密码,按回车确认\n密码:") - account = Account(username, password) - # 实例化超星API - chaoxing = Chaoxing(account=account) - # 检查当前登录状态,并检查账号密码 - _login_state = chaoxing.login() - if not _login_state["status"]: - raise LoginError(_login_state["msg"]) - # 获取所有的课程列表 - all_course = chaoxing.get_course_list() - course_task = [] - # 手动输入要学习的课程ID列表 - if not course_list: - print("*" * 10 + "课程列表" + "*" * 10) + try: + # 初始化登录信息 + username, password, course_list, speed = init_config() + # 规范化播放速度的输入值 + speed = min(2.0, max(1.0, speed)) + if (not username) or (not password): + username = input("请输入你的手机号,按回车确认\n手机号:") + password = input("请输入你的密码,按回车确认\n密码:") + account = Account(username, password) + # 实例化超星API + chaoxing = Chaoxing(account=account) + # 检查当前登录状态,并检查账号密码 + _login_state = chaoxing.login() + if not _login_state["status"]: + raise LoginError(_login_state["msg"]) + # 获取所有的课程列表 + all_course = chaoxing.get_course_list() + course_task = [] + # 手动输入要学习的课程ID列表 + if not course_list: + print("*" * 10 + "课程列表" + "*" * 10) + for course in all_course: + print(f"ID: {course['courseId']} 课程名: {course['title']}") + print("*" * 28) + try: + course_list = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").split(",") + except Exception as e: + raise FormatError("输入格式错误") from e + # 筛选需要学习的课程 for course in all_course: - print(f"ID: {course['courseId']} 课程名: {course['title']}") - print("*" * 28) - try: - course_list = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").split(",") - except Exception as e: - raise FormatError("输入格式错误") from e - # 筛选需要学习的课程 - for course in all_course: - if course["courseId"] in course_list: - course_task.append(course) - if not course_task: - course_task = all_course - # 开始遍历要学习的课程列表 - logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(course_task)}") - for course in course_task: - # 获取当前课程的所有章节 - point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"]) - for point in point_list["points"]: - # 获取当前章节的所有任务点 - jobs = [] - job_info = None - jobs, job_info = chaoxing.get_job_list(course["clazzId"], course["courseId"], course["cpi"], point["id"]) - # 可能存在章节无任何内容的情况 - if not jobs: - continue - # 遍历所有任务点 - for job in jobs: - # 视频任务 - if job["type"] == "video": - logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") - # 超星的接口没有返回当前任务是否为Audio音频任务 - isAudio = False - try: - chaoxing.study_video(course, job, job_info, _speed=speed, _type="Video") - except JSONDecodeError as e: - logger.warning("当前任务非视频任务,正在尝试音频任务解码") - isAudio = True - if isAudio: + if course["courseId"] in course_list: + course_task.append(course) + if not course_task: + course_task = all_course + # 开始遍历要学习的课程列表 + logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(course_task)}") + for course in course_task: + # 获取当前课程的所有章节 + point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"]) + for point in point_list["points"]: + # 获取当前章节的所有任务点 + jobs = [] + job_info = None + jobs, job_info = chaoxing.get_job_list(course["clazzId"], course["courseId"], course["cpi"], point["id"]) + # 可能存在章节无任何内容的情况 + if not jobs: + continue + # 遍历所有任务点 + for job in jobs: + # 视频任务 + if job["type"] == "video": + logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + # 超星的接口没有返回当前任务是否为Audio音频任务 + isAudio = False try: - chaoxing.study_video(course, job, job_info, _speed=speed, _type="Audio") + chaoxing.study_video(course, job, job_info, _speed=speed, _type="Video") except JSONDecodeError as e: - logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过") - # 文档任务 - elif job["type"] == "document": - logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") - chaoxing.study_document(course, job) - # 测验任务 - elif job["type"] == "workid": - logger.trace(f"识别到测验任务, 任务章节: {course['title']}") \ No newline at end of file + logger.warning("当前任务非视频任务,正在尝试音频任务解码") + isAudio = True + if isAudio: + try: + chaoxing.study_video(course, job, job_info, _speed=speed, _type="Audio") + except JSONDecodeError as e: + logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过") + # 文档任务 + elif job["type"] == "document": + logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + chaoxing.study_document(course, job) + # 测验任务 + elif job["type"] == "workid": + logger.trace(f"识别到测验任务, 任务章节: {course['title']}") + except BaseException as e: + import traceback + logger.error(f"错误: {type(e).__name__}: {e}") + logger.error(traceback.format_exc()) + raise e \ No newline at end of file