-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplaylist.py
145 lines (129 loc) · 4.55 KB
/
playlist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import glob
import os
import pathlib
import re
import sys
import unicodedata
from flask import current_app as app
from werkzeug.utils import safe_join
extinf_regex = re.compile(r'^#EXTINF:([0-9]+)( [^,]+)?,[\s]*(.*)')
highint32 = 1<<31
class PlaylistProvider:
def __init__(self, dir):
self.dir = dir
self._playlists = {}
def _refresh(self):
self._playlists = {p.id: p for p in self._load_playlists()}
app.logger.debug(f"Loaded {len(self._playlists)} playlists")
def _load_playlists(self):
paths = glob.glob(os.path.join(self.dir, "**.m3u8"))
paths += glob.glob(os.path.join(self.dir, "**.m3u"))
paths.sort()
for path in paths:
try:
yield self._playlist(path)
except Exception as e:
app.logger.error(f"Failed to load playlist {filepath}: {e}")
def playlists(self):
self._refresh()
playlists = self._playlists
ids = [k for k, v in playlists.items() if v]
ids.sort()
return [playlists[id] for id in ids]
def playlist(self, id):
filepath = safe_join(self.dir, id)
playlist = self._playlist(filepath)
if playlist.id not in self._playlists: # add to cache
playlists = self._playlists.copy()
playlists[playlist.id] = playlist
self._playlists = playlists
return playlist
def _playlist(self, filepath):
id = self._path2id(filepath)
name = pathlib.Path(os.path.basename(filepath)).stem
playlist = self._playlists.get(id)
mtime = pathlib.Path(filepath).stat().st_mtime
if playlist and playlist.modified == mtime:
return playlist # cached metadata
app.logger.debug(f"Loading playlist {filepath}")
return Playlist(id, name, mtime, filepath)
def _path2id(self, filepath):
return os.path.relpath(filepath, self.dir)
class Playlist:
def __init__(self, id, name, modified, path):
self.id = id
self.name = name
self.modified = modified
self.path = path
self.count = 0
self.duration = 0
artists = {}
max_artists = 10
for item in self.items():
self.count += 1
self.duration += item.duration
artist = Artist(item.title.split(' - ')[0])
found = artists.get(artist.key)
if found:
found.count += 1
else:
if len(artists) > max_artists:
l = _sortedartists(artists)[:max_artists]
artists = {a.key: a for a in l}
artists[artist.key] = artist
self.artists = ', '.join([a.name for a in _sortedartists(artists)])
def items(self):
return parse_m3u_playlist(self.path)
def _sortedartists(artists):
l = [a for _,a in artists.items()]
l.sort(key=lambda a: (highint32-a.count, a.name))
return l
class Artist:
def __init__(self, name):
self.key = _normalize(name.lower())
self.name = name
self.count = 1
def parse_m3u_playlist(filepath):
'''
Parses an M3U playlist and yields its items, one at a time.
CAUTION: Attribute values that contain ',' or ' ' are not supported!
'''
with open(filepath, 'r', encoding='UTF-8') as file:
linenum = 0
item = PlaylistItem()
while line := file.readline():
line = line.rstrip()
linenum += 1
if linenum == 1:
assert line == '#EXTM3U', f"File {filepath} is not an EXTM3U playlist!"
continue
if len(line.strip()) == 0:
continue
m = extinf_regex.match(line)
if m:
item = PlaylistItem()
duration = m.group(1)
item.duration = int(duration)
attrs = m.group(2)
if attrs:
item.attrs = {k: v.strip('"') for k,v in [kv.split('=') for kv in attrs.strip().split(' ')]}
else:
item.attrs = {}
item.title = m.group(3)
continue
if line.startswith('#'):
continue
item.uri = line
yield item
item = PlaylistItem()
class PlaylistItem():
def __init__(self):
self.title = None
self.duration = None
self.uri = None
self.attrs = None
def _normalize(s):
s = _strip_accents(s)
return s
def _strip_accents(s):
return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')