Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Tracks to Playlist #10

Merged
merged 11 commits into from
Jul 4, 2024
2 changes: 1 addition & 1 deletion musync/entity/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Track:
artist_ids: list[str]
name: str
duration: timedelta
date_added: Optional[dt] # relates to playlist, requires better structure
date_added: Optional[dt] = None # relates to playlist, requires better structure
origin: Origin = Origin.UNKNOWN

@classmethod
Expand Down
12 changes: 12 additions & 0 deletions musync/error.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,14 @@
class PlaylistNotFoundError(Exception):
"""Raised when a playlist is not found in the database."""


class MissingPrivilegesError(Exception):
"""Raised when a user does not have the necessary privileges."""


class IncompatibleEntityError(Exception):
"""Raised when an entity is not compatible with the operation."""


class TrackNotFoundWarning(Warning):
"""Raised when a track is not found in the database."""
5 changes: 5 additions & 0 deletions musync/session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from typing import Iterable

import spotipy
import tidalapi
Expand Down Expand Up @@ -29,3 +30,7 @@ def load_playlist_tracks(self, playlist: Playlist) -> list[Track]:
@abstractmethod
def find_track(self, track: Track) -> Track | None:
pass

@abstractmethod
def add_to_playlist(self, playlist: Playlist, tracks: Iterable[Track]) -> None:
pass
22 changes: 21 additions & 1 deletion musync/spotify/session.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
from typing import Iterable

import spotipy
from dotenv import load_dotenv

from musync.entity import Artist, Playlist, Track, User
from musync.entity import Artist, Origin, Playlist, Track, User
from musync.session import Session

load_dotenv()
Expand Down Expand Up @@ -88,3 +89,22 @@ def load_artist(self, artist_id: str) -> Artist | None:
return Artist.from_spotify(artist)

return None

def add_to_playlist(self, playlist: Playlist, tracks: Iterable[Track]) -> None:
if playlist.origin != Origin.SPOTIFY:
raise ValueError(f"Playlist is not from Spotify ({playlist=}).")

if not all(tr.origin == Origin.SPOTIFY for tr in tracks):
raise ValueError(
f"The tracks contain tracks that are not from Spotify ({tracks=})."
)

if self.user.user_id != playlist.owner_id:
raise ValueError(
f"The session user does not own the playlist ({playlist=})."
)

user_id = self.user.user_id
playlist_id = playlist.playlist_id
track_ids = [tr.track_id for tr in tracks]
self._client.user_playlist_add_tracks(user_id, playlist_id, track_ids)
85 changes: 71 additions & 14 deletions musync/sync_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import warnings

from musync.entity import Playlist, Track
from musync.error import MissingPrivilegesError, TrackNotFoundWarning
from musync.spotify import SpotifySession
from musync.tidal import TidalSession

Expand All @@ -14,7 +17,7 @@ def __init__(self, session1: SessionType, session2: SessionType) -> None:
self._session1 = session1
self._session2 = session2

def get_common_playlists(self) -> list[Playlist]:
def get_common_playlists(self) -> list[tuple[Playlist, Playlist]]:
playlists1 = self._session1.load_playlists()
playlists2 = self._session2.load_playlists()
common_playlists = []
Expand All @@ -28,23 +31,77 @@ def get_common_playlists(self) -> list[Playlist]:
def get_missing_tracks(
self, src_playlist: Playlist, dest_playlist: Playlist
) -> list[Track]:
if src_playlist.origin == self._session1.user.origin:
src_session = self._session1
elif src_playlist.origin == self._session2.user.origin:
src_session = self._session2
else:
raise MissingPrivilegesError(
f"The user does not have access to the source playlist ({src_playlist=})"
)

if (
self._session1.user.origin == dest_playlist.origin
and self._session1.user.user_id == dest_playlist.owner_id
dest_playlist.origin == self._session1.user.origin
and dest_playlist.owner_id == self._session1.user.user_id
):
src_tracks = self._session1.load_playlist_tracks(dest_playlist)
dest_tracks = self._session2.load_playlist_tracks(src_playlist)
dest_session = self._session1
elif (
self._session2.user.user_id == dest_playlist.owner_id
and self._session2.user.origin == dest_playlist.origin
dest_playlist.origin == self._session2.user.origin
and dest_playlist.owner_id == self._session2.user.user_id
):
src_tracks = self._session2.load_playlist_tracks(dest_playlist)
dest_tracks = self._session1.load_playlist_tracks(src_playlist)
dest_session = self._session2
else:
raise ValueError(
"Invalid playlist"
) # TODO: Create more expressive custom error
raise MissingPrivilegesError(
f"The user does not have access to the destination playlist ({dest_playlist=})"
)

