Initial work on automatic ingest for imported podcasts

This commit is contained in:
Duncan Sommerville 2015-10-15 14:44:17 -04:00
parent a2d725f2b9
commit bddc121c2d
7 changed files with 157 additions and 20 deletions

View file

@ -17,6 +17,11 @@ class CeleryManager {
*/
private static $_CELERY_RESULTS_EXCHANGE = 'celeryresults';
/**
* @var PropelCollection cache of any pending CeleryTasks results for a service or task
*/
private static $_pendingTasks;
/**
* Connect to the Celery daemon via amqp
*
@ -79,7 +84,7 @@ class CeleryManager {
// If the message isn't ready yet (Celery hasn't finished the task), throw an exception.
if ($message == FALSE) {
if (self::_checkMessageTimeout($task)) {
if (static::_checkMessageTimeout($task)) {
// If the task times out, mark it as failed. We don't want to remove the
// track reference here in case it was a deletion that failed, for example.
$task->setDbStatus(CELERY_FAILED_STATUS)->save();
@ -103,9 +108,9 @@ class CeleryManager {
*
* @return bool true if there are any pending tasks, otherwise false
*/
public static function isBrokerTaskQueueEmpty($taskName="", $serviceName = "") {
$pendingTasks = self::_getPendingTasks($taskName, $serviceName);
return empty($pendingTasks);
public static function isBrokerTaskQueueEmpty($taskName = "", $serviceName = "") {
self::$_pendingTasks = static::_getPendingTasks($taskName, $serviceName);
return empty(self::$_pendingTasks);
}
/**
@ -119,11 +124,12 @@ class CeleryManager {
* @param string $serviceName the name of the service to poll for
*/
public static function pollBrokerTaskQueue($taskName = "", $serviceName = "") {
$pendingTasks = self::_getPendingTasks($taskName, $serviceName);
$pendingTasks = empty(self::$_pendingTasks) ? static::_getPendingTasks($taskName, $serviceName)
: self::$_pendingTasks;
foreach ($pendingTasks as $task) {
try {
$message = self::_getTaskMessage($task);
self::_processTaskMessage($task, $message);
$message = static::_getTaskMessage($task);
static::_processTaskMessage($task, $message);
} catch (CeleryTimeoutException $e) {
Logging::warn($e->getMessage());
} catch (Exception $e) {

View file

@ -0,0 +1,74 @@
<?php
class PodcastManager {
/**
* @var int how often, in seconds, to check for and ingest new podcast episodes
*/
private static $_PODCAST_POLL_INTERVAL_SECONDS = 3600; // 1 hour
/**
* Check whether $_PODCAST_POLL_INTERVAL_SECONDS have passed since the last call to
* downloadNewestEpisodes
*
* @return bool true if $_PODCAST_POLL_INTERVAL_SECONDS has passed since the last check
*/
public static function hasPodcastPollIntervalPassed() {
$lastPolled = Application_Model_Preference::getPodcastPollLock();
return empty($lastPolled) || (microtime(true) > $lastPolled + self::$_PODCAST_POLL_INTERVAL_SECONDS);
}
/**
* Find all podcasts flagged for automatic ingest whose most recent episode has
* yet to be downloaded and download it with Celery
*
* @throws InvalidPodcastException
* @throws PodcastNotFoundException
*/
public static function downloadNewestEpisodes() {
$autoIngestPodcasts = static::_getAutoIngestPodcasts();
$service = new Application_Service_PodcastEpisodeService();
$episodes = array();
foreach ($autoIngestPodcasts as $podcast) {
/** @var ImportedPodcast $podcast */
$podcastArray = Application_Service_PodcastService::getPodcastById($podcast->getDbId());
// A bit hacky... sort the episodes by publication date to get the most recent
usort($podcastArray["episodes"], array(static::class, "_sortByEpisodePubDate"));
$episodeData = $podcastArray["episodes"][0];
$episode = PodcastEpisodesQuery::create()->findOneByDbEpisodeGuid($episodeData["guid"]);
// Make sure there's no existing episode placeholder or import, and that the data is non-empty
if (empty($episode) && !empty($episodeData)) {
$placeholder = $service->addPodcastEpisodePlaceholder($podcast->getDbId(), $episodeData);
array_push($episodes, $placeholder);
}
}
$service->downloadEpisodes($episodes);
Application_Model_Preference::setPodcastPollLock(microtime(true));
}
/**
* Find all podcasts flagged for automatic ingest
*
* @return PropelObjectCollection collection of ImportedPodcast objects
* flagged for automatic ingest
*/
protected static function _getAutoIngestPodcasts() {
return ImportedPodcastQuery::create()
->filterByDbAutoIngest(true)
->find();
}
/**
* Custom sort function for podcast episodes
*
* @param array $a first episode array to compare
* @param array $b second episode array to compare
* @return bool boolean for ordering
*/
protected static function _sortByEpisodePubDate($a, $b) {
if ($a["pub_date"] == $b["pub_date"]) return 0;
return ($a["pub_date"] < $b["pub_date"]) ? 1 : -1; // Descending order
}
}

View file

@ -149,7 +149,7 @@ final class TaskManager {
}
/**
* Interface AirtimeTask Interface for task operations - also acts as task type ENUM
* Interface AirtimeTask Interface for task operations
*/
interface AirtimeTask {
@ -215,6 +215,29 @@ class CeleryTask implements AirtimeTask {
}
/**
* Class PodcastTask
*/
class PodcastTask implements AirtimeTask {
/**
* Check whether or not the podcast polling interval has passed
*
* @return bool true if the podcast polling interval has passed
*/
public function shouldBeRun() {
return PodcastManager::hasPodcastPollIntervalPassed();
}
/**
* Download the latest episode for all podcasts flagged for automatic ingest
*/
public function run() {
PodcastManager::downloadNewestEpisodes();
}
}
/**
* Class TaskFactory Factory class to abstract task instantiation
*/
@ -225,8 +248,9 @@ class TaskFactory {
* Task types - values don't really matter as long as they're unique
*/
const UPGRADE = "upgrade";
const CELERY = "celery";
const UPGRADE = "upgrade";
const CELERY = "celery";
const PODCAST = "podcast";
/**
* @var array map of arbitrary identifiers to class names to be instantiated reflectively
@ -234,6 +258,7 @@ class TaskFactory {
public static $tasks = array(
"upgrade" => "UpgradeTask",
"celery" => "CeleryTask",
"podcast" => "PodcastTask",
);
/**