Skip to content

Commit

Permalink
Add: [Server][metadata] 録画ファイルからメタデータを取得するクラス群を移動/追加
Browse files Browse the repository at this point in the history
かなり量が多くなる予定なので新しくフォルダを作成してまとめた
  • Loading branch information
tsukumijima committed Jul 1, 2023
1 parent 66f7762 commit 647107d
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 49 deletions.
34 changes: 34 additions & 0 deletions server/app/metadata/CMSectionsDetector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

from pathlib import Path

from app.models import RecordedVideo


class CMSectionsDetector:
""" 録画 TS ファイルに含まれる CM 区間を検出するクラス """

def __init__(self, recorded_ts_path: Path) -> None:
"""
録画 TS ファイルに含まれる CM 区間を検出するクラスを初期化する
Args:
recorded_ts_path (Path): 録画 TS ファイルのパス
"""

self.recorded_ts_path = recorded_ts_path


def detect(self, recorded_video: RecordedVideo) -> list[tuple[float, float]]:
"""
CM 区間を検出する
Args:
recorded_video (RecordedVideo): 録画ファイル情報を表すモデル
Returns:
list[tuple[float, float]]: CM 区間 (開始時刻, 終了時刻) のリスト
"""

# TODO: CM 区間を検出する処理を実装する
cm_sections = []
return cm_sections
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,43 @@
from typing import cast

from app.constants import LIBRARY_DIR
from app.metadata import CMSectionsDetector
from app.metadata import TSInfoAnalyzer
from app.models import Channel
from app.models import RecordedProgram
from app.models import RecordedVideo
from app.utils import GetPlatformEnvironment


class TSMetadataAnalyzer:
""" 録画ファイルのメタデータを解析するクラス """
class MetadataAnalyzer:
"""
録画ファイルのメタデータを解析するクラス
app.metadata モジュール内の各クラスを統括し、録画ファイルから取り出せるだけのメタデータを取り出す
"""

def __init__(self, ts_path: Path) -> None:
def __init__(self, recorded_file_path: Path) -> None:
"""
録画ファイルのメタデータを解析するクラスを初期化する
Args:
ts_path (Path): 録画ファイルのパス
recorded_file_path (Path): 録画ファイルのパス
"""

self.ts_path = ts_path
self.recorded_file_path = recorded_file_path


def analyze(self) -> RecordedVideo | None:
def analyze(self) -> tuple[RecordedVideo, RecordedProgram, Channel | None] | None:
"""
録画ファイル内のメタデータを解析し、データベースに格納するモデルを作成する
Returns:
RecordedVideo | None: 録画ファイルを表すモデル (KonomiTV で再生可能なファイルではない場合は None)
tuple[RecordedVideo, RecordedProgram, Channel | None] | None: 録画ファイル・番組情報・チャンネルを表すモデル
(KonomiTV で再生可能なファイルではない場合は None が返される)
"""

# 録画ファイルを表すモデルを作成
recorded_video = RecordedVideo()
recorded_video.file_path = str(self.ts_path)
recorded_video.file_path = str(self.recorded_file_path)

# 録画ファイルのハッシュを計算
try:
Expand All @@ -46,7 +54,7 @@ def analyze(self) -> RecordedVideo | None:

# MediaInfo から録画ファイルのメディア情報を取得
## 取得に失敗した場合は KonomiTV で再生可能なファイルではないと判断し、None を返す
media_info = self.__getMediaInfo()
media_info = self.__analyzeMediaInfo()
if media_info is None:
return None

