Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: xhs comments add xsec_token #509

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions media_platform/xhs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,14 @@ async def get_note_by_id(
)
return dict()

async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
async def get_note_comments(
self, note_id: str, xsec_token: str, cursor: str = ""
) -> Dict:
"""
获取一级评论的API
Args:
note_id: 笔记ID
xsec_token: 验证token
cursor: 分页游标

Returns:
Expand All @@ -281,17 +284,24 @@ async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
"cursor": cursor,
"top_comment_id": "",
"image_formats": "jpg,webp,avif",
"xsec_token": xsec_token,
}
return await self.get(uri, params)

async def get_note_sub_comments(
self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = ""
self,
note_id: str,
root_comment_id: str,
xsec_token: str,
num: int = 10,
cursor: str = "",
):
"""
获取指定父评论下的子评论的API
Args:
note_id: 子评论的帖子ID
root_comment_id: 根评论ID
xsec_token: 验证token
num: 分页数量
cursor: 分页游标

Expand All @@ -304,12 +314,16 @@ async def get_note_sub_comments(
"root_comment_id": root_comment_id,
"num": num,
"cursor": cursor,
"image_formats": "jpg,webp,avif",
"top_comment_id": "",
"xsec_token": xsec_token,
}
return await self.get(uri, params)

async def get_note_all_comments(
self,
note_id: str,
xsec_token: str,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
max_count: int = 10,
Expand All @@ -318,6 +332,7 @@ async def get_note_all_comments(
获取指定笔记下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
Args:
note_id: 笔记ID
xsec_token: 验证token
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
max_count: 一次笔记爬取的最大评论数量
Expand All @@ -328,7 +343,9 @@ async def get_note_all_comments(
comments_has_more = True
comments_cursor = ""
while comments_has_more and len(result) < max_count:
comments_res = await self.get_note_comments(note_id, comments_cursor)
comments_res = await self.get_note_comments(
note_id=note_id, xsec_token=xsec_token, cursor=comments_cursor
)
comments_has_more = comments_res.get("has_more", False)
comments_cursor = comments_res.get("cursor", "")
if "comments" not in comments_res:
Expand All @@ -344,21 +361,26 @@ async def get_note_all_comments(
await asyncio.sleep(crawl_interval)
result.extend(comments)
sub_comments = await self.get_comments_all_sub_comments(
comments, crawl_interval, callback
comments=comments,
xsec_token=xsec_token,
crawl_interval=crawl_interval,
callback=callback,
)
result.extend(sub_comments)
return result

async def get_comments_all_sub_comments(
self,
comments: List[Dict],
xsec_token: str,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
) -> List[Dict]:
"""
获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息
Args:
comments: 评论列表
xsec_token: 验证token
crawl_interval: 爬取一次评论的延迟单位(秒)
callback: 一次评论爬取结束后

Expand Down Expand Up @@ -387,7 +409,11 @@ async def get_comments_all_sub_comments(

while sub_comment_has_more:
comments_res = await self.get_note_sub_comments(
note_id, root_comment_id, 10, sub_comment_cursor
note_id=note_id,
root_comment_id=root_comment_id,
xsec_token=xsec_token,
num=10,
cursor=sub_comment_cursor,
)
sub_comment_has_more = comments_res.get("has_more", False)
sub_comment_cursor = comments_res.get("cursor", "")
Expand Down
42 changes: 30 additions & 12 deletions media_platform/xhs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ async def search(self) -> None:
utils.logger.info(
f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}"
)
note_id_list: List[str] = []
note_ids: List[str] = []
xsec_tokens: List[str] = []
notes_res = await self.xhs_client.get_note_by_keyword(
keyword=keyword,
search_id=search_id,
Expand Down Expand Up @@ -168,12 +169,13 @@ async def search(self) -> None:
if note_detail:
await xhs_store.update_xhs_note(note_detail)
await self.get_notice_media(note_detail)
note_id_list.append(note_detail.get("note_id"))
note_ids.append(note_detail.get("note_id"))
xsec_tokens.append(note_detail.get("xsec_token"))
page += 1
utils.logger.info(
f"[XiaoHongShuCrawler.search] Note details: {note_details}"
)
await self.batch_get_note_comments(note_id_list)
await self.batch_get_note_comments(note_ids, xsec_tokens)
except DataFetchError:
utils.logger.error(
"[XiaoHongShuCrawler.search] Get note detail error"
Expand All @@ -200,8 +202,12 @@ async def get_creators_and_notes(self) -> None:
callback=self.fetch_creator_notes_detail,
)

note_ids = [note_item.get("note_id") for note_item in all_notes_list]
await self.batch_get_note_comments(note_ids)
note_ids = []
xsec_tokens = []
for note_item in all_notes_list:
note_ids.append(note_item.get("note_id"))
xsec_tokens.append(note_item.get("xsec_token"))
await self.batch_get_note_comments(note_ids, xsec_tokens)

async def fetch_creator_notes_detail(self, note_list: List[Dict]):
"""
Expand Down Expand Up @@ -245,12 +251,14 @@ async def get_specified_notes(self):
get_note_detail_task_list.append(crawler_task)

need_get_comment_note_ids = []
xsec_tokens = []
note_details = await asyncio.gather(*get_note_detail_task_list)
for note_detail in note_details:
if note_detail:
need_get_comment_note_ids.append(note_detail.get("note_id", ""))
xsec_tokens.append(note_detail.get("xsec_token", ""))
await xhs_store.update_xhs_note(note_detail)
await self.batch_get_note_comments(need_get_comment_note_ids)
await self.batch_get_note_comments(need_get_comment_note_ids, xsec_tokens)

async def get_note_detail_async_task(
self,
Expand Down Expand Up @@ -291,8 +299,10 @@ async def get_note_detail_async_task(
)
if not note_detail_from_html:
# 如果网页版笔记详情获取失败,则尝试API获取
note_detail_from_api: Optional[Dict] = await self.xhs_client.get_note_by_id(
note_id, xsec_source, xsec_token
note_detail_from_api: Optional[Dict] = (
await self.xhs_client.get_note_by_id(
note_id, xsec_source, xsec_token
)
)
note_detail = note_detail_from_html or note_detail_from_api
if note_detail:
Expand All @@ -311,7 +321,9 @@ async def get_note_detail_async_task(
)
return None

async def batch_get_note_comments(self, note_list: List[str]):
async def batch_get_note_comments(
self, note_list: List[str], xsec_tokens: List[str]
):
"""Batch get note comments"""
if not config.ENABLE_GET_COMMENTS:
utils.logger.info(
Expand All @@ -324,21 +336,27 @@ async def batch_get_note_comments(self, note_list: List[str]):
)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = []
for note_id in note_list:
for index, note_id in enumerate(note_list):
task = asyncio.create_task(
self.get_comments(note_id, semaphore), name=note_id
self.get_comments(
note_id=note_id, xsec_token=xsec_tokens[index], semaphore=semaphore
),
name=note_id,
)
task_list.append(task)
await asyncio.gather(*task_list)

async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore):
async def get_comments(
self, note_id: str, xsec_token: str, semaphore: asyncio.Semaphore
):
"""Get note comments with keyword filtering and quantity limitation"""
async with semaphore:
utils.logger.info(
f"[XiaoHongShuCrawler.get_comments] Begin get note id comments {note_id}"
)
await self.xhs_client.get_note_all_comments(
note_id=note_id,
xsec_token=xsec_token,
crawl_interval=random.random(),
callback=xhs_store.batch_update_xhs_note_comments,
max_count=CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES,
Expand Down