-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
200 lines (163 loc) · 6.49 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import uuid
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
import requests
from services.songs_service import retrieve_songs, save_songs
from services.spotify_service import get_auth_url, get_access_token, get_valid_token
from services.users_service import retrieve_users, save_users, get_user_by_id, create_user, update_user, delete_user
app = FastAPI()
USER_PROFILE_ENDPOINT = "https://api.spotify.com/v1/me"
SEARCH_ENDPOINT = "https://api.spotify.com/v1/search"
TOP_TRACKS_ENDPOINT = f"{USER_PROFILE_ENDPOINT}/top/tracks"
@app.get("/login")
def login():
return RedirectResponse(url=get_auth_url())
@app.get("/callback")
async def api_callback(request: Request):
code = request.query_params.get("code")
error = request.query_params.get("error")
if error:
raise HTTPException(status_code=400, detail=f"Spotify ha retornado el siguiente error: {error}")
if not code:
raise HTTPException(status_code=400, detail="No se ha retornado código de autorización")
get_access_token(code)
return HTMLResponse(content=f'''
<html>
<body>
<h1>El token ha sido almacenado correctamente.</h1>
<p>Puedes ir a la documentación para probar los endpoints <a href="/docs">aquí</a>.</p>
</body>
</html>
''')
@app.get("/search")
def search(search_term: str, type: str = "track", limit: int = 5):
if not search_term.strip():
raise HTTPException(status_code=400, detail="Has de agregar al menos un término de búsqueda.")
access_token = get_valid_token()
headers = {"Authorization": f"Bearer {access_token}"}
request_params = {
"q": search_term,
"type": type,
"limit": limit
}
response = requests.get(SEARCH_ENDPOINT, headers=headers, params=request_params)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
@app.get("/top-tracks")
def get_top_tracks(time_range: str = "medium_term", limit: int = 10):
access_token = get_valid_token()
headers = {"Authorization": f"Bearer {access_token}"}
request_params = {
"time_range": time_range,
"limit": limit
}
response = requests.get(TOP_TRACKS_ENDPOINT, headers=headers, params=request_params)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
@app.post("/songs/")
def create_song(title: str, artist: str, album: str):
songs = retrieve_songs()
new_song = {
"id": str(uuid.uuid4()),
"title": title,
"artist": artist,
"album": album
}
songs.append(new_song)
save_songs(songs)
return {
"message": f'La nueva canción "{new_song["title"]}" ha sido creada con éxito.',
"song": new_song
}
@app.get("/songs/")
def read_songs_data():
return retrieve_songs()
@app.get("/songs/{song_id}")
def read_song(song_id: str):
songs = retrieve_songs()
for song in songs:
if song["id"] == song_id:
return {
"message": f'La canción "{song["title"]}" ha sido encontrada.',
"song": song
}
raise HTTPException(status_code=404, detail="Canción no encontrada")
@app.put("/songs/{song_id}")
def update_song(song_id: str, title: str, artist: str, album: str):
songs = retrieve_songs()
for song in songs:
if song["id"] == song_id:
song["title"] = title
song["artist"] = artist
song["album"] = album
save_songs(songs)
return song
raise HTTPException(status_code=404, detail="Canción no encontrada")
@app.delete("/songs/{song_id}")
def delete_song(song_id: str):
songs = retrieve_songs()
for song in songs:
if song["id"] == song_id:
songs.remove(song)
save_songs(songs)
return {"message": f'La canción "{song["title"]}" ha sido eliminada.'}
raise HTTPException(status_code=404, detail="Canción no encontrada")
@app.delete("/songs/")
def delete_all_songs():
save_songs([])
return {"message": "Todas las canciones han sido eliminadas exitosamente."}
@app.post("/save-top-tracks")
def save_top_tracks(time_range: str = "medium_term", limit: int = 10):
top_tracks = get_top_tracks(time_range, limit)
if not top_tracks.get("items"):
raise HTTPException(status_code=404, detail="Error. No se han recibido canciones desde la API de Spotify.")
for track in top_tracks["items"]:
create_song(
title=track["name"],
artist=", ".join([artist["name"] for artist in track["artists"]]),
album=track["album"]["name"]
)
return {"message": f"Se han guardado tus {len(top_tracks['items'])} canciones favoritas."}
@app.get("/users/")
def get_all_users():
return retrieve_users()
@app.get("/users/{user_id}")
def get_user(user_id: str):
user = get_user_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="Usuario no encontrado.")
return user
@app.post("/users/")
def create_new_user(name: str, email: str):
if not name.strip() or not email.strip():
raise HTTPException(status_code=400, detail="Nombre y correo electrónico son obligatorios.")
new_user = create_user(name, email)
return {
"message": f"El usuario '{new_user['name']}' ha sido creado.",
"user": new_user
}
@app.put("/users/{user_id}")
def update_existing_user(user_id: str, name: str = None, email: str = None):
if not name and not email:
raise HTTPException(status_code=400, detail="Por favor proporciona al menos un campo para actualizar (nombre o email).")
try:
updated_user = update_user(user_id, name=name, email=email)
return {
"message": f"El usuario '{updated_user['name']}' ha sido actualizado correctamente.",
"user": updated_user
}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.delete("/users/{user_id}")
def delete_existing_user(user_id: str):
try:
delete_user(user_id)
return {"message": f"El usuario con ID '{user_id}' ha sido eliminado con éxito."}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.delete("/users/")
def delete_all_users():
save_users([])
return {"message": "Todos los usuarios han sido eliminados"}