diff --git a/musync/entity/track.py b/musync/entity/track.py index 833cff9..7b7ea49 100644 --- a/musync/entity/track.py +++ b/musync/entity/track.py @@ -18,7 +18,7 @@ class Track: artist_ids: list[str] name: str duration: timedelta - date_added: Optional[dt] # relates to playlist, requires better structure + date_added: Optional[dt] = None # relates to playlist, requires better structure origin: Origin = Origin.UNKNOWN @classmethod diff --git a/musync/error.py b/musync/error.py index 5848105..29aa5fd 100644 --- a/musync/error.py +++ b/musync/error.py @@ -1,2 +1,14 @@ class PlaylistNotFoundError(Exception): """Raised when a playlist is not found in the database.""" + + +class MissingPrivilegesError(Exception): + """Raised when a user does not have the necessary privileges.""" + + +class IncompatibleEntityError(Exception): + """Raised when an entity is not compatible with the operation.""" + + +class TrackNotFoundWarning(Warning): + """Raised when a track is not found in the database.""" diff --git a/musync/session.py b/musync/session.py index 55af16d..1205eff 100644 --- a/musync/session.py +++ b/musync/session.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from typing import Iterable import spotipy import tidalapi @@ -29,3 +30,7 @@ def load_playlist_tracks(self, playlist: Playlist) -> list[Track]: @abstractmethod def find_track(self, track: Track) -> Track | None: pass + + @abstractmethod + def add_to_playlist(self, playlist: Playlist, tracks: Iterable[Track]) -> None: + pass diff --git a/musync/spotify/session.py b/musync/spotify/session.py index a78316b..c31f81f 100644 --- a/musync/spotify/session.py +++ b/musync/spotify/session.py @@ -1,9 +1,10 @@ import os +from typing import Iterable import spotipy from dotenv import load_dotenv -from musync.entity import Artist, Playlist, Track, User +from musync.entity import Artist, Origin, Playlist, Track, User from musync.session import Session load_dotenv() @@ -88,3 +89,22 @@ def load_artist(self, artist_id: str) -> Artist | None: return Artist.from_spotify(artist) return None + + def add_to_playlist(self, playlist: Playlist, tracks: Iterable[Track]) -> None: + if playlist.origin != Origin.SPOTIFY: + raise ValueError(f"Playlist is not from Spotify ({playlist=}).") + + if not all(tr.origin == Origin.SPOTIFY for tr in tracks): + raise ValueError( + f"The tracks contain tracks that are not from Spotify ({tracks=})." + ) + + if self.user.user_id != playlist.owner_id: + raise ValueError( + f"The session user does not own the playlist ({playlist=})." + ) + + user_id = self.user.user_id + playlist_id = playlist.playlist_id + track_ids = [tr.track_id for tr in tracks] + self._client.user_playlist_add_tracks(user_id, playlist_id, track_ids) diff --git a/musync/sync_manager.py b/musync/sync_manager.py index 883563c..3e7837d 100644 --- a/musync/sync_manager.py +++ b/musync/sync_manager.py @@ -1,4 +1,7 @@ +import warnings + from musync.entity import Playlist, Track +from musync.error import MissingPrivilegesError, TrackNotFoundWarning from musync.spotify import SpotifySession from musync.tidal import TidalSession @@ -14,7 +17,7 @@ def __init__(self, session1: SessionType, session2: SessionType) -> None: self._session1 = session1 self._session2 = session2 - def get_common_playlists(self) -> list[Playlist]: + def get_common_playlists(self) -> list[tuple[Playlist, Playlist]]: playlists1 = self._session1.load_playlists() playlists2 = self._session2.load_playlists() common_playlists = [] @@ -28,23 +31,77 @@ def get_common_playlists(self) -> list[Playlist]: def get_missing_tracks( self, src_playlist: Playlist, dest_playlist: Playlist ) -> list[Track]: + if src_playlist.origin == self._session1.user.origin: + src_session = self._session1 + elif src_playlist.origin == self._session2.user.origin: + src_session = self._session2 + else: + raise MissingPrivilegesError( + f"The user does not have access to the source playlist ({src_playlist=})" + ) + if ( - self._session1.user.origin == dest_playlist.origin - and self._session1.user.user_id == dest_playlist.owner_id + dest_playlist.origin == self._session1.user.origin + and dest_playlist.owner_id == self._session1.user.user_id ): - src_tracks = self._session1.load_playlist_tracks(dest_playlist) - dest_tracks = self._session2.load_playlist_tracks(src_playlist) + dest_session = self._session1 elif ( - self._session2.user.user_id == dest_playlist.owner_id - and self._session2.user.origin == dest_playlist.origin + dest_playlist.origin == self._session2.user.origin + and dest_playlist.owner_id == self._session2.user.user_id ): - src_tracks = self._session2.load_playlist_tracks(dest_playlist) - dest_tracks = self._session1.load_playlist_tracks(src_playlist) + dest_session = self._session2 else: - raise ValueError( - "Invalid playlist" - ) # TODO: Create more expressive custom error + raise MissingPrivilegesError( + f"The user does not have access to the destination playlist ({dest_playlist=})" + ) + + src_tracks = src_session.load_playlist_tracks(src_playlist) + dest_tracks = dest_session.load_playlist_tracks(dest_playlist) + + return [st for st in src_tracks if all(not st.equals(dt) for dt in dest_tracks)] + + def sync_common_playlists(self) -> None: + common_playlists = self.get_common_playlists() + + for src_playlist, dest_playlist in common_playlists: + if dest_playlist.owner_id == self._session2.user.user_id: + self.sync_playlists(src_playlist, dest_playlist) + + for dest_playlist, src_playlist in common_playlists: + if dest_playlist.owner_id == self._session1.user.user_id: + self.sync_playlists(src_playlist, dest_playlist) + + def sync_playlists(self, src_playlist, dest_playlist) -> None: + if ( + dest_playlist.origin == self._session1.user.origin + and dest_playlist.owner_id == self._session1.user.user_id + ): + dest_session = self._session1 + elif ( + dest_playlist.origin == self._session2.user.origin + and dest_playlist.owner_id == self._session2.user.user_id + ): + dest_session = self._session2 + else: + raise MissingPrivilegesError( + f"The user does not have access to the destination playlist ({dest_playlist=})" + ) + + missing_tracks_src = self.get_missing_tracks(src_playlist, dest_playlist) + missing_tracks_dest = [] + for src_track in missing_tracks_src: + if src_track.origin == dest_session.user.origin: + dest_track = src_track + else: + dest_track = dest_session.find_track(src_track) - dest_track_names = [t.name for t in dest_tracks] + if dest_track is None: + warnings.warn( + f"Track {src_track} not found in {dest_session}", + TrackNotFoundWarning, + ) + else: + missing_tracks_dest.append(dest_track) - return [t for t in src_tracks if t.name not in dest_track_names] + if len(missing_tracks_dest) > 0: + dest_session.add_to_playlist(dest_playlist, missing_tracks_dest) diff --git a/musync/tidal/session.py b/musync/tidal/session.py index 8079199..1a54779 100644 --- a/musync/tidal/session.py +++ b/musync/tidal/session.py @@ -1,8 +1,10 @@ from pathlib import Path +from typing import Iterable import tidalapi -from musync.entity import Artist, Playlist, Track, User +from musync.entity import Artist, Origin, Playlist, Track, User +from musync.error import IncompatibleEntityError, MissingPrivilegesError from musync.session import Session TIDAL_DIR = Path(__file__).parent.parent.parent.resolve() @@ -44,8 +46,26 @@ def find_track(self, track: Track) -> Track | None: return None def load_artist(self, artist_id: str) -> Artist | None: - artist = self._client.artist(int(artist_id)) + artist = self._client.artist(artist_id) if not isinstance(artist, tidalapi.Artist): return None return Artist.from_tidal(artist) + + def add_to_playlist(self, playlist: Playlist, tracks: Iterable[Track]) -> None: + if playlist.origin != Origin.TIDAL: + raise IncompatibleEntityError(f"Playlist is not from Tidal ({playlist=}).") + + if not all(tr.origin == Origin.TIDAL for tr in tracks): + raise IncompatibleEntityError( + f"The tracks contain tracks that are not from Tidal ({tracks=})." + ) + + if self.user.user_id != playlist.owner_id: + raise MissingPrivilegesError( + f"The session user does not own the playlist ({playlist=})." + ) + + track_ids = [tr.track_id for tr in tracks] + tidal_playlist = self._client.playlist(playlist.playlist_id) + tidal_playlist.add(track_ids) diff --git a/tests/integrationtests/spotify/test_session.py b/tests/integrationtests/spotify/test_session.py index 3b7b256..e4878bc 100644 --- a/tests/integrationtests/spotify/test_session.py +++ b/tests/integrationtests/spotify/test_session.py @@ -1,8 +1,9 @@ +import datetime import pickle import pytest -from musync.entity import Playlist, Track, User +from musync.entity import Origin, Playlist, Track, User from musync.spotify import SpotifySession from tests.unittests.entity import DATA_DIR @@ -18,6 +19,57 @@ def tidal_track(): return Track.from_tidal(pickle.load(f)) +@pytest.fixture +def spotify_track_list(): + tracks = [ + Track( + track_id="2Wqw8VqMlpNYZTfIsRqYSA", + artist_ids=["15kBh7iMF03XANu9pcSAdN", "5MUtPjZ8UJxONYzEGZeArf"], + name="Hymnesia", + duration=datetime.timedelta(seconds=374, microseconds=345000), + origin=Origin.SPOTIFY, + ), + Track( + track_id="1uRFqDldISYmGJAESdoi75", + artist_ids=["11OUxHFoGgo2NDSdT6YiEC"], + name="Closer", + duration=datetime.timedelta(seconds=245, microseconds=277000), + origin=Origin.SPOTIFY, + ), + Track( + track_id="3cefg1pboKTNqgi3k2t62h", + artist_ids=["1khyIydqanugacJyKdmceT"], + name="Omnia", + duration=datetime.timedelta(seconds=393, microseconds=24000), + origin=Origin.SPOTIFY, + ), + ] + + return tracks + + +@pytest.fixture +def create_playlist(spotify_session): + title = "Test Playlist" + description = ( + "This playlist was created by the https://github.com/klepp0/musync project." + ) + user_id = spotify_session.user.user_id + spotify_response = spotify_session._client.user_playlist_create( + user_id, + title, + public=False, + collaborative=False, + description=description, + ) + new_playlist = Playlist.from_spotify(spotify_response) + + yield new_playlist + + new_playlist_id = new_playlist.playlist_id + spotify_session._client.current_user_unfollow_playlist(new_playlist_id) + + def test_session_is_logged_in(spotify_session): return spotify_session.check_login() @@ -33,3 +85,15 @@ def test_session_playlists(spotify_session): def test_find_track(spotify_session, tidal_track): spotify_track = spotify_session.find_track(tidal_track) assert tidal_track.equals(spotify_track) + + +def test_add_to_playlist(spotify_session, create_playlist, spotify_track_list): + playlist = create_playlist + n_tracks_before = playlist.n_tracks + spotify_session.add_to_playlist(playlist, spotify_track_list) + + spotify_response = spotify_session._client.playlist(playlist.playlist_id) + updated_playlist = Playlist.from_spotify(spotify_response) + n_tracks_after = updated_playlist.n_tracks + + assert n_tracks_before < n_tracks_after diff --git a/tests/integrationtests/tidal/test_session.py b/tests/integrationtests/tidal/test_session.py index 1a60d14..aac807b 100644 --- a/tests/integrationtests/tidal/test_session.py +++ b/tests/integrationtests/tidal/test_session.py @@ -1,8 +1,9 @@ +import datetime import pickle import pytest -from musync.entity import Playlist, Track, User +from musync.entity import Origin, Playlist, Track, User from musync.tidal import TidalSession from tests.unittests.entity import DATA_DIR @@ -18,6 +19,54 @@ def spotify_track(): return Track.from_spotify(pickle.load(f)) +@pytest.fixture +def tidal_track(): + with open(DATA_DIR / "test_track_tidal.pkl", "rb") as f: + return Track.from_tidal(pickle.load(f)) + + +@pytest.fixture +def tidal_track_list(): + tracks = [ + Track( + track_id="338383176", + artist_ids=["7343085"], + name="Cabalero", + duration=datetime.timedelta(seconds=345), + origin=Origin.TIDAL, + ), + Track( + track_id="304574491", + artist_ids=["10644748", "6576000"], + name="Tension", + duration=datetime.timedelta(seconds=263), + origin=Origin.TIDAL, + ), + Track( + track_id="121277192", + artist_ids=["15910495"], + name="Overdub (Original Mix)", + duration=datetime.timedelta(seconds=348), + origin=Origin.TIDAL, + ), + ] + return tracks + + +@pytest.fixture +def create_playlist(tidal_session): + title = "Test Playlist" + description = ( + "This playlist was created by the https://github.com/klepp0/musync project." + ) + tidal_response = tidal_session._client.user.create_playlist(title, description) + new_playlist = Playlist.from_tidal(tidal_response) + + yield new_playlist + + tidal_session._client.playlist(new_playlist.playlist_id).delete() + + def test_session_is_logged_in(tidal_session): assert tidal_session.check_login() @@ -33,3 +82,15 @@ def test_session_load_playlists(tidal_session): def test_search_track(tidal_session, spotify_track): tidal_track = tidal_session.find_track(spotify_track) assert spotify_track.equals(tidal_track) + + +def test_add_to_playlist(tidal_session, create_playlist, tidal_track_list): + playlist = create_playlist + n_tracks_before = playlist.n_tracks + tidal_session.add_to_playlist(playlist, tidal_track_list) + + tidal_response = tidal_session._client.playlist(playlist.playlist_id) + updated_playlist = Playlist.from_tidal(tidal_response) + n_tracks_after = updated_playlist.n_tracks + + assert n_tracks_before < n_tracks_after