Expand All @@ -60,19 +68,23 @@ def analyze(self) -> RecordedVideo | None:
if track.track_type == 'General':
recorded_video.duration = float(track.duration) / 1000 # ミリ秒を秒に変換
recorded_video.container_format = track.format # 今のところ MPEG-TS 固定
# 録画開始時刻と録画終了時刻を算出
## 録画開始時刻は MediaInfo から "start_time" として取得できる (ただし小数点以下は省略されている)
## "start_time" は "UTC 2023-06-26 23:59:52" のフォーマットになっているが、実際には JST の時刻が返される
## ちゃんと JST のタイムゾーンが指定された datetime として扱うためには、datetime.fromisoformat() でパースする必要がある
## 一度 ISO8601 に変換してからパースする
start_time_iso8601 = str(track.start_time).replace('UTC ', '').replace(' ', 'T') + '+09:00'
recorded_video.recording_start_time = datetime.fromisoformat(start_time_iso8601)
## duration は小数点以下も含めた値が取得できるので、録画開始時刻を duration のうち小数点以下の部分を2で割った値だけ削る
## これでかなり正確な録画開始時刻が算出できる
duration_miliseconds = (recorded_video.duration * 1000) % 1000
recorded_video.recording_start_time = recorded_video.recording_start_time - timedelta(milliseconds=duration_miliseconds / 2)
## 録画終了時刻は MediaInfo から "end_time" として取得できるが、値が不正確なので、録画開始時刻から録画時間を足したものを使用する
recorded_video.recording_end_time = recorded_video.recording_start_time + timedelta(seconds=recorded_video.duration)
if hasattr(track, 'start_time'):
# 録画開始時刻と録画終了時刻を算出
## 録画開始時刻は MediaInfo から "start_time" として取得できる (ただし小数点以下は省略されている)
## "start_time" は "UTC 2023-06-26 23:59:52" のフォーマットになっているが、実際には JST の時刻が返される
## ちゃんと JST のタイムゾーンが指定された datetime として扱うためには、datetime.fromisoformat() でパースする必要がある
## 一度 ISO8601 に変換してからパースする
start_time_iso8601 = str(track.start_time).replace('UTC ', '').replace(' ', 'T') + '+09:00'
recorded_video.recording_start_time = datetime.fromisoformat(start_time_iso8601)
## duration は小数点以下も含めた値が取得できるので、録画開始時刻を duration のうち小数点以下の部分を2で割った値だけ削る
## これでかなり正確な録画開始時刻が算出できる
duration_miliseconds = (recorded_video.duration * 1000) % 1000
recorded_video.recording_start_time = recorded_video.recording_start_time - timedelta(milliseconds=duration_miliseconds / 2)
## 録画終了時刻は MediaInfo から "end_time" として取得できるが、値が不正確なので、録画開始時刻から録画時間を足したものを使用する
recorded_video.recording_end_time = recorded_video.recording_start_time + timedelta(seconds=recorded_video.duration)
else:
recorded_video.recording_start_time = None
recorded_video.recording_end_time = None

# 映像
elif track.track_type == 'Video' and is_video_track_read is False:
Expand All @@ -88,20 +100,20 @@ def analyze(self) -> RecordedVideo | None:

# 主音声
elif track.track_type == 'Audio' and is_primary_audio_track_read is False:
if track.format == 'AAC' and track.format_additionalfeatures == 'LC':
recorded_video.primary_audio_codec = 'AAC-LC'
elif track.format == 'AAC' and track.format_additionalfeatures == 'HE-AAC':
recorded_video.primary_audio_codec = 'HE-AAC'
elif track.format == 'MPEG Audio':
recorded_video.primary_audio_codec = 'MP2'
if int(track.channel_s) == 1:
recorded_video.primary_audio_channel = 'Monaural'
elif int(track.channel_s) == 2:
recorded_video.primary_audio_channel = 'Stereo'
elif int(track.channel_s) == 6:
recorded_video.primary_audio_channel = '5.1ch'
recorded_video.primary_audio_sampling_rate = int(track.sampling_rate)
is_primary_audio_track_read = True
if track.format == 'AAC' and track.format_additionalfeatures == 'LC':
recorded_video.primary_audio_codec = 'AAC-LC'
elif track.format == 'AAC' and track.format_additionalfeatures == 'HE-AAC':
recorded_video.primary_audio_codec = 'HE-AAC'
elif track.format == 'MPEG Audio':
recorded_video.primary_audio_codec = 'MP2'
if int(track.channel_s) == 1:
recorded_video.primary_audio_channel = 'Monaural'
elif int(track.channel_s) == 2:
recorded_video.primary_audio_channel = 'Stereo'
elif int(track.channel_s) == 6:
recorded_video.primary_audio_channel = '5.1ch'
recorded_video.primary_audio_sampling_rate = int(track.sampling_rate)
is_primary_audio_track_read = True

