Code improvements, PEP 8, refactoring, etc.

pull/49/head
Ethan Wick 2023-02-16 15:16:24 +00:00
parent 6e5e8b0a62
commit a8b296fc1e
3 changed files with 218 additions and 183 deletions

View File

@ -1,104 +1,129 @@
import argparse import argparse
import json import json
import logging import logging
import pathlib
from . import api from . import api
logging.basicConfig(level=20, datefmt='%I:%M:%S', format='[%(asctime)s] %(message)s') logging.basicConfig(
format="[%(asctime)s] %(message)s",
datefmt="%I:%M:%S",
level=logging.DEBUG
)
def main(): SCOPES = {
# Parse arguments. "playlists": ["playlist-read-private", "playlist-read-collaborative"],
parser = argparse.ArgumentParser(description='Exports your Spotify playlists. By default, opens a browser window ' "liked": ["user-library-read"]
+ 'to authorize the Spotify Web API, but you can also manually specify' }
+ ' an OAuth token with the --token option.')
parser.add_argument('--token', metavar='OAUTH_TOKEN', help='use a Spotify OAuth token (requires the '
+ '`playlist-read-private` permission)')
parser.add_argument('--dump', default='playlists', choices=['liked,playlists', 'playlists,liked', 'playlists', 'liked'],
help='dump playlists or liked songs, or both (default: playlists)')
parser.add_argument('--format', default='txt', choices=['json', 'txt'], help='output format (default: txt)')
parser.add_argument('file', help='output filename', nargs='?')
args = parser.parse_args()
# If they didn't give a filename, then just prompt them. (They probably just double-clicked.)
while not args.file:
args.file = input('Enter a file name (e.g. playlists.txt): ')
args.format = args.file.split('.')[-1]
# Log into the Spotify API. def _artists(artists) -> str:
if args.token: return ", ".join(artist["name"] for artist in artists)
spotify = api.SpotifyAPI(args.token)
def backup(
dump: str,
filepath: pathlib.Path,
format_: str,
token: str = None
) -> None:
if token:
spotify = api.SpotifyAPI(token)
else: else:
spotify = api.SpotifyAPI.authorize(client_id='5c098bcc800e45d49e476265bc9b6934', scopes = set(
scope='playlist-read-private playlist-read-collaborative user-library-read') scope for k, v in SCOPES.items() if k in dump for scope in v
)
# Get the ID of the logged in user. spotify = api.SpotifyAPI.authorize(scopes)
logging.info('Loading user info...')
me = spotify.get('me')
logging.info('Logged in as {display_name} ({id})'.format(**me))
playlists = [] playlists = []
liked_songs = []
liked_albums = [] liked_albums = []
if "liked" in dump:
# List liked albums and songs logging.info("Getting liked songs and albums...")
if 'liked' in args.dump: liked_songs.extend(spotify.songs())
logging.info('Loading liked albums and songs...') liked_albums.extend(spotify.albums())
liked_tracks = spotify.list('users/{user_id}/tracks'.format(user_id=me['id']), {'limit': 50}) if "playlists" in dump:
liked_albums = spotify.list('me/albums', {'limit': 50}) logging.info("Getting playlists...")
playlists += [{'name': 'Liked Songs', 'tracks': liked_tracks}] for playlist in spotify.playlists():
logging.info(
# List all playlists and the tracks in each playlist "Getting playlist: {0[name]} ({0[tracks][total]} "
if 'playlists' in args.dump: "songs)".format(playlist)
logging.info('Loading playlists...') )
playlist_data = spotify.list('users/{user_id}/playlists'.format(user_id=me['id']), {'limit': 50}) playlist["tracks"] = list(spotify.playlist_tracks(playlist))
logging.info(f'Found {len(playlist_data)} playlists') playlists.append(playlist)
logging.info(f"Got {len(playlists)} playlists")
# List all tracks in each playlist playlists.insert(0, {"name": "Liked Songs", "tracks": liked_songs})
for playlist in playlist_data: logging.info('Writing file...')
logging.info('Loading playlist: {name} ({tracks[total]} songs)'.format(**playlist)) with open(filepath, "w", encoding="utf-8") as file:
playlist['tracks'] = spotify.list(playlist['tracks']['href'], {'limit': 100}) if format_ == "json":
playlists += playlist_data json.dump(
{"playlists": playlists, "albums": liked_albums},
# Write the file. file,
logging.info('Writing files...') indent=4
with open(args.file, 'w', encoding='utf-8') as f: )
# JSON file. elif format_ == "txt":
if args.format == 'json': file.write("Playlists:\n\n")
json.dump({
'playlists': playlists,
'albums': liked_albums
}, f)
# Tab-separated file.
else:
f.write('Playlists: \r\n\r\n')
for playlist in playlists: for playlist in playlists:
f.write(playlist['name'] + '\r\n') file.write("{[name]}\n".format(playlist))
for track in playlist['tracks']: for item in playlist["tracks"]:
if track['track'] is None: track = item["track"]
if track is None:
continue continue
f.write('{name}\t{artists}\t{album}\t{uri}\t{release_date}\r\n'.format( file.write(
uri=track['track']['uri'], "{0[name]}\t{artists}\t{0[album][name]}\t{0[uri]}\t"
name=track['track']['name'], "{0[album][release_date]}\n"
artists=', '.join([artist['name'] for artist in track['track']['artists']]), .format(track, artists=_artists(track["artists"]))
album=track['track']['album']['name'], )
release_date=track['track']['album']['release_date'] file.write("\n")
)) if liked_albums:
f.write('\r\n') file.write("Liked Albums:\n\n")
if len(liked_albums) > 0: for item in liked_albums:
f.write('Liked Albums: \r\n\r\n') album = item["album"]
for album in liked_albums: file.write(
uri = album['album']['uri'] "{0[name]}\t{artists}\t-\t{0[uri]}\t{0[release_date]}"
name = album['album']['name'] "\n"
artists = ', '.join([artist['name'] for artist in album['album']['artists']]) .format(album, artists=_artists(album["artists"]))
release_date = album['album']['release_date'] )
album = f'{artists} - {name}' logging.info(f"Wrote file: {filepath}")
f.write(f'{name}\t{artists}\t-\t{uri}\t{release_date}\r\n')
logging.info('Wrote file: ' + args.file)
if __name__ == '__main__': def main() -> int:
main() parser = argparse.ArgumentParser(
description=
"Exports your Spotify playlists. By default, opens a browser "
"window to authorize the Spotify Web API, but you can also "
"manually specify an OAuth token with the --token option."
)
parser.add_argument(
"--token",
help=
"use a Spotify OAuth token (requires the 'playlist-read-private' "
"permission)"
)
parser.add_argument(
"--dump",
default="playlists",
choices=["liked,playlists", "playlists,liked", "playlists", "liked"],
help=
"dump playlists ('playlists') or liked songs and albums "
"('liked'), or both (comma-separated) (default: playlists)",
metavar="DUMP"
)
parser.add_argument(
"--format",
default="txt",
choices=["json", "txt"],
help="output format (default: txt)"
)
parser.add_argument(
"file",
nargs="?",
help="output filename"
)
args = parser.parse_args()
while not args.file:
args.file = input("Enter a file name (e.g. playlists.txt): ")
args.format = args.file.split(".")[-1]
backup(args.dump, args.file, args.format, args.token)
return 0

