sintonia/legacy/application/common/TaskManager.php

373 lines
10 KiB
PHP
Raw Permalink Normal View History

<?php
/**
2021-10-11 16:10:47 +02:00
* Class TaskManager.
*
* Background class for 'asynchronous' task management for Airtime stations
*/
2021-10-11 16:10:47 +02:00
final class TaskManager
{
/**
* @var array tasks to be run. Maps task names to a boolean value denoting
* whether the task has been checked/run
*/
2021-10-11 16:10:47 +02:00
private $_taskList;
/**
* @var TaskManager singleton instance object
*/
2021-10-11 16:10:47 +02:00
private static $_instance;
/**
* @var int TASK_INTERVAL_SECONDS how often, in seconds, to run the TaskManager tasks
*/
2021-10-11 16:10:47 +02:00
public const TASK_INTERVAL_SECONDS = 30;
/**
2021-10-11 16:10:47 +02:00
* @var PDO Propel connection object
*/
private $_con;
/**
2021-10-11 16:10:47 +02:00
* Private constructor so class is uninstantiable.
*/
2021-10-11 16:10:47 +02:00
private function __construct()
{
foreach (TaskFactory::getTasks() as $k => $task) {
$this->_taskList[$task] = false;
}
}
/**
2021-10-11 16:10:47 +02:00
* Get the singleton instance of this class.
*
* @return TaskManager the TaskManager instance
*/
2021-10-11 16:10:47 +02:00
public static function getInstance()
{
if (!self::$_instance) {
self::$_instance = new TaskManager();
}
2021-10-11 16:10:47 +02:00
return self::$_instance;
}
/**
* Run a single task.
*
* @param string $taskName the ENUM name of the task to be run
*/
2021-10-11 16:10:47 +02:00
public function runTask($taskName)
{
$task = TaskFactory::getTask($taskName);
if ($task && $task->shouldBeRun()) {
Logging::debug("running task {$taskName}");
$task->run();
}
// Mark that the task has been checked/run.
// This is important for prioritized tasks that
// we need to run on every request (such as the
// schema check/upgrade)
$this->_taskList[$taskName] = true;
}
/**
* Run all tasks that need to be run.
*
* To prevent blocking and making too many requests to the database,
* we implement a row-level, non-blocking, read-protected lock on a
* timestamp that we check each time the application is bootstrapped,
* which, assuming enough time has passed, is updated before running
* the tasks.
*/
2021-10-11 16:10:47 +02:00
public function runTasks()
{
// If there is data in auth storage, this could be a user request
// so we should just return to avoid blocking
if ($this->_isUserSessionRequest()) {
return;
}
$this->_con = Propel::getConnection(CcPrefPeer::DATABASE_NAME);
$this->_con->beginTransaction();
2021-10-11 16:10:47 +02:00
try {
$lock = $this->_getLock();
2015-10-29 14:23:41 +01:00
if ($lock && (microtime(true) < ($lock['valstr'] + self::TASK_INTERVAL_SECONDS))) {
2015-06-23 21:10:02 +02:00
// Propel caches the database connection and uses it persistently, so if we don't
// use commit() here, we end up blocking other queries made within this request
$this->_con->commit();
2021-10-11 16:10:47 +02:00
return;
}
$this->_updateLock($lock);
$this->_con->commit();
} catch (PDOException $e) {
// We get here if there are simultaneous requests trying to fetch the lock row
$this->_con->rollBack();
// Do not log 'could not obtain lock' exception
// SQLSTATE[55P03]: Lock not available: 7 ERROR: could not obtain lock on row in relation "cc_pref"
if ($e->getCode() != '55P03') {
Logging::warn($e->getMessage());
}
2021-10-11 16:10:47 +02:00
return;
}
foreach ($this->_taskList as $task => $hasTaskRun) {
if (!$hasTaskRun) {
$this->runTask($task);
}
}
}
/**
2021-10-11 16:10:47 +02:00
* Check if the current session is a user request.
*
* @return bool true if there is a Zend_Auth object in the current session,
* otherwise false
*/
2021-10-11 16:10:47 +02:00
private function _isUserSessionRequest()
{
if (!Zend_Session::isStarted()) {
return false;
}
$auth = Zend_Auth::getInstance();
$data = $auth->getStorage()->read();
2021-10-11 16:10:47 +02:00
return !empty($data);
}
/**
2021-10-11 16:10:47 +02:00
* Get the task_manager_lock from cc_pref with a row-level lock for atomicity.
*
* The lock is exclusive (prevent reads) and will only last for the duration
* of the transaction. We add NOWAIT so reads on the row during the transaction
* won't block
*
* @return array|bool an array containing the row values, or false on failure
*/
2021-10-11 16:10:47 +02:00
private function _getLock()
{
$sql = "SELECT * FROM cc_pref WHERE keystr='task_manager_lock' LIMIT 1 FOR UPDATE NOWAIT";
$st = $this->_con->prepare($sql);
$st->execute();
2021-10-11 16:10:47 +02:00
return $st->fetch();
}
/**
2021-10-11 16:10:47 +02:00
* Update and commit the new lock value, or insert it if it doesn't exist.
*
* @param $lock array cc_pref lock row values
*/
2021-10-11 16:10:47 +02:00
private function _updateLock($lock)
{
$sql = empty($lock) ? "INSERT INTO cc_pref (keystr, valstr) VALUES ('task_manager_lock', :value)"
: "UPDATE cc_pref SET valstr=:value WHERE keystr='task_manager_lock'";
$st = $this->_con->prepare($sql);
2021-10-11 16:10:47 +02:00
$st->execute([':value' => microtime(true)]);
}
}
/**
2021-10-11 16:10:47 +02:00
* Interface AirtimeTask Interface for task operations.
*/
2021-10-11 16:10:47 +02:00
interface AirtimeTask
{
/**
2021-10-11 16:10:47 +02:00
* Check whether the task should be run.
*
* @return bool true if the task needs to be run, otherwise false
*/
public function shouldBeRun();
/**
2021-10-11 16:10:47 +02:00
* Run the task.
*/
public function run();
}
/**
2021-10-11 16:10:47 +02:00
* Class CeleryTask.
*
* Checks the Celery broker task queue and runs callbacks for completed tasks
*/
2021-10-11 16:10:47 +02:00
class CeleryTask implements AirtimeTask
{
/**
2021-10-11 16:10:47 +02:00
* Check the ThirdPartyTrackReferences table to see if there are any pending tasks.
*
* @return bool true if there are pending tasks in ThirdPartyTrackReferences
*/
2021-10-11 16:10:47 +02:00
public function shouldBeRun()
{
return !CeleryManager::isBrokerTaskQueueEmpty();
}
/**
2021-10-11 16:10:47 +02:00
* Poll the task queue for any completed Celery tasks.
*/
2021-10-11 16:10:47 +02:00
public function run()
{
CeleryManager::pollBrokerTaskQueue();
}
}
/**
2021-10-11 16:10:47 +02:00
* Class AutoPlaylistTask.
*
* Checks for shows with an autoplaylist that needs to be filled in
*/
class AutoPlaylistTask implements AirtimeTask
{
/**
2021-10-11 16:10:47 +02:00
* Checks whether or not the autoplaylist polling interval has passed.
*
* @return bool true if the autoplaylist polling interval has passed
*/
public function shouldBeRun()
{
return AutoPlaylistManager::hasAutoPlaylistPollIntervalPassed();
}
/**
2021-10-11 16:10:47 +02:00
* Schedule the autoplaylist for the shows.
*/
public function run()
{
AutoPlaylistManager::buildAutoPlaylist();
}
}
/**
2021-10-11 16:10:47 +02:00
* Class PodcastTask.
*
* Checks podcasts marked for automatic ingest and downloads any new episodes
* since the task was last run
*/
2021-10-11 16:10:47 +02:00
class PodcastTask implements AirtimeTask
{
/**
2021-10-11 16:10:47 +02:00
* Check whether or not the podcast polling interval has passed.
*
* @return bool true if the podcast polling interval has passed
*/
2021-10-11 16:10:47 +02:00
public function shouldBeRun()
{
$overQuota = Application_Model_Systemstatus::isDiskOverQuota();
2021-10-11 16:10:47 +02:00
return !$overQuota && PodcastManager::hasPodcastPollIntervalPassed();
}
/**
2021-10-11 16:10:47 +02:00
* Download the latest episode for all podcasts flagged for automatic ingest.
*/
2021-10-11 16:10:47 +02:00
public function run()
{
PodcastManager::downloadNewestEpisodes();
}
}
/**
2021-10-11 16:10:47 +02:00
* Class ImportTask.
*/
2021-10-11 16:10:47 +02:00
class ImportCleanupTask implements AirtimeTask
{
/**
* Check if there are any files that have been stuck
2021-10-11 16:10:47 +02:00
* in Pending status for over an hour.
*
* @return bool true if there are any files stuck pending,
* otherwise false
*/
2021-10-11 16:10:47 +02:00
public function shouldBeRun()
{
return Application_Service_MediaService::areFilesStuckInPending();
}
/**
2021-10-11 16:10:47 +02:00
* Clean up stuck imports by changing their import status to Failed.
*/
2021-10-11 16:10:47 +02:00
public function run()
{
Application_Service_MediaService::clearStuckPendingImports();
}
}
/**
2021-10-11 16:10:47 +02:00
* Class StationPodcastTask.
*
* Checks the Station podcast rollover timer and resets allotted
* downloads if enough time has passed (default: 1 month)
*/
2021-10-11 16:10:47 +02:00
class StationPodcastTask implements AirtimeTask
{
public const STATION_PODCAST_RESET_TIMER_SECONDS = 2.628e+6; // 1 month
/**
2021-10-11 16:10:47 +02:00
* Check whether or not the download counter for the station podcast should be reset.
*
* @return bool true if enough time has passed
*/
2021-10-11 16:10:47 +02:00
public function shouldBeRun()
{
$lastReset = Application_Model_Preference::getStationPodcastDownloadResetTimer();
2021-10-11 16:10:47 +02:00
2015-10-29 14:23:41 +01:00
return empty($lastReset) || (microtime(true) > ($lastReset + self::STATION_PODCAST_RESET_TIMER_SECONDS));
}
/**
2021-10-11 16:10:47 +02:00
* Reset the station podcast download counter.
*/
2021-10-11 16:10:47 +02:00
public function run()
{
Application_Model_Preference::resetStationPodcastDownloadCounter();
2015-10-29 14:23:41 +01:00
Application_Model_Preference::setStationPodcastDownloadResetTimer(microtime(true));
}
}
/**
2021-10-11 16:10:47 +02:00
* Class TaskFactory Factory class to abstract task instantiation.
*/
2021-10-11 16:10:47 +02:00
class TaskFactory
{
/**
2021-10-11 16:10:47 +02:00
* Check if the class with the given name implements AirtimeTask.
*
* @param $c string class name
*
* @return bool true if the class $c implements AirtimeTask
*/
2021-10-11 16:10:47 +02:00
private static function _isTask($c)
{
return array_key_exists('AirtimeTask', class_implements($c));
}
/**
2021-10-11 16:10:47 +02:00
* Filter all declared classes to get all classes implementing the AirtimeTask interface.
*
* @return array all classes implementing the AirtimeTask interface
*/
2021-10-11 16:10:47 +02:00
public static function getTasks()
{
return array_filter(get_declared_classes(), [__CLASS__, '_isTask']);
}
/**
2021-10-11 16:10:47 +02:00
* Get an AirtimeTask based on class name.
*
2015-11-27 22:48:44 +01:00
* @param $task string name of the class implementing AirtimeTask to construct
*
2021-10-11 16:10:47 +02:00
* @return null|AirtimeTask return a task of the given type or null if no corresponding task exists
*/
2021-10-11 16:10:47 +02:00
public static function getTask($task)
{
2015-11-27 22:48:44 +01:00
// Try to get a valid class name from the given string
2021-10-11 16:10:47 +02:00
if (!class_exists($task)) {
$task = str_replace(' ', '', ucwords($task)) . 'Task';
}
2015-11-27 22:48:44 +01:00
return class_exists($task) ? new $task() : null;
}
}