SAAS-1071 - more work on celery backend for podcasts; add upgrade to make file_id field in third_party_track_references nullable

This commit is contained in:
Duncan Sommerville 2015-09-24 15:57:38 -04:00
parent a24565669b
commit 43e9fb59ce
11 changed files with 127 additions and 58 deletions

View File

@ -0,0 +1 @@
ALTER TABLE third_party_track_references ALTER COLUMN file_id SET NOT NULL;

View File

@ -0,0 +1 @@
ALTER TABLE third_party_track_references ALTER COLUMN file_id DROP NOT NULL;

View File

@ -143,7 +143,9 @@ class Podcast extends BasePodcast
$podcastArray["episodes"] = array(); $podcastArray["episodes"] = array();
foreach ($rss->get_items() as $item) { foreach ($rss->get_items() as $item) {
/** @var SimplePie_Item $item */
array_push($podcastArray["episodes"], array( array_push($podcastArray["episodes"], array(
"guid" => $item->get_id(),
"title" => $item->get_title(), "title" => $item->get_title(),
"author" => $item->get_author()->get_name(), "author" => $item->get_author()->get_name(),
"description" => $item->get_description(), "description" => $item->get_description(),

View File

@ -125,8 +125,11 @@ class Rest_PodcastController extends Zend_Rest_Controller
try { try {
$requestData = json_decode($this->getRequest()->getRawBody(), true); $requestData = json_decode($this->getRequest()->getRawBody(), true);
// Create placeholders in PodcastEpisodes so we know these episodes are being downloaded
$this->_service->downloadEpisodes($requestData["podcast"]["episodes"]); // to prevent the user from trying to download them again while Celery is running
$episodes = $this->_service->addPodcastEpisodePlaceholders($requestData["podcast"]["id"],
$requestData["podcast"]["episodes"]);
$this->_service->downloadEpisodes($episodes);
$podcast = Podcast::updateFromArray($id, $requestData); $podcast = Podcast::updateFromArray($id, $requestData);
$this->getResponse() $this->getResponse()

View File

@ -65,15 +65,40 @@ class Application_Service_PodcastService extends Application_Service_ThirdPartyC
{ {
} }
/** /**
* Given an array of episodes, extract the download URLs and send them to Celery * Given an array of episodes, extract the download URLs and send them to Celery
* *
* @param int $podcastId Podcast object identifier
* @param array $episodes array of podcast episodes
*
* @return array the stored PodcastEpisodes objects
*/
public function addPodcastEpisodePlaceholders($podcastId, $episodes) {
$storedEpisodes = array();
foreach ($episodes as $episode) {
$e = new PodcastEpisodes();
$e->setDbPodcastId($podcastId);
$e->setDbDownloadUrl($episode["enclosure"]["link"]);
$e->setDbEpisodeGuid($episode["guid"]);
$e->setDbPublicationDate($episode["pub_date"]);
$e->save();
array_push($storedEpisodes, $e);
}
return $storedEpisodes;
}
/**
* Given an array of episodes, extract the IDs and download URLs and send them to Celery
*
* @param array $episodes array of podcast episodes * @param array $episodes array of podcast episodes
*/ */
public function downloadEpisodes($episodes) { public function downloadEpisodes($episodes) {
$episodeUrls = array(); $episodeUrls = array();
/** @var PodcastEpisodes $episode */
foreach($episodes as $episode) { foreach($episodes as $episode) {
array_push($episodeUrls, $episode["enclosure"]["link"]); array_push($episodeUrls, array("id" => $episode->getDbId(),
"url" => $episode->getDbDownloadUrl()));
} }
$this->_download($episodeUrls); $this->_download($episodeUrls);
} }
@ -81,45 +106,47 @@ class Application_Service_PodcastService extends Application_Service_ThirdPartyC
/** /**
* Given an array of download URLs, download RSS feed tracks * Given an array of download URLs, download RSS feed tracks
* *
* @param array $downloadUrls array of download URLs to send to Celery * @param array $episodes array of episodes containing download URLs and IDs to send to Celery
* TODO: do we need other parameters here...?
*/ */
private function _download($downloadUrls) { private function _download($episodes) {
$CC_CONFIG = Config::getConfig(); $CC_CONFIG = Config::getConfig();
$data = array( $data = array(
'download_urls' => $downloadUrls, 'episodes' => $episodes,
'callback_url' => Application_Common_HTTPHelper::getStationUrl() . '/rest/media', 'callback_url' => Application_Common_HTTPHelper::getStationUrl() . '/rest/media',
'api_key' => $apiKey = $CC_CONFIG["apiKey"][0], 'api_key' => $apiKey = $CC_CONFIG["apiKey"][0],
); );
// FIXME $this->_executeTask(static::$_CELERY_TASKS[self::DOWNLOAD], $data);
Logging::warn("FIXME: we can't create a task reference without a valid file ID");
$this->_executeTask(static::$_CELERY_TASKS[self::DOWNLOAD], $data, null);
} }
/** /**
* Update a ThirdPartyTrackReferences object for a completed upload * Update a ThirdPartyTrackReferences object for a completed upload
* *
* @param $task CeleryTasks the completed CeleryTasks object * @param $task CeleryTasks the completed CeleryTasks object
* @param $episodeId int PodcastEpisodes identifier * @param $episodeId int PodcastEpisodes identifier
* @param $episode object object containing Podcast episode information * @param $episodes array array containing Podcast episode information
* @param $status string Celery task status * @param $status string Celery task status
* *
* @return ThirdPartyTrackReferences the updated ThirdPartyTrackReferences object * @return ThirdPartyTrackReferences the updated ThirdPartyTrackReferences object
* *
* @throws Exception * @throws Exception
* @throws PropelException * @throws PropelException
*/ */
public function updateTrackReference($task, $episodeId, $episode, $status) { public function updateTrackReference($task, $episodeId, $episodes, $status) {
$ref = parent::updateTrackReference($task, $episodeId, $episode, $status); $ref = parent::updateTrackReference($task, $episodeId, $episodes, $status);
if ($status == CELERY_SUCCESS_STATUS) { if ($status == CELERY_SUCCESS_STATUS) {
// TODO: handle successful download foreach($episodes as $episode) {
// $ref->setDbForeignId(); // Since we process episode downloads as a batch, individual downloads can fail
// FIXME: we need the file ID here, but 'track' is too arbitrary... // even if the task itself succeeds
$ref->setDbFileId($episode->fileId); if ($episode->status) {
$dbEpisode = PodcastEpisodesQuery::create()
->findOneByDbId($episode->episodeid);
$dbEpisode->setDbFileId($episode->fileid)
->save();
}
}
} }
$ref->save();
return $ref; return $ref;
} }
} }

View File

@ -150,9 +150,7 @@ class Application_Service_SoundcloudService extends Application_Service_ThirdPar
'token' => $this->_accessToken, 'token' => $this->_accessToken,
'track_id' => $trackId 'track_id' => $trackId
); );
// FIXME $this->_executeTask(static::$_CELERY_TASKS[self::DOWNLOAD], $data);
Logging::warn("FIXME: we can't create a task reference without a valid file ID");
$this->_executeTask(static::$_CELERY_TASKS[self::DOWNLOAD], $data, null);
} }
/** /**

View File

@ -17,22 +17,16 @@ abstract class Application_Service_ThirdPartyCeleryService extends Application_S
/** /**
* Execute a Celery task with the given name and data parameters * Execute a Celery task with the given name and data parameters
* *
* FIXME: Currently, downloads will not create task reference rows because they
* don't have a valid file identifier - this means that we will never know if there
* is an issue with the download before the callback to /rest/media is called!
*
* @param string $taskName the name of the celery task to execute * @param string $taskName the name of the celery task to execute
* @param array $data the data array to send as task parameters * @param array $data the data array to send as task parameters
* @param int $fileId the unique identifier for the file involved in the task * @param int $fileId the unique identifier for the file involved in the task
*/ */
protected function _executeTask($taskName, $data, $fileId) { protected function _executeTask($taskName, $data, $fileId = null) {
try { try {
$brokerTaskId = CeleryManager::sendCeleryMessage($taskName, $brokerTaskId = CeleryManager::sendCeleryMessage($taskName,
static::$_CELERY_EXCHANGE_NAME, static::$_CELERY_EXCHANGE_NAME,
$data); $data);
if (!empty($fileId)) { $this->_createTaskReference($fileId, $brokerTaskId, $taskName);
$this->_createTaskReference($fileId, $brokerTaskId, $taskName);
}
} catch (Exception $e) { } catch (Exception $e) {
Logging::info("Invalid request: " . $e->getMessage()); Logging::info("Invalid request: " . $e->getMessage());
} }
@ -84,7 +78,7 @@ abstract class Application_Service_ThirdPartyCeleryService extends Application_S
* *
* @param $task CeleryTasks the completed CeleryTasks object * @param $task CeleryTasks the completed CeleryTasks object
* @param $trackId int ThirdPartyTrackReferences identifier * @param $trackId int ThirdPartyTrackReferences identifier
* @param $track object third-party service track object * @param $result mixed Celery task result message
* @param $status string Celery task status * @param $status string Celery task status
* *
* @return ThirdPartyTrackReferences the updated ThirdPartyTrackReferences object * @return ThirdPartyTrackReferences the updated ThirdPartyTrackReferences object
@ -92,7 +86,7 @@ abstract class Application_Service_ThirdPartyCeleryService extends Application_S
* @throws Exception * @throws Exception
* @throws PropelException * @throws PropelException
*/ */
public function updateTrackReference($task, $trackId, $track, $status) { public function updateTrackReference($task, $trackId, $result, $status) {
static::updateTask($task, $status); static::updateTask($task, $status);
$ref = ThirdPartyTrackReferencesQuery::create() $ref = ThirdPartyTrackReferencesQuery::create()
->findOneByDbId($trackId); ->findOneByDbId($trackId);

View File

@ -29,15 +29,14 @@ abstract class Application_Service_ThirdPartyService {
*/ */
public function createTrackReference($fileId) { public function createTrackReference($fileId) {
// First, check if the track already has an entry in the database // First, check if the track already has an entry in the database
$ref = ThirdPartyTrackReferencesQuery::create() // If the file ID given is null, create a new reference
$ref = is_null($fileId) ? null : ThirdPartyTrackReferencesQuery::create()
->filterByDbService(static::$_SERVICE_NAME) ->filterByDbService(static::$_SERVICE_NAME)
->findOneByDbFileId($fileId); ->findOneByDbFileId($fileId);
if (is_null($ref)) { if (is_null($ref)) {
$ref = new ThirdPartyTrackReferences(); $ref = new ThirdPartyTrackReferences();
} }
$ref->setDbService(static::$_SERVICE_NAME); $ref->setDbService(static::$_SERVICE_NAME);
// TODO: implement service-specific statuses?
// $ref->setDbStatus(CELERY_PENDING_STATUS);
$ref->setDbFileId($fileId); $ref->setDbFileId($fileId);
$ref->save(); $ref->save();
return $ref->getDbId(); return $ref->getDbId();

View File

@ -473,3 +473,23 @@ class AirtimeUpgrader2514 extends AirtimeUpgrader
return '2.5.14'; return '2.5.14';
} }
} }
/**
* Class AirtimeUpgrader2515
*
* SAAS-1071 - Remove not null constraint from file_id fk in third_party_track_references
* so that we can create track references for downloads (which won't have a file
* ID until the task is run and the file is POSTed back to Airtime)
*/
class AirtimeUpgrader2515 extends AirtimeUpgrader
{
protected function getSupportedSchemaVersions() {
return array (
'2.5.14'
);
}
public function getNewVersion() {
return '2.5.15';
}
}

View File

@ -102,6 +102,7 @@ var AIRTIME = (function (AIRTIME) {
mod.initPodcastEpisodeDatatable = function(episodes) { mod.initPodcastEpisodeDatatable = function(episodes) {
var aoColumns = [ var aoColumns = [
/* GUID */ { "sTitle" : "" , "mDataProp" : "guid" , "sClass" : "podcast_episodes_guid" , "bVisible" : false },
/* Title */ { "sTitle" : $.i18n._("Title") , "mDataProp" : "title" , "sClass" : "podcast_episodes_title" , "sWidth" : "170px" }, /* Title */ { "sTitle" : $.i18n._("Title") , "mDataProp" : "title" , "sClass" : "podcast_episodes_title" , "sWidth" : "170px" },
/* Author */ { "sTitle" : $.i18n._("Author") , "mDataProp" : "author" , "sClass" : "podcast_episodes_author" , "sWidth" : "170px" }, /* Author */ { "sTitle" : $.i18n._("Author") , "mDataProp" : "author" , "sClass" : "podcast_episodes_author" , "sWidth" : "170px" },
/* Description */ { "sTitle" : $.i18n._("Description") , "mDataProp" : "description" , "sClass" : "podcast_episodes_description" , "sWidth" : "300px" }, /* Description */ { "sTitle" : $.i18n._("Description") , "mDataProp" : "description" , "sClass" : "podcast_episodes_description" , "sWidth" : "300px" },

View File

@ -86,30 +86,53 @@ def soundcloud_delete(token, track_id):
@celery.task(name='podcast-download', acks_late=True) @celery.task(name='podcast-download', acks_late=True)
def podcast_download(download_urls, callback_url, api_key): def podcast_download(episodes, callback_url, api_key):
""" """
Download a given podcast episode Download a batch of podcast episodes
:param download_urls: array of download URLs for episodes to download :param episodes: array of episodes containing download URLs and IDs
:param callback_url: callback URL to send the downloaded file to :param callback_url: callback URL to send the downloaded file to
:param api_key: API key for callback authentication :param api_key: API key for callback authentication
:rtype: None :rtype: None
""" """
try: response = []
for url in download_urls: for episode in episodes:
with closing(requests.get(url, stream=True)) as r: logger.info(episode)
# Try to get the filename from the content disposition # Object to store file IDs, episode IDs, and download status
d = r.headers.get('Content-Disposition') # (important if there's an error before the file is posted)
if d: obj = { 'episodeid': episode['id'] }
_, params = cgi.parse_header(d) try:
filename = params['filename'] re = None
else: with closing(requests.get(episode['url'], stream=True)) as r:
# Since we don't necessarily get the filename back in the response headers, filename = get_filename(r)
# parse the URL and get the filename and extension re = requests.post(callback_url, files={'file': (filename, r.content)}, auth=requests.auth.HTTPBasicAuth(api_key, ''))
path = urlparse.urlsplit(r.url).path re.raise_for_status()
filename = posixpath.basename(path) f = json.loads(re.content) # Read the response from the media API to get the file id
requests.post(callback_url, files={'file': (filename, r.content)}, auth=requests.auth.HTTPBasicAuth(api_key, '')) obj['fileid'] = f['id']
except Exception as e: obj['status'] = 1
logger.info('Error during file download: {0}'.format(e.message)) response.append(obj)
logger.info(str(e)) except Exception as e:
raise e logger.info('Error during file download: {0}'.format(e.message))
obj['status'] = 0
return json.dumps(response)
def get_filename(r):
"""
Given a request object to a file resource, get the name of the file to be downloaded
by parsing either the content disposition or the request URL
:param r: request object
:rtype: string
"""
# Try to get the filename from the content disposition
d = r.headers.get('Content-Disposition')
if d:
_, params = cgi.parse_header(d)
filename = params['filename']
else:
# Since we don't necessarily get the filename back in the response headers,
# parse the URL and get the filename and extension
path = urlparse.urlsplit(r.url).path
filename = posixpath.basename(path)
return filename