Add SoundCloud update and download tasks to Celery backend; requires airtime-celery reinstall

This commit is contained in:
Duncan Sommerville 2015-10-30 16:10:16 -04:00
parent bf97c42167
commit 4f281a30ed
9 changed files with 131 additions and 83 deletions

View file

@ -25,14 +25,14 @@ def parse_rmq_config(rmq_config):
BROKER_URL = get_rmq_broker()
CELERY_RESULT_BACKEND = 'amqp' # Use RabbitMQ as the celery backend
CELERY_RESULT_PERSISTENT = True # Persist through a broker restart
CELERY_TASK_RESULT_EXPIRES = 600 # Expire task results after 10 minutes
CELERY_TASK_RESULT_EXPIRES = 900 # Expire task results after 15 minutes
CELERY_RESULT_EXCHANGE = 'celeryresults' # Default exchange - needed due to php-celery
CELERY_QUEUES = (
Queue('soundcloud', exchange=Exchange('soundcloud'), routing_key='soundcloud'),
Queue('podcast', exchange=Exchange('podcast'), routing_key='podcast'),
Queue(exchange=Exchange('celeryresults'), auto_delete=True),
)
CELERY_EVENT_QUEUE_EXPIRES = 600 # RabbitMQ x-expire after 10 minutes
CELERY_EVENT_QUEUE_EXPIRES = 900 # RabbitMQ x-expire after 15 minutes
# Celery task settings
CELERY_TASK_SERIALIZER = 'json'

View file

@ -23,8 +23,8 @@ def soundcloud_upload(data, token, file_path):
:param token: OAuth2 client access token
:param file_path: path to the file being uploaded
:return: the SoundCloud response object
:rtype: dict
:return: JSON formatted string of the SoundCloud response object
:rtype: string
"""
client = soundcloud.Client(access_token=token)
# Open the file with urllib2 if it's a cloud file
@ -40,28 +40,65 @@ def soundcloud_upload(data, token, file_path):
@celery.task(name='soundcloud-download', acks_late=True)
def soundcloud_download(token, callback_url, api_key, track_id=None):
def soundcloud_download(token, callback_url, api_key, track_id):
"""
This is in stasis
Download a file from SoundCloud
:param token: OAuth2 client access token
:param callback_url: callback URL to send the downloaded file to
:param api_key: API key for callback authentication
:param track_id: SoundCloud track identifier
:rtype: None
:return: JSON formatted string of file identifiers for the downloaded tracks
:rtype: string
"""
client = soundcloud.Client(access_token=token)
obj = {}
try:
track = client.get('/tracks/%s' % track_id)
obj.update(track.fields())
if track.downloadable:
re = None
with closing(requests.get('%s?oauth_token=%s' % (track.download_url, client.access_token), verify=True, stream=True)) as r:
filename = get_filename(r)
re = requests.post(callback_url, files={'file': (filename, r.content)}, auth=requests.auth.HTTPBasicAuth(api_key, ''))
re.raise_for_status()
f = json.loads(re.content) # Read the response from the media API to get the file id
obj['fileid'] = f['id']
else:
# manually update the task state
self.update_state(
state = states.FAILURE,
meta = 'Track %s is not flagged as downloadable!' % track.title
)
# ignore the task so no other state is recorded
raise Ignore()
except Exception as e:
logger.info('Error during file download: {0}'.format(e.message))
raise e
return json.dumps(obj)
@celery.task(name='soundcloud-update', acks_late=True)
def soundcloud_update(data, token, track_id):
"""
Update a file on SoundCloud
:param data: associative array containing SoundCloud metadata
:param token: OAuth2 client access token
:param track_id: SoundCloud ID of the track to be updated
:return: JSON formatted string of the SoundCloud response object
:rtype: string
"""
client = soundcloud.Client(access_token=token)
try:
tracks = client.get('/me/tracks') if track_id is None else {client.get('/tracks/%s' % track_id)}
for track in tracks:
if track.downloadable:
track_file = client.get(track.download_url)
with track_file as f:
requests.post(callback_url, data=f, auth=requests.auth.HTTPBasicAuth(api_key, ''))
logger.info('Updating track {title}'.format(**data))
track = client.put('/tracks/%s' % track_id, track=data)
except Exception as e:
logger.info('Error during file download: {0}'.format(e.message))
logger.info(str(e))
logger.info('Error updating track {title}: {0}'.format(e.message, **data))
raise e
return json.dumps(track.fields())
@celery.task(name='soundcloud-delete', acks_late=True)
@ -72,8 +109,8 @@ def soundcloud_delete(token, track_id):
:param token: OAuth2 client access token
:param track_id: SoundCloud track identifier
:return: the SoundCloud response object
:rtype: dict
:return: JSON formatted string of the SoundCloud response object
:rtype: string
"""
client = soundcloud.Client(access_token=token)
try:
@ -94,7 +131,10 @@ def podcast_download(id, url, callback_url, api_key):
:param url: download url for the episode
:param callback_url: callback URL to send the downloaded file to
:param api_key: API key for callback authentication
:rtype: None
:return: JSON formatted string of a dictionary of download statuses
and file identifiers (for successful uploads)
:rtype: string
"""
# Object to store file IDs, episode IDs, and download status
# (important if there's an error before the file is posted)
@ -121,6 +161,8 @@ def get_filename(r):
by parsing either the content disposition or the request URL
:param r: request object
:return: the file name
:rtype: string
"""
# Try to get the filename from the content disposition