diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index 43f47e9d..f3f0828c 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -265,11 +265,14 @@ async def get_note_by_id( ) return dict() - async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict: + async def get_note_comments( + self, note_id: str, xsec_token: str, cursor: str = "" + ) -> Dict: """ 获取一级评论的API Args: note_id: 笔记ID + xsec_token: 验证token cursor: 分页游标 Returns: @@ -281,17 +284,24 @@ async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict: "cursor": cursor, "top_comment_id": "", "image_formats": "jpg,webp,avif", + "xsec_token": xsec_token, } return await self.get(uri, params) async def get_note_sub_comments( - self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = "" + self, + note_id: str, + root_comment_id: str, + xsec_token: str, + num: int = 10, + cursor: str = "", ): """ 获取指定父评论下的子评论的API Args: note_id: 子评论的帖子ID root_comment_id: 根评论ID + xsec_token: 验证token num: 分页数量 cursor: 分页游标 @@ -304,12 +314,16 @@ async def get_note_sub_comments( "root_comment_id": root_comment_id, "num": num, "cursor": cursor, + "image_formats": "jpg,webp,avif", + "top_comment_id": "", + "xsec_token": xsec_token, } return await self.get(uri, params) async def get_note_all_comments( self, note_id: str, + xsec_token: str, crawl_interval: float = 1.0, callback: Optional[Callable] = None, max_count: int = 10, @@ -318,6 +332,7 @@ async def get_note_all_comments( 获取指定笔记下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息 Args: note_id: 笔记ID + xsec_token: 验证token crawl_interval: 爬取一次笔记的延迟单位(秒) callback: 一次笔记爬取结束后 max_count: 一次笔记爬取的最大评论数量 @@ -328,7 +343,9 @@ async def get_note_all_comments( comments_has_more = True comments_cursor = "" while comments_has_more and len(result) < max_count: - comments_res = await self.get_note_comments(note_id, comments_cursor) + comments_res = await self.get_note_comments( + note_id=note_id, xsec_token=xsec_token, cursor=comments_cursor + ) comments_has_more = comments_res.get("has_more", False) comments_cursor = comments_res.get("cursor", "") if "comments" not in comments_res: @@ -344,7 +361,10 @@ async def get_note_all_comments( await asyncio.sleep(crawl_interval) result.extend(comments) sub_comments = await self.get_comments_all_sub_comments( - comments, crawl_interval, callback + comments=comments, + xsec_token=xsec_token, + crawl_interval=crawl_interval, + callback=callback, ) result.extend(sub_comments) return result @@ -352,6 +372,7 @@ async def get_note_all_comments( async def get_comments_all_sub_comments( self, comments: List[Dict], + xsec_token: str, crawl_interval: float = 1.0, callback: Optional[Callable] = None, ) -> List[Dict]: @@ -359,6 +380,7 @@ async def get_comments_all_sub_comments( 获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息 Args: comments: 评论列表 + xsec_token: 验证token crawl_interval: 爬取一次评论的延迟单位(秒) callback: 一次评论爬取结束后 @@ -387,7 +409,11 @@ async def get_comments_all_sub_comments( while sub_comment_has_more: comments_res = await self.get_note_sub_comments( - note_id, root_comment_id, 10, sub_comment_cursor + note_id=note_id, + root_comment_id=root_comment_id, + xsec_token=xsec_token, + num=10, + cursor=sub_comment_cursor, ) sub_comment_has_more = comments_res.get("has_more", False) sub_comment_cursor = comments_res.get("cursor", "") diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index 88152b9f..0061aa94 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -135,7 +135,8 @@ async def search(self) -> None: utils.logger.info( f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}" ) - note_id_list: List[str] = [] + note_ids: List[str] = [] + xsec_tokens: List[str] = [] notes_res = await self.xhs_client.get_note_by_keyword( keyword=keyword, search_id=search_id, @@ -168,12 +169,13 @@ async def search(self) -> None: if note_detail: await xhs_store.update_xhs_note(note_detail) await self.get_notice_media(note_detail) - note_id_list.append(note_detail.get("note_id")) + note_ids.append(note_detail.get("note_id")) + xsec_tokens.append(note_detail.get("xsec_token")) page += 1 utils.logger.info( f"[XiaoHongShuCrawler.search] Note details: {note_details}" ) - await self.batch_get_note_comments(note_id_list) + await self.batch_get_note_comments(note_ids, xsec_tokens) except DataFetchError: utils.logger.error( "[XiaoHongShuCrawler.search] Get note detail error" @@ -200,8 +202,12 @@ async def get_creators_and_notes(self) -> None: callback=self.fetch_creator_notes_detail, ) - note_ids = [note_item.get("note_id") for note_item in all_notes_list] - await self.batch_get_note_comments(note_ids) + note_ids = [] + xsec_tokens = [] + for note_item in all_notes_list: + note_ids.append(note_item.get("note_id")) + xsec_tokens.append(note_item.get("xsec_token")) + await self.batch_get_note_comments(note_ids, xsec_tokens) async def fetch_creator_notes_detail(self, note_list: List[Dict]): """ @@ -245,12 +251,14 @@ async def get_specified_notes(self): get_note_detail_task_list.append(crawler_task) need_get_comment_note_ids = [] + xsec_tokens = [] note_details = await asyncio.gather(*get_note_detail_task_list) for note_detail in note_details: if note_detail: need_get_comment_note_ids.append(note_detail.get("note_id", "")) + xsec_tokens.append(note_detail.get("xsec_token", "")) await xhs_store.update_xhs_note(note_detail) - await self.batch_get_note_comments(need_get_comment_note_ids) + await self.batch_get_note_comments(need_get_comment_note_ids, xsec_tokens) async def get_note_detail_async_task( self, @@ -291,8 +299,10 @@ async def get_note_detail_async_task( ) if not note_detail_from_html: # 如果网页版笔记详情获取失败,则尝试API获取 - note_detail_from_api: Optional[Dict] = await self.xhs_client.get_note_by_id( - note_id, xsec_source, xsec_token + note_detail_from_api: Optional[Dict] = ( + await self.xhs_client.get_note_by_id( + note_id, xsec_source, xsec_token + ) ) note_detail = note_detail_from_html or note_detail_from_api if note_detail: @@ -311,7 +321,9 @@ async def get_note_detail_async_task( ) return None - async def batch_get_note_comments(self, note_list: List[str]): + async def batch_get_note_comments( + self, note_list: List[str], xsec_tokens: List[str] + ): """Batch get note comments""" if not config.ENABLE_GET_COMMENTS: utils.logger.info( @@ -324,14 +336,19 @@ async def batch_get_note_comments(self, note_list: List[str]): ) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list: List[Task] = [] - for note_id in note_list: + for index, note_id in enumerate(note_list): task = asyncio.create_task( - self.get_comments(note_id, semaphore), name=note_id + self.get_comments( + note_id=note_id, xsec_token=xsec_tokens[index], semaphore=semaphore + ), + name=note_id, ) task_list.append(task) await asyncio.gather(*task_list) - async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore): + async def get_comments( + self, note_id: str, xsec_token: str, semaphore: asyncio.Semaphore + ): """Get note comments with keyword filtering and quantity limitation""" async with semaphore: utils.logger.info( @@ -339,6 +356,7 @@ async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore): ) await self.xhs_client.get_note_all_comments( note_id=note_id, + xsec_token=xsec_token, crawl_interval=random.random(), callback=xhs_store.batch_update_xhs_note_comments, max_count=CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES,