From 9bea08dc03e1e97be99639041e329557b12ecb31 Mon Sep 17 00:00:00 2001 From: Lucas Bickel Date: Sun, 18 Aug 2019 17:45:48 +0200 Subject: [PATCH] :recycle: (celery) python3 compat fixes --- .../airtime-celery/airtime-celery/__init__.py | 3 +- .../airtime-celery/celeryconfig.py | 39 ++--- .../airtime-celery/airtime-celery/tasks.py | 156 ++++++++++++------ python_apps/airtime-celery/setup.py | 46 +++--- 4 files changed, 146 insertions(+), 98 deletions(-) diff --git a/python_apps/airtime-celery/airtime-celery/__init__.py b/python_apps/airtime-celery/airtime-celery/__init__.py index a65fa3c85..81a44f04f 100644 --- a/python_apps/airtime-celery/airtime-celery/__init__.py +++ b/python_apps/airtime-celery/airtime-celery/__init__.py @@ -1,3 +1,4 @@ import os + # Make the celeryconfig module visible to celery -os.environ['CELERY_CONFIG_MODULE'] = 'airtime-celery.celeryconfig' \ No newline at end of file +os.environ["CELERY_CONFIG_MODULE"] = "airtime-celery.celeryconfig" diff --git a/python_apps/airtime-celery/airtime-celery/celeryconfig.py b/python_apps/airtime-celery/airtime-celery/celeryconfig.py index e3374927d..c9e95404b 100644 --- a/python_apps/airtime-celery/airtime-celery/celeryconfig.py +++ b/python_apps/airtime-celery/airtime-celery/celeryconfig.py @@ -7,36 +7,37 @@ RMQ_CONFIG_SECTION = "rabbitmq" def get_rmq_broker(): - rmq_config = ConfigObj(os.environ['RMQ_CONFIG_FILE']) + rmq_config = ConfigObj(os.environ["RMQ_CONFIG_FILE"]) rmq_settings = parse_rmq_config(rmq_config) - return 'amqp://{username}:{password}@{host}:{port}/{vhost}'.format(**rmq_settings) + return "amqp://{username}:{password}@{host}:{port}/{vhost}".format(**rmq_settings) def parse_rmq_config(rmq_config): return { - 'host' : rmq_config[RMQ_CONFIG_SECTION]['host'], - 'port' : rmq_config[RMQ_CONFIG_SECTION]['port'], - 'username': rmq_config[RMQ_CONFIG_SECTION]['user'], - 'password': rmq_config[RMQ_CONFIG_SECTION]['password'], - 'vhost' : rmq_config[RMQ_CONFIG_SECTION]['vhost'] + "host": rmq_config[RMQ_CONFIG_SECTION]["host"], + "port": rmq_config[RMQ_CONFIG_SECTION]["port"], + "username": rmq_config[RMQ_CONFIG_SECTION]["user"], + "password": rmq_config[RMQ_CONFIG_SECTION]["password"], + "vhost": rmq_config[RMQ_CONFIG_SECTION]["vhost"], } + # Celery amqp settings BROKER_URL = get_rmq_broker() -CELERY_RESULT_BACKEND = 'amqp' # Use RabbitMQ as the celery backend -CELERY_RESULT_PERSISTENT = True # Persist through a broker restart -CELERY_TASK_RESULT_EXPIRES = 900 # Expire task results after 15 minutes -CELERY_RESULT_EXCHANGE = 'celeryresults' # Default exchange - needed due to php-celery +CELERY_RESULT_BACKEND = "amqp" # Use RabbitMQ as the celery backend +CELERY_RESULT_PERSISTENT = True # Persist through a broker restart +CELERY_TASK_RESULT_EXPIRES = 900 # Expire task results after 15 minutes +CELERY_RESULT_EXCHANGE = "celeryresults" # Default exchange - needed due to php-celery CELERY_QUEUES = ( - Queue('soundcloud', exchange=Exchange('soundcloud'), routing_key='soundcloud'), - Queue('podcast', exchange=Exchange('podcast'), routing_key='podcast'), - Queue(exchange=Exchange('celeryresults'), auto_delete=True), + Queue("soundcloud", exchange=Exchange("soundcloud"), routing_key="soundcloud"), + Queue("podcast", exchange=Exchange("podcast"), routing_key="podcast"), + Queue(exchange=Exchange("celeryresults"), auto_delete=True), ) -CELERY_EVENT_QUEUE_EXPIRES = 900 # RabbitMQ x-expire after 15 minutes +CELERY_EVENT_QUEUE_EXPIRES = 900 # RabbitMQ x-expire after 15 minutes # Celery task settings -CELERY_TASK_SERIALIZER = 'json' -CELERY_RESULT_SERIALIZER = 'json' -CELERY_ACCEPT_CONTENT = ['json'] -CELERY_TIMEZONE = 'Europe/Berlin' +CELERY_TASK_SERIALIZER = "json" +CELERY_RESULT_SERIALIZER = "json" +CELERY_ACCEPT_CONTENT = ["json"] +CELERY_TIMEZONE = "Europe/Berlin" CELERY_ENABLE_UTC = True diff --git a/python_apps/airtime-celery/airtime-celery/tasks.py b/python_apps/airtime-celery/airtime-celery/tasks.py index bedc6efa7..2ef843b89 100644 --- a/python_apps/airtime-celery/airtime-celery/tasks.py +++ b/python_apps/airtime-celery/airtime-celery/tasks.py @@ -1,25 +1,29 @@ +from future.standard_library import install_aliases + +install_aliases() + import os import json -import urllib2 import requests import soundcloud import cgi -import urlparse import posixpath import shutil import tempfile import traceback import mutagen -from StringIO import StringIO +from io import StringIO from celery import Celery from celery.utils.log import get_task_logger from contextlib import closing +from urllib.parse import urlsplit + celery = Celery() logger = get_task_logger(__name__) -@celery.task(name='soundcloud-upload', acks_late=True) +@celery.task(name="soundcloud-upload", acks_late=True) def soundcloud_upload(data, token, file_path): """ Upload a file to SoundCloud @@ -32,19 +36,23 @@ def soundcloud_upload(data, token, file_path): :rtype: string """ client = soundcloud.Client(access_token=token) - # Open the file with urllib2 if it's a cloud file - data['asset_data'] = open(file_path, 'rb') if os.path.isfile(file_path) else urllib2.urlopen(file_path) + # Open the file with requests if it's a cloud file + data["asset_data"] = ( + open(file_path, "rb") + if os.path.isfile(file_path) + else requests.get(file_path).content + ) try: - logger.info('Uploading track: {0}'.format(data)) - track = client.post('/tracks', track=data) + logger.info("Uploading track: {0}".format(data)) + track = client.post("/tracks", track=data) except Exception as e: - logger.info('Error uploading track {title}: {0}'.format(e.message, **data)) + logger.info("Error uploading track {title}: {0}".format(e.message, **data)) raise e - data['asset_data'].close() + data["asset_data"].close() return json.dumps(track.fields()) -@celery.task(name='soundcloud-download', acks_late=True) +@celery.task(name="soundcloud-download", acks_late=True) def soundcloud_download(token, callback_url, api_key, track_id): """ Download a file from SoundCloud @@ -60,31 +68,43 @@ def soundcloud_download(token, callback_url, api_key, track_id): client = soundcloud.Client(access_token=token) obj = {} try: - track = client.get('/tracks/%s' % track_id) + track = client.get("/tracks/%s" % track_id) obj.update(track.fields()) if track.downloadable: re = None - with closing(requests.get('%s?oauth_token=%s' % (track.download_url, client.access_token), verify=True, stream=True)) as r: + with closing( + requests.get( + "%s?oauth_token=%s" % (track.download_url, client.access_token), + verify=True, + stream=True, + ) + ) as r: filename = get_filename(r) - re = requests.post(callback_url, files={'file': (filename, r.content)}, auth=requests.auth.HTTPBasicAuth(api_key, '')) + re = requests.post( + callback_url, + files={"file": (filename, r.content)}, + auth=requests.auth.HTTPBasicAuth(api_key, ""), + ) re.raise_for_status() - f = json.loads(re.content) # Read the response from the media API to get the file id - obj['fileid'] = f['id'] + f = json.loads( + re.content + ) # Read the response from the media API to get the file id + obj["fileid"] = f["id"] else: # manually update the task state self.update_state( - state = states.FAILURE, - meta = 'Track %s is not flagged as downloadable!' % track.title + state=states.FAILURE, + meta="Track %s is not flagged as downloadable!" % track.title, ) # ignore the task so no other state is recorded raise Ignore() except Exception as e: - logger.info('Error during file download: {0}'.format(e.message)) + logger.info("Error during file download: {0}".format(e.message)) raise e return json.dumps(obj) -@celery.task(name='soundcloud-update', acks_late=True) +@celery.task(name="soundcloud-update", acks_late=True) def soundcloud_update(data, token, track_id): """ Update a file on SoundCloud @@ -98,15 +118,15 @@ def soundcloud_update(data, token, track_id): """ client = soundcloud.Client(access_token=token) try: - logger.info('Updating track {title}'.format(**data)) - track = client.put('/tracks/%s' % track_id, track=data) + logger.info("Updating track {title}".format(**data)) + track = client.put("/tracks/%s" % track_id, track=data) except Exception as e: - logger.info('Error updating track {title}: {0}'.format(e.message, **data)) + logger.info("Error updating track {title}: {0}".format(e.message, **data)) raise e return json.dumps(track.fields()) -@celery.task(name='soundcloud-delete', acks_late=True) +@celery.task(name="soundcloud-delete", acks_late=True) def soundcloud_delete(token, track_id): """ Delete a file from SoundCloud @@ -119,16 +139,18 @@ def soundcloud_delete(token, track_id): """ client = soundcloud.Client(access_token=token) try: - logger.info('Deleting track with ID {0}'.format(track_id)) - track = client.delete('/tracks/%s' % track_id) + logger.info("Deleting track with ID {0}".format(track_id)) + track = client.delete("/tracks/%s" % track_id) except Exception as e: - logger.info('Error deleting track!') + logger.info("Error deleting track!") raise e return json.dumps(track.fields()) -@celery.task(name='podcast-download', acks_late=True) -def podcast_download(id, url, callback_url, api_key, podcast_name, album_override, track_title): +@celery.task(name="podcast-download", acks_late=True) +def podcast_download( + id, url, callback_url, api_key, podcast_name, album_override, track_title +): """ Download a podcast episode @@ -146,12 +168,12 @@ def podcast_download(id, url, callback_url, api_key, podcast_name, album_overrid """ # Object to store file IDs, episode IDs, and download status # (important if there's an error before the file is posted) - obj = { 'episodeid': id } + obj = {"episodeid": id} try: re = None with closing(requests.get(url, stream=True)) as r: filename = get_filename(r) - with tempfile.NamedTemporaryFile(mode ='wb+', delete=False) as audiofile: + with tempfile.NamedTemporaryFile(mode="wb+", delete=False) as audiofile: r.raw.decode_content = True shutil.copyfileobj(r.raw, audiofile) # mutagen should be able to guess the write file type @@ -162,44 +184,66 @@ def podcast_download(id, url, callback_url, api_key, podcast_name, album_overrid mp3suffix = ("mp3", "MP3", "Mp3", "mP3") # so we treat it like a mp3 if it has a mp3 file extension and hope for the best if filename.endswith(mp3suffix): - metadata_audiofile = mutagen.mp3.MP3(audiofile.name, ID3=mutagen.easyid3.EasyID3) - #replace track metadata as indicated by album_override setting + metadata_audiofile = mutagen.mp3.MP3( + audiofile.name, ID3=mutagen.easyid3.EasyID3 + ) + # replace track metadata as indicated by album_override setting # replace album title as needed - metadata_audiofile = podcast_override_metadata(metadata_audiofile, podcast_name, album_override, track_title) + metadata_audiofile = podcast_override_metadata( + metadata_audiofile, podcast_name, album_override, track_title + ) metadata_audiofile.save() filetypeinfo = metadata_audiofile.pprint() - logger.info('filetypeinfo is {0}'.format(filetypeinfo.encode('ascii', 'ignore'))) - re = requests.post(callback_url, files={'file': (filename, open(audiofile.name, 'rb'))}, auth=requests.auth.HTTPBasicAuth(api_key, '')) + logger.info( + "filetypeinfo is {0}".format(filetypeinfo.encode("ascii", "ignore")) + ) + re = requests.post( + callback_url, + files={"file": (filename, open(audiofile.name, "rb"))}, + auth=requests.auth.HTTPBasicAuth(api_key, ""), + ) re.raise_for_status() - f = json.loads(re.content) # Read the response from the media API to get the file id - obj['fileid'] = f['id'] - obj['status'] = 1 + f = json.loads( + re.content + ) # Read the response from the media API to get the file id + obj["fileid"] = f["id"] + obj["status"] = 1 except Exception as e: - obj['error'] = e.message - logger.info('Error during file download: {0}'.format(e)) - logger.debug('Original Traceback: %s' % (traceback.format_exc(e))) - obj['status'] = 0 + obj["error"] = e.message + logger.info("Error during file download: {0}".format(e)) + logger.debug("Original Traceback: %s" % (traceback.format_exc(e))) + obj["status"] = 0 return json.dumps(obj) + def podcast_override_metadata(m, podcast_name, override, track_title): """ Override m['album'] if empty or forced with override arg """ # if the album override option is enabled replace the album id3 tag with the podcast name even if the album tag contains data if override is True: - logger.debug('overriding album name to {0} in podcast'.format(podcast_name.encode('ascii', 'ignore'))) - m['album'] = podcast_name - m['title'] = track_title - m['artist'] = podcast_name + logger.debug( + "overriding album name to {0} in podcast".format( + podcast_name.encode("ascii", "ignore") + ) + ) + m["album"] = podcast_name + m["title"] = track_title + m["artist"] = podcast_name else: # replace the album id3 tag with the podcast name if the album tag is empty try: - m['album'] + m["album"] except KeyError: - logger.debug('setting new album name to {0} in podcast'.format(podcast_name.encode('ascii', 'ignore'))) - m['album'] = podcast_name + logger.debug( + "setting new album name to {0} in podcast".format( + podcast_name.encode("ascii", "ignore") + ) + ) + m["album"] = podcast_name return m + def get_filename(r): """ Given a request object to a file resource, get the name of the file to be downloaded @@ -211,18 +255,20 @@ def get_filename(r): :rtype: string """ # Try to get the filename from the content disposition - d = r.headers.get('Content-Disposition') - filename = '' + d = r.headers.get("Content-Disposition") + filename = "" if d: try: _, params = cgi.parse_header(d) - filename = params['filename'] + filename = params["filename"] except Exception as e: # We end up here if we get a Content-Disposition header with no filename - logger.warn("Couldn't find file name in Content-Disposition header, using url") + logger.warn( + "Couldn't find file name in Content-Disposition header, using url" + ) if not filename: # Since we don't necessarily get the filename back in the response headers, # parse the URL and get the filename and extension - path = urlparse.urlsplit(r.url).path + path = urlsplit(r.url).path filename = posixpath.basename(path) return filename diff --git a/python_apps/airtime-celery/setup.py b/python_apps/airtime-celery/setup.py index dc2469fb4..cd873dec0 100644 --- a/python_apps/airtime-celery/setup.py +++ b/python_apps/airtime-celery/setup.py @@ -5,18 +5,20 @@ import sys # Change directory since setuptools uses relative paths script_path = os.path.dirname(os.path.realpath(__file__)) -print script_path +print(script_path) os.chdir(script_path) -install_args = ['install', 'install_data', 'develop'] +install_args = ["install", "install_data", "develop"] no_init = False run_postinst = False # XXX Definitely not the best way of doing this... if sys.argv[1] in install_args and "--no-init-script" not in sys.argv: run_postinst = True - data_files = [('/etc/default', ['install/conf/airtime-celery']), - ('/etc/init.d', ['install/initd/airtime-celery'])] + data_files = [ + ("/etc/default", ["install/conf/airtime-celery"]), + ("/etc/init.d", ["install/initd/airtime-celery"]), + ] else: if "--no-init-script" in sys.argv: no_init = True @@ -29,26 +31,24 @@ def postinst(): if not no_init: # Make /etc/init.d file executable and set proper # permissions for the defaults config file - os.chmod('/etc/init.d/airtime-celery', 0755) - os.chmod('/etc/default/airtime-celery', 0640) - print "Run \"sudo service airtime-celery restart\" now." + os.chmod("/etc/init.d/airtime-celery", 0o755) + os.chmod("/etc/default/airtime-celery", 0o640) + print('Run "sudo service airtime-celery restart" now.') -setup(name='airtime-celery', - version='0.1', - description='Airtime Celery service', - url='http://github.com/sourcefabric/Airtime', - author='Sourcefabric', - author_email='duncan.sommerville@sourcefabric.org', - license='MIT', - packages=['airtime-celery'], - install_requires=[ - 'soundcloud', - 'celery < 4', - 'kombu < 3.1', - 'configobj' - ], - zip_safe=False, - data_files=data_files) + +setup( + name="airtime-celery", + version="0.1", + description="Airtime Celery service", + url="http://github.com/sourcefabric/Airtime", + author="Sourcefabric", + author_email="duncan.sommerville@sourcefabric.org", + license="MIT", + packages=["airtime-celery"], + install_requires=["soundcloud", "celery < 4", "kombu < 3.1", "configobj"], + zip_safe=False, + data_files=data_files, +) if run_postinst: postinst()