diff --git a/FEATURES.md b/FEATURES.md new file mode 100644 index 0000000..a2672ea --- /dev/null +++ b/FEATURES.md @@ -0,0 +1,226 @@ +# Pomice Advanced Features Guide + +## 🎉 Overview + +Pomice now comes with built-in advanced features to help you build powerful music bots. These features are **integrated directly into the Player and Queue classes**, providing a "batteries-included" experience. + +### Key Enhancements + +- **Integrated Queue & History**: Every `Player` now has its own `queue` and `history` automatically. +- **Auto-History**: Tracks are automatically added to history when they finish playing. +- **Advanced Analytics**: Detailed statistics available directly via `player.get_stats()` or `queue.get_stats()`. +- **Integrated Utilities**: Filtering, sorting, and playlist management. + +--- + +## 📚 Table of Contents + +1. [Integrated Features](#-integrated-features) +2. [Track History](#-track-history) +3. [Queue Statistics](#-queue-statistics) +4. [Playlist Manager](#-playlist-manager) +5. [Track Utilities](#-track-utilities) +6. [Complete Examples](#-complete-examples) + +--- + +## 🚀 Integrated Features + +Since these features are now part of the core classes, usage is extremely simple: + +```python +# Every player now has a queue and history by default +player = ctx.voice_client + +# Access the queue +player.queue.put(track) + +# Play the next track from the queue +await player.do_next() + +# Access the history (automatically updated) +last_song = player.history.current + +# Get real-time statistics +stats = player.get_stats() +print(f"Queue Duration: {stats.format_duration(stats.total_duration)}") +``` + +--- + +## 🕐 Track History + +The `player.history` object automatically tracks every song that finishes playing. + +### Features +- Configurable maximum history size (default: 100) +- Navigation: `get_previous()`, `get_next()` +- Search: `history.search("query")` +- Filter: `get_by_requester(user_id)` +- Unique tracks: `get_unique_tracks()` + +### Usage +```python +# Show last 10 songs +recent = player.history.get_last(10) + +# Search history +results = player.history.search("Imagine Dragons") + +# Play previous track +prev = player.history.get_previous() +if prev: + await player.play(prev) +``` + +--- + +## 📊 Queue Statistics + +Access advanced analytics via `player.get_stats()` or `player.queue.get_stats()`. + +### Features +- Total/Average duration +- Longest/Shortest tracks +- Requester analytics (who added what) +- Author distribution +- Duration breakdown (short/medium/long) + +### Usage +```python +stats = player.get_stats() +summary = stats.get_summary() + +print(f"Total Tracks: {summary['total_tracks']}") +print(f"Total Duration: {summary['total_duration_formatted']}") + +# Who added the most songs? +top = stats.get_top_requesters(3) +for user, count in top: + print(f"{user.display_name}: {count} tracks") +``` + +--- + +## 💾 Playlist Manager + +Export and import playlists to/from JSON and M3U formats. + +### Usage +```python +import pomice + +# Export current queue to file +pomice.PlaylistManager.export_queue( + player.queue, + filepath='playlists/party.json', + name='Party Mix' +) + +# Import a playlist +data = pomice.PlaylistManager.import_playlist('playlists/rock.json') +uris = pomice.PlaylistManager.get_track_uris('playlists/rock.json') + +for uri in uris: + results = await player.get_tracks(query=uri) + if results: + player.queue.put(results[0]) +``` + +--- + +## 🔧 Track Utilities + +Advanced filtering and sorting. + +### Filtering +```python +import pomice + +tracks = list(player.queue) + +# Get tracks under 5 minutes +short = pomice.TrackFilter.by_duration(tracks, max_duration=300000) + +# Get tracks by a specific artist +artist_songs = pomice.TrackFilter.by_author(tracks, "Artist Name") +``` + +### Sorting +```python +# Sort queue by title +sorted_tracks = pomice.SearchHelper.sort_by_title(list(player.queue)) + +# Clear and refill with sorted tracks +player.queue.clear() +player.queue.extend(sorted_tracks) +``` + +--- + +## 🎯 Complete Examples + +### Integrated Music Cog + +```python +import pomice +from discord.ext import commands + +class Music(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @commands.command() + async def play(self, ctx, *, search: str): + if not ctx.voice_client: + await ctx.author.voice.channel.connect(cls=pomice.Player) + + player = ctx.voice_client + results = await player.get_tracks(query=search, ctx=ctx) + + if not results: + return await ctx.send("No results.") + + track = results[0] + player.queue.put(track) + await ctx.send(f"Added **{track.title}** to queue.") + + if not player.is_playing: + await player.do_next() + + @commands.command() + async def history(self, ctx): + """Show recently played songs.""" + player = ctx.voice_client + recent = player.history.get_last(5) + + msg = "\n".join(f"{i}. {t.title}" for i, t in enumerate(recent, 1)) + await ctx.send(f"**Recently Played:**\n{msg}") + + @commands.command() + async def stats(self, ctx): + """Show queue analytics.""" + stats = ctx.voice_client.get_stats() + summary = stats.get_summary() + + await ctx.send( + f"**Queue Stats**\n" + f"Tracks: {summary['total_tracks']}\n" + f"Duration: {summary['total_duration_formatted']}" + ) +``` + +--- + +## 📖 Quick Reference + +| Feature | Integrated Access | +| :--- | :--- | +| **Queue** | `player.queue` | +| **History** | `player.history` | +| **Statistics** | `player.get_stats()` | +| **Next Track** | `await player.do_next()` | + +--- + +**Happy coding! 🎵** diff --git a/examples/advanced.py b/examples/advanced.py index b8d2d3a..c835c0c 100644 --- a/examples/advanced.py +++ b/examples/advanced.py @@ -301,6 +301,85 @@ async def skip(self, ctx: commands.Context): delete_after=15, ) + @commands.command() + async def loop(self, ctx: commands.Context, mode: str = "off"): + """Sets the loop mode: off, track, queue.""" + player: Player = ctx.voice_client + if not player: + return + + mode = mode.lower() + if mode == "track": + player.loop_mode = pomice.LoopMode.TRACK + elif mode == "queue": + player.loop_mode = pomice.LoopMode.QUEUE + else: + player.loop_mode = None + + await ctx.send(f"Loop mode set to **{mode}**") + + @commands.command() + async def autoplay(self, ctx: commands.Context): + """Toggles autoplay to keep the music going with recommendations when the queue is empty.""" + player: Player = ctx.voice_client + if not player: + return + + player.autoplay = not player.autoplay + await ctx.send(f"Autoplay is now **{'on' if player.autoplay else 'off'}**") + + @commands.command() + async def move(self, ctx: commands.Context, from_index: int, to_index: int): + """Moves a track's position in the queue (e.g., !move 5 1).""" + player: Player = ctx.voice_client + if not player or player.queue.is_empty: + return await ctx.send("The queue is empty.") + + try: + player.queue.move(from_index - 1, to_index - 1) + await ctx.send(f"Moved track from #{from_index} to #{to_index}.") + except IndexError: + await ctx.send("Sorry, I couldn't find a track at that position.") + + @commands.command(aliases=["clean"]) + async def deduplicate(self, ctx: commands.Context): + """Removes any double-posted songs from your queue.""" + player: Player = ctx.voice_client + if not player: + return + + removed = player.queue.remove_duplicates() + await ctx.send(f"All cleaned up! Removed **{removed}** duplicate tracks.") + + @commands.command() + async def filter(self, ctx: commands.Context, preset: str = "off"): + """Apply a sound preset: pop, soft, metal, boost, nightcore, vaporwave, off.""" + player: Player = ctx.voice_client + if not player: + return + + preset = preset.lower() + await player.reset_filters() + + if preset == "off": + return await ctx.send("Filters cleared.") + + presets = { + "pop": pomice.Equalizer.pop(), + "soft": pomice.Equalizer.soft(), + "metal": pomice.Equalizer.metal(), + "boost": pomice.Equalizer.boost(), + "nightcore": pomice.Timescale.nightcore(), + "vaporwave": pomice.Timescale.vaporwave(), + "bass": pomice.Equalizer.bass_boost_light(), + } + + if preset not in presets: + return await ctx.send(f"Available presets: {', '.join(presets.keys())}") + + await player.add_filter(presets[preset]) + await ctx.send(f"Applied the **{preset}** sound preset!") + @commands.command() async def stop(self, ctx: commands.Context): """Stop the player and clear all internal states.""" diff --git a/examples/advanced_features.py b/examples/advanced_features.py new file mode 100644 index 0000000..9b3a5b9 --- /dev/null +++ b/examples/advanced_features.py @@ -0,0 +1,134 @@ +""" +Example usage of Pomice's integrated advanced features. + +This example shows how easy it is to use: +- Integrated Track History (auto-tracking) +- Integrated Player Queue +- Integrated Analytics with player.get_stats() +- Playlist Import/Export +""" +import discord +from discord.ext import commands + +import pomice + +# Initialize bot +bot = commands.Bot(command_prefix="!", intents=discord.Intents.all()) + + +class IntegratedMusic(commands.Cog): + """Music cog with integrated advanced features.""" + + def __init__(self, bot): + self.bot = bot + self.pomice = pomice.NodePool() + + async def start_nodes(self): + """Start Lavalink nodes.""" + await self.pomice.create_node( + bot=self.bot, + host="127.0.0.1", + port="3030", + password="youshallnotpass", + identifier="MAIN", + ) + + @commands.command(name="play") + async def play(self, ctx, *, search: str): + """Play a track using the integrated queue.""" + if not ctx.voice_client: + await ctx.author.voice.channel.connect(cls=pomice.Player) + + player: pomice.Player = ctx.voice_client + results = await player.get_tracks(query=search, ctx=ctx) + + if not results: + return await ctx.send("No results found.") + + if isinstance(results, pomice.Playlist): + player.queue.extend(results.tracks) + await ctx.send(f"Added playlist **{results.name}** ({len(results.tracks)} tracks).") + else: + track = results[0] + player.queue.put(track) + await ctx.send(f"Added **{track.title}** to queue.") + + if not player.is_playing: + await player.do_next() + + @commands.command(name="history") + async def history(self, ctx, limit: int = 10): + """Show recently played tracks (tracked automatically!).""" + player: pomice.Player = ctx.voice_client + if not player: + return await ctx.send("Not connected.") + + if player.history.is_empty: + return await ctx.send("No tracks in history.") + + tracks = player.history.get_last(limit) + + embed = discord.Embed(title="🎵 Recently Played", color=discord.Color.blue()) + for i, track in enumerate(tracks, 1): + embed.add_field(name=f"{i}. {track.title}", value=f"by {track.author}", inline=False) + + await ctx.send(embed=embed) + + @commands.command(name="stats") + async def queue_stats(self, ctx): + """Show detailed queue statistics via integrated get_stats().""" + player: pomice.Player = ctx.voice_client + if not player: + return await ctx.send("Not connected.") + + stats = player.get_stats() + summary = stats.get_summary() + + embed = discord.Embed(title="📊 Queue Statistics", color=discord.Color.green()) + embed.add_field(name="Tracks", value=summary["total_tracks"], inline=True) + embed.add_field(name="Duration", value=summary["total_duration_formatted"], inline=True) + + # Who added the most? + top_requesters = stats.get_top_requesters(3) + if top_requesters: + text = "\n".join(f"{u.display_name}: {c} tracks" for u, c in top_requesters) + embed.add_field(name="Top Requesters", value=text, inline=False) + + await ctx.send(embed=embed) + + @commands.command(name="export") + async def export_queue(self, ctx, filename: str = "my_playlist.json"): + """Export current integrated queue.""" + player: pomice.Player = ctx.voice_client + if not player or player.queue.is_empty: + return await ctx.send("Queue is empty.") + + pomice.PlaylistManager.export_queue( + player.queue, + f"playlists/{filename}", + name=f"{ctx.guild.name}'s Playlist", + ) + await ctx.send(f"✅ Queue exported to `playlists/{filename}`") + + @commands.command(name="sort") + async def sort_queue(self, ctx): + """Sort the queue using integrated utilities.""" + player: pomice.Player = ctx.voice_client + if not player or player.queue.is_empty: + return await ctx.send("Queue is empty.") + + # Use SearchHelper to sort the queue list + sorted_tracks = pomice.SearchHelper.sort_by_title(list(player.queue)) + + player.queue.clear() + player.queue.extend(sorted_tracks) + await ctx.send("✅ Queue sorted alphabetically.") + + +@bot.event +async def on_ready(): + print(f"{bot.user} is ready!") + + +if __name__ == "__main__": + print("Example script ready for use!") diff --git a/pomice/__init__.py b/pomice/__init__.py index a2ae70c..18dc2b4 100644 --- a/pomice/__init__.py +++ b/pomice/__init__.py @@ -35,3 +35,7 @@ class DiscordPyOutdated(Exception): from .player import * from .pool import * from .routeplanner import * +from .history import * +from .queue_stats import * +from .playlist_manager import * +from .track_utils import * diff --git a/pomice/exceptions.py b/pomice/exceptions.py index 4019e3b..8de6fc7 100644 --- a/pomice/exceptions.py +++ b/pomice/exceptions.py @@ -70,7 +70,8 @@ class TrackInvalidPosition(PomiceException): class TrackLoadError(PomiceException): """There was an error while loading a track.""" - pass + def __init__(self, message: str = "Sorry, I ran into trouble trying to load that track."): + super().__init__(message) class FilterInvalidArgument(PomiceException): @@ -112,13 +113,17 @@ class QueueException(Exception): class QueueFull(QueueException): """Exception raised when attempting to add to a full Queue.""" - pass + def __init__(self, message: str = "Whoops! The queue is completely full right now."): + super().__init__(message) class QueueEmpty(QueueException): """Exception raised when attempting to retrieve from an empty Queue.""" - pass + def __init__( + self, message: str = "It looks like the queue is empty. There's no more music to play!", + ): + super().__init__(message) class LavalinkVersionIncompatible(PomiceException): diff --git a/pomice/filters.py b/pomice/filters.py index f0df953..6d143f7 100644 --- a/pomice/filters.py +++ b/pomice/filters.py @@ -110,10 +110,7 @@ def flat(cls) -> "Equalizer": @classmethod def boost(cls) -> "Equalizer": - """Equalizer preset which boosts the sound of a track, - making it sound fun and energetic by increasing the bass - and the highs. - """ + """A lively preset that boosts both bass and highs, making the music feel more energetic and fun.""" levels = [ (0, -0.075), @@ -134,11 +131,16 @@ def boost(cls) -> "Equalizer": ] return cls(tag="boost", levels=levels) + @classmethod + def bass_boost_light(cls) -> "Equalizer": + """A light touch for people who want a bit more bass without it becoming overwhelming.""" + levels = [(0, 0.15), (1, 0.1), (2, 0.05)] + return cls(tag="bass_boost_light", levels=levels) + @classmethod def metal(cls) -> "Equalizer": - """Equalizer preset which increases the mids of a track, - preferably one of the metal genre, to make it sound - more full and concert-like. + """A heavy preset designed to bring out the intensity of metal and rock. + It boosts the mids and highs to create a fuller, stage-like sound experience. """ levels = [ @@ -161,6 +163,54 @@ def metal(cls) -> "Equalizer": return cls(tag="metal", levels=levels) + @classmethod + def pop(cls) -> "Equalizer": + """A balanced preset that enhances vocals and adds a bit of 'pop' to the rhythm. + Perfect for mainstream hits. + """ + levels = [ + (0, -0.02), + (1, -0.01), + (2, 0.08), + (3, 0.1), + (4, 0.15), + (5, 0.1), + (6, 0.05), + (7, 0.0), + (8, 0.0), + (9, 0.0), + (10, 0.05), + (11, 0.1), + (12, 0.15), + (13, 0.1), + (14, 0.05), + ] + return cls(tag="pop", levels=levels) + + @classmethod + def soft(cls) -> "Equalizer": + """A gentle preset that smooths out harsh frequencies. + Ideal for acoustic tracks or when you just want a more relaxed listening experience. + """ + levels = [ + (0, 0.0), + (1, 0.0), + (2, 0.0), + (3, -0.05), + (4, -0.1), + (5, -0.1), + (6, -0.05), + (7, 0.0), + (8, 0.05), + (9, 0.1), + (10, 0.1), + (11, 0.05), + (12, 0.0), + (13, 0.0), + (14, 0.0), + ] + return cls(tag="soft", levels=levels) + @classmethod def piano(cls) -> "Equalizer": """Equalizer preset which increases the mids and highs diff --git a/pomice/history.py b/pomice/history.py new file mode 100644 index 0000000..23a9e3f --- /dev/null +++ b/pomice/history.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +from collections import deque +from typing import Deque +from typing import Iterator +from typing import List +from typing import Optional + +from .objects import Track + +__all__ = ("TrackHistory",) + + +class TrackHistory: + """Track history manager for Pomice. + + Keeps track of previously played tracks with a configurable maximum size. + Useful for implementing 'previous track' functionality and viewing play history. + """ + + __slots__ = ("_history", "max_size", "_current_index") + + def __init__(self, max_size: int = 100) -> None: + """Initialize the track history. + + Parameters + ---------- + max_size: int + Maximum number of tracks to keep in history. Defaults to 100. + """ + self.max_size = max_size + self._history: Deque[Track] = deque(maxlen=max_size) + self._current_index: int = -1 + + def __len__(self) -> int: + """Return the number of tracks in history.""" + return len(self._history) + + def __bool__(self) -> bool: + """Return True if history contains tracks.""" + return bool(self._history) + + def __iter__(self) -> Iterator[Track]: + """Iterate over tracks in history (newest to oldest).""" + return reversed(self._history) + + def __getitem__(self, index: int) -> Track: + """Get a track at the given index in history. + + Parameters + ---------- + index: int + Index of the track (0 = most recent) + """ + return self._history[-(index + 1)] + + def __repr__(self) -> str: + return f"" + + def add(self, track: Track) -> None: + """Add a track to the history. + + Parameters + ---------- + track: Track + The track to add to history + """ + self._history.append(track) + self._current_index = len(self._history) - 1 + + def get_last(self, count: int = 1) -> List[Track]: + """Get the last N tracks from history. + + Parameters + ---------- + count: int + Number of tracks to retrieve. Defaults to 1. + + Returns + ------- + List[Track] + List of the last N tracks (most recent first) + """ + if count <= 0: + return [] + return list(reversed(list(self._history)[-count:])) + + def get_previous(self) -> Optional[Track]: + """Get the previous track in history. + + Returns + ------- + Optional[Track] + The previous track, or None if at the beginning + """ + if not self._history or self._current_index <= 0: + return None + + self._current_index -= 1 + return self._history[self._current_index] + + def get_next(self) -> Optional[Track]: + """Get the next track in history (when navigating backwards). + + Returns + ------- + Optional[Track] + The next track, or None if at the end + """ + if not self._history or self._current_index >= len(self._history) - 1: + return None + + self._current_index += 1 + return self._history[self._current_index] + + def clear(self) -> None: + """Clear all tracks from history.""" + self._history.clear() + self._current_index = -1 + + def get_all(self) -> List[Track]: + """Get all tracks in history. + + Returns + ------- + List[Track] + All tracks in history (most recent first) + """ + return list(reversed(self._history)) + + def search(self, query: str) -> List[Track]: + """Search for tracks in history by title or author. + + Parameters + ---------- + query: str + Search query (case-insensitive) + + Returns + ------- + List[Track] + Matching tracks (most recent first) + """ + query_lower = query.lower() + return [ + track + for track in reversed(self._history) + if query_lower in track.title.lower() or query_lower in track.author.lower() + ] + + def get_unique_tracks(self) -> List[Track]: + """Get unique tracks from history (removes duplicates). + + Returns + ------- + List[Track] + Unique tracks (most recent occurrence kept) + """ + seen = set() + unique = [] + for track in reversed(self._history): + if track.track_id not in seen: + seen.add(track.track_id) + unique.append(track) + return unique + + def get_by_requester(self, requester_id: int) -> List[Track]: + """Get all tracks requested by a specific user. + + Parameters + ---------- + requester_id: int + Discord user ID + + Returns + ------- + List[Track] + Tracks requested by the user (most recent first) + """ + return [ + track + for track in reversed(self._history) + if track.requester and track.requester.id == requester_id + ] + + @property + def is_empty(self) -> bool: + """Check if history is empty.""" + return len(self._history) == 0 + + @property + def current(self) -> Optional[Track]: + """Get the current track in navigation.""" + if not self._history or self._current_index < 0: + return None + return self._history[self._current_index] diff --git a/pomice/player.py b/pomice/player.py index da8d24d..dead4d9 100644 --- a/pomice/player.py +++ b/pomice/player.py @@ -26,11 +26,11 @@ from .exceptions import TrackLoadError from .filters import Filter from .filters import Timescale +from .history import TrackHistory from .objects import Playlist from .objects import Track -from .pool import Node -from .pool import NodePool -from pomice.utils import LavalinkVersion +from .queue import Queue +from .queue_stats import QueueStats if TYPE_CHECKING: from discord.types.voice import VoiceServerUpdate @@ -154,6 +154,9 @@ class Player(VoiceProtocol): "_log", "_voice_state", "_player_endpoint_uri", + "queue", + "history", + "autoplay", ) def __call__(self, client: Client, channel: VoiceChannel) -> Player: @@ -191,6 +194,10 @@ def __init__( self._player_endpoint_uri: str = f"sessions/{self._node._session_id}/players" + self.queue: Queue = Queue() + self.history: TrackHistory = TrackHistory() + self.autoplay: bool = False + def __repr__(self) -> str: return ( f" bool: @property def is_paused(self) -> bool: - """Property which returns whether or not the player has a track which is paused or not.""" + """Returns True if the music is currently paused.""" return self._is_connected and self._paused @property @@ -358,6 +365,8 @@ async def _dispatch_event(self, data: dict) -> None: event: PomiceEvent = getattr(events, event_type)(data, self) if isinstance(event, TrackEndEvent) and event.reason not in ("REPLACED", "replaced"): + if self._current: + self.history.add(self._current) self._current = None event.dispatch(self._bot) @@ -763,3 +772,39 @@ async def reset_filters(self, *, fast_apply: bool = False) -> None: if self._log: self._log.debug(f"Fast apply passed, now removing all filters instantly.") await self.seek(self.position) + + async def do_next(self) -> Optional[Track]: + """Automatically picks the next track from the queue and plays it. + If the queue is empty and autoplay is on, it will search for recommended tracks. + + Returns + ------- + Optional[Track] + The track that's now playing, or None if we've run out of music. + """ + if self.queue.is_empty: + if self.autoplay and self._current: + recommendations = await self.get_recommendations(track=self._current) + if recommendations: + if isinstance(recommendations, Playlist): + track = recommendations.tracks[0] + else: + track = recommendations[0] + + await self.play(track) + return track + return None + + track = self.queue.get() + await self.play(track) + return track + + def get_stats(self) -> QueueStats: + """Get detailed statistics for the current player and queue. + + Returns + ------- + QueueStats + A QueueStats object containing detailed analytics. + """ + return QueueStats(self.queue) diff --git a/pomice/playlist_manager.py b/pomice/playlist_manager.py new file mode 100644 index 0000000..930842d --- /dev/null +++ b/pomice/playlist_manager.py @@ -0,0 +1,304 @@ +from __future__ import annotations + +import json +from datetime import datetime +from pathlib import Path +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .objects import Track + from .queue import Queue + +__all__ = ("PlaylistManager",) + + +class PlaylistManager: + """Manager for exporting and importing playlists. + + Allows saving queue contents to JSON files and loading them back, + useful for persistent playlists and sharing. + """ + + @staticmethod + def export_queue( + queue: Queue, + filepath: str, + *, + name: Optional[str] = None, + description: Optional[str] = None, + include_metadata: bool = True, + ) -> None: + """Export a queue to a JSON file. + + Parameters + ---------- + queue: Queue + The queue to export + filepath: str + Path to save the JSON file + name: Optional[str] + Name for the playlist. Defaults to filename. + description: Optional[str] + Description for the playlist + include_metadata: bool + Whether to include requester and timestamp metadata. Defaults to True. + """ + path = Path(filepath) + + if name is None: + name = path.stem + + tracks_data = [] + for track in queue: + track_dict = { + "title": track.title, + "author": track.author, + "uri": track.uri, + "identifier": track.identifier, + "length": track.length, + "is_stream": track.is_stream, + } + + if include_metadata: + track_dict["requester_id"] = track.requester.id if track.requester else None + track_dict["requester_name"] = str(track.requester) if track.requester else None + track_dict["timestamp"] = track.timestamp + + if track.thumbnail: + track_dict["thumbnail"] = track.thumbnail + + if track.isrc: + track_dict["isrc"] = track.isrc + + if track.playlist: + track_dict["playlist_name"] = track.playlist.name + + tracks_data.append(track_dict) + + playlist_data = { + "name": name, + "description": description, + "created_at": datetime.utcnow().isoformat(), + "track_count": len(tracks_data), + "total_duration": sum(t["length"] for t in tracks_data), + "tracks": tracks_data, + "version": "1.0", + } + + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(playlist_data, f, indent=2, ensure_ascii=False) + + @staticmethod + def import_playlist(filepath: str) -> Dict[str, Any]: + """Import a playlist from a JSON file. + + Parameters + ---------- + filepath: str + Path to the JSON file + + Returns + ------- + Dict[str, Any] + Dictionary containing playlist data: + - 'name': Playlist name + - 'description': Playlist description + - 'tracks': List of track data dictionaries + - 'track_count': Number of tracks + - 'total_duration': Total duration in milliseconds + - 'created_at': Creation timestamp + """ + path = Path(filepath) + + with open(path, encoding="utf-8") as f: + data = json.load(f) + + return data + + @staticmethod + def export_track_list( + tracks: List[Track], + filepath: str, + *, + name: Optional[str] = None, + description: Optional[str] = None, + ) -> None: + """Export a list of tracks to a JSON file. + + Parameters + ---------- + tracks: List[Track] + List of tracks to export + filepath: str + Path to save the JSON file + name: Optional[str] + Name for the playlist + description: Optional[str] + Description for the playlist + """ + path = Path(filepath) + + if name is None: + name = path.stem + + tracks_data = [ + { + "title": track.title, + "author": track.author, + "uri": track.uri, + "identifier": track.identifier, + "length": track.length, + "thumbnail": track.thumbnail, + "isrc": track.isrc, + } + for track in tracks + ] + + playlist_data = { + "name": name, + "description": description, + "created_at": datetime.utcnow().isoformat(), + "track_count": len(tracks_data), + "total_duration": sum(t["length"] for t in tracks_data), + "tracks": tracks_data, + "version": "1.0", + } + + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(playlist_data, f, indent=2, ensure_ascii=False) + + @staticmethod + def get_track_uris(filepath: str) -> List[str]: + """Get list of track URIs from a saved playlist. + + Parameters + ---------- + filepath: str + Path to the JSON file + + Returns + ------- + List[str] + List of track URIs + """ + data = PlaylistManager.import_playlist(filepath) + return [track["uri"] for track in data["tracks"] if track.get("uri")] + + @staticmethod + def merge_playlists( + filepaths: List[str], + output_path: str, + *, + name: Optional[str] = None, + description: Optional[str] = None, + remove_duplicates: bool = True, + ) -> None: + """Merge multiple playlists into one. + + Parameters + ---------- + filepaths: List[str] + List of playlist file paths to merge + output_path: str + Path to save the merged playlist + name: Optional[str] + Name for the merged playlist + description: Optional[str] + Description for the merged playlist + remove_duplicates: bool + Whether to remove duplicate tracks (by URI). Defaults to True. + """ + all_tracks = [] + seen_uris = set() + + for filepath in filepaths: + data = PlaylistManager.import_playlist(filepath) + + for track in data["tracks"]: + uri = track.get("uri", "") + + if remove_duplicates: + if uri and uri in seen_uris: + continue + if uri: + seen_uris.add(uri) + + all_tracks.append(track) + + merged_data = { + "name": name or "Merged Playlist", + "description": description or f"Merged from {len(filepaths)} playlists", + "created_at": datetime.utcnow().isoformat(), + "track_count": len(all_tracks), + "total_duration": sum(t["length"] for t in all_tracks), + "tracks": all_tracks, + "version": "1.0", + } + + output = Path(output_path) + output.parent.mkdir(parents=True, exist_ok=True) + with open(output, "w", encoding="utf-8") as f: + json.dump(merged_data, f, indent=2, ensure_ascii=False) + + @staticmethod + def export_to_m3u( + tracks: List[Track], + filepath: str, + *, + name: Optional[str] = None, + ) -> None: + """Export tracks to M3U playlist format. + + Parameters + ---------- + tracks: List[Track] + List of tracks to export + filepath: str + Path to save the M3U file + name: Optional[str] + Playlist name for the header + """ + path = Path(filepath) + path.parent.mkdir(parents=True, exist_ok=True) + + with open(path, "w", encoding="utf-8") as f: + f.write("#EXTM3U\n") + if name: + f.write(f"#PLAYLIST:{name}\n") + + for track in tracks: + # Duration in seconds + duration = track.length // 1000 + f.write(f"#EXTINF:{duration},{track.author} - {track.title}\n") + f.write(f"{track.uri}\n") + + @staticmethod + def get_playlist_info(filepath: str) -> Dict[str, Any]: + """Get basic information about a saved playlist without loading all tracks. + + Parameters + ---------- + filepath: str + Path to the JSON file + + Returns + ------- + Dict[str, Any] + Dictionary with playlist metadata (name, track_count, duration, etc.) + """ + data = PlaylistManager.import_playlist(filepath) + + return { + "name": data.get("name"), + "description": data.get("description"), + "track_count": data.get("track_count"), + "total_duration": data.get("total_duration"), + "created_at": data.get("created_at"), + "version": data.get("version"), + } diff --git a/pomice/queue.py b/pomice/queue.py index 7ff2679..47123b0 100644 --- a/pomice/queue.py +++ b/pomice/queue.py @@ -310,8 +310,8 @@ def copy(self) -> Queue: return new_queue def clear(self) -> None: - """Remove all items from the queue.""" - self._queue.clear() + """Wipes the entire queue clean, removing all tracks.""" + self._queue = [] def set_loop_mode(self, mode: LoopMode) -> None: """ @@ -343,8 +343,40 @@ def disable_loop(self) -> None: self._loop_mode = None def shuffle(self) -> None: - """Shuffles the queue.""" - return random.shuffle(self._queue) + """Mixes up the entire queue in a random order.""" + random.shuffle(self._queue) + + def move(self, from_index: int, to_index: int) -> None: + """Moves a track from one spot in the queue to another. + + Parameters + ---------- + from_index: int + The current position of the track (0-indexed). + to_index: int + Where you want to put the track. + """ + if from_index == to_index: + return + + track = self._queue.pop(from_index) + self._queue.insert(to_index, track) + + def remove_duplicates(self) -> int: + """Looks through the queue and removes any tracks that appear more than once. + Returns the number of duplicate tracks removed. + """ + initial_count = len(self._queue) + seen_ids = set() + unique_queue = [] + + for track in self._queue: + if track.track_id not in seen_ids: + unique_queue.append(track) + seen_ids.add(track.track_id) + + self._queue = unique_queue + return initial_count - len(self._queue) def clear_track_filters(self) -> None: """Clears all filters applied to tracks""" @@ -372,3 +404,15 @@ def jump(self, item: Track) -> None: else: new_queue = self._queue[index : self.size] self._queue = new_queue + + def get_stats(self) -> pomice.QueueStats: + """Get detailed statistics for this queue. + + Returns + ------- + QueueStats + A QueueStats object containing detailed analytics. + """ + from .queue_stats import QueueStats + + return QueueStats(self) diff --git a/pomice/queue_stats.py b/pomice/queue_stats.py new file mode 100644 index 0000000..8ef852d --- /dev/null +++ b/pomice/queue_stats.py @@ -0,0 +1,272 @@ +from __future__ import annotations + +from collections import Counter +from typing import Dict +from typing import List +from typing import Optional +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .objects import Track + from .queue import Queue + +__all__ = ("QueueStats",) + + +class QueueStats: + """Advanced statistics for a Pomice Queue. + + Provides detailed analytics about queue contents including duration, + requester statistics, and track distribution. + """ + + def __init__(self, queue: Queue) -> None: + """Initialize queue statistics. + + Parameters + ---------- + queue: Queue + The queue to analyze + """ + self._queue = queue + + @property + def total_duration(self) -> int: + """Get total duration of all tracks in queue (milliseconds). + + Returns + ------- + int + Total duration in milliseconds + """ + return sum(track.length for track in self._queue) + + @property + def average_duration(self) -> float: + """Get average track duration in queue (milliseconds). + + Returns + ------- + float + Average duration in milliseconds, or 0.0 if queue is empty + """ + if self._queue.is_empty: + return 0.0 + return self.total_duration / len(self._queue) + + @property + def longest_track(self) -> Optional[Track]: + """Get the longest track in the queue. + + Returns + ------- + Optional[Track] + The longest track, or None if queue is empty + """ + if self._queue.is_empty: + return None + return max(self._queue, key=lambda t: t.length) + + @property + def shortest_track(self) -> Optional[Track]: + """Get the shortest track in the queue. + + Returns + ------- + Optional[Track] + The shortest track, or None if queue is empty + """ + if self._queue.is_empty: + return None + return min(self._queue, key=lambda t: t.length) + + def get_requester_stats(self) -> Dict[int, Dict[str, any]]: + """Get statistics grouped by requester. + + Returns + ------- + Dict[int, Dict[str, any]] + Dictionary mapping user IDs to their stats: + - 'count': Number of tracks requested + - 'total_duration': Total duration of their tracks (ms) + - 'tracks': List of their tracks + """ + stats: Dict[int, Dict] = {} + + for track in self._queue: + if not track.requester: + continue + + user_id = track.requester.id + if user_id not in stats: + stats[user_id] = { + "count": 0, + "total_duration": 0, + "tracks": [], + "requester": track.requester, + } + + stats[user_id]["count"] += 1 + stats[user_id]["total_duration"] += track.length + stats[user_id]["tracks"].append(track) + + return stats + + def get_top_requesters(self, limit: int = 5) -> List[tuple]: + """Get top requesters by track count. + + Parameters + ---------- + limit: int + Maximum number of requesters to return. Defaults to 5. + + Returns + ------- + List[tuple] + List of (requester, count) tuples sorted by count (descending) + """ + requester_counts = Counter(track.requester.id for track in self._queue if track.requester) + + # Get requester objects + stats = self.get_requester_stats() + return [ + (stats[user_id]["requester"], count) + for user_id, count in requester_counts.most_common(limit) + ] + + def get_author_distribution(self) -> Dict[str, int]: + """Get distribution of tracks by author. + + Returns + ------- + Dict[str, int] + Dictionary mapping author names to track counts + """ + return dict(Counter(track.author for track in self._queue)) + + def get_top_authors(self, limit: int = 10) -> List[tuple]: + """Get most common authors in the queue. + + Parameters + ---------- + limit: int + Maximum number of authors to return. Defaults to 10. + + Returns + ------- + List[tuple] + List of (author, count) tuples sorted by count (descending) + """ + author_counts = Counter(track.author for track in self._queue) + return author_counts.most_common(limit) + + def get_stream_count(self) -> int: + """Get count of streams in the queue. + + Returns + ------- + int + Number of streams + """ + return sum(1 for track in self._queue if track.is_stream) + + def get_playlist_distribution(self) -> Dict[str, int]: + """Get distribution of tracks by playlist. + + Returns + ------- + Dict[str, int] + Dictionary mapping playlist names to track counts + """ + distribution: Dict[str, int] = {} + + for track in self._queue: + if track.playlist: + playlist_name = track.playlist.name + distribution[playlist_name] = distribution.get(playlist_name, 0) + 1 + + return distribution + + def get_duration_breakdown(self) -> Dict[str, int]: + """Get breakdown of tracks by duration categories. + + Returns + ------- + Dict[str, int] + Dictionary with counts for different duration ranges: + - 'short' (< 3 min) + - 'medium' (3-6 min) + - 'long' (6-10 min) + - 'very_long' (> 10 min) + """ + breakdown = { + "short": 0, # < 3 minutes + "medium": 0, # 3-6 minutes + "long": 0, # 6-10 minutes + "very_long": 0, # > 10 minutes + } + + for track in self._queue: + duration_minutes = track.length / 60000 # Convert ms to minutes + + if duration_minutes < 3: + breakdown["short"] += 1 + elif duration_minutes < 6: + breakdown["medium"] += 1 + elif duration_minutes < 10: + breakdown["long"] += 1 + else: + breakdown["very_long"] += 1 + + return breakdown + + def format_duration(self, milliseconds: int) -> str: + """Format duration in milliseconds to human-readable string. + + Parameters + ---------- + milliseconds: int + Duration in milliseconds + + Returns + ------- + str + Formatted duration (e.g., "1:23:45" or "5:30") + """ + seconds = milliseconds // 1000 + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + + if hours > 0: + return f"{hours}:{minutes:02d}:{seconds:02d}" + return f"{minutes}:{seconds:02d}" + + def get_summary(self) -> Dict[str, any]: + """Get a comprehensive summary of queue statistics. + + Returns + ------- + Dict[str, any] + Dictionary containing various queue statistics + """ + return { + "total_tracks": len(self._queue), + "total_duration": self.total_duration, + "total_duration_formatted": self.format_duration(self.total_duration), + "average_duration": self.average_duration, + "average_duration_formatted": self.format_duration(int(self.average_duration)), + "longest_track": self.longest_track, + "shortest_track": self.shortest_track, + "stream_count": self.get_stream_count(), + "unique_authors": len(self.get_author_distribution()), + "unique_requesters": len(self.get_requester_stats()), + "duration_breakdown": self.get_duration_breakdown(), + "loop_mode": self._queue.loop_mode, + "is_looping": self._queue.is_looping, + } + + def __repr__(self) -> str: + return ( + f"" + ) diff --git a/pomice/spotify/client.py b/pomice/spotify/client.py index b822ab3..472aa3e 100644 --- a/pomice/spotify/client.py +++ b/pomice/spotify/client.py @@ -286,25 +286,16 @@ async def fetch(offset: int) -> List[Track]: # Fetch pages in rolling waves; yield promptly as soon as a wave completes. wave_size = self._playlist_concurrency * 2 - for i, offset in enumerate(remaining_offsets): - # Build wave - if i % wave_size == 0: - wave_offsets = list( - o for o in remaining_offsets if o >= offset and o < offset + wave_size - ) - results = await asyncio.gather(*[fetch(o) for o in wave_offsets]) - for page_tracks in results: - if not page_tracks: - continue - for j in range(0, len(page_tracks), batch_size): - yield page_tracks[j : j + batch_size] - # Skip ahead in iterator by adjusting enumerate drive (consume extras) - # Fast-forward the generator manually - for _ in range(len(wave_offsets) - 1): - try: - next(remaining_offsets) # type: ignore - except StopIteration: - break + remaining_offsets_list = list(remaining_offsets) + + for i in range(0, len(remaining_offsets_list), wave_size): + wave_offsets = remaining_offsets_list[i : i + wave_size] + results = await asyncio.gather(*[fetch(o) for o in wave_offsets]) + for page_tracks in results: + if not page_tracks: + continue + for j in range(0, len(page_tracks), batch_size): + yield page_tracks[j : j + batch_size] async def get_recommendations(self, *, query: str) -> List[Track]: if not self._bearer_token or time.time() >= self._expiry: diff --git a/pomice/track_utils.py b/pomice/track_utils.py new file mode 100644 index 0000000..9554c21 --- /dev/null +++ b/pomice/track_utils.py @@ -0,0 +1,405 @@ +from __future__ import annotations + +from typing import Callable +from typing import List +from typing import Optional +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .objects import Track + +__all__ = ("TrackFilter", "SearchHelper") + + +class TrackFilter: + """Advanced filtering utilities for tracks. + + Provides various filter functions to find tracks matching specific criteria. + """ + + @staticmethod + def by_duration( + tracks: List[Track], + *, + min_duration: Optional[int] = None, + max_duration: Optional[int] = None, + ) -> List[Track]: + """Filter tracks by duration range. + + Parameters + ---------- + tracks: List[Track] + List of tracks to filter + min_duration: Optional[int] + Minimum duration in milliseconds + max_duration: Optional[int] + Maximum duration in milliseconds + + Returns + ------- + List[Track] + Filtered tracks + """ + result = tracks + + if min_duration is not None: + result = [t for t in result if t.length >= min_duration] + + if max_duration is not None: + result = [t for t in result if t.length <= max_duration] + + return result + + @staticmethod + def by_author(tracks: List[Track], author: str, *, exact: bool = False) -> List[Track]: + """Filter tracks by author name. + + Parameters + ---------- + tracks: List[Track] + List of tracks to filter + author: str + Author name to search for + exact: bool + Whether to match exactly. Defaults to False (case-insensitive contains). + + Returns + ------- + List[Track] + Filtered tracks + """ + if exact: + return [t for t in tracks if t.author == author] + + author_lower = author.lower() + return [t for t in tracks if author_lower in t.author.lower()] + + @staticmethod + def by_title(tracks: List[Track], title: str, *, exact: bool = False) -> List[Track]: + """Filter tracks by title. + + Parameters + ---------- + tracks: List[Track] + List of tracks to filter + title: str + Title to search for + exact: bool + Whether to match exactly. Defaults to False (case-insensitive contains). + + Returns + ------- + List[Track] + Filtered tracks + """ + if exact: + return [t for t in tracks if t.title == title] + + title_lower = title.lower() + return [t for t in tracks if title_lower in t.title.lower()] + + @staticmethod + def by_requester(tracks: List[Track], requester_id: int) -> List[Track]: + """Filter tracks by requester. + + Parameters + ---------- + tracks: List[Track] + List of tracks to filter + requester_id: int + Discord user ID + + Returns + ------- + List[Track] + Filtered tracks + """ + return [t for t in tracks if t.requester and t.requester.id == requester_id] + + @staticmethod + def by_playlist(tracks: List[Track], playlist_name: str) -> List[Track]: + """Filter tracks by playlist name. + + Parameters + ---------- + tracks: List[Track] + List of tracks to filter + playlist_name: str + Playlist name to search for + + Returns + ------- + List[Track] + Filtered tracks + """ + playlist_lower = playlist_name.lower() + return [t for t in tracks if t.playlist and playlist_lower in t.playlist.name.lower()] + + @staticmethod + def streams_only(tracks: List[Track]) -> List[Track]: + """Filter to only include streams. + + Parameters + ---------- + tracks: List[Track] + List of tracks to filter + + Returns + ------- + List[Track] + Only stream tracks + """ + return [t for t in tracks if t.is_stream] + + @staticmethod + def non_streams_only(tracks: List[Track]) -> List[Track]: + """Filter to exclude streams. + + Parameters + ---------- + tracks: List[Track] + List of tracks to filter + + Returns + ------- + List[Track] + Only non-stream tracks + """ + return [t for t in tracks if not t.is_stream] + + @staticmethod + def custom(tracks: List[Track], predicate: Callable[[Track], bool]) -> List[Track]: + """Filter tracks using a custom predicate function. + + Parameters + ---------- + tracks: List[Track] + List of tracks to filter + predicate: Callable[[Track], bool] + Function that returns True for tracks to include + + Returns + ------- + List[Track] + Filtered tracks + """ + return [t for t in tracks if predicate(t)] + + +class SearchHelper: + """Helper utilities for searching and sorting tracks.""" + + @staticmethod + def search_tracks( + tracks: List[Track], + query: str, + *, + search_title: bool = True, + search_author: bool = True, + case_sensitive: bool = False, + ) -> List[Track]: + """Search tracks by query string. + + Parameters + ---------- + tracks: List[Track] + List of tracks to search + query: str + Search query + search_title: bool + Whether to search in titles. Defaults to True. + search_author: bool + Whether to search in authors. Defaults to True. + case_sensitive: bool + Whether search is case-sensitive. Defaults to False. + + Returns + ------- + List[Track] + Matching tracks + """ + if not case_sensitive: + query = query.lower() + + results = [] + for track in tracks: + title = track.title if case_sensitive else track.title.lower() + author = track.author if case_sensitive else track.author.lower() + + if search_title and query in title: + results.append(track) + elif search_author and query in author: + results.append(track) + + return results + + @staticmethod + def sort_by_duration( + tracks: List[Track], + *, + reverse: bool = False, + ) -> List[Track]: + """Sort tracks by duration. + + Parameters + ---------- + tracks: List[Track] + List of tracks to sort + reverse: bool + If True, sort longest to shortest. Defaults to False. + + Returns + ------- + List[Track] + Sorted tracks + """ + return sorted(tracks, key=lambda t: t.length, reverse=reverse) + + @staticmethod + def sort_by_title( + tracks: List[Track], + *, + reverse: bool = False, + ) -> List[Track]: + """Sort tracks alphabetically by title. + + Parameters + ---------- + tracks: List[Track] + List of tracks to sort + reverse: bool + If True, sort Z to A. Defaults to False. + + Returns + ------- + List[Track] + Sorted tracks + """ + return sorted(tracks, key=lambda t: t.title.lower(), reverse=reverse) + + @staticmethod + def sort_by_author( + tracks: List[Track], + *, + reverse: bool = False, + ) -> List[Track]: + """Sort tracks alphabetically by author. + + Parameters + ---------- + tracks: List[Track] + List of tracks to sort + reverse: bool + If True, sort Z to A. Defaults to False. + + Returns + ------- + List[Track] + Sorted tracks + """ + return sorted(tracks, key=lambda t: t.author.lower(), reverse=reverse) + + @staticmethod + def remove_duplicates( + tracks: List[Track], + *, + by_uri: bool = True, + by_title_author: bool = False, + ) -> List[Track]: + """Remove duplicate tracks from a list. + + Parameters + ---------- + tracks: List[Track] + List of tracks + by_uri: bool + Remove duplicates by URI. Defaults to True. + by_title_author: bool + Remove duplicates by title+author combination. Defaults to False. + + Returns + ------- + List[Track] + List with duplicates removed (keeps first occurrence) + """ + seen = set() + result = [] + + for track in tracks: + if by_uri: + key = track.uri + elif by_title_author: + key = (track.title.lower(), track.author.lower()) + else: + key = track.track_id + + if key not in seen: + seen.add(key) + result.append(track) + + return result + + @staticmethod + def group_by_author(tracks: List[Track]) -> dict[str, List[Track]]: + """Group tracks by author. + + Parameters + ---------- + tracks: List[Track] + List of tracks to group + + Returns + ------- + dict[str, List[Track]] + Dictionary mapping author names to their tracks + """ + groups = {} + for track in tracks: + author = track.author + if author not in groups: + groups[author] = [] + groups[author].append(track) + return groups + + @staticmethod + def group_by_playlist(tracks: List[Track]) -> dict[str, List[Track]]: + """Group tracks by playlist. + + Parameters + ---------- + tracks: List[Track] + List of tracks to group + + Returns + ------- + dict[str, List[Track]] + Dictionary mapping playlist names to their tracks + """ + groups = {} + for track in tracks: + if track.playlist: + playlist_name = track.playlist.name + if playlist_name not in groups: + groups[playlist_name] = [] + groups[playlist_name].append(track) + return groups + + @staticmethod + def get_random_tracks(tracks: List[Track], count: int) -> List[Track]: + """Get random tracks from a list. + + Parameters + ---------- + tracks: List[Track] + List of tracks + count: int + Number of random tracks to get + + Returns + ------- + List[Track] + Random tracks (without replacement) + """ + import random + + return random.sample(tracks, min(count, len(tracks)))