Skip to content

Commit

Permalink
fix: replace some with
Browse files Browse the repository at this point in the history
  • Loading branch information
Nemo2011 committed Dec 7, 2024
1 parent 663102f commit 41f356c
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 143 deletions.
2 changes: 1 addition & 1 deletion bilibili_api/article.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .utils.initial_state import get_initial_state, get_initial_state_sync
from .utils.utils import get_api, raise_for_statement
from .utils.credential import Credential
from .utils.network import Api, get_session
from .utils.network import Api
from .utils import cache_pool
from .exceptions.NetworkException import ApiException, NetworkException
from .video import get_cid_info_sync
Expand Down
9 changes: 5 additions & 4 deletions bilibili_api/ass.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
有关 ASS 文件的操作
"""
import os
import json
from tempfile import gettempdir
from typing import Union, Optional

Expand All @@ -14,7 +15,7 @@
from .utils.json2srt import json2srt
from .utils.credential import Credential
from .utils.danmaku2ass import Danmaku2ASS
from .utils.network import get_session
from .utils.network import Api
from .exceptions.ArgsException import ArgsException


Expand Down Expand Up @@ -132,10 +133,10 @@ async def make_ass_file_subtitle(
url = subtitle["subtitle_url"]
if isinstance(obj, Episode) or "https:" not in url:
url = "https:" + url
req = await get_session().request("GET", url)
req = await Api(url=url, method="GET").request(raw=True)
file_dir = gettempdir() + "/" + "subtitle.json"
with open(file_dir, "wb") as f:
f.write(req.content)
with open(file_dir, "w+") as f:
f.write(json.dumps(req))
export_ass_from_json(file_dir, out)
return
raise ValueError("没有找到指定字幕")
Expand Down
36 changes: 21 additions & 15 deletions bilibili_api/bangumi.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
from .video import Video
from .utils.utils import get_api, raise_for_statement
from .utils.credential import Credential
from .exceptions.ApiException import ApiException
from .utils.network import Api, get_session, HEADERS
from .utils.network import Api
from .utils.initial_state import (
get_initial_state,
get_initial_state_sync,
Expand Down Expand Up @@ -745,6 +744,7 @@ class Anime:
"""
动画
"""

def __init__(
self,
version: IndexFilter.Version = IndexFilter.Version.ALL,
Expand Down Expand Up @@ -793,6 +793,7 @@ class Movie:
"""
电影
"""

def __init__(
self,
area: IndexFilter.Area = IndexFilter.Area.ALL,
Expand Down Expand Up @@ -823,6 +824,7 @@ class Documentary:
"""
纪录片
"""

def __init__(
self,
release_date: str = -1,
Expand Down Expand Up @@ -851,6 +853,7 @@ class TV:
"""
TV
"""

def __init__(
self,
area: IndexFilter.Area = IndexFilter.Area.ALL,
Expand Down Expand Up @@ -879,6 +882,7 @@ class GuoChuang:
"""
国创
"""

def __init__(
self,
version: IndexFilter.Version = IndexFilter.Version.ALL,
Expand Down Expand Up @@ -915,6 +919,7 @@ class Variety:
"""
综艺
"""

def __init__(
self,
style: IndexFilter.Style.Variety = IndexFilter.Style.Variety.ALL,
Expand Down Expand Up @@ -1362,9 +1367,9 @@ def __init__(self, epid: int, credential: Union[Credential, None] = None):
else:
content = episode_data_cache[epid]["bangumi_meta"]
bvid = None
self.__ep_info = content["props"]["pageProps"]["dehydratedState"]["queries"][0][
"state"
]["data"]["result"]["play_view_business_info"]["episode_info"]
self.__ep_info = content["props"]["pageProps"]["dehydratedState"][
"queries"
][0]["state"]["data"]["result"]["play_view_business_info"]["episode_info"]
bvid = self.__ep_info["bvid"]
self.bangumi = episode_data_cache[epid]["bangumi_class"]
self.__ep_info: dict = episode_data_cache[epid]
Expand Down Expand Up @@ -1393,7 +1398,7 @@ async def get_cid(self) -> int:
Returns:
int: cid
"""
return (await self.get_episode_info())["epInfo"]["cid"]
return (await self.get_episode_info())["cid"]

