2015-06-15 21:12:37 +02:00
|
|
|
<?php
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Class TaskManager
|
2015-06-30 17:46:58 +02:00
|
|
|
*
|
2015-09-23 02:21:19 +02:00
|
|
|
* When adding a new task, the new AirtimeTask class will also need to be added
|
|
|
|
* as a class constant and to the array in TaskFactory
|
2015-06-15 21:12:37 +02:00
|
|
|
*/
|
|
|
|
final class TaskManager {
|
|
|
|
|
|
|
|
/**
|
2015-06-30 17:46:58 +02:00
|
|
|
* @var array tasks to be run. Maps task names to a boolean value denoting
|
|
|
|
* whether the task has been checked/run
|
2015-06-15 21:12:37 +02:00
|
|
|
*/
|
2015-09-23 02:21:19 +02:00
|
|
|
protected $_taskList;
|
2015-06-15 21:12:37 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* @var TaskManager singleton instance object
|
|
|
|
*/
|
|
|
|
protected static $_instance;
|
|
|
|
|
2015-06-16 21:10:08 +02:00
|
|
|
/**
|
2015-06-30 17:46:58 +02:00
|
|
|
* @var int TASK_INTERVAL_SECONDS how often, in seconds, to run the TaskManager tasks
|
2015-06-16 21:10:08 +02:00
|
|
|
*/
|
2015-09-28 20:15:08 +02:00
|
|
|
const TASK_INTERVAL_SECONDS = 30;
|
2015-06-16 21:10:08 +02:00
|
|
|
|
|
|
|
/**
|
2015-10-21 18:54:50 +02:00
|
|
|
*
|
2015-06-16 21:10:08 +02:00
|
|
|
* @var $con PDO Propel connection object
|
|
|
|
*/
|
|
|
|
private $_con;
|
|
|
|
|
2015-06-15 21:12:37 +02:00
|
|
|
/**
|
|
|
|
* Private constructor so class is uninstantiable
|
|
|
|
*/
|
|
|
|
private function __construct() {
|
2015-09-23 02:21:19 +02:00
|
|
|
foreach (array_keys(TaskFactory::$tasks) as $v) {
|
|
|
|
$this->_taskList[$v] = false;
|
|
|
|
}
|
2015-06-15 21:12:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Get the singleton instance of this class
|
|
|
|
*
|
|
|
|
* @return TaskManager the TaskManager instance
|
|
|
|
*/
|
|
|
|
public static function getInstance() {
|
|
|
|
if (!self::$_instance) {
|
|
|
|
self::$_instance = new TaskManager();
|
|
|
|
}
|
|
|
|
return self::$_instance;
|
|
|
|
}
|
|
|
|
|
2015-06-30 17:46:58 +02:00
|
|
|
/**
|
|
|
|
* Run a single task.
|
|
|
|
*
|
|
|
|
* @param string $taskName the ENUM name of the task to be run
|
|
|
|
*/
|
|
|
|
public function runTask($taskName) {
|
|
|
|
$task = TaskFactory::getTask($taskName);
|
|
|
|
if ($task && $task->shouldBeRun()) {
|
|
|
|
$task->run();
|
|
|
|
}
|
|
|
|
$this->_taskList[$taskName] = true; // Mark that the task has been checked/run.
|
|
|
|
// This is important for prioritized tasks that
|
|
|
|
// we need to run on every request (such as the
|
|
|
|
// schema check/upgrade)
|
|
|
|
}
|
|
|
|
|
2015-06-15 21:12:37 +02:00
|
|
|
/**
|
2015-06-17 23:16:21 +02:00
|
|
|
* Run all tasks that need to be run.
|
|
|
|
*
|
|
|
|
* To prevent blocking and making too many requests to the database,
|
|
|
|
* we implement a row-level, non-blocking, read-protected lock on a
|
|
|
|
* timestamp that we check each time the application is bootstrapped,
|
|
|
|
* which, assuming enough time has passed, is updated before running
|
|
|
|
* the tasks.
|
2015-06-15 21:12:37 +02:00
|
|
|
*/
|
|
|
|
public function runTasks() {
|
2015-06-16 21:10:08 +02:00
|
|
|
// If there is data in auth storage, this could be a user request
|
2015-09-23 02:21:19 +02:00
|
|
|
// so we should just return to avoid blocking
|
2015-06-17 23:16:21 +02:00
|
|
|
if ($this->_isUserSessionRequest()) {
|
|
|
|
return;
|
|
|
|
}
|
2015-06-16 21:10:08 +02:00
|
|
|
$this->_con = Propel::getConnection(CcPrefPeer::DATABASE_NAME);
|
|
|
|
$this->_con->beginTransaction();
|
|
|
|
try {
|
|
|
|
$lock = $this->_getLock();
|
2015-10-29 14:23:41 +01:00
|
|
|
if ($lock && (microtime(true) < ($lock['valstr'] + self::TASK_INTERVAL_SECONDS))) {
|
2015-06-23 21:10:02 +02:00
|
|
|
// Propel caches the database connection and uses it persistently, so if we don't
|
|
|
|
// use commit() here, we end up blocking other queries made within this request
|
2015-06-19 00:18:48 +02:00
|
|
|
$this->_con->commit();
|
2015-06-17 23:16:21 +02:00
|
|
|
return;
|
|
|
|
}
|
2015-06-16 21:10:08 +02:00
|
|
|
$this->_updateLock($lock);
|
|
|
|
$this->_con->commit();
|
|
|
|
} catch (Exception $e) {
|
|
|
|
// We get here if there are simultaneous requests trying to fetch the lock row
|
|
|
|
$this->_con->rollBack();
|
2015-09-23 02:21:19 +02:00
|
|
|
Logging::warn($e->getMessage());
|
2015-06-16 21:10:08 +02:00
|
|
|
return;
|
|
|
|
}
|
2015-06-30 17:46:58 +02:00
|
|
|
foreach ($this->_taskList as $task => $hasTaskRun) {
|
|
|
|
if (!$hasTaskRun) {
|
|
|
|
$this->runTask($task);
|
2015-06-17 23:16:21 +02:00
|
|
|
}
|
2015-06-15 21:12:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-06-16 21:10:08 +02:00
|
|
|
/**
|
|
|
|
* Check if the current session is a user request
|
|
|
|
*
|
2015-06-17 23:16:21 +02:00
|
|
|
* @return bool true if there is a Zend_Auth object in the current session,
|
|
|
|
* otherwise false
|
2015-06-16 21:10:08 +02:00
|
|
|
*/
|
|
|
|
private function _isUserSessionRequest() {
|
2015-09-25 16:41:51 +02:00
|
|
|
if (!Zend_Session::isStarted()) {
|
|
|
|
return false;
|
|
|
|
}
|
2015-06-16 21:10:08 +02:00
|
|
|
$auth = Zend_Auth::getInstance();
|
|
|
|
$data = $auth->getStorage()->read();
|
|
|
|
return !empty($data);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Get the task_manager_lock from cc_pref with a row-level lock for atomicity
|
|
|
|
*
|
2015-06-17 23:16:21 +02:00
|
|
|
* The lock is exclusive (prevent reads) and will only last for the duration
|
|
|
|
* of the transaction. We add NOWAIT so reads on the row during the transaction
|
|
|
|
* won't block
|
|
|
|
*
|
2015-06-16 21:10:08 +02:00
|
|
|
* @return array|bool an array containing the row values, or false on failure
|
|
|
|
*/
|
|
|
|
private function _getLock() {
|
|
|
|
$sql = "SELECT * FROM cc_pref WHERE keystr='task_manager_lock' LIMIT 1 FOR UPDATE NOWAIT";
|
|
|
|
$st = $this->_con->prepare($sql);
|
|
|
|
$st->execute();
|
|
|
|
return $st->fetch();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Update and commit the new lock value, or insert it if it doesn't exist
|
|
|
|
*
|
|
|
|
* @param $lock array cc_pref lock row values
|
|
|
|
*/
|
|
|
|
private function _updateLock($lock) {
|
|
|
|
$sql = empty($lock) ? "INSERT INTO cc_pref (keystr, valstr) VALUES ('task_manager_lock', :value)"
|
|
|
|
: "UPDATE cc_pref SET valstr=:value WHERE keystr='task_manager_lock'";
|
|
|
|
$st = $this->_con->prepare($sql);
|
|
|
|
$st->execute(array(":value" => microtime(true)));
|
|
|
|
}
|
|
|
|
|
2015-06-15 21:12:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2015-10-15 20:44:17 +02:00
|
|
|
* Interface AirtimeTask Interface for task operations
|
2015-06-15 21:12:37 +02:00
|
|
|
*/
|
|
|
|
interface AirtimeTask {
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Check whether the task should be run
|
|
|
|
*
|
|
|
|
* @return bool true if the task needs to be run, otherwise false
|
|
|
|
*/
|
|
|
|
public function shouldBeRun();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Run the task
|
|
|
|
*
|
|
|
|
* @return void
|
|
|
|
*/
|
|
|
|
public function run();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Class UpgradeTask
|
2015-10-29 16:03:28 +01:00
|
|
|
*
|
|
|
|
* Checks the current Airtime version and runs any outstanding upgrades
|
2015-06-15 21:12:37 +02:00
|
|
|
*/
|
|
|
|
class UpgradeTask implements AirtimeTask {
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Check the current Airtime schema version to see if an upgrade should be run
|
|
|
|
*
|
|
|
|
* @return bool true if an upgrade is needed
|
|
|
|
*/
|
|
|
|
public function shouldBeRun() {
|
|
|
|
return UpgradeManager::checkIfUpgradeIsNeeded();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Run all upgrades above the current schema version
|
|
|
|
*/
|
|
|
|
public function run() {
|
|
|
|
UpgradeManager::doUpgrade();
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2015-06-16 21:10:08 +02:00
|
|
|
* Class CeleryTask
|
2015-10-29 16:03:28 +01:00
|
|
|
*
|
|
|
|
* Checks the Celery broker task queue and runs callbacks for completed tasks
|
2015-06-15 21:12:37 +02:00
|
|
|
*/
|
2015-06-16 21:10:08 +02:00
|
|
|
class CeleryTask implements AirtimeTask {
|
2015-06-15 21:12:37 +02:00
|
|
|
|
|
|
|
/**
|
2015-06-16 21:10:08 +02:00
|
|
|
* Check the ThirdPartyTrackReferences table to see if there are any pending tasks
|
2015-06-15 21:12:37 +02:00
|
|
|
*
|
|
|
|
* @return bool true if there are pending tasks in ThirdPartyTrackReferences
|
|
|
|
*/
|
|
|
|
public function shouldBeRun() {
|
2015-09-15 21:06:03 +02:00
|
|
|
return !CeleryManager::isBrokerTaskQueueEmpty();
|
2015-06-15 21:12:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Poll the task queue for any completed Celery tasks
|
|
|
|
*/
|
|
|
|
public function run() {
|
2015-09-15 21:06:03 +02:00
|
|
|
CeleryManager::pollBrokerTaskQueue();
|
2015-06-15 21:12:37 +02:00
|
|
|
}
|
|
|
|
|
2015-09-23 02:21:19 +02:00
|
|
|
}
|
|
|
|
|
2015-10-15 20:44:17 +02:00
|
|
|
/**
|
|
|
|
* Class PodcastTask
|
2015-10-29 16:03:28 +01:00
|
|
|
*
|
|
|
|
* Checks podcasts marked for automatic ingest and downloads any new episodes
|
|
|
|
* since the task was last run
|
2015-10-15 20:44:17 +02:00
|
|
|
*/
|
|
|
|
class PodcastTask implements AirtimeTask {
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Check whether or not the podcast polling interval has passed
|
|
|
|
*
|
|
|
|
* @return bool true if the podcast polling interval has passed
|
|
|
|
*/
|
|
|
|
public function shouldBeRun() {
|
2015-11-17 18:26:21 +01:00
|
|
|
$overQuota = Application_Model_Systemstatus::isDiskOverQuota();
|
|
|
|
return !$overQuota && PodcastManager::hasPodcastPollIntervalPassed();
|
2015-10-15 20:44:17 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Download the latest episode for all podcasts flagged for automatic ingest
|
|
|
|
*/
|
|
|
|
public function run() {
|
|
|
|
PodcastManager::downloadNewestEpisodes();
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2015-10-29 16:03:28 +01:00
|
|
|
/**
|
|
|
|
* Class ImportTask
|
|
|
|
*/
|
|
|
|
class ImportCleanupTask implements AirtimeTask {
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Check if there are any files that have been stuck
|
|
|
|
* in Pending status for over an hour
|
|
|
|
*
|
|
|
|
* @return bool true if there are any files stuck pending,
|
|
|
|
* otherwise false
|
|
|
|
*/
|
|
|
|
public function shouldBeRun() {
|
|
|
|
return Application_Service_MediaService::areFilesStuckInPending();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Clean up stuck imports by changing their import status to Failed
|
|
|
|
*/
|
|
|
|
public function run() {
|
|
|
|
Application_Service_MediaService::clearStuckPendingImports();
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2015-10-21 18:54:50 +02:00
|
|
|
/**
|
|
|
|
* Class StationPodcastTask
|
2015-10-29 16:03:28 +01:00
|
|
|
*
|
|
|
|
* Checks the Station podcast rollover timer and resets monthly allotted
|
|
|
|
* downloads if enough time has passed (default: 1 month)
|
2015-10-21 18:54:50 +02:00
|
|
|
*/
|
|
|
|
class StationPodcastTask implements AirtimeTask {
|
|
|
|
|
2015-10-21 23:30:24 +02:00
|
|
|
const STATION_PODCAST_RESET_TIMER_SECONDS = 2.628e+6; // 1 month XXX: should we use datetime roll for this instead?
|
2015-10-21 18:54:50 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Check whether or not the download counter for the station podcast should be reset
|
|
|
|
*
|
|
|
|
* @return bool true if enough time has passed
|
|
|
|
*/
|
|
|
|
public function shouldBeRun() {
|
|
|
|
$lastReset = Application_Model_Preference::getStationPodcastDownloadResetTimer();
|
2015-10-29 14:23:41 +01:00
|
|
|
return empty($lastReset) || (microtime(true) > ($lastReset + self::STATION_PODCAST_RESET_TIMER_SECONDS));
|
2015-10-21 18:54:50 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Reset the station podcast download counter
|
|
|
|
*/
|
|
|
|
public function run() {
|
|
|
|
Application_Model_Preference::resetStationPodcastDownloadCounter();
|
2015-10-29 14:23:41 +01:00
|
|
|
Application_Model_Preference::setStationPodcastDownloadResetTimer(microtime(true));
|
2015-10-21 18:54:50 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2015-09-23 02:21:19 +02:00
|
|
|
/**
|
|
|
|
* Class TaskFactory Factory class to abstract task instantiation
|
|
|
|
*/
|
|
|
|
class TaskFactory {
|
|
|
|
|
|
|
|
/**
|
2015-11-10 00:17:49 +01:00
|
|
|
* PHP doesn't have ENUMs so declare them as constants
|
2015-09-23 02:21:19 +02:00
|
|
|
* Task types - values don't really matter as long as they're unique
|
|
|
|
*/
|
|
|
|
|
2015-10-21 18:54:50 +02:00
|
|
|
const UPGRADE = "upgrade";
|
|
|
|
const CELERY = "celery";
|
|
|
|
const PODCAST = "podcast";
|
2015-10-29 16:03:28 +01:00
|
|
|
const IMPORT = "import";
|
2015-10-21 18:54:50 +02:00
|
|
|
const STATION_PODCAST = "station-podcast";
|
2015-09-23 02:21:19 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* @var array map of arbitrary identifiers to class names to be instantiated reflectively
|
|
|
|
*/
|
|
|
|
public static $tasks = array(
|
2015-10-21 18:54:50 +02:00
|
|
|
"upgrade" => "UpgradeTask",
|
|
|
|
"celery" => "CeleryTask",
|
|
|
|
"podcast" => "PodcastTask",
|
2015-10-29 16:03:28 +01:00
|
|
|
"import" => "ImportCleanupTask",
|
2015-10-21 18:54:50 +02:00
|
|
|
"station-podcast" => "StationPodcastTask",
|
2015-09-23 02:21:19 +02:00
|
|
|
);
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Get an AirtimeTask based on a task type
|
|
|
|
*
|
|
|
|
* @param $task string the task type; uses AirtimeTask constants as an ENUM
|
|
|
|
*
|
|
|
|
* @return AirtimeTask|null return a task of the given type or null if no corresponding
|
|
|
|
* task exists or is implemented
|
|
|
|
*/
|
|
|
|
public static function getTask($task) {
|
|
|
|
return new self::$tasks[$task]();
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|