From bddc121c2dc06755de9dd2566a8e69f6ac020191 Mon Sep 17 00:00:00 2001 From: Duncan Sommerville Date: Thu, 15 Oct 2015 14:44:17 -0400 Subject: [PATCH] Initial work on automatic ingest for imported podcasts --- airtime_mvc/application/Bootstrap.php | 1 + .../application/common/CeleryManager.php | 20 +++-- .../application/common/PodcastManager.php | 74 +++++++++++++++++++ .../application/common/TaskManager.php | 31 +++++++- airtime_mvc/application/models/Preference.php | 8 ++ .../services/PodcastEpisodeService.php | 37 ++++++++-- .../application/services/PodcastService.php | 6 +- 7 files changed, 157 insertions(+), 20 deletions(-) create mode 100644 airtime_mvc/application/common/PodcastManager.php diff --git a/airtime_mvc/application/Bootstrap.php b/airtime_mvc/application/Bootstrap.php index 763a4cb64..2fa903fcd 100644 --- a/airtime_mvc/application/Bootstrap.php +++ b/airtime_mvc/application/Bootstrap.php @@ -28,6 +28,7 @@ require_once "GoogleAnalytics.php"; require_once "Timezone.php"; require_once "CeleryManager.php"; require_once "TaskManager.php"; +require_once "PodcastManager.php"; require_once "UsabilityHints.php"; require_once __DIR__.'/models/formatters/LengthFormatter.php'; require_once __DIR__.'/common/widgets/Table.php'; diff --git a/airtime_mvc/application/common/CeleryManager.php b/airtime_mvc/application/common/CeleryManager.php index d2c41d79d..6f8f361d5 100644 --- a/airtime_mvc/application/common/CeleryManager.php +++ b/airtime_mvc/application/common/CeleryManager.php @@ -17,6 +17,11 @@ class CeleryManager { */ private static $_CELERY_RESULTS_EXCHANGE = 'celeryresults'; + /** + * @var PropelCollection cache of any pending CeleryTasks results for a service or task + */ + private static $_pendingTasks; + /** * Connect to the Celery daemon via amqp * @@ -79,7 +84,7 @@ class CeleryManager { // If the message isn't ready yet (Celery hasn't finished the task), throw an exception. if ($message == FALSE) { - if (self::_checkMessageTimeout($task)) { + if (static::_checkMessageTimeout($task)) { // If the task times out, mark it as failed. We don't want to remove the // track reference here in case it was a deletion that failed, for example. $task->setDbStatus(CELERY_FAILED_STATUS)->save(); @@ -103,9 +108,9 @@ class CeleryManager { * * @return bool true if there are any pending tasks, otherwise false */ - public static function isBrokerTaskQueueEmpty($taskName="", $serviceName = "") { - $pendingTasks = self::_getPendingTasks($taskName, $serviceName); - return empty($pendingTasks); + public static function isBrokerTaskQueueEmpty($taskName = "", $serviceName = "") { + self::$_pendingTasks = static::_getPendingTasks($taskName, $serviceName); + return empty(self::$_pendingTasks); } /** @@ -119,11 +124,12 @@ class CeleryManager { * @param string $serviceName the name of the service to poll for */ public static function pollBrokerTaskQueue($taskName = "", $serviceName = "") { - $pendingTasks = self::_getPendingTasks($taskName, $serviceName); + $pendingTasks = empty(self::$_pendingTasks) ? static::_getPendingTasks($taskName, $serviceName) + : self::$_pendingTasks; foreach ($pendingTasks as $task) { try { - $message = self::_getTaskMessage($task); - self::_processTaskMessage($task, $message); + $message = static::_getTaskMessage($task); + static::_processTaskMessage($task, $message); } catch (CeleryTimeoutException $e) { Logging::warn($e->getMessage()); } catch (Exception $e) { diff --git a/airtime_mvc/application/common/PodcastManager.php b/airtime_mvc/application/common/PodcastManager.php new file mode 100644 index 000000000..f7be607c8 --- /dev/null +++ b/airtime_mvc/application/common/PodcastManager.php @@ -0,0 +1,74 @@ + $lastPolled + self::$_PODCAST_POLL_INTERVAL_SECONDS); + } + + /** + * Find all podcasts flagged for automatic ingest whose most recent episode has + * yet to be downloaded and download it with Celery + * + * @throws InvalidPodcastException + * @throws PodcastNotFoundException + */ + public static function downloadNewestEpisodes() { + $autoIngestPodcasts = static::_getAutoIngestPodcasts(); + $service = new Application_Service_PodcastEpisodeService(); + $episodes = array(); + foreach ($autoIngestPodcasts as $podcast) { + /** @var ImportedPodcast $podcast */ + $podcastArray = Application_Service_PodcastService::getPodcastById($podcast->getDbId()); + // A bit hacky... sort the episodes by publication date to get the most recent + usort($podcastArray["episodes"], array(static::class, "_sortByEpisodePubDate")); + $episodeData = $podcastArray["episodes"][0]; + $episode = PodcastEpisodesQuery::create()->findOneByDbEpisodeGuid($episodeData["guid"]); + // Make sure there's no existing episode placeholder or import, and that the data is non-empty + if (empty($episode) && !empty($episodeData)) { + $placeholder = $service->addPodcastEpisodePlaceholder($podcast->getDbId(), $episodeData); + array_push($episodes, $placeholder); + } + } + + $service->downloadEpisodes($episodes); + Application_Model_Preference::setPodcastPollLock(microtime(true)); + } + + /** + * Find all podcasts flagged for automatic ingest + * + * @return PropelObjectCollection collection of ImportedPodcast objects + * flagged for automatic ingest + */ + protected static function _getAutoIngestPodcasts() { + return ImportedPodcastQuery::create() + ->filterByDbAutoIngest(true) + ->find(); + } + + /** + * Custom sort function for podcast episodes + * + * @param array $a first episode array to compare + * @param array $b second episode array to compare + * @return bool boolean for ordering + */ + protected static function _sortByEpisodePubDate($a, $b) { + if ($a["pub_date"] == $b["pub_date"]) return 0; + return ($a["pub_date"] < $b["pub_date"]) ? 1 : -1; // Descending order + } + +} \ No newline at end of file diff --git a/airtime_mvc/application/common/TaskManager.php b/airtime_mvc/application/common/TaskManager.php index 2792116d4..e410cf139 100644 --- a/airtime_mvc/application/common/TaskManager.php +++ b/airtime_mvc/application/common/TaskManager.php @@ -149,7 +149,7 @@ final class TaskManager { } /** - * Interface AirtimeTask Interface for task operations - also acts as task type ENUM + * Interface AirtimeTask Interface for task operations */ interface AirtimeTask { @@ -215,6 +215,29 @@ class CeleryTask implements AirtimeTask { } +/** + * Class PodcastTask + */ +class PodcastTask implements AirtimeTask { + + /** + * Check whether or not the podcast polling interval has passed + * + * @return bool true if the podcast polling interval has passed + */ + public function shouldBeRun() { + return PodcastManager::hasPodcastPollIntervalPassed(); + } + + /** + * Download the latest episode for all podcasts flagged for automatic ingest + */ + public function run() { + PodcastManager::downloadNewestEpisodes(); + } + +} + /** * Class TaskFactory Factory class to abstract task instantiation */ @@ -225,8 +248,9 @@ class TaskFactory { * Task types - values don't really matter as long as they're unique */ - const UPGRADE = "upgrade"; - const CELERY = "celery"; + const UPGRADE = "upgrade"; + const CELERY = "celery"; + const PODCAST = "podcast"; /** * @var array map of arbitrary identifiers to class names to be instantiated reflectively @@ -234,6 +258,7 @@ class TaskFactory { public static $tasks = array( "upgrade" => "UpgradeTask", "celery" => "CeleryTask", + "podcast" => "PodcastTask", ); /** diff --git a/airtime_mvc/application/models/Preference.php b/airtime_mvc/application/models/Preference.php index e8bf5adf5..d49260157 100644 --- a/airtime_mvc/application/models/Preference.php +++ b/airtime_mvc/application/models/Preference.php @@ -1502,4 +1502,12 @@ class Application_Model_Preference { self::setValue("whats_new_dialog_viewed", $value, true); } + + public static function getPodcastPollLock() { + return self::getValue("podcast_poll_lock"); + } + + public static function setPodcastPollLock($value) { + self::setValue("podcast_poll_lock", $value); + } } diff --git a/airtime_mvc/application/services/PodcastEpisodeService.php b/airtime_mvc/application/services/PodcastEpisodeService.php index 3a76c1c11..bbe23aa90 100644 --- a/airtime_mvc/application/services/PodcastEpisodeService.php +++ b/airtime_mvc/application/services/PodcastEpisodeService.php @@ -123,7 +123,8 @@ class Application_Service_PodcastEpisodeService extends Application_Service_Thir } /** - * Given an array of episodes, extract the download URLs and send them to Celery + * Given an array of episodes, store them in the database as placeholder objects until + * they can be processed by Celery * * @param int $podcastId Podcast object identifier * @param array $episodes array of podcast episodes @@ -133,17 +134,38 @@ class Application_Service_PodcastEpisodeService extends Application_Service_Thir public function addPodcastEpisodePlaceholders($podcastId, $episodes) { $storedEpisodes = array(); foreach ($episodes as $episode) { - $e = new PodcastEpisodes(); - $e->setDbPodcastId($podcastId); - $e->setDbDownloadUrl($episode["enclosure"]["link"]); - $e->setDbEpisodeGuid($episode["guid"]); - $e->setDbPublicationDate($episode["pub_date"]); - $e->save(); + $e = $this->addPodcastEpisodePlaceholder($podcastId, $episode); array_push($storedEpisodes, $e); } return $storedEpisodes; } + /** + * Given an episode, store it in the database as a placeholder object until + * it can be processed by Celery + * + * @param int $podcastId Podcast object identifier + * @param array $episode array of podcast episode data + * + * @return PodcastEpisodes the stored PodcastEpisodes object + */ + public function addPodcastEpisodePlaceholder($podcastId, $episode) { + // We need to check whether the array is parsed directly from the SimplePie + // feed object, or whether it's passed in as json + if ($episode["enclosure"] instanceof SimplePie_Enclosure) { + $url = $episode["enclosure"]->get_link(); + } else { + $url = $episode["enclosure"]["link"]; + } + $e = new PodcastEpisodes(); + $e->setDbPodcastId($podcastId); + $e->setDbDownloadUrl($url); + $e->setDbEpisodeGuid($episode["guid"]); + $e->setDbPublicationDate($episode["pub_date"]); + $e->save(); + return $e; + } + /** * Given an array of episodes, extract the IDs and download URLs and send them to Celery * @@ -156,6 +178,7 @@ class Application_Service_PodcastEpisodeService extends Application_Service_Thir array_push($episodeUrls, array("id" => $episode->getDbId(), "url" => $episode->getDbDownloadUrl())); } + if (empty($episodeUrls)) return; $this->_download($episodeUrls); } diff --git a/airtime_mvc/application/services/PodcastService.php b/airtime_mvc/application/services/PodcastService.php index 5c1322768..2dce275f6 100644 --- a/airtime_mvc/application/services/PodcastService.php +++ b/airtime_mvc/application/services/PodcastService.php @@ -37,7 +37,7 @@ class Application_Service_PodcastService /** * Returns parsed rss feed, or false if the given URL cannot be downloaded * - * @param $podcastUrl String containing the podcast feed URL + * @param string $feedUrl String containing the podcast feed URL * * @return mixed */ @@ -57,9 +57,9 @@ class Application_Service_PodcastService /** Creates a Podcast object from the given podcast URL. * This is used by our Podcast REST API * - * @param $feedUrl Podcast RSS Feed Url + * @param string $feedUrl Podcast RSS Feed Url * - * @return array - Podcast Array with a full list of episodes + * @return array Podcast Array with a full list of episodes * @throws Exception * @throws InvalidPodcastException * @throws PodcastLimitReachedException