diff --git a/src/bilifm/audio.py b/src/bilifm/audio.py index eb569fb..55f25e4 100644 --- a/src/bilifm/audio.py +++ b/src/bilifm/audio.py @@ -1,12 +1,15 @@ import os -import sys import time import requests -import typer +from rich.console import Console +from rich.panel import Panel +from rich.progress import BarColumn, DownloadColumn, Progress, TransferSpeedColumn from .util import AudioQualityEnums, get_signed_params +console = Console() + class Audio: bvid = "" @@ -64,7 +67,13 @@ def download(self): # 如果文件已存在,则跳过下载 if os.path.exists(file_path): - typer.echo(f"{self.title} already exists, skip for now") + console.print( + Panel( + f"{self.title} 已存在,跳过下载", + style="yellow", + expand=False, + ) + ) return params = get_signed_params( @@ -79,15 +88,27 @@ def download(self): ).json() if json["data"] is None: - typer.echo( - f" `data` field is not valid with url: {self.playUrl} and params : {params}" + console.print( + Panel( + f"[bold red]数据字段无效[/bold red]\n" + f"URL: {self.playUrl}\n" + f"参数: {params}", + title="错误", + expand=False, + ) ) return audio = json["data"]["dash"]["audio"] if not audio: - typer.echo( - f" `audio` field is empty with url: {self.playUrl} and params : {params}" + console.print( + Panel( + f"[bold red]音频字段为空[/bold red]\n" + f"URL: {self.playUrl}\n" + f"参数: {params}", + title="错误", + expand=False, + ) ) return @@ -103,38 +124,45 @@ def download(self): response = requests.get(url=base_url, headers=self.headers, stream=True) total_size = int(response.headers.get("content-length", 0)) - temp_size = 0 - - with open(file_path, "wb") as f: - for chunk in response.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) - f.flush() - - temp_size += len(chunk) - done = int(50 * temp_size / total_size) - sys.stdout.write( - "\r[%s%s] %s/%s %s" - % ( - "#" * done, - "-" * (50 - done), - temp_size, - total_size, - self.title, - ) - ) - sys.stdout.flush() - except Exception as e: - typer.echo("Download failed") - typer.echo("Error: " + str(e)) - pass + with Progress( + "[progress.description]{task.description}", + BarColumn(), + "[progress.percentage]{task.percentage:>3.0f}%", + "•", + DownloadColumn(), + "•", + TransferSpeedColumn(), + console=console, + ) as progress: + task = progress.add_task( + f"[cyan]下载 {self.title}", total=total_size + ) - end_time = time.time() + with open(file_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + progress.update(task, advance=len(chunk)) + + # 添加下载完成的提示 + end_time = time.time() + download_time = round(end_time - start_time, 2) + console.print( + Panel( + f"[bold green]下载完成![/bold green]用时 {download_time} 秒", + expand=False, + ) + ) - sys.stdout.write( - " " + str(round(end_time - start_time, 2)) + " seconds download finish\n" - ) + except Exception as e: + console.print( + Panel( + f"[bold red]下载失败[/bold red]\n错误: {str(e)}", + title="异常", + expand=False, + ) + ) def __get_cid_title(self, bvid: str): url = "https://api.bilibili.com/x/web-interface/view" @@ -149,17 +177,21 @@ def __get_cid_title(self, bvid: str): data = response.json().get("data") self.title = self.__title_process(data.get("title")) - # 这里是否也应该也使用get方法? - self.cid_list = [str(page["cid"]) for page in data["pages"]] + self.cid_list = [str(page.get("cid")) for page in data.get("pages", [])] self.part_list = [ - self.__title_process(str(page["part"])) for page in data["pages"] + self.__title_process(str(page.get("part"))) + for page in data.get("pages", []) ] - except ValueError as e: - raise e - except Exception as e: - raise e + console.print( + Panel( + f"[bold red]获取视频信息失败[/bold red]\n错误: {str(e)}", + title="异常", + expand=False, + ) + ) + raise def __title_process(self, title: str): replaceList = ["?", "\\", "*", "|", "<", ">", ":", "/", " "]