# 副音声(存在する場合)
elif track.track_type == 'Audio' and is_secondary_audio_track_read is False:
Expand All @@ -120,7 +132,40 @@ def analyze(self) -> RecordedVideo | None:
recorded_video.secondary_audio_sampling_rate = int(track.sampling_rate)
is_secondary_audio_track_read = True

return recorded_video
# 最低でも映像トラックと主音声トラックが含まれている必要がある
# 映像か主音声、あるいは両方のトラックが含まれていない場合は None を返す
if is_video_track_read is False or is_primary_audio_track_read is False:
return None

if recorded_video.container_format == 'MPEG-TS':
# TS ファイル内に含まれる番組情報を解析する
program_analyzer = TSInfoAnalyzer(self.recorded_file_path)
recorded_program, channel = program_analyzer.analyze(recorded_video)
else:
# それ以外のファイルでは番組情報を取得できないので、ファイル名などから最低限の情報を設定する
## チャンネル情報は取得できない
channel = None
## ファイルの作成日時を録画開始時刻として使用する
start_time = datetime.fromtimestamp(self.recorded_file_path.stat().st_ctime)
## 拡張子を除いたファイル名をタイトルとして使用する
title = self.recorded_file_path.stem
recorded_program = RecordedProgram(
title = title,
description = '番組情報を取得できませんでした。',
detail = {},
start_time = start_time,
end_time = start_time + timedelta(seconds=recorded_video.duration),
duration = recorded_video.duration,
is_free = True,
genres = [],
)

# CM 区間を検出する
## 時間がかかるので最後に実行する
cm_sections_detector = CMSectionsDetector(self.recorded_file_path)
recorded_video.cm_sections = cm_sections_detector.detect(recorded_video)

return recorded_video, recorded_program, channel


def __calculateTSFileHash(self, chunk_size: int = 1024 * 1024, num_chunks: int = 3) -> str:
Expand All @@ -140,13 +185,13 @@ def __calculateTSFileHash(self, chunk_size: int = 1024 * 1024, num_chunks: int =
"""

# ファイルのサイズを取得する
file_size = self.ts_path.stat().st_size
file_size = self.recorded_file_path.stat().st_size

# ファイルサイズが`chunk_size * num_chunks`より小さい場合は十分な数のチャンクが取得できないため例外を発生させる
if file_size < chunk_size * num_chunks:
raise ValueError(f'File size must be at least {chunk_size * num_chunks} bytes.')

with self.ts_path.open('rb') as file:
with self.recorded_file_path.open('rb') as file:
hash_obj = hashlib.sha256()

# 指定された数のチャンクを読み込み、ハッシュを計算する
Expand All @@ -164,9 +209,9 @@ def __calculateTSFileHash(self, chunk_size: int = 1024 * 1024, num_chunks: int =
return hash_obj.hexdigest()


def __getMediaInfo(self) -> MediaInfo | None:
def __analyzeMediaInfo(self) -> MediaInfo | None:
"""
MediaInfo から録画ファイルのメディア情報を取得する
録画ファイルのメディア情報を MediaInfo を使って解析する
KonomiTV で再生可能なファイルではない場合は None を返す
Returns:
Expand All @@ -181,7 +226,7 @@ def __getMediaInfo(self) -> MediaInfo | None:

# 録画ファイルのメディア情報を取得する
try:
media_info = cast(MediaInfo, MediaInfo.parse(str(self.ts_path), library_file=libmediainfo_path))
media_info = cast(MediaInfo, MediaInfo.parse(str(self.recorded_file_path), library_file=libmediainfo_path))
except Exception:
return None

Expand Down Expand Up @@ -220,10 +265,13 @@ def __getMediaInfo(self) -> MediaInfo | None:


if __name__ == '__main__':
def main(path: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True)):
analyzer = TSMetadataAnalyzer(path)
result = analyzer.analyze()
def main(recorded_file_path: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True)):
metadata_analyzer = MetadataAnalyzer(recorded_file_path)
result = metadata_analyzer.analyze()
if result:
from pprint import pprint
pprint(dict(result))
pprint(dict(result[0]))
pprint(dict(result[1]))
else:
typer.echo('Not a KonomiTV playable TS file.')
typer.run(main)
45 changes: 45 additions & 0 deletions server/app/metadata/TSInfoAnalyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@