def get_bangumi(self) -> "Bangumi":
"""
Expand Down Expand Up @@ -1480,13 +1485,7 @@ async def get_danmaku_xml(self) -> str:
"""
cid = await self.get_cid()
url = f"https://comment.bilibili.com/{cid}.xml"
sess = get_session()
config: dict[str, Any] = {"url": url}
# 代理
if settings.proxy:
config["proxies"] = {"all://", settings.proxy}
resp = await sess.get(**config)
return resp.content.decode("utf-8")
return (await Api(url=url, method="GET").request(byte=True)).decode("utf-8")

async def get_danmaku_view(self) -> dict:
"""
Expand All @@ -1498,18 +1497,25 @@ async def get_danmaku_view(self) -> dict:
return await self.video_class.get_danmaku_view(0)

async def get_danmakus(
self, date: Union[datetime.date, None] = None
self,
date: Union[datetime.date, None] = None,
from_seg: Union[int, None] = None,
to_seg: Union[int, None] = None,
) -> List["Danmaku"]:
"""
获取弹幕
Args:
date (datetime.date | None, optional): 指定某一天查询弹幕. Defaults to None. (不指定某一天)
from_seg (int, optional): 从第几段开始(0 开始编号,None 为从第一段开始,一段 6 分钟). Defaults to None.
to_seg (int, optional): 到第几段结束(0 开始编号,None 为到最后一段,包含编号的段,一段 6 分钟). Defaults to None.
Returns:
dict[Danmaku]: 弹幕列表
"""
return await self.video_class.get_danmakus(0, date)
return await self.video_class.get_danmakus(0, date, from_seg=from_seg, to_seg=to_seg)

async def get_history_danmaku_index(
self, date: Union[datetime.date, None] = None
Expand Down
119 changes: 50 additions & 69 deletions bilibili_api/cheese.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .utils.credential import Credential
from .utils.BytesReader import BytesReader
from .exceptions.ArgsException import ArgsException
from .utils.network import Api, get_session
from .utils.network import Api
from .exceptions import NetworkException, ResponseException, DanmakuClosedException

API = get_api("cheese")
Expand Down Expand Up @@ -291,24 +291,14 @@ async def get_danmaku_view(self) -> dict:
dict: 调用 API 返回的结果。
"""
cid = self.get_cid()
session = get_session()
api = API_video["danmaku"]["view"]

config = {}
config["url"] = api["url"]
config["params"] = {"type": 1, "oid": cid, "pid": self.get_aid()}
config["cookies"] = self.credential.get_cookies()
config["headers"] = {
"Referer": "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0",
}
params = {"type": 1, "oid": cid, "pid": self.get_aid()}

try:
resp = await session.get(**config)
resp_data = await Api(**api, credential=self.credential).update_params(**params).request(byte=True)
except Exception as e:
raise NetworkException(-1, str(e))

resp_data = resp.read()
json_data = {}
reader = BytesReader(resp_data)
# 解析二进制数据流
Expand Down Expand Up @@ -486,65 +476,60 @@ def read_image_danmakus(string: bytes):
continue
return json_data

async def get_danmakus(self, date: Union[datetime.date, None] = None):
async def get_danmakus(
self,
date: Union[datetime.date, None] = None,
from_seg: Union[int, None] = None,
to_seg: Union[int, None] = None,
) -> List[Danmaku]:
"""
获取弹幕。
Args:
date (datetime.Date | None, optional): 指定日期后为获取历史弹幕,精确到年月日。Defaults to None.
date (datetime.Date | None, optional): 指定日期后为获取历史弹幕,精确到年月日。Defaults to None.
from_seg (int, optional): 从第几段开始(0 开始编号,None 为从第一段开始,一段 6 分钟). Defaults to None.
to_seg (int, optional): 到第几段结束(0 开始编号,None 为到最后一段,包含编号的段,一段 6 分钟). Defaults to None.
Returns:
List[Danmaku]: Danmaku 类的列表。
注意:
- 1. 段数可以通过视频时长计算。6分钟为一段。
- 2. `from_seg` 和 `to_seg` 仅对 `date == None` 的时候有效果。
- 3. 例:取前 `12` 分钟的弹幕:`from_seg=0, to_seg=1`
"""
if date is not None:
self.credential.raise_for_no_sessdata()

# self.credential.raise_for_no_sessdata()

