diff --git a/src/apis/scaffold/claimer.py b/src/apis/scaffold/claimer.py index 8e38655402..d0c6db747f 100644 --- a/src/apis/scaffold/claimer.py +++ b/src/apis/scaffold/claimer.py @@ -5,7 +5,7 @@ # Description: from typing import Optional -from services.deploy import ClaimerScheduler +from services.deploy import ClaimerScheduler, ClaimerInstance from services.settings import logger @@ -16,6 +16,7 @@ def deploy(platform: Optional[str] = None): @logger.catch() -def run(silence: Optional[bool] = None): +def run(silence: Optional[bool] = None, log_ignore: Optional[bool] = None): """运行 `claim` 单步子任务,认领周免游戏""" - ClaimerScheduler(silence=silence).job_loop_claim() + with ClaimerInstance(silence=silence, log_ignore=log_ignore) as claimer: + claimer.just_do_it() diff --git a/src/services/bricklayer/bricklayer.py b/src/services/bricklayer/bricklayer.py index dc4e74374d..4e412c7a76 100644 --- a/src/services/bricklayer/bricklayer.py +++ b/src/services/bricklayer/bricklayer.py @@ -235,9 +235,6 @@ def __init__(self, silence: bool = None): self.cookie_manager = CookieManager() - # 游戏获取结果的状态 - self.result = "" - def get_free_game( self, page_link: str, diff --git a/src/services/bricklayer/core.py b/src/services/bricklayer/core.py index ff3cae7b42..e767ed972c 100644 --- a/src/services/bricklayer/core.py +++ b/src/services/bricklayer/core.py @@ -12,6 +12,7 @@ from typing import List, Optional, NoReturn, Dict import cloudscraper +from requests.exceptions import RequestException from selenium.common.exceptions import ( TimeoutException, ElementNotVisibleException, @@ -87,11 +88,16 @@ def _ajax_cookie_check_need_login(beat_dance: int = 0) -> Optional[bool]: scraper = cloudscraper.create_scraper() try: response = scraper.get( - _api, headers={"cookie": ToolBox.transfer_cookies(ctx.get_cookies())} + _api, + headers={"cookie": ToolBox.transfer_cookies(ctx.get_cookies())}, + timeout=2, ) return response.json()["needLogin"] except (json.decoder.JSONDecodeError, KeyError): return True + # Timeout/ConnectionError + except RequestException: + return True except Exception as err: # noqa logger.warning(err) return None @@ -131,11 +137,11 @@ def fall_in_captcha_runtime(ctx: Chrome) -> Optional[bool]: """ 判断在下单时是否遇到人机挑战 + # "//div[@id='talon_frame_checkout_free_prod']" :param ctx: :return: """ try: - # "//div[@id='talon_frame_checkout_free_prod']" WebDriverWait(ctx, 5, ignored_exceptions=WebDriverException).until( EC.presence_of_element_located( (By.XPATH, "//iframe[contains(@title,'content')]") @@ -368,6 +374,7 @@ class AssertUtils: GAME_OK = "🛴 已在库" GAME_PENDING = "👀 待认领" GAME_CLAIM = "💰 领取成功" + GAME_NOT_FREE = "🦽 付费游戏" @staticmethod def login_error(ctx: Chrome) -> bool: @@ -645,7 +652,7 @@ def purchase_status( game=f"『{game_name}』", ) ) - return AssertUtils.ASSERT_OBJECT_EXCEPTION + return AssertUtils.GAME_NOT_FREE return AssertUtils.ASSERT_OBJECT_EXCEPTION @@ -687,6 +694,9 @@ def __init__(self): self.path_ctx_cookies = os.path.join(DIR_COOKIES, "ctx_cookies.yaml") self.loop_timeout = 300 + # 游戏获取结果的状态 + self.result = "" + # 注册拦截机 self._armor = ArmorUtils() self.assert_ = AssertUtils() @@ -880,10 +890,10 @@ def _get_free_game( # 当游戏不处于<待认领>状态时跳过后续业务 if self.result != self.assert_.GAME_PENDING: # <游戏状态断言超时>或<检测到异常的实体对象> - # # 在超时阈值内尝试重新拉起服务 + # 在超时阈值内尝试重新拉起服务 if self.result == self.assert_.ASSERT_OBJECT_EXCEPTION: continue - # 否则游戏状态处于<领取成功>或<已在库> + # 否则游戏状态处于<领取成功>或<已在库>或<付费游戏> break # [🚀] 激活游戏订单 diff --git a/src/services/deploy.py b/src/services/deploy.py index d1a46963d7..1d43889cae 100644 --- a/src/services/deploy.py +++ b/src/services/deploy.py @@ -5,7 +5,7 @@ # Description: import random from datetime import datetime, timedelta -from typing import Optional +from typing import Optional, List import apprise import pytz @@ -22,8 +22,6 @@ class ClaimerScheduler: """系统任务调度器""" - SPAWN_TIME = "spawn_time" - def __init__(self, silence: Optional[bool] = None): self.action_name = "AwesomeScheduler" self.end_date = datetime.now(pytz.timezone("Asia/Shanghai")) + timedelta(days=180) @@ -31,9 +29,6 @@ def __init__(self, silence: Optional[bool] = None): # 服务注册 self.scheduler = BlockingScheduler() - self.bricklayer = Bricklayer(silence=silence) - self.explorer = Explorer(silence=silence) - self.task_queue = Queue() self.logger = logger def deploy_on_vps(self): @@ -82,6 +77,104 @@ def deploy_on_vps(self): ) ) + def deploy_jobs(self, platform: Optional[str] = None): + """ + 部署系统任务 + + :param platform: within [vps serverless qing-long] + :return: + """ + platform = "vps" if platform is None else platform + if platform not in ["vps", "serverless", "qing-long"]: + raise NotImplementedError + + self.logger.debug( + ToolBox.runtime_report( + motive="JOB", + action_name=self.action_name, + message="部署任务调度器", + platform=platform.upper(), + ) + ) + + # [⚔] Distribute common state machine patterns + if platform == "vps": + self.deploy_on_vps() + elif platform == "serverless": + raise NotImplementedError + elif platform == "qing-long": + return self.job_loop_claim() + + def job_loop_claim(self): + """wrap function for claimer instance""" + with ClaimerInstance(silence=self.silence) as claimer: + claimer.just_do_it() + + +class ClaimerInstance: + """单步子任务 认领周免游戏""" + + def __init__(self, silence: bool, log_ignore: Optional[bool] = False): + """ + + :param silence: + :param log_ignore: 过滤掉已在库的资源实体的推送信息。 + """ + self.action_name = "ClaimerInstance" + self.silence = silence + self.logger = logger + self.log_ignore = log_ignore + + # 服务注册 + self.bricklayer = Bricklayer(silence=silence) + self.explorer = Explorer(silence=silence) + + # 任务队列 按顺缓存周免游戏及其免费附加内容的认领任务 + self.task_queue = Queue() + # 消息队列 按序缓存认领任务的执行状态 + self.message_queue = Queue() + # 内联数据容器 编排推送模版 + self.inline_docker = [] + + def __enter__(self): + # 集成统一的驱动上下文,减少内存占用 + self.challenger = get_challenge_ctx(silence=self.silence) + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # 消息推送 + self._pusher_wrapper() + + # 缓存卸载 + if hasattr(self, "challenger"): + self.challenger.quit() + + def _pusher_wrapper(self): + while not self.message_queue.empty(): + context = self.message_queue.get() + # 过滤已在库的游戏资源的推送数据 + if ( + self.log_ignore is True + and context["status"] == self.bricklayer.assert_.GAME_OK + ): + continue + self.inline_docker.append(context) + + # 在 `ignore` 模式下当所有资源实体都已在库时不推送消息 + if self.inline_docker: + self._push(inline_docker=self.inline_docker) + # 在 `ignore` 模式下追加 DEBUG 标签日志 + elif self.log_ignore: + self.logger.debug( + ToolBox.runtime_report( + motive="Notify", + action_name=self.action_name, + message="忽略已在库的资源实体推送信息", + ignore=self.log_ignore, + ) + ) + def _push(self, inline_docker: list, pusher_settings: Optional[dict] = None): """ 推送追踪日志 @@ -90,7 +183,6 @@ def _push(self, inline_docker: list, pusher_settings: Optional[dict] = None): :param pusher_settings: :return: """ - # ------------------------- # [♻]参数过滤 # ------------------------- @@ -102,7 +194,7 @@ def _push(self, inline_docker: list, pusher_settings: Optional[dict] = None): # [📧]消息推送 # ------------------------- _inline_textbox = [f"当前玩家:{ToolBox.secret_email(self.bricklayer.email)}"] - _inline_textbox += ["运行日志".center(20, "-")] + _inline_textbox += ["周免游戏".center(20, "-")] if not inline_docker: _inline_textbox += [f"[{ToolBox.date_format_now()}] 🛴 暂无待认领的周免游戏"] else: @@ -137,140 +229,93 @@ def _push(self, inline_docker: list, pusher_settings: Optional[dict] = None): ) ) - def deploy_jobs(self, platform: Optional[str] = None): - """ - 部署系统任务 - - :param platform: within [vps serverless qing-long] - :return: - """ - platform = "vps" if platform is None else platform - if platform not in ["vps", "serverless", "qing-long"]: - raise NotImplementedError - - self.logger.debug( - ToolBox.runtime_report( - motive="JOB", - action_name=self.action_name, - message="部署任务调度器", - platform=platform.upper(), - ) - ) - - # [⚔] Distribute common state machine patterns - if platform == "vps": - self.deploy_on_vps() - elif platform == "serverless": - raise NotImplementedError - elif platform == "qing-long": - return self.job_loop_claim() - - def job_loop_claim(self): - """单步子任务 认领周免游戏""" - - def _release_power(urls: Optional[list] = None): - if not urls: - self.logger.debug( - ToolBox.runtime_report( - motive="SKIP", - action_name=self.action_name, - message="🛴 当前玩家暂无待认领的周免游戏。", - ) - ) - return - - # 优先处理常规情况 urls.__len__() == 1 - for url in urls: - self.logger.debug( - ToolBox.runtime_report( - motive="STARTUP", - action_name="ScaffoldClaim", - message="🍜 正在为玩家领取周免游戏", - game=f"『{limited_free_game_objs[url]}』", - ) - ) - - # 更新任务队列 - challenger.switch_to.new_window("tab") - self.task_queue.put({"game": challenger.current_window_handle}) - - # 反复生产挑战者领取周免游戏 - self.bricklayer.get_free_game( - page_link=url, ctx_cookies=ctx_cookies, _ctx_session=challenger + def claim_free_game( + self, + challenger, + ctx_cookies: List[dict], + game_objs: dict, + urls: Optional[List[str]] = None, + ): + """认领周免游戏""" + if not urls: + self.logger.debug( + ToolBox.runtime_report( + motive="SKIP", + action_name=self.action_name, + message="🛴 当前玩家暂无待认领的周免游戏。", ) + ) + return - # 编制运行缓存 用于生成业务报告 - _runtime = { - self.SPAWN_TIME: ToolBox.date_format_now(), - "status": self.bricklayer.result, - "name": limited_free_game_objs[url], - } - inline_docker.append(_runtime) - - def _release_follower(): - while not self.task_queue.empty(): - context = self.task_queue.get() - - # {"game": WebDriver Window} - if isinstance(context, dict) and context.get("game"): - challenger.switch_to.window(context["game"]) - dlc_details = self.bricklayer.get_free_dlc_details( - _ctx_session=challenger - ) - for dlc in dlc_details: - self.task_queue.put(dlc) - # {"url": link of dlc , "name": alia-label of dlc} - elif isinstance(context, dict) and context.get("url"): - result = self.bricklayer.get_free_dlc( - dlc_page_link=context["url"], - ctx_cookies=ctx_cookies, - _ctx_session=challenger, - ) - _runtime = { - self.SPAWN_TIME: ToolBox.date_format_now(), - "status": result, - "name": context["name"], - "dlc": True, - } - inline_docker.append(_runtime) - - # 标记运行时刻 - if self.scheduler.running: + # 优先处理常规情况 urls.__len__() == 1 + for url in urls: self.logger.debug( ToolBox.runtime_report( - motive="JOB", - action_name=self.action_name, - message="定时任务启动", - job="claim", + motive="STARTUP", + action_name="ScaffoldClaim", + message="🍜 正在为玩家领取周免游戏", + game=f"『{game_objs[url]}』", ) ) - # 初始化内联数据容器 临时存储运行缓存 - inline_docker = [] + # 更新任务队列 + challenger.switch_to.new_window("tab") + self.task_queue.put({"game": challenger.current_window_handle}) - # 集成统一的驱动上下文,减少内存占用 - challenger = get_challenge_ctx(silence=self.silence) + # 反复生产挑战者领取周免游戏 + self.bricklayer.get_free_game( + page_link=url, ctx_cookies=ctx_cookies, _ctx_session=challenger + ) - try: - # 检查并更新身份令牌 - if self.bricklayer.cookie_manager.refresh_ctx_cookies( - _ctx_session=challenger - ): - # 读取有效的身份令牌 - ctx_cookies = self.bricklayer.cookie_manager.load_ctx_cookies() + # 编制运行缓存 用于生成业务报告 + _runtime = {"status": self.bricklayer.result, "name": game_objs[url]} + self.message_queue.put_nowait(_runtime) + + def claim_free_dlc(self, challenger, ctx_cookies): + """认领周免游戏的免费附加内容""" + while not self.task_queue.empty(): + context = self.task_queue.get() - # 扫描商城促销活动,返回“0折”商品的名称与商城链接 - limited_free_game_objs = self.explorer.get_the_absolute_free_game( - ctx_cookies, _ctx_session=challenger + # {"game": WebDriver Window} + if isinstance(context, dict) and context.get("game"): + challenger.switch_to.window(context["game"]) + dlc_details = self.bricklayer.get_free_dlc_details( + _ctx_session=challenger + ) + for dlc in dlc_details: + self.task_queue.put(dlc) + # {"url": link of dlc , "name": alia-label of dlc} + elif isinstance(context, dict) and context.get("url"): + result = self.bricklayer.get_free_dlc( + dlc_page_link=context["url"], + ctx_cookies=ctx_cookies, + _ctx_session=challenger, ) + _runtime = {"status": result, "name": context["name"], "dlc": True} + self.message_queue.put_nowait(_runtime) - # 释放 Claimer 认领周免游戏 - _release_power(limited_free_game_objs["urls"]) - self._push(inline_docker) + def just_do_it(self): + """单步子任务 认领周免游戏""" + # 检查并更新身份令牌 + if self.bricklayer.cookie_manager.refresh_ctx_cookies( + _ctx_session=self.challenger + ): + # 读取有效的身份令牌 + ctx_cookies = self.bricklayer.cookie_manager.load_ctx_cookies() + + # 扫描商城促销活动,返回“0折”商品的名称与商城链接 + limited_free_game_objs = self.explorer.get_the_absolute_free_game( + ctx_cookies, _ctx_session=self.challenger + ) - # 释放 Claimer 认领游戏DLC - _release_follower() - self._push(inline_docker) + # 释放 Claimer 认领周免游戏 + urls = limited_free_game_objs["urls"] + self.claim_free_game( + challenger=self.challenger, + ctx_cookies=ctx_cookies, + game_objs=limited_free_game_objs, + urls=urls, + ) - finally: - challenger.quit() + # 释放 Claimer 认领游戏DLC + self.claim_free_dlc(challenger=self.challenger, ctx_cookies=ctx_cookies) diff --git a/src/services/explorer/core.py b/src/services/explorer/core.py index 7eea76d1b4..901bc635d5 100644 --- a/src/services/explorer/core.py +++ b/src/services/explorer/core.py @@ -30,7 +30,7 @@ class AwesomeFreeGirl: URL_STORE_FREE_GAME = ( f"{URL_STORE_PREFIX}sortBy=releaseDate&sortDir=DESC&priceTier=tierFree&count=40" ) - URL_STORE_FREE_DLC = f"{URL_STORE_PREFIX}sortBy=releaseDate&sortDir=DESC&priceTier=tierFree&category=GameAddOn&count=40&start=0" + URL_STORE_FREE_DLC = f"{URL_STORE_PREFIX}sortBy=releaseDate&sortDir=DESC&priceTier=tierFree&category=GameAddOn&count=40&start=0" # noqa URL_PROMOTIONS = "https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions?locale=zh-CN" URL_PRODUCT_PAGE = "https://store.epicgames.com/zh-CN/p/" diff --git a/src/services/scaffold.py b/src/services/scaffold.py index a4c3924141..8e59fa7bf1 100644 --- a/src/services/scaffold.py +++ b/src/services/scaffold.py @@ -82,7 +82,7 @@ def get( get.join(trace=debug, cache=cache, category=category) @staticmethod - def claim(silence: Optional[bool] = True): + def claim(silence: Optional[bool] = True, ignore: Optional[bool] = False): """ 认领周免游戏。 @@ -92,9 +92,11 @@ def claim(silence: Optional[bool] = True): `claim` 是系统级指令 `deploy` 的单步子任务,在上述业务结束后,会根据你配置的 `pusher` 推送追踪日志(若配置无效则不发)。 + :param silence: + :param ignore: 忽略已在库的推送数据。 :return: """ - claimer.run(silence=silence) + claimer.run(silence=silence, log_ignore=ignore) @staticmethod def deploy(platform: Optional[str] = None): diff --git a/src/services/utils/toolbox/toolbox.py b/src/services/utils/toolbox/toolbox.py index 8296a75814..9b566f8142 100644 --- a/src/services/utils/toolbox/toolbox.py +++ b/src/services/utils/toolbox/toolbox.py @@ -12,6 +12,7 @@ import pytz import undetected_chromedriver as uc import yaml +from gevent.queue import Queue from loguru import logger from selenium.webdriver import Chrome from selenium.webdriver import ChromeOptions @@ -21,6 +22,8 @@ class ToolBox: """可移植的工具箱""" + logger_tracer = Queue() + @staticmethod def check_sample_yaml(path_output: str, path_sample: str) -> Optional[Dict[str, Any]]: """ @@ -61,7 +64,11 @@ def check_sample_yaml(path_output: str, path_sample: str) -> Optional[Dict[str, @staticmethod def runtime_report( - action_name: str, motive: str = "RUN", message: str = "", **params + action_name: str, + motive: str = "RUN", + message: str = "", + record: bool = False, + **params, ) -> str: """格式化输出""" flag_ = f">> {motive} [{action_name}]" @@ -70,6 +77,11 @@ def runtime_report( if params: flag_ += " - " flag_ += " ".join([f"{i[0]}={i[1]}" for i in params.items()]) + + # 将系统级日志按序插入消息队列 + if bool(record) is True: + ToolBox.logger_tracer.put_nowait(flag_) + return flag_ @staticmethod