View File

@ -1,79 +1,83 @@
import codecs import collections.abc
import functools
import itertools
import json import json
import logging import logging
import sys import typing
import time
import urllib.parse import urllib.parse
import urllib.request import urllib.request
import urllib.response
import webbrowser import webbrowser
from . import authorization from . import authorization
def chain(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return itertools.chain.from_iterable(f(*args, **kwargs))
return wrapper
class SpotifyAPI: class SpotifyAPI:
_CLIENT_ID = "5c098bcc800e45d49e476265bc9b6934"
_BASE_URL = "https://api.spotify.com/v1/"
# Requires an OAuth token. def __init__(self, token) -> None:
def __init__(self, auth): self.token = token
self._auth = auth
# Gets a resource from the Spotify API and returns the object. @classmethod
def get(self, url, params={}, tries=3): def authorize(cls, scopes: collections.abc.Iterable[str]) -> typing.Self:
# Construct the correct URL. logging.info("Authorizing...")
if not url.startswith('https://api.spotify.com/v1/'): server = authorization.Server()
url = 'https://api.spotify.com/v1/' + url query = urllib.parse.urlencode(
if params: {
url += ('&' if '?' in url else '?') + urllib.parse.urlencode(params) "client_id": cls._CLIENT_ID,
"response_type": "token",
# Try the sending off the request a specified number of times before giving up. "redirect_uri": server.redirect_uri(),
for _ in range(tries): "state": server.state,
try: "scope": " ".join(scopes)
req = urllib.request.Request(url) }
req.add_header('Authorization', 'Bearer ' + self._auth) )
res = urllib.request.urlopen(req) url = f"https://accounts.spotify.com/authorize?{query}"
reader = codecs.getreader('utf-8')
return json.load(reader(res))
except Exception as err:
logging.info('Couldn\'t load URL: {} ({})'.format(url, err))
time.sleep(2)
logging.info('Trying again...')
sys.exit(1)
# The Spotify API breaks long lists into multiple pages. This method automatically
# fetches all pages and joins them, returning in a single list of objects.
def list(self, url, params={}):
last_log_time = time.time()
response = self.get(url, params)
items = response['items']
while response['next']:
if time.time() > last_log_time + 15:
last_log_time = time.time()
logging.info(f"Loaded {len(items)}/{response['total']} items")
response = self.get(response['next'])
items += response['items']
return items
# Pops open a browser window for a user to log in and authorize API access.
@staticmethod
def authorize(client_id, scope):
url = 'https://accounts.spotify.com/authorize?' + urllib.parse.urlencode({
'response_type': 'token',
'client_id': client_id,
'scope': scope,
'redirect_uri': 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._SERVER_PORT)
})
logging.info(f'Logging in (click if it doesn\'t open automatically): {url}')
webbrowser.open(url) webbrowser.open(url)
with server:
# Start a simple, local HTTP server to listen for the authorization token... (i.e. a hack). while server.token is None:
server = authorization.Server('127.0.0.1', SpotifyAPI._SERVER_PORT)
try:
while True:
server.handle_request() server.handle_request()
except authorization.Authorization as auth: return cls(server.token)
return SpotifyAPI(auth.access_token)
# The port that the local server listens on. Don't change this, def get(self, url, params=None) -> typing.Any:
# as Spotify only will redirect to certain predefined URLs. query = f"?{urllib.parse.urlencode(params)}" if params else ""
_SERVER_PORT = 43019 request = urllib.request.Request(
urllib.parse.urljoin(self._BASE_URL, url) + query,
headers={"Authorization": f"Bearer {self.token}"}
)
print(request.full_url)
with urllib.request.urlopen(request) as response:
return json.load(response)
def getter(
self, url
) -> collections.abc.Generator[typing.Any, str, None]:
while url:
yield self.get(url, {"limit": 50})
url = yield
@chain
def items(self, url) -> collections.abc.Iterator[typing.Any]:
getter = self.getter(url)
for body in getter:
yield body["items"]
getter.send(body["next"])
def playlists(self):
return self.items("me/playlists")
def playlist_tracks(self, playlist):
return self.items(playlist["tracks"]["href"])
def songs(self):
return self.items("me/tracks")
def albums(self):
return self.items("me/albums")

