Merge pull request #854 from radiorabe/chore/py3-cleanup-for-celery
Python3 cleanup in airtime-celery package
This commit is contained in:
commit
07a9ef4ba3
|
@ -1,3 +1,4 @@
|
|||
import os
|
||||
|
||||
# Make the celeryconfig module visible to celery
|
||||
os.environ['CELERY_CONFIG_MODULE'] = 'airtime-celery.celeryconfig'
|
||||
os.environ["CELERY_CONFIG_MODULE"] = "airtime-celery.celeryconfig"
|
||||
|
|
|
@ -7,36 +7,37 @@ RMQ_CONFIG_SECTION = "rabbitmq"
|
|||
|
||||
|
||||
def get_rmq_broker():
|
||||
rmq_config = ConfigObj(os.environ['RMQ_CONFIG_FILE'])
|
||||
rmq_config = ConfigObj(os.environ["RMQ_CONFIG_FILE"])
|
||||
rmq_settings = parse_rmq_config(rmq_config)
|
||||
return 'amqp://{username}:{password}@{host}:{port}/{vhost}'.format(**rmq_settings)
|
||||
return "amqp://{username}:{password}@{host}:{port}/{vhost}".format(**rmq_settings)
|
||||
|
||||
|
||||
def parse_rmq_config(rmq_config):
|
||||
return {
|
||||
'host' : rmq_config[RMQ_CONFIG_SECTION]['host'],
|
||||
'port' : rmq_config[RMQ_CONFIG_SECTION]['port'],
|
||||
'username': rmq_config[RMQ_CONFIG_SECTION]['user'],
|
||||
'password': rmq_config[RMQ_CONFIG_SECTION]['password'],
|
||||
'vhost' : rmq_config[RMQ_CONFIG_SECTION]['vhost']
|
||||
"host": rmq_config[RMQ_CONFIG_SECTION]["host"],
|
||||
"port": rmq_config[RMQ_CONFIG_SECTION]["port"],
|
||||
"username": rmq_config[RMQ_CONFIG_SECTION]["user"],
|
||||
"password": rmq_config[RMQ_CONFIG_SECTION]["password"],
|
||||
"vhost": rmq_config[RMQ_CONFIG_SECTION]["vhost"],
|
||||
}
|
||||
|
||||
|
||||
# Celery amqp settings
|
||||
BROKER_URL = get_rmq_broker()
|
||||
CELERY_RESULT_BACKEND = 'amqp' # Use RabbitMQ as the celery backend
|
||||
CELERY_RESULT_PERSISTENT = True # Persist through a broker restart
|
||||
CELERY_TASK_RESULT_EXPIRES = 900 # Expire task results after 15 minutes
|
||||
CELERY_RESULT_EXCHANGE = 'celeryresults' # Default exchange - needed due to php-celery
|
||||
CELERY_RESULT_BACKEND = "amqp" # Use RabbitMQ as the celery backend
|
||||
CELERY_RESULT_PERSISTENT = True # Persist through a broker restart
|
||||
CELERY_TASK_RESULT_EXPIRES = 900 # Expire task results after 15 minutes
|
||||
CELERY_RESULT_EXCHANGE = "celeryresults" # Default exchange - needed due to php-celery
|
||||
CELERY_QUEUES = (
|
||||
Queue('soundcloud', exchange=Exchange('soundcloud'), routing_key='soundcloud'),
|
||||
Queue('podcast', exchange=Exchange('podcast'), routing_key='podcast'),
|
||||
Queue(exchange=Exchange('celeryresults'), auto_delete=True),
|
||||
Queue("soundcloud", exchange=Exchange("soundcloud"), routing_key="soundcloud"),
|
||||
Queue("podcast", exchange=Exchange("podcast"), routing_key="podcast"),
|
||||
Queue(exchange=Exchange("celeryresults"), auto_delete=True),
|
||||
)
|
||||
CELERY_EVENT_QUEUE_EXPIRES = 900 # RabbitMQ x-expire after 15 minutes
|
||||
CELERY_EVENT_QUEUE_EXPIRES = 900 # RabbitMQ x-expire after 15 minutes
|
||||
|
||||
# Celery task settings
|
||||
CELERY_TASK_SERIALIZER = 'json'
|
||||
CELERY_RESULT_SERIALIZER = 'json'
|
||||
CELERY_ACCEPT_CONTENT = ['json']
|
||||
CELERY_TIMEZONE = 'Europe/Berlin'
|
||||
CELERY_TASK_SERIALIZER = "json"
|
||||
CELERY_RESULT_SERIALIZER = "json"
|
||||
CELERY_ACCEPT_CONTENT = ["json"]
|
||||
CELERY_TIMEZONE = "Europe/Berlin"
|
||||
CELERY_ENABLE_UTC = True
|
||||
|
|
|
@ -1,25 +1,29 @@
|
|||
from future.standard_library import install_aliases
|
||||
|
||||
install_aliases()
|
||||
|
||||
import os
|
||||
import json
|
||||
import urllib2
|
||||
import requests
|
||||
import soundcloud
|
||||
import cgi
|
||||
import urlparse
|
||||
import posixpath
|
||||
import shutil
|
||||
import tempfile
|
||||
import traceback
|
||||
import mutagen
|
||||
from StringIO import StringIO
|
||||
from io import StringIO
|
||||
from celery import Celery
|
||||
from celery.utils.log import get_task_logger
|
||||
from contextlib import closing
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
|
||||
celery = Celery()
|
||||
logger = get_task_logger(__name__)
|
||||
|
||||
|
||||
@celery.task(name='soundcloud-upload', acks_late=True)
|
||||
@celery.task(name="soundcloud-upload", acks_late=True)
|
||||
def soundcloud_upload(data, token, file_path):
|
||||
"""
|
||||
Upload a file to SoundCloud
|
||||
|
@ -32,19 +36,23 @@ def soundcloud_upload(data, token, file_path):
|
|||
:rtype: string
|
||||
"""
|
||||
client = soundcloud.Client(access_token=token)
|
||||
# Open the file with urllib2 if it's a cloud file
|
||||
data['asset_data'] = open(file_path, 'rb') if os.path.isfile(file_path) else urllib2.urlopen(file_path)
|
||||
# Open the file with requests if it's a cloud file
|
||||
data["asset_data"] = (
|
||||
open(file_path, "rb")
|
||||
if os.path.isfile(file_path)
|
||||
else requests.get(file_path).content
|
||||
)
|
||||
try:
|
||||
logger.info('Uploading track: {0}'.format(data))
|
||||
track = client.post('/tracks', track=data)
|
||||
logger.info("Uploading track: {0}".format(data))
|
||||
track = client.post("/tracks", track=data)
|
||||
except Exception as e:
|
||||
logger.info('Error uploading track {title}: {0}'.format(e.message, **data))
|
||||
logger.info("Error uploading track {title}: {0}".format(e.message, **data))
|
||||
raise e
|
||||
data['asset_data'].close()
|
||||
data["asset_data"].close()
|
||||
return json.dumps(track.fields())
|
||||
|
||||
|
||||
@celery.task(name='soundcloud-download', acks_late=True)
|
||||
@celery.task(name="soundcloud-download", acks_late=True)
|
||||
def soundcloud_download(token, callback_url, api_key, track_id):
|
||||
"""
|
||||
Download a file from SoundCloud
|
||||
|
@ -60,31 +68,43 @@ def soundcloud_download(token, callback_url, api_key, track_id):
|
|||
client = soundcloud.Client(access_token=token)
|
||||
obj = {}
|
||||
try:
|
||||
track = client.get('/tracks/%s' % track_id)
|
||||
track = client.get("/tracks/%s" % track_id)
|
||||
obj.update(track.fields())
|
||||
if track.downloadable:
|
||||
re = None
|
||||
with closing(requests.get('%s?oauth_token=%s' % (track.download_url, client.access_token), verify=True, stream=True)) as r:
|
||||
with closing(
|
||||
requests.get(
|
||||
"%s?oauth_token=%s" % (track.download_url, client.access_token),
|
||||
verify=True,
|
||||
stream=True,
|
||||
)
|
||||
) as r:
|
||||
filename = get_filename(r)
|
||||
re = requests.post(callback_url, files={'file': (filename, r.content)}, auth=requests.auth.HTTPBasicAuth(api_key, ''))
|
||||
re = requests.post(
|
||||
callback_url,
|
||||
files={"file": (filename, r.content)},
|
||||
auth=requests.auth.HTTPBasicAuth(api_key, ""),
|
||||
)
|
||||
re.raise_for_status()
|
||||
f = json.loads(re.content) # Read the response from the media API to get the file id
|
||||
obj['fileid'] = f['id']
|
||||
f = json.loads(
|
||||
re.content
|
||||
) # Read the response from the media API to get the file id
|
||||
obj["fileid"] = f["id"]
|
||||
else:
|
||||
# manually update the task state
|
||||
self.update_state(
|
||||
state = states.FAILURE,
|
||||
meta = 'Track %s is not flagged as downloadable!' % track.title
|
||||
state=states.FAILURE,
|
||||
meta="Track %s is not flagged as downloadable!" % track.title,
|
||||
)
|
||||
# ignore the task so no other state is recorded
|
||||
raise Ignore()
|
||||
except Exception as e:
|
||||
logger.info('Error during file download: {0}'.format(e.message))
|
||||
logger.info("Error during file download: {0}".format(e.message))
|
||||
raise e
|
||||
return json.dumps(obj)
|
||||
|
||||
|
||||
@celery.task(name='soundcloud-update', acks_late=True)
|
||||
@celery.task(name="soundcloud-update", acks_late=True)
|
||||
def soundcloud_update(data, token, track_id):
|
||||
"""
|
||||
Update a file on SoundCloud
|
||||
|
@ -98,15 +118,15 @@ def soundcloud_update(data, token, track_id):
|
|||
"""
|
||||
client = soundcloud.Client(access_token=token)
|
||||
try:
|
||||
logger.info('Updating track {title}'.format(**data))
|
||||
track = client.put('/tracks/%s' % track_id, track=data)
|
||||
logger.info("Updating track {title}".format(**data))
|
||||
track = client.put("/tracks/%s" % track_id, track=data)
|
||||
except Exception as e:
|
||||
logger.info('Error updating track {title}: {0}'.format(e.message, **data))
|
||||
logger.info("Error updating track {title}: {0}".format(e.message, **data))
|
||||
raise e
|
||||
return json.dumps(track.fields())
|
||||
|
||||
|
||||
@celery.task(name='soundcloud-delete', acks_late=True)
|
||||
@celery.task(name="soundcloud-delete", acks_late=True)
|
||||
def soundcloud_delete(token, track_id):
|
||||
"""
|
||||
Delete a file from SoundCloud
|
||||
|
@ -119,16 +139,18 @@ def soundcloud_delete(token, track_id):
|
|||
"""
|
||||
client = soundcloud.Client(access_token=token)
|
||||
try:
|
||||
logger.info('Deleting track with ID {0}'.format(track_id))
|
||||
track = client.delete('/tracks/%s' % track_id)
|
||||
logger.info("Deleting track with ID {0}".format(track_id))
|
||||
track = client.delete("/tracks/%s" % track_id)
|
||||
except Exception as e:
|
||||
logger.info('Error deleting track!')
|
||||
logger.info("Error deleting track!")
|
||||
raise e
|
||||
return json.dumps(track.fields())
|
||||
|
||||
|
||||
@celery.task(name='podcast-download', acks_late=True)
|
||||
def podcast_download(id, url, callback_url, api_key, podcast_name, album_override, track_title):
|
||||
@celery.task(name="podcast-download", acks_late=True)
|
||||
def podcast_download(
|
||||
id, url, callback_url, api_key, podcast_name, album_override, track_title
|
||||
):
|
||||
"""
|
||||
Download a podcast episode
|
||||
|
||||
|
@ -146,12 +168,12 @@ def podcast_download(id, url, callback_url, api_key, podcast_name, album_overrid
|
|||
"""
|
||||
# Object to store file IDs, episode IDs, and download status
|
||||
# (important if there's an error before the file is posted)
|
||||
obj = { 'episodeid': id }
|
||||
obj = {"episodeid": id}
|
||||
try:
|
||||
re = None
|
||||
with closing(requests.get(url, stream=True)) as r:
|
||||
filename = get_filename(r)
|
||||
with tempfile.NamedTemporaryFile(mode ='wb+', delete=False) as audiofile:
|
||||
with tempfile.NamedTemporaryFile(mode="wb+", delete=False) as audiofile:
|
||||
r.raw.decode_content = True
|
||||
shutil.copyfileobj(r.raw, audiofile)
|
||||
# mutagen should be able to guess the write file type
|
||||
|
@ -162,44 +184,66 @@ def podcast_download(id, url, callback_url, api_key, podcast_name, album_overrid
|
|||
mp3suffix = ("mp3", "MP3", "Mp3", "mP3")
|
||||
# so we treat it like a mp3 if it has a mp3 file extension and hope for the best
|
||||
if filename.endswith(mp3suffix):
|
||||
metadata_audiofile = mutagen.mp3.MP3(audiofile.name, ID3=mutagen.easyid3.EasyID3)
|
||||
#replace track metadata as indicated by album_override setting
|
||||
metadata_audiofile = mutagen.mp3.MP3(
|
||||
audiofile.name, ID3=mutagen.easyid3.EasyID3
|
||||
)
|
||||
# replace track metadata as indicated by album_override setting
|
||||
# replace album title as needed
|
||||
metadata_audiofile = podcast_override_metadata(metadata_audiofile, podcast_name, album_override, track_title)
|
||||
metadata_audiofile = podcast_override_metadata(
|
||||
metadata_audiofile, podcast_name, album_override, track_title
|
||||
)
|
||||
metadata_audiofile.save()
|
||||
filetypeinfo = metadata_audiofile.pprint()
|
||||
logger.info('filetypeinfo is {0}'.format(filetypeinfo.encode('ascii', 'ignore')))
|
||||
re = requests.post(callback_url, files={'file': (filename, open(audiofile.name, 'rb'))}, auth=requests.auth.HTTPBasicAuth(api_key, ''))
|
||||
logger.info(
|
||||
"filetypeinfo is {0}".format(filetypeinfo.encode("ascii", "ignore"))
|
||||
)
|
||||
re = requests.post(
|
||||
callback_url,
|
||||
files={"file": (filename, open(audiofile.name, "rb"))},
|
||||
auth=requests.auth.HTTPBasicAuth(api_key, ""),
|
||||
)
|
||||
re.raise_for_status()
|
||||
f = json.loads(re.content) # Read the response from the media API to get the file id
|
||||
obj['fileid'] = f['id']
|
||||
obj['status'] = 1
|
||||
f = json.loads(
|
||||
re.content
|
||||
) # Read the response from the media API to get the file id
|
||||
obj["fileid"] = f["id"]
|
||||
obj["status"] = 1
|
||||
except Exception as e:
|
||||
obj['error'] = e.message
|
||||
logger.info('Error during file download: {0}'.format(e))
|
||||
logger.debug('Original Traceback: %s' % (traceback.format_exc(e)))
|
||||
obj['status'] = 0
|
||||
obj["error"] = e.message
|
||||
logger.info("Error during file download: {0}".format(e))
|
||||
logger.debug("Original Traceback: %s" % (traceback.format_exc(e)))
|
||||
obj["status"] = 0
|
||||
return json.dumps(obj)
|
||||
|
||||
|
||||
def podcast_override_metadata(m, podcast_name, override, track_title):
|
||||
"""
|
||||
Override m['album'] if empty or forced with override arg
|
||||
"""
|
||||
# if the album override option is enabled replace the album id3 tag with the podcast name even if the album tag contains data
|
||||
if override is True:
|
||||
logger.debug('overriding album name to {0} in podcast'.format(podcast_name.encode('ascii', 'ignore')))
|
||||
m['album'] = podcast_name
|
||||
m['title'] = track_title
|
||||
m['artist'] = podcast_name
|
||||
logger.debug(
|
||||
"overriding album name to {0} in podcast".format(
|
||||
podcast_name.encode("ascii", "ignore")
|
||||
)
|
||||
)
|
||||
m["album"] = podcast_name
|
||||
m["title"] = track_title
|
||||
m["artist"] = podcast_name
|
||||
else:
|
||||
# replace the album id3 tag with the podcast name if the album tag is empty
|
||||
try:
|
||||
m['album']
|
||||
m["album"]
|
||||
except KeyError:
|
||||
logger.debug('setting new album name to {0} in podcast'.format(podcast_name.encode('ascii', 'ignore')))
|
||||
m['album'] = podcast_name
|
||||
logger.debug(
|
||||
"setting new album name to {0} in podcast".format(
|
||||
podcast_name.encode("ascii", "ignore")
|
||||
)
|
||||
)
|
||||
m["album"] = podcast_name
|
||||
return m
|
||||
|
||||
|
||||
def get_filename(r):
|
||||
"""
|
||||
Given a request object to a file resource, get the name of the file to be downloaded
|
||||
|
@ -211,18 +255,20 @@ def get_filename(r):
|
|||
:rtype: string
|
||||
"""
|
||||
# Try to get the filename from the content disposition
|
||||
d = r.headers.get('Content-Disposition')
|
||||
filename = ''
|
||||
d = r.headers.get("Content-Disposition")
|
||||
filename = ""
|
||||
if d:
|
||||
try:
|
||||
_, params = cgi.parse_header(d)
|
||||
filename = params['filename']
|
||||
filename = params["filename"]
|
||||
except Exception as e:
|
||||
# We end up here if we get a Content-Disposition header with no filename
|
||||
logger.warn("Couldn't find file name in Content-Disposition header, using url")
|
||||
logger.warn(
|
||||
"Couldn't find file name in Content-Disposition header, using url"
|
||||
)
|
||||
if not filename:
|
||||
# Since we don't necessarily get the filename back in the response headers,
|
||||
# parse the URL and get the filename and extension
|
||||
path = urlparse.urlsplit(r.url).path
|
||||
path = urlsplit(r.url).path
|
||||
filename = posixpath.basename(path)
|
||||
return filename
|
||||
|
|
|
@ -5,18 +5,20 @@ import sys
|
|||
|
||||
# Change directory since setuptools uses relative paths
|
||||
script_path = os.path.dirname(os.path.realpath(__file__))
|
||||
print script_path
|
||||
print(script_path)
|
||||
os.chdir(script_path)
|
||||
|
||||
install_args = ['install', 'install_data', 'develop']
|
||||
install_args = ["install", "install_data", "develop"]
|
||||
no_init = False
|
||||
run_postinst = False
|
||||
|
||||
# XXX Definitely not the best way of doing this...
|
||||
if sys.argv[1] in install_args and "--no-init-script" not in sys.argv:
|
||||
run_postinst = True
|
||||
data_files = [('/etc/default', ['install/conf/airtime-celery']),
|
||||
('/etc/init.d', ['install/initd/airtime-celery'])]
|
||||
data_files = [
|
||||
("/etc/default", ["install/conf/airtime-celery"]),
|
||||
("/etc/init.d", ["install/initd/airtime-celery"]),
|
||||
]
|
||||
else:
|
||||
if "--no-init-script" in sys.argv:
|
||||
no_init = True
|
||||
|
@ -29,26 +31,24 @@ def postinst():
|
|||
if not no_init:
|
||||
# Make /etc/init.d file executable and set proper
|
||||
# permissions for the defaults config file
|
||||
os.chmod('/etc/init.d/airtime-celery', 0755)
|
||||
os.chmod('/etc/default/airtime-celery', 0640)
|
||||
print "Run \"sudo service airtime-celery restart\" now."
|
||||
os.chmod("/etc/init.d/airtime-celery", 0o755)
|
||||
os.chmod("/etc/default/airtime-celery", 0o640)
|
||||
print('Run "sudo service airtime-celery restart" now.')
|
||||
|
||||
setup(name='airtime-celery',
|
||||
version='0.1',
|
||||
description='Airtime Celery service',
|
||||
url='http://github.com/sourcefabric/Airtime',
|
||||
author='Sourcefabric',
|
||||
author_email='duncan.sommerville@sourcefabric.org',
|
||||
license='MIT',
|
||||
packages=['airtime-celery'],
|
||||
install_requires=[
|
||||
'soundcloud',
|
||||
'celery < 4',
|
||||
'kombu < 3.1',
|
||||
'configobj'
|
||||
],
|
||||
zip_safe=False,
|
||||
data_files=data_files)
|
||||
|
||||
setup(
|
||||
name="airtime-celery",
|
||||
version="0.1",
|
||||
description="Airtime Celery service",
|
||||
url="http://github.com/sourcefabric/Airtime",
|
||||
author="Sourcefabric",
|
||||
author_email="duncan.sommerville@sourcefabric.org",
|
||||
license="MIT",
|
||||
packages=["airtime-celery"],
|
||||
install_requires=["soundcloud", "celery < 4", "kombu < 3.1", "configobj"],
|
||||
zip_safe=False,
|
||||
data_files=data_files,
|
||||
)
|
||||
|
||||
if run_postinst:
|
||||
postinst()
|
||||
|
|
Loading…
Reference in New Issue