471 lines
16 KiB
Python
471 lines
16 KiB
Python
|
#!/usr/bin/env python
|
||
|
|
||
|
from collections import namedtuple
|
||
|
from functools import reduce
|
||
|
from operator import attrgetter
|
||
|
import argparse
|
||
|
import configparser
|
||
|
import datetime
|
||
|
import hashlib
|
||
|
import itertools as it
|
||
|
import json
|
||
|
import logging
|
||
|
import os
|
||
|
import pickle
|
||
|
import sqlite3
|
||
|
import time
|
||
|
import urllib.parse as up
|
||
|
import urllib.request as ur
|
||
|
|
||
|
CONFIG_PATH = '~/.config/cmus/cmus_status_scrobbler.ini'
|
||
|
DB_CONNECT_TIMEOUT = 300
|
||
|
DB_PATH = '~/.config/cmus/cmus_status_scrobbler.sqlite3'
|
||
|
SCROBBLE_BATCH_SIZE = 50
|
||
|
|
||
|
parser = argparse.ArgumentParser(description="Scrobbling.")
|
||
|
parser.add_argument('--auth',
|
||
|
action='store_true',
|
||
|
help="Add if you're missing session_key in .ini file.")
|
||
|
parser.add_argument('--ini',
|
||
|
type=str,
|
||
|
default=os.path.expanduser(CONFIG_PATH),
|
||
|
help='Path to .ini configuration file.')
|
||
|
parser.add_argument('--db-path',
|
||
|
type=str,
|
||
|
default=os.path.expanduser(DB_PATH),
|
||
|
help='Path to sqlite3 database')
|
||
|
parser.add_argument(
|
||
|
'--log-path',
|
||
|
type=str,
|
||
|
required=False,
|
||
|
help='If given logging will be saved to desired path (default: no logging)'
|
||
|
)
|
||
|
parser.add_argument('--log-db',
|
||
|
action='store_true',
|
||
|
default=False,
|
||
|
help='If given, SQL queries are logged')
|
||
|
|
||
|
|
||
|
class StatusDB:
|
||
|
def __init__(self, connection, table_name):
|
||
|
self.con = connection
|
||
|
self.table_name = f'status_updates_{table_name}'
|
||
|
self.create()
|
||
|
|
||
|
def create(self):
|
||
|
self.con.execute(
|
||
|
f"CREATE TABLE IF NOT EXISTS {self.table_name} (pickle BLOB)")
|
||
|
|
||
|
def get_status_updates(self):
|
||
|
cur = self.con.cursor()
|
||
|
cur.execute(f"SELECT * FROM {self.table_name}")
|
||
|
status_updates = []
|
||
|
for row in cur:
|
||
|
status_updates.append(pickle.loads(row[0]))
|
||
|
return status_updates
|
||
|
|
||
|
def clear(self):
|
||
|
self.con.execute(f"DELETE FROM {self.table_name}")
|
||
|
|
||
|
def save_status_updates(self, status_updates):
|
||
|
if not status_updates:
|
||
|
return
|
||
|
self.con.executemany(
|
||
|
f"INSERT INTO {self.table_name}(pickle) values (?)",
|
||
|
[(pickle.dumps(su), ) for su in status_updates])
|
||
|
|
||
|
|
||
|
class CmusStatus:
|
||
|
stopped = "stopped"
|
||
|
playing = "playing"
|
||
|
paused = "paused"
|
||
|
|
||
|
|
||
|
Status = namedtuple('Status', [
|
||
|
'status', 'file', 'artist', 'albumartist', 'album', 'discnumber',
|
||
|
'tracknumber', 'title', 'date', 'duration', 'musicbrainz_trackid',
|
||
|
'cur_time'
|
||
|
])
|
||
|
|
||
|
|
||
|
def get_api_sig(params, secret):
|
||
|
m = hashlib.md5()
|
||
|
for k in sorted(params):
|
||
|
m.update(k.encode('utf-8'))
|
||
|
m.update(params[k].encode('utf-8'))
|
||
|
m.update(secret.encode('utf-8'))
|
||
|
return m.hexdigest()
|
||
|
|
||
|
|
||
|
def send_req(api_url,
|
||
|
api_key,
|
||
|
ignore_request_fail=False,
|
||
|
shared_secret=None,
|
||
|
method=None,
|
||
|
xml=False,
|
||
|
timeout_secs=10.,
|
||
|
**params):
|
||
|
params = dict(**params)
|
||
|
params['api_key'] = api_key
|
||
|
params['method'] = method
|
||
|
params = {k: v for k, v in params.items() if v is not None}
|
||
|
if shared_secret:
|
||
|
params['api_sig'] = get_api_sig(params, shared_secret)
|
||
|
if not xml:
|
||
|
params['format'] = 'json'
|
||
|
logging.info(params)
|
||
|
api_req = ur.Request(api_url, headers={"User-Agent": "Mozilla/5.0"})
|
||
|
try:
|
||
|
with ur.urlopen(api_req, up.urlencode(params, encoding='utf-8').encode(), timeout=timeout_secs) as f:
|
||
|
res = f.read().decode('utf-8')
|
||
|
logging.info(res)
|
||
|
if not res:
|
||
|
return None
|
||
|
if not xml:
|
||
|
return json.loads(res)
|
||
|
return res
|
||
|
except Exception as e:
|
||
|
if not ignore_request_fail:
|
||
|
raise e
|
||
|
logging.exception('Ignoring error.')
|
||
|
return None
|
||
|
|
||
|
|
||
|
class Scrobbler:
|
||
|
def __init__(self, name, api_url, api_key, shared_secret, session_key,
|
||
|
now_playing, xml=False):
|
||
|
self.name = name
|
||
|
self.api_url = api_url
|
||
|
self.api_key = api_key
|
||
|
self.shared_secret = shared_secret
|
||
|
self.sk = session_key
|
||
|
self.now_playing = now_playing
|
||
|
self.xml = xml
|
||
|
|
||
|
@staticmethod
|
||
|
def auth(auth_url, api_url, api_key, shared_secret, xml=False):
|
||
|
# fetching token that is used to ask for access
|
||
|
token = send_req(api_url, api_key,
|
||
|
method=ScrobblerMethod.GET_TOKEN, xml=xml)
|
||
|
if xml:
|
||
|
token = token.split("<token>")[1].split("</token>")[0]
|
||
|
else:
|
||
|
token = token['token']
|
||
|
print(f'{auth_url}?' + up.urlencode(dict(token=token,api_key=api_key)))
|
||
|
input('Press <Enter> after visiting the link and allowing access...')
|
||
|
# fetching session with infinite lifetime that is used to scrobble
|
||
|
session = send_req(api_url,
|
||
|
api_key,
|
||
|
shared_secret=shared_secret,
|
||
|
method=ScrobblerMethod.GET_SESSION,
|
||
|
token=token, xml=xml)
|
||
|
if xml:
|
||
|
session = dict(
|
||
|
key=session.split("<key>")[1].split("</key>")[0],
|
||
|
name=session.split("<name>")[1].split("</name>")[0])
|
||
|
else:
|
||
|
session = session['session']
|
||
|
return dict(session_key=session['key'], username=session['name'])
|
||
|
|
||
|
@staticmethod
|
||
|
def make_scrobble(i, su):
|
||
|
return {
|
||
|
f'artist[{i}]':
|
||
|
su.artist,
|
||
|
f'track[{i}]':
|
||
|
su.title,
|
||
|
f'timestamp[{i}]':
|
||
|
str(
|
||
|
int(
|
||
|
su.cur_time.replace(
|
||
|
tzinfo=datetime.timezone.utc).timestamp())),
|
||
|
f'album[{i}]':
|
||
|
su.album,
|
||
|
f'trackNumber[{i}]':
|
||
|
su.tracknumber,
|
||
|
f'mbid[{i}]':
|
||
|
su.musicbrainz_trackid,
|
||
|
f'albumArtist[{i}]':
|
||
|
su.albumartist if su.artist != su.albumartist else None,
|
||
|
f'duration[{i}]':
|
||
|
su.duration,
|
||
|
}
|
||
|
|
||
|
def scrobble(self, status_updates):
|
||
|
if not status_updates:
|
||
|
return
|
||
|
logging.info(f'Scrobbling previous tracks for {self.name}')
|
||
|
# ignoring status updates with status other than playing
|
||
|
playing_sus = filter(lambda x: x.status == CmusStatus.playing,
|
||
|
status_updates)
|
||
|
batch_scrobble_request = reduce(lambda a, b: {
|
||
|
**a,
|
||
|
**b
|
||
|
}, [Scrobbler.make_scrobble(i, su) for (i, su) in enumerate(playing_sus)],
|
||
|
dict(sk=self.sk))
|
||
|
if not batch_scrobble_request:
|
||
|
return
|
||
|
send_req(self.api_url,
|
||
|
self.api_key,
|
||
|
shared_secret=self.shared_secret,
|
||
|
method=ScrobblerMethod.SCROBBLE,
|
||
|
xml=self.xml,
|
||
|
timeout_secs=5., # scrobbling is not critical (saved in db)
|
||
|
**batch_scrobble_request)
|
||
|
|
||
|
def send_now_playing(self, cur):
|
||
|
if not self.now_playing or cur.status != CmusStatus.playing:
|
||
|
return
|
||
|
|
||
|
logging.info(f'Sending now playing for {self.name}')
|
||
|
params = dict(artist=cur.artist,
|
||
|
track=cur.title,
|
||
|
album=cur.album,
|
||
|
trackNumber=cur.tracknumber,
|
||
|
duration=cur.duration,
|
||
|
albumArtist=cur.albumartist
|
||
|
if cur.artist != cur.albumartist else None,
|
||
|
mbid=cur.musicbrainz_trackid,
|
||
|
sk=self.sk)
|
||
|
send_req(self.api_url,
|
||
|
self.api_key,
|
||
|
ignore_request_fail=True,
|
||
|
shared_secret=self.shared_secret,
|
||
|
method=ScrobblerMethod.NOW_PLAYING,
|
||
|
xml=self.xml,
|
||
|
**params)
|
||
|
|
||
|
|
||
|
def parse_cmus_status_line(ls):
|
||
|
r = dict(
|
||
|
cur_time=datetime.datetime.utcnow(),
|
||
|
musicbrainz_trackid=None,
|
||
|
discnumber=1,
|
||
|
tracknumber=None,
|
||
|
date=None,
|
||
|
album=None,
|
||
|
albumartist=None,
|
||
|
artist=None,
|
||
|
)
|
||
|
r.update((k, v) for k, v in zip(ls[::2], ls[1::2]))
|
||
|
return Status(**r)
|
||
|
|
||
|
|
||
|
def has_played_enough(start_ts,
|
||
|
end_ts,
|
||
|
duration,
|
||
|
perc_thresh,
|
||
|
secs_thresh,
|
||
|
ptbp=0):
|
||
|
duration = int(duration)
|
||
|
total = (end_ts - start_ts).total_seconds() + ptbp
|
||
|
return total / duration >= perc_thresh or total >= secs_thresh
|
||
|
|
||
|
|
||
|
def equal_tracks(a, b):
|
||
|
return a.file == b.file
|
||
|
|
||
|
|
||
|
def get_prefix_end_exclusive_idx(status_updates):
|
||
|
r_su = list(reversed(status_updates))
|
||
|
for i, (cur, prv) in enumerate(zip(r_su, r_su[1:])):
|
||
|
if (cur.status == CmusStatus.stopped or not equal_tracks(cur, prv)
|
||
|
or cur.status == prv.status
|
||
|
or prv.status == CmusStatus.stopped):
|
||
|
return len(r_su) - i
|
||
|
return 0 # all statuses do not result in a scrobble
|
||
|
|
||
|
|
||
|
def calculate_scrobbles(status_updates, perc_thresh=0.5, secs_thresh=4 * 60):
|
||
|
scrobbles, leftovers = [], []
|
||
|
if not status_updates or len(status_updates) == 1:
|
||
|
return scrobbles, status_updates or leftovers
|
||
|
|
||
|
# if status updates array has a suffix of playing/paused updates with same
|
||
|
# track, then these tracks need to be immediatelly leftovers
|
||
|
sus = sorted(status_updates, key=attrgetter('cur_time'))
|
||
|
prefix_end = get_prefix_end_exclusive_idx(sus)
|
||
|
lsus = sus[:prefix_end]
|
||
|
# I am incapable of having simple thoughts. The pause is messing me up.
|
||
|
# I use these two variables to scrobble paused tracks.
|
||
|
ptbp = 0 # played time before pausing
|
||
|
ptbp_status = None
|
||
|
for cur, nxt, nxt2 in it.zip_longest(lsus, lsus[1:], lsus[2:]):
|
||
|
if cur.status in [CmusStatus.stopped, CmusStatus.paused]:
|
||
|
continue
|
||
|
if nxt is None:
|
||
|
leftovers.append(cur)
|
||
|
break
|
||
|
hpe = has_played_enough(
|
||
|
cur.cur_time,
|
||
|
nxt.cur_time,
|
||
|
cur.duration,
|
||
|
perc_thresh,
|
||
|
secs_thresh,
|
||
|
ptbp=ptbp if ptbp_status and equal_tracks(ptbp_status, cur) else 0)
|
||
|
|
||
|
if (not equal_tracks(cur, nxt)
|
||
|
or nxt.status in [CmusStatus.stopped, CmusStatus.playing]):
|
||
|
if hpe:
|
||
|
scrobbles.append(cur)
|
||
|
ptbp = 0
|
||
|
ptbp_status = None
|
||
|
continue
|
||
|
|
||
|
# files are equal and nxt status paused
|
||
|
if nxt2 is None:
|
||
|
leftovers.append(cur)
|
||
|
leftovers.append(nxt)
|
||
|
continue
|
||
|
|
||
|
if equal_tracks(cur, nxt2) and nxt2.status == CmusStatus.playing:
|
||
|
# playing continued, keeping already played time for next
|
||
|
ptbp += (nxt.cur_time - cur.cur_time).total_seconds()
|
||
|
ptbp_status = cur if not ptbp_status else ptbp_status
|
||
|
continue
|
||
|
# playing did not continue, nxt2 file is not None and it's either a
|
||
|
# different file or it's the same file but status is not playing
|
||
|
# in this case we just check if played enough otherwise no scrobble
|
||
|
if hpe:
|
||
|
scrobbles.append(ptbp_status or cur)
|
||
|
return scrobbles, leftovers + sus[prefix_end:]
|
||
|
|
||
|
|
||
|
class ScrobblerMethod:
|
||
|
GET_TOKEN = 'auth.gettoken'
|
||
|
GET_SESSION = 'auth.getsession'
|
||
|
NOW_PLAYING = 'track.updateNowPlaying'
|
||
|
SCROBBLE = 'track.scrobble'
|
||
|
|
||
|
|
||
|
def update_scrobble_state(db, scrobbler, new_status_update):
|
||
|
sus = db.get_status_updates()
|
||
|
sus.append(new_status_update)
|
||
|
db.save_status_updates([new_status_update])
|
||
|
scrobbles, leftovers = calculate_scrobbles(sus)
|
||
|
failed_scrobbles = []
|
||
|
for i in range(0, len(scrobbles), SCROBBLE_BATCH_SIZE):
|
||
|
try:
|
||
|
scrobbler.scrobble(scrobbles[i:i + SCROBBLE_BATCH_SIZE])
|
||
|
except Exception:
|
||
|
logging.exception('Scrobbling failed')
|
||
|
# tracks need to be scrobbled in correct order. If the first
|
||
|
# batch fails then other batches need to be left for later too.
|
||
|
failed_scrobbles.extend(scrobbles[i:])
|
||
|
break
|
||
|
db.clear()
|
||
|
db.save_status_updates(failed_scrobbles + leftovers)
|
||
|
|
||
|
|
||
|
def setup_logging(log_path):
|
||
|
logging.basicConfig(filename=log_path or '/tmp/cmus_scrobbler.log',
|
||
|
datefmt='%Y-%m-%d %H:%M:%S',
|
||
|
format='%(process)d %(asctime)s %(levelname)s %(name)s %(message)s',
|
||
|
level=logging.DEBUG)
|
||
|
|
||
|
|
||
|
def get_conf(conf_path):
|
||
|
if not os.path.exists(conf_path):
|
||
|
raise FileNotFoundError(f'{conf_path} does not exist.')
|
||
|
conf = configparser.ConfigParser()
|
||
|
with open(conf_path, 'r') as f:
|
||
|
conf.read_file(f)
|
||
|
return conf
|
||
|
|
||
|
DB_CONNECT_RETRY_ATTEMPTS = 10
|
||
|
DB_CONNECT_RETRY_SLEEP_SECS = 10
|
||
|
def db_connect(db_path, log_db=False):
|
||
|
con = sqlite3.connect(db_path, timeout=DB_CONNECT_TIMEOUT)
|
||
|
if log_db:
|
||
|
con.set_trace_callback(logging.debug)
|
||
|
# BEGIN IMMEDIATE can return SQLITE_BUSY. After it succeeds, no other query
|
||
|
# will return SQLITE_BUSY.
|
||
|
# Retrying opens the possibility of incorrect event order but it should not
|
||
|
# happen with normal human level usage.
|
||
|
# Way around this would be to serialize all events in a single continuously
|
||
|
# running process. That was not the point of this simple script.
|
||
|
# 10 retries leaves enough room for scrobble ops to finish and release the
|
||
|
# db lock.
|
||
|
for _ in range(DB_CONNECT_RETRY_ATTEMPTS):
|
||
|
try:
|
||
|
con.execute('BEGIN IMMEDIATE')
|
||
|
break
|
||
|
except sqlite3.OperationalError:
|
||
|
time.sleep(DB_CONNECT_RETRY_SLEEP_SECS)
|
||
|
else:
|
||
|
raise Exception('Could not connect to db.')
|
||
|
# when multiple status updates arrive one after another, then
|
||
|
# if there is no blocking mechanism the order of status updates
|
||
|
# will not be correct
|
||
|
# Try it out without BEGIN immediate and hold pause-play
|
||
|
return con
|
||
|
|
||
|
|
||
|
def get_scrobblers(conf):
|
||
|
api_key = conf['global'].get('api_key')
|
||
|
shared_secret = conf['global'].get('shared_secret')
|
||
|
scrs = []
|
||
|
for section in conf.sections():
|
||
|
if section == 'global':
|
||
|
continue
|
||
|
scrs.append(
|
||
|
Scrobbler(
|
||
|
section, conf[section]['api_url'],
|
||
|
conf[section].get('api_key', api_key),
|
||
|
conf[section].get('shared_secret', shared_secret),
|
||
|
conf[section].get('session_key'), conf[section].getboolean(
|
||
|
'now_playing', conf['global'].getboolean('now_playing')),
|
||
|
conf[section].getboolean('format_xml', conf['global'].getboolean('format_xml'))))
|
||
|
return scrs
|
||
|
|
||
|
|
||
|
def auth(conf):
|
||
|
api_key = conf['global'].get('api_key')
|
||
|
shared_secret = conf['global'].get('shared_secret')
|
||
|
format_xml = conf['global'].getboolean('format_xml')
|
||
|
for section in conf.sections():
|
||
|
if section == 'global':
|
||
|
continue
|
||
|
if 'session_key' in conf[section]:
|
||
|
print(f'Session key already active for {section}. Skipping...')
|
||
|
continue
|
||
|
try:
|
||
|
conf[section].update(
|
||
|
Scrobbler.auth(
|
||
|
conf[section]['auth_url'], conf[section]['api_url'],
|
||
|
conf[section].get('api_key', api_key),
|
||
|
conf[section].get('shared_secret', shared_secret),
|
||
|
conf[section].getboolean('format_xml', format_xml),
|
||
|
)
|
||
|
)
|
||
|
except Exception:
|
||
|
logging.exception('Authentication failed.')
|
||
|
return conf
|
||
|
|
||
|
|
||
|
def main():
|
||
|
args, rest = parser.parse_known_args()
|
||
|
conf_path = args.ini
|
||
|
conf = get_conf(conf_path)
|
||
|
setup_logging(args.log_path or conf['global'].get('log_path'))
|
||
|
if args.auth:
|
||
|
with open(conf_path, 'w') as f:
|
||
|
auth(conf).write(f)
|
||
|
exit()
|
||
|
status = parse_cmus_status_line(rest)
|
||
|
scrobblers = get_scrobblers(conf)
|
||
|
with db_connect(conf['global'].get('db_path', args.db_path),
|
||
|
log_db=conf['global'].get('log_db', args.log_db)) as con:
|
||
|
logging.info(repr(status))
|
||
|
for scr in scrobblers:
|
||
|
update_scrobble_state(StatusDB(con, scr.name), scr, status)
|
||
|
for scr in scrobblers:
|
||
|
scr.send_now_playing(status)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
try:
|
||
|
main()
|
||
|
except Exception:
|
||
|
logging.exception('Error happened')
|