src_tracks = src_session.load_playlist_tracks(src_playlist)
dest_tracks = dest_session.load_playlist_tracks(dest_playlist)

return [st for st in src_tracks if all(not st.equals(dt) for dt in dest_tracks)]

def sync_common_playlists(self) -> None:
common_playlists = self.get_common_playlists()

for src_playlist, dest_playlist in common_playlists:
if dest_playlist.owner_id == self._session2.user.user_id:
self.sync_playlists(src_playlist, dest_playlist)

for dest_playlist, src_playlist in common_playlists:
if dest_playlist.owner_id == self._session1.user.user_id:
self.sync_playlists(src_playlist, dest_playlist)

def sync_playlists(self, src_playlist, dest_playlist) -> None:
if (
dest_playlist.origin == self._session1.user.origin
and dest_playlist.owner_id == self._session1.user.user_id
):
dest_session = self._session1
elif (
dest_playlist.origin == self._session2.user.origin
and dest_playlist.owner_id == self._session2.user.user_id
):
dest_session = self._session2
else:
raise MissingPrivilegesError(
f"The user does not have access to the destination playlist ({dest_playlist=})"
)

missing_tracks_src = self.get_missing_tracks(src_playlist, dest_playlist)
missing_tracks_dest = []
for src_track in missing_tracks_src:
if src_track.origin == dest_session.user.origin:
dest_track = src_track
else:
dest_track = dest_session.find_track(src_track)

dest_track_names = [t.name for t in dest_tracks]
if dest_track is None:
warnings.warn(
f"Track {src_track} not found in {dest_session}",
TrackNotFoundWarning,
)
else:
missing_tracks_dest.append(dest_track)

return [t for t in src_tracks if t.name not in dest_track_names]
if len(missing_tracks_dest) > 0:
dest_session.add_to_playlist(dest_playlist, missing_tracks_dest)
24 changes: 22 additions & 2 deletions musync/tidal/session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from pathlib import Path
from typing import Iterable

import tidalapi

from musync.entity import Artist, Playlist, Track, User
from musync.entity import Artist, Origin, Playlist, Track, User
from musync.error import IncompatibleEntityError, MissingPrivilegesError
from musync.session import Session

TIDAL_DIR = Path(__file__).parent.parent.parent.resolve()
Expand Down Expand Up @@ -44,8 +46,26 @@ def find_track(self, track: Track) -> Track | None:
return None

def load_artist(self, artist_id: str) -> Artist | None:
artist = self._client.artist(int(artist_id))
artist = self._client.artist(artist_id)
if not isinstance(artist, tidalapi.Artist):
return None

return Artist.from_tidal(artist)

def add_to_playlist(self, playlist: Playlist, tracks: Iterable[Track]) -> None:
if playlist.origin != Origin.TIDAL:
raise IncompatibleEntityError(f"Playlist is not from Tidal ({playlist=}).")

if not all(tr.origin == Origin.TIDAL for tr in tracks):
raise IncompatibleEntityError(
f"The tracks contain tracks that are not from Tidal ({tracks=})."
)

if self.user.user_id != playlist.owner_id:
raise MissingPrivilegesError(
f"The session user does not own the playlist ({playlist=})."
)

track_ids = [tr.track_id for tr in tracks]
tidal_playlist = self._client.playlist(playlist.playlist_id)
tidal_playlist.add(track_ids)
66 changes: 65 additions & 1 deletion tests/integrationtests/spotify/test_session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import datetime
import pickle

import pytest

from musync.entity import Playlist, Track, User
from musync.entity import Origin, Playlist, Track, User
from musync.spotify import SpotifySession
from tests.unittests.entity import DATA_DIR

Expand All @@ -18,6 +19,57 @@ def tidal_track():
return Track.from_tidal(pickle.load(f))


