From a8b296fc1e20265ffafa509d6d11b0bc234d02db Mon Sep 17 00:00:00 2001
From: Ethan Wick <125320183+ejwick@users.noreply.github.com>
Date: Thu, 16 Feb 2023 15:16:24 +0000
Subject: [PATCH] Code improvements, PEP 8, refactoring, etc.
---
spotifybackup/__init__.py | 197 +++++++++++++++++++--------------
spotifybackup/api.py | 132 +++++++++++-----------
spotifybackup/authorization.py | 72 ++++++------
3 files changed, 218 insertions(+), 183 deletions(-)
diff --git a/spotifybackup/__init__.py b/spotifybackup/__init__.py
index 41e9f5e..028a86e 100644
--- a/spotifybackup/__init__.py
+++ b/spotifybackup/__init__.py
@@ -1,104 +1,129 @@
import argparse
import json
import logging
+import pathlib
from . import api
-logging.basicConfig(level=20, datefmt='%I:%M:%S', format='[%(asctime)s] %(message)s')
+logging.basicConfig(
+ format="[%(asctime)s] %(message)s",
+ datefmt="%I:%M:%S",
+ level=logging.DEBUG
+)
-def main():
- # Parse arguments.
- parser = argparse.ArgumentParser(description='Exports your Spotify playlists. By default, opens a browser window '
- + 'to authorize the Spotify Web API, but you can also manually specify'
- + ' an OAuth token with the --token option.')
- parser.add_argument('--token', metavar='OAUTH_TOKEN', help='use a Spotify OAuth token (requires the '
- + '`playlist-read-private` permission)')
- parser.add_argument('--dump', default='playlists', choices=['liked,playlists', 'playlists,liked', 'playlists', 'liked'],
- help='dump playlists or liked songs, or both (default: playlists)')
- parser.add_argument('--format', default='txt', choices=['json', 'txt'], help='output format (default: txt)')
- parser.add_argument('file', help='output filename', nargs='?')
- args = parser.parse_args()
+SCOPES = {
+ "playlists": ["playlist-read-private", "playlist-read-collaborative"],
+ "liked": ["user-library-read"]
+}
- # If they didn't give a filename, then just prompt them. (They probably just double-clicked.)
- while not args.file:
- args.file = input('Enter a file name (e.g. playlists.txt): ')
- args.format = args.file.split('.')[-1]
- # Log into the Spotify API.
- if args.token:
- spotify = api.SpotifyAPI(args.token)
+def _artists(artists) -> str:
+ return ", ".join(artist["name"] for artist in artists)
+
+
+def backup(
+ dump: str,
+ filepath: pathlib.Path,
+ format_: str,
+ token: str = None
+) -> None:
+ if token:
+ spotify = api.SpotifyAPI(token)
else:
- spotify = api.SpotifyAPI.authorize(client_id='5c098bcc800e45d49e476265bc9b6934',
- scope='playlist-read-private playlist-read-collaborative user-library-read')
-
- # Get the ID of the logged in user.
- logging.info('Loading user info...')
- me = spotify.get('me')
- logging.info('Logged in as {display_name} ({id})'.format(**me))
-
+ scopes = set(
+ scope for k, v in SCOPES.items() if k in dump for scope in v
+ )
+ spotify = api.SpotifyAPI.authorize(scopes)
playlists = []
+ liked_songs = []
liked_albums = []
-
- # List liked albums and songs
- if 'liked' in args.dump:
- logging.info('Loading liked albums and songs...')
- liked_tracks = spotify.list('users/{user_id}/tracks'.format(user_id=me['id']), {'limit': 50})
- liked_albums = spotify.list('me/albums', {'limit': 50})
- playlists += [{'name': 'Liked Songs', 'tracks': liked_tracks}]
-
- # List all playlists and the tracks in each playlist
- if 'playlists' in args.dump:
- logging.info('Loading playlists...')
- playlist_data = spotify.list('users/{user_id}/playlists'.format(user_id=me['id']), {'limit': 50})
- logging.info(f'Found {len(playlist_data)} playlists')
-
- # List all tracks in each playlist
- for playlist in playlist_data:
- logging.info('Loading playlist: {name} ({tracks[total]} songs)'.format(**playlist))
- playlist['tracks'] = spotify.list(playlist['tracks']['href'], {'limit': 100})
- playlists += playlist_data
-
- # Write the file.
- logging.info('Writing files...')
- with open(args.file, 'w', encoding='utf-8') as f:
- # JSON file.
- if args.format == 'json':
- json.dump({
- 'playlists': playlists,
- 'albums': liked_albums
- }, f)
-
- # Tab-separated file.
- else:
- f.write('Playlists: \r\n\r\n')
+ if "liked" in dump:
+ logging.info("Getting liked songs and albums...")
+ liked_songs.extend(spotify.songs())
+ liked_albums.extend(spotify.albums())
+ if "playlists" in dump:
+ logging.info("Getting playlists...")
+ for playlist in spotify.playlists():
+ logging.info(
+ "Getting playlist: {0[name]} ({0[tracks][total]} "
+ "songs)".format(playlist)
+ )
+ playlist["tracks"] = list(spotify.playlist_tracks(playlist))
+ playlists.append(playlist)
+ logging.info(f"Got {len(playlists)} playlists")
+ playlists.insert(0, {"name": "Liked Songs", "tracks": liked_songs})
+ logging.info('Writing file...')
+ with open(filepath, "w", encoding="utf-8") as file:
+ if format_ == "json":
+ json.dump(
+ {"playlists": playlists, "albums": liked_albums},
+ file,
+ indent=4
+ )
+ elif format_ == "txt":
+ file.write("Playlists:\n\n")
for playlist in playlists:
- f.write(playlist['name'] + '\r\n')
- for track in playlist['tracks']:
- if track['track'] is None:
+ file.write("{[name]}\n".format(playlist))
+ for item in playlist["tracks"]:
+ track = item["track"]
+ if track is None:
continue
- f.write('{name}\t{artists}\t{album}\t{uri}\t{release_date}\r\n'.format(
- uri=track['track']['uri'],
- name=track['track']['name'],
- artists=', '.join([artist['name'] for artist in track['track']['artists']]),
- album=track['track']['album']['name'],
- release_date=track['track']['album']['release_date']
- ))
- f.write('\r\n')
- if len(liked_albums) > 0:
- f.write('Liked Albums: \r\n\r\n')
- for album in liked_albums:
- uri = album['album']['uri']
- name = album['album']['name']
- artists = ', '.join([artist['name'] for artist in album['album']['artists']])
- release_date = album['album']['release_date']
- album = f'{artists} - {name}'
-
- f.write(f'{name}\t{artists}\t-\t{uri}\t{release_date}\r\n')
-
- logging.info('Wrote file: ' + args.file)
+ file.write(
+ "{0[name]}\t{artists}\t{0[album][name]}\t{0[uri]}\t"
+ "{0[album][release_date]}\n"
+ .format(track, artists=_artists(track["artists"]))
+ )
+ file.write("\n")
+ if liked_albums:
+ file.write("Liked Albums:\n\n")
+ for item in liked_albums:
+ album = item["album"]
+ file.write(
+ "{0[name]}\t{artists}\t-\t{0[uri]}\t{0[release_date]}"
+ "\n"
+ .format(album, artists=_artists(album["artists"]))
+ )
+ logging.info(f"Wrote file: {filepath}")
-if __name__ == '__main__':
- main()
+def main() -> int:
+ parser = argparse.ArgumentParser(
+ description=
+ "Exports your Spotify playlists. By default, opens a browser "
+ "window to authorize the Spotify Web API, but you can also "
+ "manually specify an OAuth token with the --token option."
+ )
+ parser.add_argument(
+ "--token",
+ help=
+ "use a Spotify OAuth token (requires the 'playlist-read-private' "
+ "permission)"
+ )
+ parser.add_argument(
+ "--dump",
+ default="playlists",
+ choices=["liked,playlists", "playlists,liked", "playlists", "liked"],
+ help=
+ "dump playlists ('playlists') or liked songs and albums "
+ "('liked'), or both (comma-separated) (default: playlists)",
+ metavar="DUMP"
+ )
+ parser.add_argument(
+ "--format",
+ default="txt",
+ choices=["json", "txt"],
+ help="output format (default: txt)"
+ )
+ parser.add_argument(
+ "file",
+ nargs="?",
+ help="output filename"
+ )
+ args = parser.parse_args()
+ while not args.file:
+ args.file = input("Enter a file name (e.g. playlists.txt): ")
+ args.format = args.file.split(".")[-1]
+ backup(args.dump, args.file, args.format, args.token)
+ return 0
diff --git a/spotifybackup/api.py b/spotifybackup/api.py
index d81101d..83011dd 100644
--- a/spotifybackup/api.py
+++ b/spotifybackup/api.py
@@ -1,79 +1,83 @@
-import codecs
+import collections.abc
+import functools
+import itertools
import json
import logging
-import sys
-import time
+import typing
import urllib.parse
import urllib.request
+import urllib.response
import webbrowser
from . import authorization
+def chain(f):
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ return itertools.chain.from_iterable(f(*args, **kwargs))
+ return wrapper
+
+
class SpotifyAPI:
+ _CLIENT_ID = "5c098bcc800e45d49e476265bc9b6934"
+ _BASE_URL = "https://api.spotify.com/v1/"
- # Requires an OAuth token.
- def __init__(self, auth):
- self._auth = auth
+ def __init__(self, token) -> None:
+ self.token = token
- # Gets a resource from the Spotify API and returns the object.
- def get(self, url, params={}, tries=3):
- # Construct the correct URL.
- if not url.startswith('https://api.spotify.com/v1/'):
- url = 'https://api.spotify.com/v1/' + url
- if params:
- url += ('&' if '?' in url else '?') + urllib.parse.urlencode(params)
-
- # Try the sending off the request a specified number of times before giving up.
- for _ in range(tries):
- try:
- req = urllib.request.Request(url)
- req.add_header('Authorization', 'Bearer ' + self._auth)
- res = urllib.request.urlopen(req)
- reader = codecs.getreader('utf-8')
- return json.load(reader(res))
- except Exception as err:
- logging.info('Couldn\'t load URL: {} ({})'.format(url, err))
- time.sleep(2)
- logging.info('Trying again...')
- sys.exit(1)
-
- # The Spotify API breaks long lists into multiple pages. This method automatically
- # fetches all pages and joins them, returning in a single list of objects.
- def list(self, url, params={}):
- last_log_time = time.time()
- response = self.get(url, params)
- items = response['items']
-
- while response['next']:
- if time.time() > last_log_time + 15:
- last_log_time = time.time()
- logging.info(f"Loaded {len(items)}/{response['total']} items")
-
- response = self.get(response['next'])
- items += response['items']
- return items
-
- # Pops open a browser window for a user to log in and authorize API access.
- @staticmethod
- def authorize(client_id, scope):
- url = 'https://accounts.spotify.com/authorize?' + urllib.parse.urlencode({
- 'response_type': 'token',
- 'client_id': client_id,
- 'scope': scope,
- 'redirect_uri': 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._SERVER_PORT)
- })
- logging.info(f'Logging in (click if it doesn\'t open automatically): {url}')
+ @classmethod
+ def authorize(cls, scopes: collections.abc.Iterable[str]) -> typing.Self:
+ logging.info("Authorizing...")
+ server = authorization.Server()
+ query = urllib.parse.urlencode(
+ {
+ "client_id": cls._CLIENT_ID,
+ "response_type": "token",
+ "redirect_uri": server.redirect_uri(),
+ "state": server.state,
+ "scope": " ".join(scopes)
+ }
+ )
+ url = f"https://accounts.spotify.com/authorize?{query}"
webbrowser.open(url)
-
- # Start a simple, local HTTP server to listen for the authorization token... (i.e. a hack).
- server = authorization.Server('127.0.0.1', SpotifyAPI._SERVER_PORT)
- try:
- while True:
+ with server:
+ while server.token is None:
server.handle_request()
- except authorization.Authorization as auth:
- return SpotifyAPI(auth.access_token)
+ return cls(server.token)
- # The port that the local server listens on. Don't change this,
- # as Spotify only will redirect to certain predefined URLs.
- _SERVER_PORT = 43019
+ def get(self, url, params=None) -> typing.Any:
+ query = f"?{urllib.parse.urlencode(params)}" if params else ""
+ request = urllib.request.Request(
+ urllib.parse.urljoin(self._BASE_URL, url) + query,
+ headers={"Authorization": f"Bearer {self.token}"}
+ )
+ print(request.full_url)
+ with urllib.request.urlopen(request) as response:
+ return json.load(response)
+
+ def getter(
+ self, url
+ ) -> collections.abc.Generator[typing.Any, str, None]:
+ while url:
+ yield self.get(url, {"limit": 50})
+ url = yield
+
+ @chain
+ def items(self, url) -> collections.abc.Iterator[typing.Any]:
+ getter = self.getter(url)
+ for body in getter:
+ yield body["items"]
+ getter.send(body["next"])
+
+ def playlists(self):
+ return self.items("me/playlists")
+
+ def playlist_tracks(self, playlist):
+ return self.items(playlist["tracks"]["href"])
+
+ def songs(self):
+ return self.items("me/tracks")
+
+ def albums(self):
+ return self.items("me/albums")
diff --git a/spotifybackup/authorization.py b/spotifybackup/authorization.py
index bb4461c..427c89f 100644
--- a/spotifybackup/authorization.py
+++ b/spotifybackup/authorization.py
@@ -1,46 +1,52 @@
import http.server
-import logging
-import re
-
-
-class Server(http.server.HTTPServer):
- def __init__(self, host, port):
- http.server.HTTPServer.__init__(self, (host, port), _RequestHandler)
-
- # Disable the default error handling.
- def handle_error(self, request, client_address):
- raise
+import secrets
+import urllib.parse
class _RequestHandler(http.server.BaseHTTPRequestHandler):
+ server: "Server"
+
def do_GET(self):
- # The Spotify API has redirected here, but access_token is hidden in the URL fragment.
- # Read it using JavaScript and send it to /token as an actual query string...
- if self.path.startswith('/redirect'):
+ url = urllib.parse.urlsplit(self.path)
+ if url.path == self.server.REDIRECT_PATH:
self.send_response(200)
- self.send_header('Content-Type', 'text/html')
+ self.send_header("Content-Type", "text/html")
self.end_headers()
- self.wfile.write(b'')
-
- # Read access_token and use an exception to kill the server listening...
- elif self.path.startswith('/token?'):
+ self.wfile.write(
+ f""
+ .encode()
+ )
+ elif url.path == self.server.TOKEN_PATH:
+ query = urllib.parse.parse_qs(url.query)
+ if query["state"][0] != self.server.state:
+ self.send_error(401)
+ self.server.token = query["access_token"][0]
self.send_response(200)
- self.send_header('Content-Type', 'text/html')
+ self.send_header("Content-Type", "text/html")
self.end_headers()
- self.wfile.write(b'Thanks! You may now close this window.')
+ self.wfile.write(
+ b"Thanks! You may now close this "
+ b"window."
+ )
- access_token = re.search('access_token=([^&]*)', self.path).group(1)
- logging.info(f'Received access token from Spotify: {access_token}')
- raise Authorization(access_token)
-
- else:
- self.send_error(404)
-
- # Disable the default logging.
- def log_message(self, format, *args):
+ def log_message(self, format, *args) -> None:
pass
-class Authorization(Exception):
- def __init__(self, access_token):
- self.access_token = access_token
+class Server(http.server.HTTPServer):
+ _HOST = "127.0.0.1"
+ _PORT = 43019
+ REDIRECT_PATH = "/redirect"
+ TOKEN_PATH = "/token"
+
+ def __init__(self) -> None:
+ self.state = "".join(
+ chr(secrets.choice(range(0x20, 0x7E + 1))) for _ in range(32)
+ )
+ self.token: str | None = None
+ super().__init__((self._HOST, self._PORT), _RequestHandler)
+
+ @classmethod
+ def redirect_uri(cls) -> str:
+ return f"http://{cls._HOST}:{cls._PORT}{cls.REDIRECT_PATH}"