♻️ (celery) python3 compat fixes

This commit is contained in:
Lucas Bickel 2019-08-18 17:45:48 +02:00
parent e232469551
commit 9bea08dc03
4 changed files with 146 additions and 98 deletions

View File

@ -1,3 +1,4 @@
import os
# Make the celeryconfig module visible to celery
os.environ['CELERY_CONFIG_MODULE'] = 'airtime-celery.celeryconfig'
os.environ["CELERY_CONFIG_MODULE"] = "airtime-celery.celeryconfig"

View File

@ -7,36 +7,37 @@ RMQ_CONFIG_SECTION = "rabbitmq"
def get_rmq_broker():
rmq_config = ConfigObj(os.environ['RMQ_CONFIG_FILE'])
rmq_config = ConfigObj(os.environ["RMQ_CONFIG_FILE"])
rmq_settings = parse_rmq_config(rmq_config)
return 'amqp://{username}:{password}@{host}:{port}/{vhost}'.format(**rmq_settings)
return "amqp://{username}:{password}@{host}:{port}/{vhost}".format(**rmq_settings)
def parse_rmq_config(rmq_config):
return {
'host' : rmq_config[RMQ_CONFIG_SECTION]['host'],
'port' : rmq_config[RMQ_CONFIG_SECTION]['port'],
'username': rmq_config[RMQ_CONFIG_SECTION]['user'],
'password': rmq_config[RMQ_CONFIG_SECTION]['password'],
'vhost' : rmq_config[RMQ_CONFIG_SECTION]['vhost']
"host": rmq_config[RMQ_CONFIG_SECTION]["host"],
"port": rmq_config[RMQ_CONFIG_SECTION]["port"],
"username": rmq_config[RMQ_CONFIG_SECTION]["user"],
"password": rmq_config[RMQ_CONFIG_SECTION]["password"],
"vhost": rmq_config[RMQ_CONFIG_SECTION]["vhost"],
}
# Celery amqp settings
BROKER_URL = get_rmq_broker()
CELERY_RESULT_BACKEND = 'amqp' # Use RabbitMQ as the celery backend
CELERY_RESULT_PERSISTENT = True # Persist through a broker restart
CELERY_TASK_RESULT_EXPIRES = 900 # Expire task results after 15 minutes
CELERY_RESULT_EXCHANGE = 'celeryresults' # Default exchange - needed due to php-celery
CELERY_RESULT_BACKEND = "amqp" # Use RabbitMQ as the celery backend
CELERY_RESULT_PERSISTENT = True # Persist through a broker restart
CELERY_TASK_RESULT_EXPIRES = 900 # Expire task results after 15 minutes
CELERY_RESULT_EXCHANGE = "celeryresults" # Default exchange - needed due to php-celery
CELERY_QUEUES = (
Queue('soundcloud', exchange=Exchange('soundcloud'), routing_key='soundcloud'),
Queue('podcast', exchange=Exchange('podcast'), routing_key='podcast'),
Queue(exchange=Exchange('celeryresults'), auto_delete=True),
Queue("soundcloud", exchange=Exchange("soundcloud"), routing_key="soundcloud"),
Queue("podcast", exchange=Exchange("podcast"), routing_key="podcast"),
Queue(exchange=Exchange("celeryresults"), auto_delete=True),
)
CELERY_EVENT_QUEUE_EXPIRES = 900 # RabbitMQ x-expire after 15 minutes
CELERY_EVENT_QUEUE_EXPIRES = 900 # RabbitMQ x-expire after 15 minutes
# Celery task settings
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TIMEZONE = "Europe/Berlin"
CELERY_ENABLE_UTC = True

View File

