diff --git a/scripts/channel_data.py b/scripts/channel_data.py new file mode 100644 index 0000000..e00b965 --- /dev/null +++ b/scripts/channel_data.py @@ -0,0 +1,187 @@ +# pip install youtool[livechat,transcription] +import argparse +import os +import json +import shelve +from pathlib import Path + +from chat_downloader.errors import ChatDisabled, LoginRequired, NoChatReplay +from tqdm import tqdm +from youtool import YouTube + + +class CsvLazyDictWriter: # Got and adapted from + """Lazy CSV dict writer, so you don't need to specify field names beforehand + + This class is almost the same as `csv.DictWriter` with the following + differences: + + - You don't need to pass `fieldnames` (it's extracted on the first + `.writerow` call); + - You can pass either a filename or a fobj (like `sys.stdout`); + """ + + def __init__(self, filename_or_fobj, encoding="utf-8", *args, **kwargs): + self.writer = None + self.filename_or_fobj = filename_or_fobj + self.encoding = encoding + self._fobj = None + self.writer_args = args + self.writer_kwargs = kwargs + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + @property + def fobj(self): + if self._fobj is None: + if getattr(self.filename_or_fobj, "read", None) is not None: + self._fobj = self.filename_or_fobj + else: + self._fobj = open( + self.filename_or_fobj, mode="w", encoding=self.encoding + ) + + return self._fobj + + def writerow(self, row): + if self.writer is None: + self.writer = csv.DictWriter( + self.fobj, + fieldnames=list(row.keys()), + *self.writer_args, + **self.writer_kwargs + ) + self.writer.writeheader() + + self.writerow = self.writer.writerow + return self.writerow(row) + + def __del__(self): + self.close() + + def close(self): + if self._fobj and not self._fobj.closed: + self._fobj.close() + + +# TODO: add options to get only part of the data (not all steps) +parser = argparse.ArgumentParser() +parser.add_argument("--api-key", default=os.environ.get("YOUTUBE_API_KEY"), help="Comma-separated list of YouTube API keys to use") +parser.add_argument("username_or_channel_url", type=str) +parser.add_argument("data_path", type=Path) +parser.add_argument("language-code", default="pt-orig", help="See the list by running `yt-dlp --list-subs `") +args = parser.parse_args() + +if not args.api_key: + import sys + + print("ERROR: API key must be provided either by `--api-key` or `YOUTUBE_API_KEY` environment variable", file=sys.stderr) + exit(1) +api_keys = [key.strip() for key in args.api_key.split(",") if key.strip()] + + +username = args.username +if username.startswith("https://"): + channel_url = username + username = [item for item in username.split("/") if item][-1] +else: + channel_url = f"https://www.youtube.com/@{username}" +data_path = args.data_path +channel_csv_filename = data_path / f"{username}-channel.csv" +playlist_csv_filename = data_path / f"{username}-playlist.csv" +playlist_video_csv_filename = data_path / f"{username}-playlist-video.csv" +video_csv_filename = data_path / f"{username}-video.csv" +comment_csv_filename = data_path / f"{username}-comment.csv" +livechat_csv_filename = data_path / f"username}-livechat.csv" +language_code = args.language_code +video_transcription_path = data_path / Path(f"{username}-transcriptions") + +yt = YouTube(api_keys, disable_ipv6=True) +video_transcription_path.mkdir(parents=True, exist_ok=True) +channel_writer = CsvLazyDictWriter(channel_csv_filename) +playlist_writer = CsvLazyDictWriter(playlist_csv_filename) +video_writer = CsvLazyDictWriter(video_csv_filename) +comment_writer = CsvLazyDictWriter(comment_csv_filename) +livechat_writer = CsvLazyDictWriter(livechat_csv_filename) +playlist_video_writer = CsvLazyDictWriter(playlist_video_csv_filename) + +print("Retrieving channel info") +channel_id = yt.channel_id_from_url(channel_url) +channel_info = list(yt.channels_infos([channel_id]))[0] +channel_writer.writerow(channel_info) +channel_writer.close() + +main_playlist = { + "id": channel_info["playlist_id"], + "title": "Uploads", + "description": channel_info["description"], + "videos": channel_info["videos"], + "channel_id": channel_id, + "channel_title": channel_info["title"], + "published_at": channel_info["published_at"], + "thumbnail_url": channel_info["thumbnail_url"], +} +playlist_writer.writerow(main_playlist) +playlist_ids = [channel_info["playlist_id"]] +for playlist in tqdm(yt.channel_playlists(channel_id), desc="Retrieving channel playlists"): + playlist_writer.writerow(playlist) + playlist_ids.append(playlist["id"]) +playlist_writer.close() + +video_ids = [] +for playlist_id in tqdm(playlist_ids, desc="Retrieving playlists' videos"): + for video in yt.playlist_videos(playlist_id): + if video["id"] not in video_ids: + video_ids.append(video["id"]) + row = { + "playlist_id": playlist_id, + "video_id": video["id"], + "video_status": video["status"], + "channel_id": video["channel_id"], + "channel_title": video["channel_title"], + "playlist_channel_id": video["playlist_channel_id"], + "playlist_channel_title": video["playlist_channel_title"], + "title": video["title"], + "description": video["description"], + "published_at": video["published_at"], + "added_to_playlist_at": video["added_to_playlist_at"], + "tags": video["tags"], + } + playlist_video_writer.writerow(row) +playlist_video_writer.close() + +videos = [] +for video in tqdm(yt.videos_infos(video_ids), desc="Retrieving detailed video information"): + videos.append(video) + video_writer.writerow(video) +video_writer.close() + +for video_id in tqdm(video_ids, desc="Retrieving video comments"): + try: + for comment in yt.video_comments(video_id): + comment_writer.writerow(comment) + except RuntimeError: # Comments disabled + continue +comment_writer.close() + +print("Retrieving transcriptions") +yt.videos_transcriptions( + video_ids, + language_code=language_code, + path=video_transcription_path, + skip_downloaded=True, + batch_size=10, +) + +# TODO: live chat code will freeze if it's not available +for video_id in tqdm(video_ids, desc="Retrieving live chat"): + try: + for comment in yt.video_livechat(video_id): + livechat_writer.writerow(comment) + except (LoginRequired, NoChatReplay, ChatDisabled): + continue +livechat_writer.close() diff --git a/scripts/clean_vtt.py b/scripts/clean_vtt.py new file mode 100644 index 0000000..3412b59 --- /dev/null +++ b/scripts/clean_vtt.py @@ -0,0 +1,43 @@ +# pip install webvtt-py +import argparse +import io +import json +import os +import shelve +import time +from pathlib import Path + +import tiktoken +import webvtt +from openai import APITimeoutError, OpenAI +from rows.utils import CsvLazyDictWriter +from tqdm import tqdm + + +def vtt_clean(vtt_content, same_line=False): + result_lines, last_line = [], None + for caption in webvtt.read_buffer(io.StringIO(vtt_content)): + new_lines = caption.text.strip().splitlines() + for line in new_lines: + line = line.strip() + if not line or line == last_line: + continue + result_lines.append(f"{str(caption.start).split('.')[0]} {line}\n" if not same_line else f"{line} ") + last_line = line + return "".join(result_lines) + + +parser = argparse.ArgumentParser() +parser.add_argument("input_path", type=Path) +parser.add_argument("output_path", type=Path) +args = parser.parse_args() + +for filename in tqdm(args.input_path.glob("*.vtt")): + new_filename = args.output_path / filename.name + if new_filename.exists(): + continue + with filename.open() as fobj: + data = fobj.read() + result = vtt_clean(data) + with new_filename.open(mode="w") as fobj: + fobj.write(result) diff --git a/setup.cfg b/setup.cfg index 77478cb..2cffba5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,10 @@ packages = find: python_requires = >=3.7 install_requires = file: requirements/base.txt +[options.entry_points] +console_scripts = + youtool = youtool:cli + [options.extras_require] cli = file: requirements/cli.txt dev = file: requirements/dev.txt diff --git a/tests/commands/__init__.py b/tests/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/commands/conftest.py b/tests/commands/conftest.py new file mode 100644 index 0000000..9970eab --- /dev/null +++ b/tests/commands/conftest.py @@ -0,0 +1,29 @@ +import pytest + + +@pytest.fixture +def channels_urls(): + return [ + "https://www.youtube.com/@Turicas/featured", + "https://www.youtube.com/c/PythonicCaf%C3%A9" + ] + + +@pytest.fixture +def videos_ids(): + return [ + "video_id_1", + "video_id_2" + ] + + +@pytest.fixture +def videos_urls(videos_ids): + return [ + f"https://www.youtube.com/?v={video_id}" for video_id in videos_ids + ] + + +@pytest.fixture +def usernames(): + return ["Turicas", "PythonicCafe"] diff --git a/tests/commands/test_base.py b/tests/commands/test_base.py new file mode 100644 index 0000000..afbcf06 --- /dev/null +++ b/tests/commands/test_base.py @@ -0,0 +1,193 @@ +import csv +import argparse +import pytest + +from pathlib import Path +from unittest.mock import MagicMock, patch, mock_open +from youtool.commands import Command + + +class TestCommand(Command): + name = "command_name" + arguments = [ + {"name": "--test-arg", "help": "Test argument", "default": "default_value", "type": str} + ] + + @classmethod + def execute(cls, **kwargs): + return "executed" + +@pytest.fixture +def subparsers(): + """Fixture to create subparsers for argument parsing.""" + parser = argparse.ArgumentParser() + return parser.add_subparsers() + + +def test_generate_parser(subparsers): + """Test to verify the parser generation. + + This test checks if the `generate_parser` method correctly generates a parser + for the command and sets the appropriate properties + """ + parser = TestCommand.generate_parser(subparsers) + + assert parser is not None, "Parser should not be None" + assert isinstance(parser, argparse.ArgumentParser), "Parser should be an instance of argparse.ArgumentParser" + assert parser.prog.endswith(TestCommand.name), f"Parser prog should end with '{TestCommand.name}'" + + +def test_parse_arguments(subparsers): + """Test to verify argument parsing. + + This test checks if the `parse_arguments` method correctly adds the command's + arguments to the parser and sets the default function to the command's execute method. + """ + subparsers_mock = MagicMock(spec=subparsers) + + TestCommand.parse_arguments(subparsers_mock) + + subparsers_mock.add_parser.assert_called_once_with(TestCommand.name, help=TestCommand.__doc__) + parser_mock = subparsers_mock.add_parser.return_value + parser_mock.add_argument.assert_called_once_with("--test-arg", help="Test argument", default="default_value", type=str) + parser_mock.set_defaults.assert_called_once_with(func=TestCommand.execute) + + +def test_command(): + """Test to verify that the `execute` method is implemented. + + This test ensures that if a command does not implement the `execute` method, + a `NotImplementedError` is raised. + """ + class MyCommand(Command): + pass + + with pytest.raises(NotImplementedError): + MyCommand.execute() + + +@pytest.fixture +def mock_csv_file(): + """Fixture to provide mock CSV content for tests.""" + + csv_content = """URL + http://example.com + http://example2.com + """ + return csv_content + +def test_data_from_csv_valid(mock_csv_file): + """Test to verify reading data from a valid CSV file. + + This test checks if the `data_from_csv` method correctly reads data from a valid CSV file + and returns the expected list of URLs. + + Args: + mock_csv_file (str): The mock CSV file content. + """ + with patch('pathlib.Path.is_file', return_value=True): + with patch('builtins.open', mock_open(read_data=mock_csv_file)): + data_column_name = "URL" + file_path = Path("tests/resources/csv_valid.csv") + result = Command.data_from_csv(file_path, data_column_name) + assert len(result) == 2 + assert result[0] == "http://example.com" + assert result[1] == "http://example2.com" + +def test_data_from_csv_file_not_found(): + """Test to verify behavior when the specified column is not found in the CSV file. + + This test checks if the `data_from_csv` method raises an exception when the specified + column does not exist in the CSV file. + """ + with patch('pathlib.Path.is_file', return_value=False): + file_path = Path("/fake/path/not_found.csv") + with pytest.raises(FileNotFoundError): + Command.data_from_csv(file_path, "URL") + +def test_data_from_csv_column_not_found(mock_csv_file): + with patch('pathlib.Path.is_file', return_value=True): + with patch('builtins.open', mock_open(read_data=mock_csv_file)): + file_path = Path("tests/resources/csv_column_not_found.csv") + with pytest.raises(Exception) as exc_info: + Command.data_from_csv(file_path, "NonExistentColumn") + assert f"Column NonExistentColumn not found on {file_path}" in str(exc_info.value) + + +@pytest.fixture +def sample_data(): + """Fixture to provide sample data for tests.""" + return [ + {"id": "123", "name": "Channel One"}, + {"id": "456", "name": "Channel Two"} + ] + +def test_data_to_csv_with_output_file_path(tmp_path, sample_data): + """Test to verify writing data to a CSV file with an output file path specified. + + This test checks if the `data_to_csv` method correctly writes the sample data to + a CSV file when an output file path is provided. + """ + output_file_path = tmp_path / "output.csv" + + result_path = Command.data_to_csv(sample_data, str(output_file_path)) + + assert result_path == str(output_file_path) + assert output_file_path.exists() + with output_file_path.open('r') as f: + reader = csv.DictReader(f) + rows = list(reader) + assert len(rows) == 2 + assert rows[0]["id"] == "123" and rows[1]["id"] == "456" + +def test_data_to_csv_without_output_file_path(sample_data): + """Test to verify writing data to a CSV format without an output file path specified. + + This test checks if the `data_to_csv` method correctly returns the CSV content + as a string when no output file path is provided. + """ + csv_content = Command.data_to_csv(sample_data) + + assert "id,name" in csv_content + assert "123,Channel One" in csv_content + assert "456,Channel Two" in csv_content + +def test_data_to_csv_output(tmp_path): + """ + Test to verify the content of the output CSV file. + + This test checks if the `data_to_csv` method writes the expected content + to the output CSV file. + """ + output_file_path = tmp_path / "output.csv" + + data = [ + {"id": 1, "name": "Test1"}, + {"id": 2, "name": "Test2"} + ] + + expected_output = "id,name\n1,Test1\n2,Test2\n" + result = Command.data_to_csv(data, str(output_file_path)) + assert Path(output_file_path).is_file() + assert expected_output == Path(output_file_path).read_text() + assert str(output_file_path) == result + +def test_filter_fields(): + channel_info = { + 'channel_id': '123456', + 'channel_name': 'Test Channel', + 'subscribers': 1000, + 'videos': 50, + 'category': 'Tech' + } + + info_columns = ['channel_id', 'channel_name', 'subscribers'] + filtered_info = Command.filter_fields(channel_info, info_columns) + + expected_result = { + 'channel_id': '123456', + 'channel_name': 'Test Channel', + 'subscribers': 1000 + } + + assert filtered_info == expected_result, f"Expected {expected_result}, but got {filtered_info}" diff --git a/tests/commands/test_channel_id.py b/tests/commands/test_channel_id.py new file mode 100644 index 0000000..04400ef --- /dev/null +++ b/tests/commands/test_channel_id.py @@ -0,0 +1,80 @@ +import csv +import pytest + +from io import StringIO + +from unittest.mock import patch, call +from youtool.commands.channel_id import ChannelId + +@pytest.fixture +def csv_file(tmp_path): + """Fixture to create a temporary CSV file with a single YouTube channel URL.""" + + csv_content = "channel_url\nhttps://www.youtube.com/@Turicas/featured\n" + csv_file = tmp_path / "urls.csv" + csv_file.write_text(csv_content) + return csv_file + +@pytest.fixture +def youtube_api_mock(): + """Fixture to mock the YouTube API. + + This fixture mocks the `YouTube` class and its `channel_id_from_url` method + to return a channel ID based on the URL. + """ + with patch("youtool.commands.channel_id.YouTube") as mock: + mock.return_value.channel_id_from_url.side_effect = lambda url: f"channel-{url}" + yield mock + +def test_channels_ids_csv_preparation(youtube_api_mock): + """Fixture to mock the YouTube API. + + This fixture mocks the `YouTube` class and its `channel_id_from_url` method + to return a channel ID based on the URL. + """ + urls = ["https://www.youtube.com/@Turicas/featured", "https://www.youtube.com/c/PythonicCaf%C3%A9"] + api_key = "test_api_key" + id_column_name = "custom_id_column" + expected_result_data = [ + {id_column_name: "channel-https://www.youtube.com/@Turicas/featured"}, + {id_column_name: "channel-https://www.youtube.com/c/PythonicCaf%C3%A9"} + ] + with StringIO() as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=[id_column_name]) + writer.writeheader() + writer.writerows(expected_result_data) + expected_result_csv = csv_file.getvalue() + + result = ChannelId.execute(urls=urls, api_key=api_key, id_column_name=id_column_name) + + youtube_api_mock.return_value.channel_id_from_url.assert_has_calls([call(url) for url in urls], any_order=True) + assert result == expected_result_csv + + +def test_resolve_urls_with_direct_urls(): + """Test to verify resolving URLs when provided directly. + + This test checks if the `resolve_urls` method of the `ChannelId` class correctly + returns the given list of URLs when provided directly. + """ + urls = ["https://www.youtube.com/@Turicas/featured"] + result = ChannelId.resolve_urls(urls, None, None) + assert result == urls + +def test_resolve_urls_with_file_path(csv_file): + """Test to verify resolving URLs from a CSV file. + + This test checks if the `resolve_urls` method of the `ChannelId` class correctly + reads URLs from a given CSV file. + """ + result = ChannelId.resolve_urls(None, csv_file, "channel_url") + assert result == ["https://www.youtube.com/@Turicas/featured"] + +def test_resolve_urls_raises_exception(): + """Test to verify exception raising when no URLs are provided. + + This test checks if the `resolve_urls` method of the `ChannelId` class raises an exception + when neither direct URLs nor a file path are provided. + """ + with pytest.raises(Exception, match="Either 'username' or 'url' must be provided for the channel-id command"): + ChannelId.resolve_urls(None, None, None) diff --git a/tests/commands/test_channel_info.py b/tests/commands/test_channel_info.py new file mode 100644 index 0000000..62623e8 --- /dev/null +++ b/tests/commands/test_channel_info.py @@ -0,0 +1,63 @@ +import pytest + +from unittest.mock import Mock, call + +from youtool.commands.channel_info import ChannelInfo + + +def test_filter_fields(): + """Test to verify the filtering of channel information fields. + + This test checks if the `filter_fields` method of the `ChannelInfo` class correctly + filters out unwanted fields from the channel information dictionary based on the provided columns. + """ + channel_info = { + 'channel_id': '123456', + 'channel_name': 'Test Channel', + 'subscribers': 1000, + 'videos': 50, + 'category': 'Tech' + } + + info_columns = ['channel_id', 'channel_name', 'subscribers'] + filtered_info = ChannelInfo.filter_fields(channel_info, info_columns) + + expected_result = { + 'channel_id': '123456', + 'channel_name': 'Test Channel', + 'subscribers': 1000 + } + + assert filtered_info == expected_result, f"Expected {expected_result}, but got {filtered_info}" + + +def test_channel_ids_from_urls_and_usernames(mocker, channels_urls): + """Test to verify fetching channel IDs from both URLs and usernames. + + This test checks if the `execute` method of the `ChannelInfo` class correctly fetches channel IDs + from a list of URLs and usernames, and then calls the `channels_infos` method with these IDs. + """ + urls = ["https://www.youtube.com/@Turicas/featured", "https://www.youtube.com/c/PythonicCaf%C3%A9"] + usernames = ["Turicas", "PythonicCafe"] + + ids_from_urls_mock = "id_from_url" + ids_from_usernames_mock = "id_from_username" + youtube_mock = mocker.patch("youtool.commands.channel_info.YouTube") + + channel_id_from_url_mock = Mock(return_value=ids_from_urls_mock) + channel_id_from_username_mock = Mock(return_value=ids_from_usernames_mock) + channels_infos_mock = Mock(return_value=[]) + + youtube_mock.return_value.channel_id_from_url = channel_id_from_url_mock + youtube_mock.return_value.channel_id_from_username = channel_id_from_username_mock + youtube_mock.return_value.channels_infos = channels_infos_mock + + ChannelInfo.execute(urls=channels_urls, usernames=usernames) + + channel_id_from_url_mock.assert_has_calls( + [call(url) for url in channels_urls] + ) + channel_id_from_username_mock.assert_has_calls( + [call(username) for username in usernames] + ) + channels_infos_mock.assert_called_once_with([ids_from_urls_mock, ids_from_usernames_mock]) diff --git a/tests/commands/test_video_comments.py b/tests/commands/test_video_comments.py new file mode 100644 index 0000000..386c5de --- /dev/null +++ b/tests/commands/test_video_comments.py @@ -0,0 +1,69 @@ +import csv +import pytest + +from io import StringIO +from datetime import datetime +from unittest.mock import Mock +from youtool.commands import VideoComments + + +def test_video_comments(mocker): + """Test case for fetching video comments and verifying the output. + + This test mocks the YouTube API to simulate fetching comments for a video, + then compares the generated CSV output with expected comments. + """ + youtube_mock = mocker.patch("youtool.commands.video_comments.YouTube") + video_id = "video_id_mock" + + expected_result = [ + {"text": "my_comment", "author": "my_name"} + ] + + csv_file = StringIO() + csv_writer = csv.DictWriter(csv_file, fieldnames=expected_result[0].keys()) + csv_writer.writeheader() + csv_writer.writerows(expected_result) + + videos_comments_mock = Mock(return_value=expected_result) + youtube_mock.return_value.video_comments = videos_comments_mock + result = VideoComments.execute(id=video_id) + + videos_comments_mock.assert_called_once_with(video_id) + + assert result == csv_file.getvalue() + + +def test_video_comments_with_file_output(mocker, tmp_path): + """Test case for fetching video comments and saving them to a CSV file. + + This test mocks the YouTube API to simulate fetching comments for a video, + then saves the comments to a temporary CSV file. + """ + youtube_mock = mocker.patch("youtool.commands.video_comments.YouTube") + video_id = "video_id_mock" + + expected_result = [ + {"text": "my_comment", "author": "my_name"} + ] + + csv_file = StringIO() + csv_writer = csv.DictWriter(csv_file, fieldnames=expected_result[0].keys()) + csv_writer.writeheader() + csv_writer.writerows(expected_result) + + timestamp = datetime.now().strftime("%f") + output_file_name = f"output_{timestamp}.csv" + output_file_path = tmp_path / output_file_name + + videos_comments_mock = Mock(return_value=expected_result) + youtube_mock.return_value.video_comments = videos_comments_mock + + result_file_path = VideoComments.execute(id=video_id, output_file_path=output_file_path) + + with open(result_file_path, "r") as result_csv_file: + result_csv = result_csv_file.read() + + videos_comments_mock.assert_called_once_with(video_id) + + assert result_csv.replace("\r", "") == csv_file.getvalue().replace("\r", "") diff --git a/tests/commands/test_video_info.py b/tests/commands/test_video_info.py new file mode 100644 index 0000000..f4da48f --- /dev/null +++ b/tests/commands/test_video_info.py @@ -0,0 +1,106 @@ +import csv +import pytest + +from unittest.mock import Mock +from pathlib import Path +from youtool.commands import VideoInfo + + +@pytest.fixture +def youtube_mock(mocker, mock_video_info): + """Fixture to mock the YouTube instance and its videos_infos method.""" + mock = mocker.patch("youtool.commands.video_info.YouTube") + mock_instance = mock.return_value + mock_instance.videos_infos = Mock(return_value=mock_video_info) + return mock_instance + +@pytest.fixture +def mock_video_info(): + """Fixture to return mock video information.""" + return [ + {"id": "tmrhPou85HQ", "title": "Title 1", "description": "Description 1", "published_at": "2021-01-01", "view_count": 100, "like_count": 10, "comment_count": 5}, + {"id": "qoI_x9fylaw", "title": "Title 2", "description": "Description 2", "published_at": "2021-02-01", "view_count": 200, "like_count": 20, "comment_count": 10} + ] + +def test_execute_with_ids_and_urls(youtube_mock, mocker, tmp_path, mock_video_info): + """Test the execute method with provided video IDs and URLs. + + This test verifies that the execute method can handle both video IDs and URLs, + and correctly writes the video information to the output CSV file. + """ + ids = ["tmrhPou85HQ", "qoI_x9fylaw"] + urls = ["https://www.youtube.com/watch?v=tmrhPou85HQ&ab_channel=Turicas", "https://www.youtube.com/watch?v=qoI_x9fylaw&ab_channel=PythonicCaf%C3%A9"] + output_file_path = tmp_path / "output.csv" + + VideoInfo.execute(ids=ids, urls=urls, output_file_path=str(output_file_path), api_key="test_api_key") + + assert Path(output_file_path).is_file() + with open(output_file_path, 'r') as f: + reader = csv.DictReader(f) + csv_data = list(reader) + + assert csv_data[0]["id"] == "tmrhPou85HQ" + assert csv_data[1]["id"] == "qoI_x9fylaw" + +def test_execute_missing_arguments(): + """Test the execute method raises an exception when missing required arguments. + + This test verifies that the execute method raises an exception if neither + video IDs nor URLs are provided. + + Raises: + Exception: If neither 'ids' nor 'urls' is provided. + """ + with pytest.raises(Exception) as exc_info: + VideoInfo.execute(api_key="test_api_key") + + assert str(exc_info.value) == "Either 'ids' or 'urls' must be provided for the video-info command" + +def test_execute_with_input_file_path(youtube_mock, mocker, tmp_path, mock_video_info): + """Test the execute method with an input CSV file containing video URLs and IDs. + + This test verifies that the execute method can read video URLs and IDs from + an input CSV file and correctly writes the video information to the output CSV file. + """ + input_csv_content = """video_id,video_url + tmrhPou85HQ,https://www.youtube.com/watch?v=tmrhPou85HQ&ab_channel=Turicas + qoI_x9fylaw,https://www.youtube.com/watch?v=qoI_x9fylaw&ab_channel=PythonicCaf%C3%A9 + """ + input_file_path = tmp_path / "input.csv" + output_file_path = tmp_path / "output.csv" + + with open(input_file_path, 'w') as f: + f.write(input_csv_content) + + VideoInfo.execute(input_file_path=str(input_file_path), output_file_path=str(output_file_path), api_key="test_api_key") + + assert Path(output_file_path).is_file() + with open(output_file_path, 'r') as f: + reader = csv.DictReader(f) + csv_data = list(reader) + + assert csv_data[0]["id"] == "tmrhPou85HQ" + assert csv_data[1]["id"] == "qoI_x9fylaw" + + +def test_execute_with_info_columns(youtube_mock, mocker, tmp_path, mock_video_info): + """Test the execute method with specified info columns. + + This test verifies that the execute method can filter the video information + based on specified columns and correctly writes the filtered information + to the output CSV file. + """ + ids = ["tmrhPou85HQ", "qoI_x9fylaw"] + output_file_path = tmp_path / "output.csv" + + VideoInfo.execute(ids=ids, output_file_path=str(output_file_path), api_key="test_api_key", info_columns="id,title") + + assert Path(output_file_path).is_file() + with open(output_file_path, 'r') as f: + reader = csv.DictReader(f) + csv_data = list(reader) + + assert csv_data[0]["id"] == "tmrhPou85HQ" + assert csv_data[0]["title"] == "Title 1" + assert csv_data[1]["id"] == "qoI_x9fylaw" + assert csv_data[1]["title"] == "Title 2" diff --git a/tests/commands/test_video_livechat.py b/tests/commands/test_video_livechat.py new file mode 100644 index 0000000..c91db87 --- /dev/null +++ b/tests/commands/test_video_livechat.py @@ -0,0 +1,67 @@ +import csv +import pytest + +from io import StringIO +from datetime import datetime +from unittest.mock import Mock +from youtool.commands import VideoLiveChat + + +def test_video_livechat(mocker): + """Test case for fetching live chat messages from a YouTube video. + + Mocks the YouTube API to return expected live chat messages and verifies if the execute method correctly formats and returns the data. + """ + youtube_mock = mocker.patch("youtool.commands.video_livechat.YouTube") + video_id = "video_id_mock" + + expected_result = [ + {column: "data" for column in VideoLiveChat.CHAT_MESSAGE_COLUMNS} + ] + + csv_file = StringIO() + csv_writer = csv.DictWriter(csv_file, fieldnames=expected_result[0].keys()) + csv_writer.writeheader() + csv_writer.writerows(expected_result) + + videos_livechat_mock = Mock(return_value=expected_result) + youtube_mock.return_value.video_livechat = videos_livechat_mock + result = VideoLiveChat.execute(id=video_id) + + videos_livechat_mock.assert_called_once_with(video_id) + + assert result == csv_file.getvalue() + + +def test_video_livechat_with_file_output(mocker, tmp_path): + """Test case for fetching live chat messages from a YouTube video and saving them to a CSV file. + + Mocks the YouTube API to return expected live chat messages and verifies if the execute method correctly saves the data to a CSV file. + """ + youtube_mock = mocker.patch("youtool.commands.video_livechat.YouTube") + video_id = "video_id_mock" + + expected_result = [ + {column: "data" for column in VideoLiveChat.CHAT_MESSAGE_COLUMNS} + ] + + csv_file = StringIO() + csv_writer = csv.DictWriter(csv_file, fieldnames=expected_result[0].keys()) + csv_writer.writeheader() + csv_writer.writerows(expected_result) + + timestamp = datetime.now().strftime("%f") + output_file_name = f"output_{timestamp}.csv" + output_file_path = tmp_path / output_file_name + + videos_livechat_mock = Mock(return_value=expected_result) + youtube_mock.return_value.video_livechat = videos_livechat_mock + + result_file_path = VideoLiveChat.execute(id=video_id, output_file_path=output_file_path) + + with open(result_file_path, "r") as result_csv_file: + result_csv = result_csv_file.read() + + videos_livechat_mock.assert_called_once_with(video_id) + + assert result_csv.replace("\r", "") == csv_file.getvalue().replace("\r", "") diff --git a/tests/commands/test_video_search.py b/tests/commands/test_video_search.py new file mode 100644 index 0000000..a30a879 --- /dev/null +++ b/tests/commands/test_video_search.py @@ -0,0 +1,86 @@ +import csv +import pytest + +from io import StringIO +from unittest.mock import Mock + +from datetime import datetime + +from youtool.commands.video_search import VideoSearch + + +def test_video_search_string_output(mocker, videos_ids, videos_urls): + """Test the execution of the video-search command and verify the output as string. + + This test simulates the execution of the `VideoSearch.execute` command with a list of video IDs and URLs, + and checks if the output is correctly formatted as a CSV string. + """ + youtube_mock = mocker.patch("youtool.commands.video_search.YouTube") + expected_videos_infos = [ + { + column: f"v_{index}" for column in VideoSearch.INFO_COLUMNS + } for index, _ in enumerate(videos_ids) + ] + + csv_file = StringIO() + csv_writer = csv.DictWriter(csv_file, fieldnames=VideoSearch.INFO_COLUMNS) + csv_writer.writeheader() + csv_writer.writerows(expected_videos_infos) + + videos_infos_mock = Mock(return_value=expected_videos_infos) + youtube_mock.return_value.videos_infos = videos_infos_mock + + result = VideoSearch.execute(ids=videos_ids, urls=videos_urls) + + videos_infos_mock.assert_called_once_with(list(set(videos_ids))) + assert result == csv_file.getvalue() + + +def test_video_search_file_output(mocker, videos_ids, videos_urls, tmp_path): + """Test the execution of the video-search command and verify the output to a file. + + This test simulates the execution of the `VideoSearch.execute` command with a list of video IDs and URLs, + and checks if the output is correctly written to a CSV file. + """ + youtube_mock = mocker.patch("youtool.commands.video_search.YouTube") + expected_videos_infos = [ + { + column: f"v_{index}" for column in VideoSearch.INFO_COLUMNS + } for index, _ in enumerate(videos_ids) + ] + + expected_csv_file = StringIO() + csv_writer = csv.DictWriter(expected_csv_file, fieldnames=VideoSearch.INFO_COLUMNS) + csv_writer.writeheader() + csv_writer.writerows(expected_videos_infos) + + timestamp = datetime.now().strftime("%f") + output_file_name = f"output_{timestamp}.csv" + output_file_path = tmp_path / output_file_name + + videos_infos_mock = Mock(return_value=expected_videos_infos) + youtube_mock.return_value.videos_infos = videos_infos_mock + + result_file_path = VideoSearch.execute( + ids=videos_ids, urls=videos_urls, output_file_path=output_file_path + ) + + with open(result_file_path, "r") as result_csv_file: + result_csv = result_csv_file.read() + + videos_infos_mock.assert_called_once_with(list(set(videos_ids))) + assert result_csv.replace("\r", "") == expected_csv_file.getvalue().replace("\r", "") + + +def test_video_search_no_id_and_url_error(): + """Test if the video-search command raises an exception when neither IDs nor URLs are provided. + + This test checks if executing the `VideoSearch.execute` command without providing IDs or URLs + raises the expected exception. + + Assertions: + - Assert that the raised exception matches the expected error message. + """ + + with pytest.raises(Exception, match="Either 'ids' or 'urls' must be provided"): + VideoSearch.execute(ids=None, urls=None) diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..9165041 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,25 @@ +import pytest + +from subprocess import run + +from youtool.commands import COMMANDS + +from youtool.commands.base import Command + + +@pytest.mark.parametrize( + "command", COMMANDS +) +def test_missing_api_key(monkeypatch: pytest.MonkeyPatch, command: Command): + """Test to verify behavior when the YouTube API key is missing. + + This test ensures that when the YouTube API key is not set, running any command + from the youtool CLI results in an appropriate error message and exit code. + """ + monkeypatch.delenv('YOUTUBE_API_KEY', raising=False) + cli_path = "youtool/cli.py" + command = ["python", cli_path, command.name] + result = run(command, capture_output=True, text=True, check=False) + + assert result.returncode == 2 + assert "YouTube API Key is required" in result.stderr \ No newline at end of file diff --git a/youtool/cli.py b/youtool/cli.py new file mode 100644 index 0000000..28b055c --- /dev/null +++ b/youtool/cli.py @@ -0,0 +1,49 @@ +import argparse +import os + +from youtool.commands import COMMANDS + + +def main(): + """Main function for the YouTube CLI Tool. + + This function sets up the argument parser for the CLI tool, including options for the YouTube API key and + command-specific subparsers. It then parses the command-line arguments, retrieving the YouTube API key + from either the command-line argument '--api-key' or the environment variable 'YOUTUBE_API_KEY'. If the API + key is not provided through any means, it raises an argparse.ArgumentError. + + Finally, the function executes the appropriate command based on the parsed arguments. If an exception occurs + during the execution of the command, it is caught and raised as an argparse error for proper handling. + + Raises: + argparse.ArgumentError: If the YouTube API key is not provided. + argparse.ArgumentError: If there is an error during the execution of the command. + """ + parser = argparse.ArgumentParser(description="CLI Tool for managing YouTube videos add playlists") + parser.add_argument("--api-key", type=str, help="YouTube API Key", dest="api_key") + parser.add_argument("--debug", default=False, action="store_true", help="Debug mode", dest="debug") + + subparsers = parser.add_subparsers(required=True, dest="command", title="Command", help="Command to be executed") + + # cmd_video_livechat = subparsers.add_parser("video-livechat", help="Get comments from a video ID, generate CSV output (same schema for `chat_message` dicts)") + # cmd_video_transcriptions = subparsers.add_parser("video-transcription", help="Download video transcriptions based on language code, path and list of video IDs or URLs (or CSV filename with URLs/IDs inside), download files to destination and report results") + + for command in COMMANDS: + command.parse_arguments(subparsers) + + args = parser.parse_args() + args.api_key = args.api_key or os.environ.get("YOUTUBE_API_KEY") + + if not args.api_key: + parser.error("YouTube API Key is required") + + try: + print(args.func(**args.__dict__)) + except Exception as error: + if args.debug: + raise error + parser.error(error) + + +if __name__ == "__main__": + main() diff --git a/youtool/commands/__init__.py b/youtool/commands/__init__.py new file mode 100644 index 0000000..72913ce --- /dev/null +++ b/youtool/commands/__init__.py @@ -0,0 +1,18 @@ +from .base import Command +from .channel_id import ChannelId +from .channel_info import ChannelInfo +from .video_info import VideoInfo +from .video_search import VideoSearch +from .video_comments import VideoComments + +COMMANDS = [ + ChannelId, + ChannelInfo, + VideoInfo, + VideoSearch, + VideoComments +] + +__all__ = [ + "Command", "COMMANDS", "ChannelId", "ChannelInfo", "VideoInfo", "VideoSearch", "VideoComments" +] diff --git a/youtool/commands/base.py b/youtool/commands/base.py new file mode 100644 index 0000000..20e1708 --- /dev/null +++ b/youtool/commands/base.py @@ -0,0 +1,141 @@ +import csv +import argparse + +from typing import List, Dict, Any, Optional +from io import StringIO +from pathlib import Path +from datetime import datetime +from urllib.parse import urlparse, parse_qsl + + +class Command: + """A base class for commands to inherit from, following a specific structure. + + Attributes: + name (str): The name of the command. + arguments (List[Dict[str, Any]]): A list of dictionaries, each representing an argument for the command. + """ + name: str + arguments: List[Dict[str, Any]] + + @staticmethod + def video_id_from_url(video_url: str) -> Optional[str]: + parsed_url = urlparse(video_url) + parsed_url_query = dict(parse_qsl(parsed_url.query)) + return parsed_url_query.get("v") + + @classmethod + def generate_parser(cls, subparsers: argparse._SubParsersAction): + """Creates a parser for the command and adds it to the subparsers. + + Args: + subparsers (argparse._SubParsersAction): The subparsers action to add the parser to. + + Returns: + argparse.ArgumentParser: The parser for the command. + """ + return subparsers.add_parser(cls.name, help=cls.__doc__) + + @classmethod + def parse_arguments(cls, subparsers: argparse._SubParsersAction) -> None: + """Parses the arguments for the command and sets the command's execute method as the default function to call. + + Args: + subparsers (argparse._SubParsersAction): The subparsers action to add the parser to. + """ + parser = cls.generate_parser(subparsers) + for argument in cls.arguments: + argument_copy = {**argument} + argument_name = argument_copy.pop("name") + parser.add_argument(argument_name, **argument_copy) + parser.set_defaults(func=cls.execute) + + @staticmethod + def filter_fields(video_info: Dict, info_columns: Optional[List] = None) -> Dict: + """Filters the fields of a dictionary containing video information based on specified columns. + + Args: + video_info (Dict): A dictionary containing video information. + info_columns (Optional[List], optional): A list specifying which fields to include in the filtered output. + If None, returns the entire video_info dictionary. Defaults to None. + + Returns: + A dictionary containing only the fields specified in info_columns (if provided) + or the entire video_info dictionary if info_columns is None. + """ + return { + field: value for field, value in video_info.items() if field in info_columns + } if info_columns else video_info + + + @classmethod + def execute(cls, **kwargs) -> str: # noqa: D417 + """Executes the command. + + This method should be overridden by subclasses to define the command's behavior. + + Args: + arguments (argparse.Namespace): The parsed arguments for the command. + """ + raise NotImplementedError() + + @staticmethod + def data_from_csv(file_path: Path, data_column_name: Optional[str] = None) -> List[str]: + """Extracts a list of URLs from a specified CSV file. + + Args: + file_path: The path to the CSV file containing the URLs. + data_column_name: The name of the column in the CSV file that contains the URLs. + If not provided, it defaults to `ChannelId.URL_COLUMN_NAME`. + + Returns: + A list of URLs extracted from the specified CSV file. + + Raises: + Exception: If the file path is invalid or the file cannot be found. + """ + data = [] + + if not file_path.is_file(): + raise FileNotFoundError(f"Invalid file path: {file_path}") + + with file_path.open('r', newline='') as csv_file: + reader = csv.DictReader(csv_file) + fieldnames = reader.fieldnames + + if fieldnames is None: + raise ValueError("Fieldnames is None") + + if data_column_name not in fieldnames: + raise Exception(f"Column {data_column_name} not found on {file_path}") + for row in reader: + value = row.get(data_column_name) + if value is not None: + data.append(str(value)) + return data + + @classmethod + def data_to_csv(cls, data: List[Dict], output_file_path: Optional[str] = None) -> str: + """Converts a list of channel IDs into a CSV file. + + Parameters: + channels_ids (List[str]): List of channel IDs to be written to the CSV. + output_file_path (str, optional): Path to the file where the CSV will be saved. If not provided, the CSV will be returned as a string. + channel_id_column_name (str, optional): Name of the column in the CSV that will contain the channel IDs. + If not provided, the default value defined in ChannelId.CHANNEL_ID_COLUMN_NAME will be used. + + Returns: + str: The path of the created CSV file or, if no path is provided, the contents of the CSV as a string. + """ + if output_file_path: + output_path = Path(output_file_path) + if output_path.is_dir(): + command_name = cls.name.replace("-", "_") + timestamp = datetime.now().strftime("%M%S%f") + output_file_path = output_path / f"{command_name}_{timestamp}.csv" + + with (Path(output_file_path).open('w', newline='') if output_file_path else StringIO()) as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=list(data[0].keys()) if data else []) + writer.writeheader() + writer.writerows(data) + return str(output_file_path) if output_file_path else csv_file.getvalue() diff --git a/youtool/commands/channel_id.py b/youtool/commands/channel_id.py new file mode 100644 index 0000000..d42f311 --- /dev/null +++ b/youtool/commands/channel_id.py @@ -0,0 +1,86 @@ + +from pathlib import Path + +from youtool import YouTube + +from .base import Command + + +class ChannelId(Command): + """Get channel IDs from a list of URLs (or CSV filename with URLs inside), generate CSV output (just the IDs).""" + name = "channel-id" + arguments = [ + {"name": "--urls", "type": str, "help": "Channels urls", "nargs": "*"}, + {"name": "--urls-file-path", "type": str, "help": "Channels urls csv file path"}, + {"name": "--output-file-path", "type": str, "help": "Output csv file path"}, + {"name": "--url-column-name", "type": str, "help": "URL column name on csv input files"}, + {"name": "--id-column-name", "type": str, "help": "Channel ID column name on csv output files"} + ] + + URL_COLUMN_NAME: str = "channel_url" + CHANNEL_ID_COLUMN_NAME: str = "channel_id" + + @classmethod + def execute(cls, **kwargs) -> str: # noqa: D417 + """Execute the channel-id command to fetch YouTube channel IDs from URLs and save them to a CSV file. + + This method retrieves YouTube channel IDs from a list of provided URLs or from a file containing URLs. + It then saves these channel IDs to a CSV file if an output file path is specified. + + Args: + urls (list[str], optional): A list of YouTube channel URLs. Either this or urls_file_path must be provided. + urls_file_path (str, optional): Path to a CSV file containing YouTube channel URLs. + Requires url_column_name to specify the column with URLs. + output_file_path (str, optional): Path to the output CSV file where channel IDs will be saved. + If not provided, the result will be returned as a string. + api_key (str): The API key to authenticate with the YouTube Data API. + url_column_name (str, optional): The name of the column in the urls_file_path CSV file that contains the URLs. + Default is "url". + id_column_name (str, optional): The name of the column for channel IDs in the output CSV file. + Default is "channel_id". + + Returns: + str: A message indicating the result of the command. If output_file_path is specified, the message will + include the path to the generated CSV file. Otherwise, it will return the result as a string. + + Raises: + Exception: If neither urls nor urls_file_path is provided. + """ + urls = kwargs.get("urls") + urls_file_path = kwargs.get("urls_file_path") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + url_column_name = kwargs.get("url_column_name") + id_column_name = kwargs.get("id_column_name") + + urls = cls.resolve_urls(urls, urls_file_path, url_column_name) + + youtube = YouTube([api_key], disable_ipv6=True) + + channels_ids = [ + youtube.channel_id_from_url(url) for url in urls if url + ] + + result = cls.data_to_csv( + data=[ + { + (id_column_name or cls.CHANNEL_ID_COLUMN_NAME): channel_id + } for channel_id in channels_ids + ], + output_file_path=output_file_path + ) + + return result + + @classmethod + def resolve_urls(cls, urls, urls_file_path, url_column_name): + if urls_file_path and not urls: + urls = cls.data_from_csv( + file_path=Path(urls_file_path), + data_column_name=url_column_name or cls.URL_COLUMN_NAME + ) + + if not urls: + raise Exception("Either 'username' or 'url' must be provided for the channel-id command") + return urls diff --git a/youtool/commands/channel_info.py b/youtool/commands/channel_info.py new file mode 100644 index 0000000..09103af --- /dev/null +++ b/youtool/commands/channel_info.py @@ -0,0 +1,122 @@ +import csv + +from typing import List, Dict, Optional, Self + +from youtool import YouTube + +from .base import Command + + +class ChannelInfo(Command): + """Get channel info from a list of IDs (or CSV filename with IDs inside), generate CSV output + (same schema for `channel` dicts) + """ + name = "channel-info" + arguments = [ + {"name": "--urls", "type": str, "help": "Channel URLs", "nargs": "*"}, + {"name": "--usernames", "type": str, "help": "Channel usernames", "nargs": "*"}, + {"name": "--ids", "type": str, "help": "Channel IDs", "nargs": "*"}, + {"name": "--urls-file-path", "type": str, "help": "Channel URLs CSV file path"}, + {"name": "--usernames-file-path", "type": str, "help": "Channel usernames CSV file path"}, + {"name": "--ids-file-path", "type": str, "help": "Channel IDs CSV file path"}, + {"name": "--output-file-path", "type": str, "help": "Output CSV file path"}, + {"name": "--url-column-name", "type": str, "help": "URL column name on CSV input files"}, + {"name": "--username-column-name", "type": str, "help": "Username column name on CSV input files"}, + {"name": "--id-column-name", "type": str, "help": "ID column name on CSV input files"}, + ] + + URL_COLUMN_NAME: str = "channel_url" + USERNAME_COLUMN_NAME: str = "channel_username" + ID_COLUMN_NAME: str = "channel_id" + INFO_COLUMNS: List[str] = [ + "id", "title", "description", "published_at", "view_count", "subscriber_count", "video_count" + ] + + @staticmethod + def filter_fields(channel_info: Dict, info_columns: Optional[List] = None): + """Filters the fields of a dictionary containing channel information based on + specified columns. + + Args: + channel_info (Dict): A dictionary containing channel information. + info_columns (Optional[List], optional): A list specifying which fields + to include in the filtered output. If None, returns the entire + channel_info dictionary. Defaults to None. + + Returns: + Dict: A dictionary containing only the fields specified in info_columns + (if provided) or the entire channel_info dictionary if info_columns is None. + """ + return { + field: value for field, value in channel_info.items() if field in info_columns + } if info_columns else channel_info + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """Execute the channel-info command to fetch YouTube channel information from URLs or + usernames and save them to a CSV file. + + Args: + urls (list[str], optional): A list of YouTube channel URLs. If not provided, `urls_file_path` must be specified. + usernames (list[str], optional): A list of YouTube channel usernames. If not provided, `usernames_file_path` must be specified. + urls_file_path (str, optional): Path to a CSV file containing YouTube channel URLs. + usernames_file_path (str, optional): Path to a CSV file containing YouTube channel usernames. + output_file_path (str, optional): Path to the output CSV file where channel information will be saved. + api_key (str): The API key to authenticate with the YouTube Data API. + url_column_name (str, optional): The name of the column in the `urls_file_path` CSV file that contains the URLs. + Default is "channel_url". + username_column_name (str, optional): The name of the column in the `usernames_file_path` CSV file that contains the usernames. + Default is "channel_username". + info_columns (str, optional): Comma-separated list of columns to include in the output CSV. + Default is the class attribute `INFO_COLUMNS`. + + Returns: + str: A message indicating the result of the command. If `output_file_path` is specified, the message will + include the path to the generated CSV file. Otherwise, it will return the result as a string. + + Raises: + Exception: If neither `urls`, `usernames`, `urls_file_path` nor `usernames_file_path` is provided. + """ + + urls = kwargs.get("urls") + usernames = kwargs.get("usernames") + urls_file_path = kwargs.get("urls_file_path") + usernames_file_path = kwargs.get("usernames_file_path") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + url_column_name = kwargs.get("url_column_name") + username_column_name = kwargs.get("username_column_name") + info_columns = kwargs.get("info_columns") + + info_columns = [ + column.strip() for column in info_columns.split(",") + ] if info_columns else ChannelInfo.INFO_COLUMNS + + if urls_file_path and not urls: + urls = ChannelInfo.data_from_file(urls_file_path, url_column_name) + if usernames_file_path and not usernames: + usernames = ChannelInfo.data_from_file(usernames_file_path, username_column_name) + + if not urls and not usernames: + raise Exception("Either 'urls' or 'usernames' must be provided for the channel-info command") + + youtube = YouTube([api_key], disable_ipv6=True) + + channels_ids = [ + youtube.channel_id_from_url(url) for url in (urls or []) if url + ] + [ + youtube.channel_id_from_username(username) for username in (usernames or []) if username + ] + channel_ids = list( + set([channel_id for channel_id in channels_ids if channel_id]) + ) + + return cls.data_to_csv( + data=[ + ChannelInfo.filter_fields( + channel_info, info_columns + ) for channel_info in (youtube.channels_infos(channel_ids) or []) + ], + output_file_path=output_file_path + ) diff --git a/youtool/commands/video_comments.py b/youtool/commands/video_comments.py new file mode 100644 index 0000000..ec07e18 --- /dev/null +++ b/youtool/commands/video_comments.py @@ -0,0 +1,47 @@ +import csv +from typing import List, Dict, Optional, Self + +from youtool import YouTube +from .base import Command + +class VideoComments(Command): + """Get comments from a video ID, generate CSV output (same schema for comment dicts)""" + + name = "video-comments" + arguments = [ + {"name": "--id", "type": str, "help": "Video ID", "required": True}, + {"name": "--output-file-path", "type": str, "help": "Output CSV file path"} + ] + + COMMENT_COLUMNS: List[str] = [ + "comment_id", "author_display_name", "text_display", "like_count", "published_at" + ] + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the get-comments command to fetch comments from a YouTube video and save them to a CSV file. + + Args: + id (str): The ID of the YouTube video. + output_file_path (str): Path to the output CSV file where comments will be saved. + api_key (str): The API key to authenticate with the YouTube Data API. + + Returns: + A message indicating the result of the command. If output_file_path is specified, + the message will include the path to the generated CSV file. + Otherwise, it will return the result as a string. + """ + video_id = kwargs.get("id") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + youtube = YouTube([api_key], disable_ipv6=True) + + comments = list(youtube.video_comments(video_id)) + + return cls.data_to_csv( + data=comments, + output_file_path=output_file_path + ) + \ No newline at end of file diff --git a/youtool/commands/video_info.py b/youtool/commands/video_info.py new file mode 100644 index 0000000..bfa6534 --- /dev/null +++ b/youtool/commands/video_info.py @@ -0,0 +1,92 @@ +import csv + +from typing import List, Dict, Optional, Self + +from youtool import YouTube + +from .base import Command + + +class VideoInfo(Command): + """Get video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), generate CSV output (same schema for video dicts)") + """ + name = "video-info" + arguments = [ + {"name": "--ids", "type": str, "help": "Video IDs", "nargs": "*"}, + {"name": "--urls", "type": str, "help": "Video URLs", "nargs": "*"}, + {"name": "--input-file-path", "type": str, "help": "Input CSV file path with URLs/IDs"}, + {"name": "--output-file-path", "type": str, "help": "Output CSV file path"} + ] + + ID_COLUMN_NAME: str = "video_id" + URL_COLUMN_NAME: str = "video_url" + INFO_COLUMNS: List[str] = [ + "id", "title", "description", "published_at", "view_count", "like_count", "comment_count" + ] + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the video-info command to fetch YouTube video information from IDs or URLs and save them to a CSV file. + + Args: + ids (list[str], optional): A list of YouTube video IDs. If not provided, input_file_path must be specified. + urls (list[str], optional): A list of YouTube video URLs. If not provided, input_file_path must be specified. + input_file_path (str, optional): Path to a CSV file containing YouTube video URLs or IDs. + output_file_path (str, optional): Path to the output CSV file where video information will be saved. + api_key (str): The API key to authenticate with the YouTube Data API. + url_column_name (str, optional): The name of the column in the input_file_path CSV file that contains the URLs. + Default is "video_url". + id_column_name (str, optional): The name of the column in the input_file_path CSV file that contains the IDs. + Default is "video_id". + info_columns (str, optional): Comma-separated list of columns to include in the output CSV. + Default is the class attribute INFO_COLUMNS. + + Returns: + str: A message indicating the result of the command. If output_file_path is specified, the message will + include the path to the generated CSV file. Otherwise, it will return the result as a string. + + Raises: + Exception: If neither ids, urls, nor input_file_path is provided. + """ + + ids = kwargs.get("ids", []) + urls = kwargs.get("urls", []) + input_file_path = kwargs.get("input_file_path") + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + + info_columns = kwargs.get("info_columns") + + info_columns = [ + column.strip() for column in info_columns.split(",") + ] if info_columns else VideoInfo.INFO_COLUMNS + + if input_file_path: + with open(input_file_path, mode='r') as infile: + reader = csv.DictReader(infile) + for row in reader: + if cls.ID_COLUMN_NAME in row: + ids.append(row[cls.ID_COLUMN_NAME]) + elif cls.URL_COLUMN_NAME in row: + urls.append(row[cls.URL_COLUMN_NAME]) + + if not ids and not urls: + raise Exception("Either 'ids' or 'urls' must be provided for the video-info command") + + youtube = YouTube([api_key], disable_ipv6=True) + + if urls: + ids += [cls.video_id_from_url(url) for url in urls] + + # Remove duplicated + ids = list(set(ids)) + videos_infos = list(youtube.videos_infos([_id for _id in ids if _id])) + return cls.data_to_csv( + data=[ + VideoInfo.filter_fields( + video_info, info_columns + ) for video_info in videos_infos + ], + output_file_path=output_file_path + ) diff --git a/youtool/commands/video_livechat.py b/youtool/commands/video_livechat.py new file mode 100644 index 0000000..775b857 --- /dev/null +++ b/youtool/commands/video_livechat.py @@ -0,0 +1,81 @@ +import csv +from typing import List, Dict, Optional, Self +from chat_downloader import ChatDownloader +from chat_downloader.errors import ChatDisabled, LoginRequired, NoChatReplay +from .base import Command +from datetime import datetime + +class VideoLiveChat(Command): + """Get live chat comments from a video ID, generate CSV output (same schema for chat_message dicts)""" + name = "video-livechat" + arguments = [ + {"name": "--id", "type": str, "help": "Video ID", "required": True}, + {"name": "--output-file-path", "type": str, "help": "Output CSV file path"}, + {"name": "--expand-emojis", "type": bool, "help": "Expand emojis in chat messages", "default": True} + ] + + CHAT_COLUMNS: List[str] = [ + "id", "video_id", "created_at", "type", "action", "video_time", + "author", "author_id", "author_image_url", "text", + "money_currency", "money_amount" + ] + + @staticmethod + def parse_timestamp(timestamp: str) -> str: + return datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S') + + @staticmethod + def parse_decimal(value: Optional[str]) -> Optional[float]: + return float(value.replace(',', '')) if value else None + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the video-livechat command to fetch live chat messages from a YouTube video and save them to a CSV file. + + Args: + id (str): The ID of the YouTube video. + output_file_path (str): Path to the output CSV file where chat messages will be saved. + expand_emojis (bool): Whether to expand emojis in chat messages. Defaults to True. + api_key (str): The API key to authenticate with the YouTube Data API. + + Returns: + A message indicating the result of the command. If output_file_path is specified, + the message will include the path to the generated CSV file. + Otherwise, it will return the result as a string. + """ + video_id = kwargs.get("id") + output_file_path = kwargs.get("output_file_path") + expand_emojis = kwargs.get("expand_emojis", True) + + downloader = ChatDownloader() + video_url = f"https://youtube.com/watch?v={video_id}" + + chat_messages = [] + try: + live = downloader.get_chat(video_url, message_groups=["messages", "superchat"]) + for message in live: + text = message["message"] + if expand_emojis: + for emoji in message.get("emotes", []): + for shortcut in emoji["shortcuts"]: + text = text.replace(shortcut, emoji["id"]) + money = message.get("money", {}) or {} + chat_messages.append({ + "id": message["message_id"], + "video_id": video_id, + "created_at": cls.parse_timestamp(message["timestamp"]), + "type": message["message_type"], + "action": message["action_type"], + "video_time": float(message["time_in_seconds"]), + "author": message["author"]["name"], + "author_id": message["author"]["id"], + "author_image_url": [img for img in message["author"]["images"] if img["id"] == "source"][0]["url"], + "text": text, + "money_currency": money.get("currency"), + "money_amount": cls.parse_decimal(money.get("amount")), + }) + except (LoginRequired, NoChatReplay, ChatDisabled): + raise + + return cls.data_to_csv(chat_messages, output_file_path) diff --git a/youtool/commands/video_search.py b/youtool/commands/video_search.py new file mode 100644 index 0000000..4713a84 --- /dev/null +++ b/youtool/commands/video_search.py @@ -0,0 +1,94 @@ +import csv + +from typing import List, Dict, Optional, Self + +from youtool import YouTube + +from .base import Command + + +class VideoSearch(Command): + """ + Search video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), + generate CSV output (simplified video dict schema or option to get full video info) + """ + name = "video-search" + arguments = [ + {"name": "--ids", "type": str, "help": "Video IDs", "nargs": "*"}, + {"name": "--urls", "type": str, "help": "Video URLs", "nargs": "*"}, + {"name": "--input-file-path", "type": str, "help": "Input CSV file path with URLs/IDs"}, + {"name": "--output-file-path", "type": str, "help": "Output CSV file path"}, + {"name": "--full-info", "type": bool, "help": "Option to get full video info", "default": False}, + {"name": "--url-column-name", "type": str, "help": "URL column name on csv input files"}, + {"name": "--id-column-name", "type": str, "help": "Channel ID column name on csv output files"} + ] + + ID_COLUMN_NAME: str = "video_id" + URL_COLUMN_NAME: str = "video_url" + INFO_COLUMNS: List[str] = [ + "id", "title", "published_at", "view_count" + ] + FULL_INFO_COLUMNS: List[str] = [ + "id", "title", "description", "published_at", "view_count", "like_count", "comment_count" + ] + + @classmethod + def execute(cls: Self, **kwargs) -> str: + """ + Execute the video-search command to fetch YouTube video information from IDs or URLs and save them to a CSV file. + + Args: + ids (list[str], optional): A list of YouTube video IDs. If not provided, input_file_path must be specified. + urls (list[str], optional): A list of YouTube video URLs. If not provided, input_file_path must be specified. + input_file_path (str, optional): Path to a CSV file containing YouTube video URLs or IDs. + output_file_path (str, optional): Path to the output CSV file where video information will be saved. + api_key (str): The API key to authenticate with the YouTube Data API. + full_info (bool, optional): Flag to indicate whether to get full video info. Default is False. + url_column_name (str, optional): The name of the column in the input CSV file that contains the URLs. Default is "video_url". + id_column_name (str, optional): The name of the column in the input CSV file that contains the IDs. Default is "video_id". + + Returns: + str: A message indicating the result of the command. If output_file_path is specified, + the message will include the path to the generated CSV file. + Otherwise, it will return the result as a string. + + Raises: + Exception: If neither ids, urls, nor input_file_path is provided. + """ + ids = kwargs.get("ids", []) + urls = kwargs.get("urls", []) + output_file_path = kwargs.get("output_file_path") + api_key = kwargs.get("api_key") + full_info = kwargs.get("full_info", False) + + url_column_name = kwargs.get("url_column_name", cls.URL_COLUMN_NAME) + id_column_name = kwargs.get("id_column_name", cls.ID_COLUMN_NAME) + + info_columns = VideoSearch.FULL_INFO_COLUMNS if full_info else VideoSearch.INFO_COLUMNS + + if (input_file_path := kwargs.get("input_file_path")): + if (urls_from_csv := cls.data_from_csv(input_file_path, url_column_name)): + ids += [cls.video_id_from_url(url) for url in urls_from_csv] + if (ids_from_csv := cls.data_from_csv(input_file_path, id_column_name)): + ids += ids_from_csv + + if not ids and not urls: + raise Exception("Either 'ids' or 'urls' must be provided for the video-search command") + + youtube = YouTube([api_key], disable_ipv6=True) + + if urls: + ids += [cls.video_id_from_url(url) for url in urls] + + # Remove duplicated + ids = list(set(ids)) + videos_infos = list(youtube.videos_infos([_id for _id in ids if _id])) + + return cls.data_to_csv( + data=[ + VideoSearch.filter_fields( + video_info, info_columns + ) for video_info in videos_infos + ], + output_file_path=output_file_path + )