Cleaned up code

pull/4/head
Casey Chu 2014-12-24 00:44:55 -08:00
parent 79f3f19da0
commit 1ef465449b
1 changed files with 68 additions and 35 deletions

View File

@ -1,113 +1,146 @@
import argparse import argparse
import BaseHTTPServer
import codecs
import json
import re
import sys
import time
import urllib import urllib
import urllib2 import urllib2
import json
import time
import webbrowser import webbrowser
import sys
import BaseHTTPServer
import re
import codecs
class SpotifyAPI: class SpotifyAPI:
# Requires an OAuth token.
def __init__(self, auth): def __init__(self, auth):
self._auth = auth self._auth = auth
# Gets a resource from the Spotify API and returns the object.
def get(self, url, params={}, tries=3): def get(self, url, params={}, tries=3):
# Construct the correct URL.
if not url.startswith('https://api.spotify.com/v1/'): if not url.startswith('https://api.spotify.com/v1/'):
url = 'https://api.spotify.com/v1/' + url url = 'https://api.spotify.com/v1/' + url
if params: if params:
url += ('&' if '?' in url else '?') + urllib.urlencode(params) url += ('&' if '?' in url else '?') + urllib.urlencode(params)
try: # Try the sending off the request a specified number of times before giving up.
return json.load(urllib2.urlopen(urllib2.Request(url, None, {'Authorization': 'Bearer ' + self._auth}))) for _ in xrange(tries):
except urllib2.HTTPError as err: try:
log('Couldn\'t load URL: {} ({} {})'.format(url, err.code, err.reason)) req = urllib2.Request(url)
if tries <= 0: req.add_header('Authorization', 'Bearer ' + self._auth)
sys.exit(1) return json.load(urllib2.urlopen(req))
time.sleep(2) except urllib2.HTTPError as err:
log('Trying again...') log('Couldn\'t load URL: {} ({} {})'.format(url, err.code, err.reason))
return self.get(url, tries=tries-1) time.sleep(2)
log('Trying again...')
def list(self, path, params={}): sys.exit(1)
response = self.get(path, params)
# The Spotify API breaks long lists into multiple pages. This method automatically
# fetches all pages and joins them, returning in a single list of objects.
def list(self, url, params={}):
response = self.get(url, params)
items = response['items'] items = response['items']
while response['next']: while response['next']:
response = self.get(response['next']) response = self.get(response['next'])
items += response['items'] items += response['items']
return items return items
_LISTEN_PORT_NUMBER = 43019 # Pops open a browser window for a user to log in and authorize API access.
@staticmethod @staticmethod
def authorize(client_id, scope): def authorize(client_id, scope):
webbrowser.open('https://accounts.spotify.com/authorize?' + urllib.urlencode({ webbrowser.open('https://accounts.spotify.com/authorize?' + urllib.urlencode({
'response_type': 'token', 'response_type': 'token',
'client_id': client_id, 'client_id': client_id,
'scope': scope, 'scope': scope,
'redirect_uri': 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._LISTEN_PORT_NUMBER) 'redirect_uri': 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._SERVER_PORT)
})) }))
httpd = SpotifyAPI._AuthorizationListener('', SpotifyAPI._LISTEN_PORT_NUMBER)
# Start a simple, local HTTP server to listen for the authorization token... (i.e. a hack).
server = SpotifyAPI._AuthorizationServer('127.0.0.1', SpotifyAPI._SERVER_PORT)
try: try:
while True: while True:
httpd.handle_request() server.handle_request()
except SpotifyAPI._Authorization as auth: except SpotifyAPI._Authorization as auth:
return SpotifyAPI(auth.access_token) return SpotifyAPI(auth.access_token)
class _AuthorizationListener(BaseHTTPServer.HTTPServer): # The port that the local server listens on. Don't change this,
# as Spotify only will redirect to certain predefined URLs.
_SERVER_PORT = 43019
class _AuthorizationServer(BaseHTTPServer.HTTPServer):
def __init__(self, host, port): def __init__(self, host, port):
BaseHTTPServer.HTTPServer.__init__(self, (host, port), SpotifyAPI._AuthorizationHandler) BaseHTTPServer.HTTPServer.__init__(self, (host, port), SpotifyAPI._AuthorizationHandler)
# Disable the default error handling.
def handle_error(self, request, client_address): def handle_error(self, request, client_address):
raise raise
class _AuthorizationHandler(BaseHTTPServer.BaseHTTPRequestHandler): class _AuthorizationHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self): def do_GET(self):
# The Spotify API has redirected here, but access_token is hidden in the URL fragment.
# Read it using JavaScript and send it to /token as an actual query string...
if self.path.startswith('/redirect'): if self.path.startswith('/redirect'):
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'text/html') self.send_header('Content-Type', 'text/html')
self.end_headers() self.end_headers()
self.wfile.write('<script>location.replace("token?" + location.hash.slice(1));</script>') self.wfile.write('<script>location.replace("token?" + location.hash.slice(1));</script>')
# Read access_token and use an exception to kill the server listening...
elif self.path.startswith('/token?'): elif self.path.startswith('/token?'):
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'text/html') self.send_header('Content-Type', 'text/html')
self.end_headers() self.end_headers()
self.wfile.write('<script>close()</script>Thanks! You may now close this window.') self.wfile.write('<script>close()</script>Thanks! You may now close this window.')
raise SpotifyAPI._Authorization(re.search('access_token=([^&]*)', self.path).group(1)) raise SpotifyAPI._Authorization(re.search('access_token=([^&]*)', self.path).group(1))
else: else:
self.send_error(404) self.send_error(404)
# Disable the default logging.
def log_message(self, format, *args): def log_message(self, format, *args):
pass pass
class _Authorization(Exception): class _Authorization(Exception):
def __init__(self, access_token): def __init__(self, access_token):
self.access_token = access_token self.access_token = access_token
def log(str): def log(str):
print u'[{}] {}'.format(time.strftime('%I:%M:%S'), str).encode(sys.stdout.encoding, errors='replace') print u'[{}] {}'.format(time.strftime('%I:%M:%S'), str).encode(sys.stdout.encoding, errors='replace')
def main(): def main():
parser = argparse.ArgumentParser(description='Exports your Spotify playlists. By default, opens a browser window to authorize the Spotify Web API, but you can manually specify an OAuth token with the --token option.') # Parse arguments.
parser.add_argument('--token', metavar='OAUTH_TOKEN', help='use a Spotify OAuth token (requires the `playlist-read-private` permission)') parser = argparse.ArgumentParser(description='Exports your Spotify playlists. By default, opens a browser window'
+ 'to authorize the Spotify Web API, but you can also manually specify'
+ ' an OAuth token with the --token option.')
parser.add_argument('--token', metavar='OAUTH_TOKEN', help='use a Spotify OAuth token (requires the '
+ '`playlist-read-private` permission)')
parser.add_argument('--format', default='txt', choices=['json', 'txt'], help='output format (default: txt)') parser.add_argument('--format', default='txt', choices=['json', 'txt'], help='output format (default: txt)')
parser.add_argument('file', help='output filename') parser.add_argument('file', help='output filename')
args = parser.parse_args() args = parser.parse_args()
# Log into the Spotify API.
if args.token: if args.token:
spotify = SpotifyAPI(args.token) spotify = SpotifyAPI(args.token)
else: else:
spotify = SpotifyAPI.authorize(client_id='5c098bcc800e45d49e476265bc9b6934', scope='playlist-read-private') spotify = SpotifyAPI.authorize(client_id='5c098bcc800e45d49e476265bc9b6934', scope='playlist-read-private')
# Get the ID of the logged in user.
me = spotify.get('me') me = spotify.get('me')
log(u'Logged in as {display_name} ({id})'.format(**me)) log(u'Logged in as {display_name} ({id})'.format(**me))
# List all playlists and all track in each playlist.
playlists = spotify.list('users/{user_id}/playlists'.format(user_id=me['id']), {'limit': 50}) playlists = spotify.list('users/{user_id}/playlists'.format(user_id=me['id']), {'limit': 50})
for playlist in playlists: for playlist in playlists:
log(u'Loading playlist: {name} ({tracks[total]} songs)'.format(**playlist)) log(u'Loading playlist: {name} ({tracks[total]} songs)'.format(**playlist))
playlist['tracks'] = spotify.list(playlist['tracks']['href'], {'limit': 100}) playlist['tracks'] = spotify.list(playlist['tracks']['href'], {'limit': 100})
# Write the file.
with codecs.open(args.file, 'w', 'utf-8') as f: with codecs.open(args.file, 'w', 'utf-8') as f:
# JSON file.
if args.format == 'json': if args.format == 'json':
json.dump(playlists, f) json.dump(playlists, f)
# Tab-separated file.
elif args.format == 'txt': elif args.format == 'txt':
for playlist in playlists: for playlist in playlists:
f.write(playlist['name'] + '\r\n') f.write(playlist['name'] + '\r\n')