-
Notifications
You must be signed in to change notification settings - Fork 0
/
scrobbler.py
executable file
·213 lines (191 loc) · 9.18 KB
/
scrobbler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#!/usr/bin/env python
import os
import logging
from configparser import ConfigParser
from pprint import pformat
import time
from math import ceil
import ScriptingBridge
import Foundation
import PyObjCTools.AppHelper
import objc
import pylast
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger("scrobbler.py")
ITUNES_PLAYER_STATE_STOPPED = int.from_bytes(b'kPSS', byteorder="big")
ITUNES_PLAYER_STATE_PLAYING = int.from_bytes(b'kPSP', byteorder="big")
# Track must be at least this long to be scrobbled
SCROBBLER_MIN_TRACK_LENGTH = 30
# Scrobble after track halfway point or this many seconds since starting, whichever is first.
SCROBBLER_HALFWAY_THRESHOLD = 240
# How many times to try scrobbling each track to last.fm
SCROBBLE_MAX_ATTEMPTS = 5
class Scrobbler(object):
itunes = None
lastfm = None
config = None
scrobble_timer = None
def __init__(self):
self.load_config()
self.setup_itunes_observer()
self.setup_lastfm()
def load_config(self):
inipath = os.path.expanduser("~/.scrobbler.ini")
if not os.path.exists(inipath):
raise Exception("Config file {} is missing.".format(inipath))
self.config = ConfigParser()
self.config.read(inipath)
def setup_itunes_observer(self):
self.itunes = ScriptingBridge.SBApplication.applicationWithBundleIdentifier_("com.apple.iTunes")
log.debug("iTunes running: {}".format(self.itunes.isRunning()))
dnc = Foundation.NSDistributedNotificationCenter.defaultCenter()
selector = objc.selector(self.receivedNotification_, signature=b"v@:@")
dnc.addObserver_selector_name_object_(self, selector, "com.apple.iTunes.playerInfo", None)
log.debug("Added observer")
def setup_lastfm(self):
cfg = self.config['lastfm']
password_hash = pylast.md5(cfg['password'])
self.lastfm = pylast.LastFMNetwork(api_key=cfg['api_key'], api_secret=cfg['api_secret'], username=cfg['username'], password_hash=password_hash)
log.debug("Connected to last.fm")
def receivedNotification_(self, notification):
log.debug("Got a notification: {}".format(notification.name()))
userinfo = dict(notification.userInfo())
# log.debug(pformat(userinfo))
state = userinfo.get("Player State")
if state == "Playing":
should_scrobble = self.update_now_playing(userinfo)
if should_scrobble:
self.prepare_to_scrobble(userinfo)
else:
log.debug("update_now_playing returned False, so not going to scrobble.")
elif state in ("Paused", "Stopped"):
self.cancel_scrobble_timer()
else:
log.info("Unrecognised player state: {}".format(state))
def update_now_playing(self, userinfo):
kwargs = {
'artist': userinfo.get("Artist"),
'album_artist': userinfo.get("Album Artist"),
'title': userinfo.get("Name"),
'album': userinfo.get("Album"),
'track_number': userinfo.get("Track Number"),
'duration': userinfo.get("Total Time", 0) // 1000 or None,
}
# Some things, such as streams, don't have full metadata so we must ignore them
if not kwargs['artist'] or not kwargs['title']:
log.debug("Artist or title are missing, so ignoring...")
return False
for attempt in range(SCROBBLE_MAX_ATTEMPTS):
try:
log.debug("Attempt {}/{} to update now playing with kwargs:\n{}".format(attempt+1, SCROBBLE_MAX_ATTEMPTS, pformat(kwargs)))
self.lastfm.update_now_playing(**kwargs)
log.debug("done.")
return True
except pylast.WSError:
log.exception("Couldn't update now playing, waiting 5 seconds and trying again.")
time.sleep(5)
else:
log.error("Couldn't update now playing after {} attempts!".format(SCROBBLE_MAX_ATTEMPTS))
return False
def prepare_to_scrobble(self, userinfo):
log.debug("prepare_to_scrobble")
self.cancel_scrobble_timer()
if userinfo.get("PersistentID") is None:
log.warning("Track being played doesn't have a PersistentID, so can't prepare to scrobble it!")
return
# We need to wait a bit for a certain amount of the track to be played before scrobbling it.
# The delay is half the track's length or SCROBBLER_HALFWAY_THRESHOLD, whichever is sooner.
track_length = userinfo.get("Total Time", 0) / 1000 # seconds
if track_length == 0:
log.debug("Track has zero length, trying to get it from itunes.currentTrack after 5 seconds")
time.sleep(5)
track_length = self.itunes.currentTrack().duration()
log.debug("currentTrack().duration(): {}".format(track_length))
if not track_length:
log.debug("Still zero-length, giving up!")
return
elif track_length < SCROBBLER_MIN_TRACK_LENGTH:
log.debug("Track is too short ({}), so not going to scrobble it".format(track_length))
return
timeout = min(ceil(track_length/2), SCROBBLER_HALFWAY_THRESHOLD)
log.debug("Setting up a timer for {} seconds".format(timeout))
# Set up a timer that calls back after timeout seconds
self.scrobble_timer = Foundation.NSTimer.scheduledTimerWithTimeInterval_target_selector_userInfo_repeats_(
timeout,
self,
objc.selector(self.scrobbleTimerFired_, signature=b"v@:@"),
userinfo,
False
)
def cancel_scrobble_timer(self):
log.debug("cancel_scrobble_timer")
if self.scrobble_timer is not None:
log.debug("Invalidating timer...")
self.scrobble_timer.invalidate()
self.scrobble_timer = None
else:
log.debug("No timer to invalidate")
def scrobbleTimerFired_(self, timer):
log.debug("scrobbleTimerFired_")
if not timer.isValid():
log.warning("Received a fire event from an invalid timer, not scrobbling")
return
if self.itunes.playerState() != ITUNES_PLAYER_STATE_PLAYING:
log.debug("iTunes isn't playing, not scrobbling")
return
userinfo = timer.userInfo()
expected_persistent_id = userinfo.get("PersistentID")
if expected_persistent_id < 0:
# PyObjC thinks this is a signed long, but actually it's unsigned, so convert it
expected_persistent_id += 2**64
expected_persistent_id = "{:016X}".format(expected_persistent_id)
log.debug("Expected persistent ID of track to be scrobbled: {}".format(expected_persistent_id))
current_track = self.itunes.currentTrack()
scrobble_from_current_track = True
actual_persistent_id = current_track.persistentID()
if actual_persistent_id is not None and actual_persistent_id != expected_persistent_id:
log.warning("Track now playing is different to the one that prompted timer, not scrobbling: {} (expected) vs {} (actual)".format(expected_persistent_id, actual_persistent_id))
return
elif actual_persistent_id is None:
log.warning("Track playing has no persistent ID, assuming it's an Apple Music stream and scrobbling based on metadata from original notification")
scrobble_from_current_track = False
else:
# at this point we know the correct track is playing
log.debug("Correct track is playing, going to scrobble it")
if scrobble_from_current_track:
kwargs = {
'artist': current_track.artist(),
'title': current_track.name(),
'album': current_track.album(),
'album_artist': current_track.albumArtist(),
'track_number': current_track.trackNumber(),
'duration': int(current_track.duration()),
}
else:
kwargs = {
'artist': userinfo.get("Artist"),
'title': userinfo.get("Name"),
'album': userinfo.get("Album"),
'album_artist': userinfo.get("Album Artist"),
'track_number': userinfo.get("Track Number"),
'duration': userinfo.get("Total Time", 0) // 1000 or None,
}
kwargs['timestamp'] = int(time.time() - self.itunes.playerPosition())
for attempt in range(SCROBBLE_MAX_ATTEMPTS):
try:
log.debug("Attempt {}/{} to scrobble with kwargs:\n{}".format(attempt+1, SCROBBLE_MAX_ATTEMPTS, pformat(kwargs)))
self.lastfm.scrobble(**kwargs)
log.debug("done.")
break
except pylast.WSError:
log.exception("Couldn't scrobble, waiting 5 seconds and trying again.")
time.sleep(5)
else:
log.error("Couldn't scrobble after {} attempts!".format(SCROBBLE_MAX_ATTEMPTS))
def main():
Scrobbler()
log.debug("Going into event loop...")
PyObjCTools.AppHelper.runConsoleEventLoop(installInterrupt=True)
log.debug("exiting...")
if __name__ == '__main__':
main()