@ -1,25 +1,29 @@
from future.standard_library import install_aliases
install_aliases()
import os
import json
import urllib2
import requests
import soundcloud
import cgi
import urlparse
import posixpath
import shutil
import tempfile
import traceback
import mutagen
from StringIO import StringIO
from io import StringIO
from celery import Celery
from celery.utils.log import get_task_logger
from contextlib import closing
from urllib.parse import urlsplit
celery = Celery()
logger = get_task_logger(__name__)
@celery.task(name='soundcloud-upload', acks_late=True)
@celery.task(name="soundcloud-upload", acks_late=True)
def soundcloud_upload(data, token, file_path):
"""
Upload a file to SoundCloud
@ -32,19 +36,23 @@ def soundcloud_upload(data, token, file_path):
:rtype: string
"""
client = soundcloud.Client(access_token=token)
# Open the file with urllib2 if it's a cloud file
data['asset_data'] = open(file_path, 'rb') if os.path.isfile(file_path) else urllib2.urlopen(file_path)
# Open the file with requests if it's a cloud file
data["asset_data"] = (
open(file_path, "rb")
if os.path.isfile(file_path)
else requests.get(file_path).content
)
try:
logger.info('Uploading track: {0}'.format(data))
track = client.post('/tracks', track=data)
logger.info("Uploading track: {0}".format(data))
track = client.post("/tracks", track=data)
except Exception as e:
logger.info('Error uploading track {title}: {0}'.format(e.message, **data))
logger.info("Error uploading track {title}: {0}".format(e.message, **data))
raise e
data['asset_data'].close()
data["asset_data"].close()
return json.dumps(track.fields())
@celery.task(name='soundcloud-download', acks_late=True)
@celery.task(name="soundcloud-download", acks_late=True)
def soundcloud_download(token, callback_url, api_key, track_id):
"""
Download a file from SoundCloud
@ -60,31 +68,43 @@ def soundcloud_download(token, callback_url, api_key, track_id):
client = soundcloud.Client(access_token=token)
obj = {}
try:
track = client.get('/tracks/%s' % track_id)
track = client.get("/tracks/%s" % track_id)
obj.update(track.fields())
if track.downloadable:
re = None
with closing(requests.get('%s?oauth_token=%s' % (track.download_url, client.access_token), verify=True, stream=True)) as r:
with closing(
requests.get(
"%s?oauth_token=%s" % (track.download_url, client.access_token),
verify=True,
stream=True,
)
) as r:
filename = get_filename(r)
re = requests.post(callback_url, files={'file': (filename, r.content)}, auth=requests.auth.HTTPBasicAuth(api_key, ''))
re = requests.post(
callback_url,
files={"file": (filename, r.content)},
auth=requests.auth.HTTPBasicAuth(api_key, ""),
)
re.raise_for_status()
f = json.loads(re.content) # Read the response from the media API to get the file id
obj['fileid'] = f['id']
f = json.loads(
re.content
) # Read the response from the media API to get the file id
obj["fileid"] = f["id"]
else:
# manually update the task state
self.update_state(
state = states.FAILURE,
meta = 'Track %s is not flagged as downloadable!' % track.title
state=states.FAILURE,
meta="Track %s is not flagged as downloadable!" % track.title,
)
# ignore the task so no other state is recorded
raise Ignore()
except Exception as e:
logger.info('Error during file download: {0}'.format(e.message))
logger.info("Error during file download: {0}".format(e.message))
raise e
return json.dumps(obj)
@celery.task(name='soundcloud-update', acks_late=True)
@celery.task(name="soundcloud-update", acks_late=True)
def soundcloud_update(data, token, track_id):
"""
Update a file on SoundCloud
@ -98,15 +118,15 @@ def soundcloud_update(data, token, track_id):
"""
client = soundcloud.Client(access_token=token)
try:
logger.info('Updating track {title}'.format(**data))
track = client.put('/tracks/%s' % track_id, track=data)
logger.info("Updating track {title}".format(**data))
track = client.put("/tracks/%s" % track_id, track=data)
except Exception as e:
logger.info('Error updating track {title}: {0}'.format(e.message, **data))
logger.info("Error updating track {title}: {0}".format(e.message, **data))
raise e
return json.dumps(track.fields())
@celery.task(name='soundcloud-delete', acks_late=True)
@celery.task(name="soundcloud-delete", acks_late=True)
def soundcloud_delete(token, track_id):
"""
Delete a file from SoundCloud
@ -119,16 +139,18 @@ def soundcloud_delete(token, track_id):
"""
client = soundcloud.Client(access_token=token)
try:
logger.info('Deleting track with ID {0}'.format(track_id))
track = client.delete('/tracks/%s' % track_id)
logger.info("Deleting track with ID {0}".format(track_id))
track = client.delete("/tracks/%s" % track_id)
except Exception as e:
logger.info('Error deleting track!')
logger.info("Error deleting track!")
raise e
return json.dumps(track.fields())
@celery.task(name='podcast-download', acks_late=True)
def podcast_download(id, url, callback_url, api_key, podcast_name, album_override, track_title):
@celery.task(name="podcast-download", acks_late=True)
def podcast_download(
id, url, callback_url, api_key, podcast_name, album_override, track_title
):
"""
Download a podcast episode
@ -146,12 +168,12 @@ def podcast_download(id, url, callback_url, api_key, podcast_name, album_overrid
"""
# Object to store file IDs, episode IDs, and download status
# (important if there's an error before the file is posted)
obj = { 'episodeid': id }
obj = {"episodeid": id}
try:
re = None
with closing(requests.get(url, stream=True)) as r:
filename = get_filename(r)
with tempfile.NamedTemporaryFile(mode ='wb+', delete=False) as audiofile:
with tempfile.NamedTemporaryFile(mode="wb+", delete=False) as audiofile:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, audiofile)
# mutagen should be able to guess the write file type
@ -162,44 +184,66 @@ def podcast_download(id, url, callback_url, api_key, podcast_name, album_overrid
mp3suffix = ("mp3", "MP3", "Mp3", "mP3")
# so we treat it like a mp3 if it has a mp3 file extension and hope for the best
if filename.endswith(mp3suffix):
metadata_audiofile = mutagen.mp3.MP3(audiofile.name, ID3=mutagen.easyid3.EasyID3)
#replace track metadata as indicated by album_override setting
metadata_audiofile = mutagen.mp3.MP3(
audiofile.name, ID3=mutagen.easyid3.EasyID3
)
# replace track metadata as indicated by album_override setting
# replace album title as needed
metadata_audiofile = podcast_override_metadata(metadata_audiofile, podcast_name, album_override, track_title)
metadata_audiofile = podcast_override_metadata(
metadata_audiofile, podcast_name, album_override, track_title
)
metadata_audiofile.save()
filetypeinfo = metadata_audiofile.pprint()
logger.info('filetypeinfo is {0}'.format(filetypeinfo.encode('ascii', 'ignore')))
re = requests.post(callback_url, files={'file': (filename, open(audiofile.name, 'rb'))}, auth=requests.auth.HTTPBasicAuth(api_key, ''))
logger.info(
"filetypeinfo is {0}".format(filetypeinfo.encode("ascii", "ignore"))
)
re = requests.post(
callback_url,
files={"file": (filename, open(audiofile.name, "rb"))},
auth=requests.auth.HTTPBasicAuth(api_key, ""),
)
re.raise_for_status()
f = json.loads(re.content) # Read the response from the media API to get the file id
obj['fileid'] = f['id']
obj['status'] = 1
f = json.loads(
re.content
) # Read the response from the media API to get the file id
obj["fileid"] = f["id"]
obj["status"] = 1
except Exception as e:
obj['error'] = e.message
logger.info('Error during file download: {0}'.format(e))
logger.debug('Original Traceback: %s' % (traceback.format_exc(e)))
obj['status'] = 0
obj["error"] = e.message
logger.info("Error during file download: {0}".format(e))
logger.debug("Original Traceback: %s" % (traceback.format_exc(e)))
obj["status"] = 0
return json.dumps(obj)
def podcast_override_metadata(m, podcast_name, override, track_title):
"""
Override m['album'] if empty or forced with override arg
"""
# if the album override option is enabled replace the album id3 tag with the podcast name even if the album tag contains data
if override is True:
logger.debug('overriding album name to {0} in podcast'.format(podcast_name.encode('ascii', 'ignore')))
m['album'] = podcast_name
m['title'] = track_title
m['artist'] = podcast_name
logger.debug(
"overriding album name to {0} in podcast".format(
podcast_name.encode("ascii", "ignore")
)
)
m["album"] = podcast_name
m["title"] = track_title
m["artist"] = podcast_name
else:
# replace the album id3 tag with the podcast name if the album tag is empty
try:
m['album']
m["album"]
except KeyError:
logger.debug('setting new album name to {0} in podcast'.format(podcast_name.encode('ascii', 'ignore')))
m['album'] = podcast_name
logger.debug(
"setting new album name to {0} in podcast".format(
podcast_name.encode("ascii", "ignore")
)
)
m["album"] = podcast_name
return m
def get_filename(r):
"""
Given a request object to a file resource, get the name of the file to be downloaded
@ -211,18 +255,20 @@ def get_filename(r):
:rtype: string
"""
# Try to get the filename from the content disposition
d = r.headers.get('Content-Disposition')
filename = ''
d = r.headers.get("Content-Disposition")
filename = ""
if d:
try:
_, params = cgi.parse_header(d)
filename = params['filename']
filename = params["filename"]
except Exception as e:
# We end up here if we get a Content-Disposition header with no filename
logger.warn("Couldn't find file name in Content-Disposition header, using url")
logger.warn(
"Couldn't find file name in Content-Disposition header, using url"
)
if not filename:
# Since we don't necessarily get the filename back in the response headers,
# parse the URL and get the filename and extension
path = urlparse.urlsplit(r.url).path
path = urlsplit(r.url).path
filename = posixpath.basename(path)
return filename

View File

@ -5,18 +5,20 @@ import sys
# Change directory since setuptools uses relative paths
script_path = os.path.dirname(os.path.realpath(__file__))
print script_path
print(script_path)
os.chdir(script_path)
install_args = ['install', 'install_data', 'develop']
install_args = ["install", "install_data", "develop"]
no_init = False
run_postinst = False
# XXX Definitely not the best way of doing this...
if sys.argv[1] in install_args and "--no-init-script" not in sys.argv:
run_postinst = True
data_files = [('/etc/default', ['install/conf/airtime-celery']),
('/etc/init.d', ['install/initd/airtime-celery'])]
data_files = [
("/etc/default", ["install/conf/airtime-celery"]),
("/etc/init.d", ["install/initd/airtime-celery"]),
]
else:
if "--no-init-script" in sys.argv:
no_init = True
@ -29,26 +31,24 @@ def postinst():
if not no_init:
# Make /etc/init.d file executable and set proper
# permissions for the defaults config file
os.chmod('/etc/init.d/airtime-celery', 0755)
os.chmod('/etc/default/airtime-celery', 0640)
print "Run \"sudo service airtime-celery restart\" now."
os.chmod("/etc/init.d/airtime-celery", 0o755)
os.chmod("/etc/default/airtime-celery", 0o640)
print('Run "sudo service airtime-celery restart" now.')
setup(name='airtime-celery',
version='0.1',
description='Airtime Celery service',
url='http://github.com/sourcefabric/Airtime',
author='Sourcefabric',
author_email='duncan.sommerville@sourcefabric.org',
license='MIT',
packages=['airtime-celery'],
install_requires=[
'soundcloud',
'celery < 4',
'kombu < 3.1',
'configobj'
],
zip_safe=False,
data_files=data_files)
setup(
name="airtime-celery",
version="0.1",
description="Airtime Celery service",
url="http://github.com/sourcefabric/Airtime",
author="Sourcefabric",
author_email="duncan.sommerville@sourcefabric.org",
license="MIT",
packages=["airtime-celery"],
install_requires=["soundcloud", "celery < 4", "kombu < 3.1", "configobj"],
zip_safe=False,
data_files=data_files,
)
if run_postinst:
postinst()