Remove duplicate code

pull/24/head
Sogolumbo 2020-10-05 06:25:45 +02:00
parent b31ed259ea
commit f52e1f75e0
1 changed files with 4 additions and 108 deletions

View File

@ -13,114 +13,10 @@ import urllib.parse
import urllib.request import urllib.request
import webbrowser import webbrowser
class SpotifyAPI: import importlib
spotifyBackup = importlib.import_module('spotify-backup')
# Requires an OAuth token. SpotifyAPI = spotifyBackup.SpotifyAPI
def __init__(self, auth): log = spotifyBackup.log
self._auth = auth
# Gets a resource from the Spotify API and returns the object.
def get(self, url, params={}, tries=3):
# Construct the correct URL.
if not url.startswith('https://api.spotify.com/v1/'):
url = 'https://api.spotify.com/v1/' + url
if params:
url += ('&' if '?' in url else '?') + urllib.parse.urlencode(params)
# Try the sending off the request a specified number of times before giving up.
for _ in range(tries):
try:
req = urllib.request.Request(url)
req.add_header('Authorization', 'Bearer ' + self._auth)
res = urllib.request.urlopen(req)
reader = codecs.getreader('utf-8')
return json.load(reader(res))
except Exception as err:
log('Couldn\'t load URL: {} ({})'.format(url, err))
time.sleep(2)
log('Trying again...')
sys.exit(1)
# The Spotify API breaks long lists into multiple pages. This method automatically
# fetches all pages and joins them, returning in a single list of objects.
def list(self, url, params={}):
response = self.get(url, params)
requests = int(response['total'] / params['limit'])
if requests > 10:
timeEstimate = int(0.27*requests); # in seconds, 0.28 was the average speed of 130 requests
log('{} items, {} items per request, {} requests, ~{} seconds'.format(response['total'], params['limit'], requests, timeEstimate))
items = response['items']
while response['next']:
response = self.get(response['next'])
items += response['items']
return items
# Pops open a browser window for a user to log in and authorize API access.
@staticmethod
def authorize(client_id, scope):
webbrowser.open('https://accounts.spotify.com/authorize?' + urllib.parse.urlencode({
'response_type': 'token',
'client_id': client_id,
'scope': scope,
'redirect_uri': 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._SERVER_PORT)
}))
# Start a simple, local HTTP server to listen for the authorization token... (i.e. a hack).
server = SpotifyAPI._AuthorizationServer('127.0.0.1', SpotifyAPI._SERVER_PORT)
try:
while True:
server.handle_request()
except SpotifyAPI._Authorization as auth:
return SpotifyAPI(auth.access_token)
# The port that the local server listens on. Don't change this,
# as Spotify only will redirect to certain predefined URLs.
_SERVER_PORT = 43019
class _AuthorizationServer(http.server.HTTPServer):
def __init__(self, host, port):
http.server.HTTPServer.__init__(self, (host, port), SpotifyAPI._AuthorizationHandler)
# Disable the default error handling.
def handle_error(self, request, client_address):
raise
class _AuthorizationHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
# The Spotify API has redirected here, but access_token is hidden in the URL fragment.
# Read it using JavaScript and send it to /token as an actual query string...
if self.path.startswith('/redirect'):
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.end_headers()
self.wfile.write(b'<script>location.replace("token?" + location.hash.slice(1));</script>')
# Read access_token and use an exception to kill the server listening...
elif self.path.startswith('/token?'):
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.end_headers()
self.wfile.write(b'<script>close()</script>Thanks! You may now close this window.')
raise SpotifyAPI._Authorization(re.search('access_token=([^&]*)', self.path).group(1))
else:
self.send_error(404)
# Disable the default logging.
def log_message(self, format, *args):
pass
class _Authorization(Exception):
def __init__(self, access_token):
self.access_token = access_token
def log(str):
fileOutput = False #replace with the opposite value if errors occur
if not fileOutput:
print('[{}] {}'.format(time.strftime('%I:%M:%S'), str).encode(sys.stdout.encoding, errors='replace'))
else:
sys.stdout.buffer.write('[{}] {}\n'.format(time.strftime('%I:%M:%S'), str).encode(sys.stdout.encoding, errors='replace'))
sys.stdout.flush()
def main(): def main():
# Parse arguments. # Parse arguments.