import ariblib
from pathlib import Path

from app.models import Channel
from app.models import RecordedProgram
from app.models import RecordedVideo


class TSInfoAnalyzer:
""" 録画 TS ファイル内に含まれる番組情報を解析するクラス """

def __init__(self, recorded_ts_path: Path) -> None:
"""
録画 TS ファイル内に含まれる番組情報を解析するクラスを初期化する
Args:
recorded_ts_path (Path): 録画 TS ファイルのパス
"""

# TS ファイルを開く
# チャンクは 1000(だいたい 0.1 ~ 0.2 秒間隔)に設定
self.ts = ariblib.tsopen(recorded_ts_path, chunk=1000)


def analyze(self, recorded_video: RecordedVideo) -> tuple[RecordedProgram, Channel | None]:
"""
録画 TS ファイル内に含まれる番組情報を解析し、データベースに格納するモデルを作成する
Args:
recorded_video (RecordedVideo): 録画ファイル情報を表すモデル
Returns:
tuple[RecordedProgram, Channel | None]: 録画番組情報とチャンネル情報を表すモデルのタプル
"""

# TODO!!!!!

# 録画番組情報のモデルを作成
recorded_program = RecordedProgram()

# チャンネル情報のモデルを作成
channel = Channel()

return recorded_program, channel
5 changes: 5 additions & 0 deletions server/app/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

# モデルをモジュールとして登録
from .CMSectionsDetector import CMSectionsDetector # type: ignore
from .MetadataAnalyzer import MetadataAnalyzer # type: ignore
from .TSInfoAnalyzer import TSInfoAnalyzer # type: ignore
2 changes: 1 addition & 1 deletion server/app/migrations/models/1_20230621210000_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def upgrade(db: BaseDBAsyncClient) -> str:
"secondary_audio_codec" VARCHAR(255),
"secondary_audio_channel" VARCHAR(255),
"secondary_audio_sampling_rate" INT,
"cm_intervals" JSON NOT NULL
"cm_sections" JSON NOT NULL
);
CREATE TABLE IF NOT EXISTS "series" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion server/app/models/RecordedVideo.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ class Meta: # type: ignore
secondary_audio_codec: Literal['AAC-LC', 'HE-AAC', 'MP2'] | None = fields.CharField(255, null=True) # type: ignore
secondary_audio_channel: Literal['Monaural', 'Stereo', '5.1ch'] | None = fields.CharField(255, null=True) # type: ignore
secondary_audio_sampling_rate: int | None = fields.IntField(null=True) # type: ignore
cm_intervals: list[tuple[float, float]] = \
cm_sections: list[tuple[float, float]] = \
fields.JSONField(default=[], encoder=lambda x: json.dumps(x, ensure_ascii=False)) # type: ignore
2 changes: 1 addition & 1 deletion server/app/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class RecordedVideo(PydanticModel):
secondary_audio_codec: Literal['AAC-LC', 'HE-AAC', 'MP2'] | None
secondary_audio_channel: Literal['Monaural', 'Stereo', '5.1ch'] | None
secondary_audio_sampling_rate: int | None
cm_intervals: list[tuple[float, float]]
cm_sections: list[tuple[float, float]]

class RecordedProgram(PydanticModel):
id: int
Expand Down

0 comments on commit 647107d

Please sign in to comment.