@pytest.fixture
def spotify_track_list():
tracks = [
Track(
track_id="2Wqw8VqMlpNYZTfIsRqYSA",
artist_ids=["15kBh7iMF03XANu9pcSAdN", "5MUtPjZ8UJxONYzEGZeArf"],
name="Hymnesia",
duration=datetime.timedelta(seconds=374, microseconds=345000),
origin=Origin.SPOTIFY,
),
Track(
track_id="1uRFqDldISYmGJAESdoi75",
artist_ids=["11OUxHFoGgo2NDSdT6YiEC"],
name="Closer",
duration=datetime.timedelta(seconds=245, microseconds=277000),
origin=Origin.SPOTIFY,
),
Track(
track_id="3cefg1pboKTNqgi3k2t62h",
artist_ids=["1khyIydqanugacJyKdmceT"],
name="Omnia",
duration=datetime.timedelta(seconds=393, microseconds=24000),
origin=Origin.SPOTIFY,
),
]

return tracks


@pytest.fixture
def create_playlist(spotify_session):
title = "Test Playlist"
description = (
"This playlist was created by the https://github.com/klepp0/musync project."
)
user_id = spotify_session.user.user_id
spotify_response = spotify_session._client.user_playlist_create(
user_id,
title,
public=False,
collaborative=False,
description=description,
)
new_playlist = Playlist.from_spotify(spotify_response)

yield new_playlist

new_playlist_id = new_playlist.playlist_id
spotify_session._client.current_user_unfollow_playlist(new_playlist_id)


def test_session_is_logged_in(spotify_session):
return spotify_session.check_login()

Expand All @@ -33,3 +85,15 @@ def test_session_playlists(spotify_session):
def test_find_track(spotify_session, tidal_track):
spotify_track = spotify_session.find_track(tidal_track)
assert tidal_track.equals(spotify_track)


def test_add_to_playlist(spotify_session, create_playlist, spotify_track_list):
playlist = create_playlist
n_tracks_before = playlist.n_tracks
spotify_session.add_to_playlist(playlist, spotify_track_list)

spotify_response = spotify_session._client.playlist(playlist.playlist_id)
updated_playlist = Playlist.from_spotify(spotify_response)
n_tracks_after = updated_playlist.n_tracks

assert n_tracks_before < n_tracks_after
63 changes: 62 additions & 1 deletion tests/integrationtests/tidal/test_session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import datetime
import pickle

import pytest

from musync.entity import Playlist, Track, User
from musync.entity import Origin, Playlist, Track, User
from musync.tidal import TidalSession
from tests.unittests.entity import DATA_DIR

Expand All @@ -18,6 +19,54 @@ def spotify_track():
return Track.from_spotify(pickle.load(f))


@pytest.fixture
def tidal_track():
with open(DATA_DIR / "test_track_tidal.pkl", "rb") as f:
return Track.from_tidal(pickle.load(f))


@pytest.fixture
def tidal_track_list():
tracks = [
Track(
track_id="338383176",
artist_ids=["7343085"],
name="Cabalero",
duration=datetime.timedelta(seconds=345),
origin=Origin.TIDAL,
),
Track(
track_id="304574491",
artist_ids=["10644748", "6576000"],
name="Tension",
duration=datetime.timedelta(seconds=263),
origin=Origin.TIDAL,
),
Track(
track_id="121277192",
artist_ids=["15910495"],
name="Overdub (Original Mix)",
duration=datetime.timedelta(seconds=348),
origin=Origin.TIDAL,
),
]
return tracks


@pytest.fixture
def create_playlist(tidal_session):
title = "Test Playlist"
description = (
"This playlist was created by the https://github.com/klepp0/musync project."
)
tidal_response = tidal_session._client.user.create_playlist(title, description)
new_playlist = Playlist.from_tidal(tidal_response)

yield new_playlist

tidal_session._client.playlist(new_playlist.playlist_id).delete()


def test_session_is_logged_in(tidal_session):
assert tidal_session.check_login()

Expand All @@ -33,3 +82,15 @@ def test_session_load_playlists(tidal_session):
def test_search_track(tidal_session, spotify_track):
tidal_track = tidal_session.find_track(spotify_track)
assert spotify_track.equals(tidal_track)


def test_add_to_playlist(tidal_session, create_playlist, tidal_track_list):
playlist = create_playlist
n_tracks_before = playlist.n_tracks
tidal_session.add_to_playlist(playlist, tidal_track_list)

tidal_response = tidal_session._client.playlist(playlist.playlist_id)
updated_playlist = Playlist.from_tidal(tidal_response)
n_tracks_after = updated_playlist.n_tracks

assert n_tracks_before < n_tracks_after
Loading