session = get_session()
cid = self.get_cid()
aid = self.get_aid()
params: dict[str, Any] = {"oid": self.get_cid(), "type": 1, "pid": aid}
params: dict[str, Any] = {"oid": cid, "type": 1, "pid": aid}
if date is not None:
# 获取历史弹幕
api = API_video["danmaku"]["get_history_danmaku"]
params["date"] = date.strftime("%Y-%m-%d")
params["type"] = 1
all_seg = 1
from_seg = to_seg = 0
else:
api = API_video["danmaku"]["get_danmaku"]
view = await self.get_danmaku_view()
all_seg = view["dm_seg"]["total"]
if from_seg == None:
from_seg = 0
if to_seg == None:
to_seg = self.get_meta()["duration"] // 360 + 1

danmakus = []

for seg in range(all_seg):
for seg in range(from_seg, to_seg + 1):
if date is None:
# 仅当获取当前弹幕时需要该参数
params["segment_index"] = seg + 1

config = {}
config["url"] = api["url"]
config["params"] = params
config["headers"] = {
"Referer": "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0",
}
config["cookies"] = self.credential.get_cookies()

try:
req = await session.get(**config)
data = await Api(**api, credential=self.credential).update_params(**params).request(byte=True)
except Exception as e:
raise NetworkException(-1, str(e))

if "content-type" not in req.headers.keys():
break
else:
content_type = req.headers["content-type"]
if content_type != "application/octet-stream":
raise ResponseException("返回数据类型错误:")

# 解析二进制流数据
data = req.read()
if data == b"\x10\x01":
# 视频弹幕被关闭
raise DanmakuClosedException()
Expand Down Expand Up @@ -584,7 +569,12 @@ async def get_danmakus(self, date: Union[datetime.date, None] = None):
elif data_type == 4:
dm.font_size = dm_reader.varint()
elif data_type == 5:
dm.color = hex(dm_reader.varint())[2:]
color = dm_reader.varint()
if color != 60001:
color = hex(color)[2:]
else:
color = "special"
dm.color = color
elif data_type == 6:
dm.crc32_id = dm_reader.string()
elif data_type == 7:
Expand All @@ -594,7 +584,7 @@ async def get_danmakus(self, date: Union[datetime.date, None] = None):
elif data_type == 9:
dm.weight = dm_reader.varint()
elif data_type == 10:
dm.action = str(dm_reader.varint())
dm.action = str(dm_reader.string())
elif data_type == 11:
dm.pool = dm_reader.varint()
elif data_type == 12:
Expand All @@ -609,33 +599,29 @@ async def get_danmakus(self, date: Union[datetime.date, None] = None):
dm_reader.bytes_string()
elif data_type == 21:
dm_reader.bytes_string()
elif data_type == 22:
dm_reader.bytes_string()
elif data_type == 25:
dm_reader.varint()
elif data_type == 26:
dm_reader.varint()
else:
break
danmakus.append(dm)
return danmakus

async def get_pbp(self):
async def get_pbp(self) -> dict:
"""
获取高能进度条
Returns:
调用 API 返回的结果
"""
cid = self.get_cid()

api = API_video["info"]["pbp"]

params = {"cid": cid}
return await Api(**api, credential=self.credential).update_params(**params).request(raw=True)

session = get_session()

return json.loads(
(
await session.get(
api["url"], params=params, cookies=self.credential.get_cookies()
)
).text
)

async def send_danmaku(self, danmaku: Union[Danmaku, None] = None):
"""
Expand Down Expand Up @@ -791,18 +777,13 @@ async def set_favorite(
}
return await Api(**api, credential=self.credential).update_data(**data).result

async def get_danmaku_xml(self):
async def get_danmaku_xml(self) -> str:
"""
获取弹幕(xml 源)
获取所有弹幕的 xml 源文件(非装填)
Returns:
str: xml 文件源
"""
url = f"https://comment.bilibili.com/{self.get_cid()}.xml"
sess = get_session()
config: dict[str, Any] = {"url": url}
# 代理
if settings.proxy:
config["proxies"] = {"all://", settings.proxy}
resp = await sess.get(**config)
return resp.content.decode("utf-8")
str: 文件源
"""
cid = self.get_cid()
url = f"https://comment.bilibili.com/{cid}.xml"
return (await Api(url=url, method="GET").request(byte=True)).decode("utf-8")
Loading

0 comments on commit 41f356c

Please sign in to comment.