Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update: [Server][EDCBTuner] ProactorEventLoop.create_pipe_connection() の使用をやめる #44

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions server/app/streams/LiveEncodingTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from app.models import Channel
from app.models import LiveStream
from app.utils import Logging
from app.utils.EDCB import EDCBTuner
from app.utils.EDCB import EDCBTuner, PipeStreamReader


class LiveEncodingTask:
Expand Down Expand Up @@ -529,7 +529,7 @@ async def run(self) -> None:
is_running: bool = True

# 放送波の MPEG2-TS を受信する StreamReader
stream_reader: asyncio.StreamReader | aiohttp.StreamReader
stream_reader: asyncio.StreamReader | PipeStreamReader | aiohttp.StreamReader

# EDCB のチューナーインスタンス (Mirakurun バックエンド利用時は常に None)
tuner: EDCBTuner | None = None
Expand Down Expand Up @@ -672,7 +672,7 @@ async def Reader():

# 受信した放送波が入るイテレータを作成
# R/W バッファ: 188B (TS Packet Size) * 256 = 48128B
async def GetIterator(stream_reader: asyncio.StreamReader | aiohttp.StreamReader, chunk_size: int = 48128) -> AsyncIterator[bytes]:
async def GetIterator(stream_reader: asyncio.StreamReader | PipeStreamReader | aiohttp.StreamReader, chunk_size: int = 48128) -> AsyncIterator[bytes]:
while True:
try:
yield await stream_reader.readexactly(chunk_size)
Expand Down
59 changes: 38 additions & 21 deletions server/app/utils/EDCB.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,35 @@
import sys
import time
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, cast, ClassVar

from app.constants import CONFIG


class PipeStreamReader:
"""
パイプのファイルオブジェクトを非同期で読み込むクラス
ProactorEventLoop のパイプサポートは未だ不十分で、ドキュメントされていない create_pipe_connection メソッドも
内部で Win32API の CreateFile に渡すフラグが不適切で使い物にならないためつなぎとして用意したもの
"""

def __init__(self, pipe: Any, executor: ThreadPoolExecutor, loop: Any):
self.__pipe = pipe
self.__executor = executor
self.__loop = loop
self.__buffer = bytearray()

async def readexactly(self, n: int) -> bytes:
self.__buffer.clear()
while len(self.__buffer) < n:
data = await self.__loop.run_in_executor(self.__executor, lambda: self.__pipe.read(n - len(self.__buffer)))
if len(data) == 0:
raise asyncio.IncompleteReadError(bytes(self.__buffer), None)
self.__buffer += data
return bytes(self.__buffer)


class EDCBTuner:
""" EDCB バックエンドのチューナーを制御するクラス """

Expand Down Expand Up @@ -173,34 +197,37 @@ async def open(self) -> bool:
return True


async def connect(self) -> asyncio.StreamReader | None:
async def connect(self) -> asyncio.StreamReader | PipeStreamReader | None:
"""
チューナーに接続し、放送波を受け取るための TCP ソケットまたは名前付きパイプを返す

Returns:
asyncio.StreamReader | None: TCP ソケットまたは名前付きパイプの StreamReader (取得できなかった場合は None を返す)
asyncio.StreamReader | PipeStreamReader | None: TCP ソケットまたは名前付きパイプの StreamReader (取得できなかった場合は None を返す)
"""

# プロセス ID が取得できている(チューナーが起動している)ことが前提
if self._edcb_process_id is None:
return None

stream_reader: asyncio.StreamReader | PipeStreamReader | None = None

# チューナーに接続する
if EDCBUtil.getEDCBHost() != 'edcb-namedpipe':
## EpgDataCap_Bon で受信した放送波を受け取るための名前付きパイプの出力を、
## EpgTimerSrv の CtrlCmd インターフェイス (TCP API) 経由で受信するための TCP ソケット (StreamReader / StreamWriter)
result = await EDCBUtil.openViewStream(self._edcb_process_id)
stream_reader, stream_writer = (None, None) if result is None else result
else:
## EpgDataCap_Bon で受信した放送波を受け取るための名前付きパイプ (StreamReader / StreamWriter)
result = await EDCBUtil.openPipeStream(self._edcb_process_id)
## EpgDataCap_Bon で受信した放送波を受け取るための名前付きパイプ (PipeStreamReader)
stream_reader = await EDCBUtil.openPipeStream(self._edcb_process_id)
stream_writer = None

# チューナーへの接続に失敗した
## チューナーを閉じてからエラーを返す
if result is None:
if stream_reader is None:
await self.close() # チューナーを閉じる
return None

stream_reader, stream_writer = result
self._edcb_stream_writer = stream_writer

return stream_reader
Expand Down Expand Up @@ -402,8 +429,8 @@ async def openViewStream(process_id: int, timeout_sec: float = 10.0) -> tuple[as
return None

@staticmethod
async def openPipeStream(process_id: int, timeout_sec: float = 10.0) -> tuple[asyncio.StreamReader, asyncio.StreamWriter] | None:
""" システムに存在する SrvPipe ストリームを開き、StreamReader / StreamWriter を返す """
async def openPipeStream(process_id: int, timeout_sec: float = 10.0) -> PipeStreamReader | None:
""" システムに存在する SrvPipe ストリームを開く """
if sys.platform != 'win32':
raise NotImplementedError('Windows Only')

Expand All @@ -413,21 +440,11 @@ async def openPipeStream(process_id: int, timeout_sec: float = 10.0) -> tuple[as
while time.monotonic() < to:
# ポートは必ず 0 から 29 まで
for port in range(30):
# asyncio.ProactorEventLoop.create_pipe_connection() を使う (Windows 専用のプライベート API)
# ref: https://github.com/qwertyquerty/pypresence/blob/4.2.1/pypresence/baseclient.py#L105-L123
path = '\\\\.\\pipe\\SendTSTCP_' + str(port) + '_' + str(process_id)
reader = asyncio.StreamReader(loop=loop)
reader_protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
try:
transport, _ = await cast(asyncio.ProactorEventLoop, loop).create_pipe_connection(lambda: reader_protocol, path)
writer = asyncio.StreamWriter(transport, reader_protocol, reader, loop)
return (reader, writer)
path = '\\\\.\\pipe\\SendTSTCP_' + str(port) + '_' + str(process_id)
pipe = open(path, mode = 'rb')
return PipeStreamReader(pipe, ThreadPoolExecutor(), loop)
except:
# TODO: エラーを解消できたら削除
import traceback
from app.utils import Logging
Logging.error('openPipeStream: failed to connect to ' + path)
Logging.error(traceback.format_exc())
pass
await asyncio.sleep(wait)
# 初期に成功しなければ見込みは薄いので問い合わせを疎にしていく
Expand Down