View File

@ -1,46 +1,52 @@
import http.server import http.server
import logging import secrets
import re import urllib.parse
class Server(http.server.HTTPServer):
def __init__(self, host, port):
http.server.HTTPServer.__init__(self, (host, port), _RequestHandler)
# Disable the default error handling.
def handle_error(self, request, client_address):
raise
class _RequestHandler(http.server.BaseHTTPRequestHandler): class _RequestHandler(http.server.BaseHTTPRequestHandler):
server: "Server"
def do_GET(self): def do_GET(self):
# The Spotify API has redirected here, but access_token is hidden in the URL fragment. url = urllib.parse.urlsplit(self.path)
# Read it using JavaScript and send it to /token as an actual query string... if url.path == self.server.REDIRECT_PATH:
if self.path.startswith('/redirect'):
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'text/html') self.send_header("Content-Type", "text/html")
self.end_headers() self.end_headers()
self.wfile.write(b'<script>location.replace("token?" + location.hash.slice(1));</script>') self.wfile.write(
f"<script>location.replace(\"{self.server.TOKEN_PATH}?\" + "
# Read access_token and use an exception to kill the server listening... "location.hash.slice(1));</script>"
elif self.path.startswith('/token?'): .encode()
)
elif url.path == self.server.TOKEN_PATH:
query = urllib.parse.parse_qs(url.query)
if query["state"][0] != self.server.state:
self.send_error(401)
self.server.token = query["access_token"][0]
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'text/html') self.send_header("Content-Type", "text/html")
self.end_headers() self.end_headers()
self.wfile.write(b'<script>close()</script>Thanks! You may now close this window.') self.wfile.write(
b"<script>close()</script>Thanks! You may now close this "
b"window."
)
access_token = re.search('access_token=([^&]*)', self.path).group(1) def log_message(self, format, *args) -> None:
logging.info(f'Received access token from Spotify: {access_token}')
raise Authorization(access_token)
else:
self.send_error(404)
# Disable the default logging.
def log_message(self, format, *args):
pass pass
class Authorization(Exception): class Server(http.server.HTTPServer):
def __init__(self, access_token): _HOST = "127.0.0.1"
self.access_token = access_token _PORT = 43019
REDIRECT_PATH = "/redirect"
TOKEN_PATH = "/token"
def __init__(self) -> None:
self.state = "".join(
chr(secrets.choice(range(0x20, 0x7E + 1))) for _ in range(32)
)
self.token: str | None = None
super().__init__((self._HOST, self._PORT), _RequestHandler)
@classmethod
def redirect_uri(cls) -> str:
return f"http://{cls._HOST}:{cls._PORT}{cls.REDIRECT_PATH}"