Format code using php-cs-fixer
This commit is contained in:
parent
43d7dc92cd
commit
d52c6184b9
352 changed files with 17473 additions and 17041 deletions
|
@ -1,52 +1,54 @@
|
|||
<?php
|
||||
|
||||
/**
|
||||
* Class TaskManager
|
||||
* Class TaskManager.
|
||||
*
|
||||
* Background class for 'asynchronous' task management for Airtime stations
|
||||
*/
|
||||
final class TaskManager {
|
||||
|
||||
final class TaskManager
|
||||
{
|
||||
/**
|
||||
* @var array tasks to be run. Maps task names to a boolean value denoting
|
||||
* whether the task has been checked/run
|
||||
*/
|
||||
protected $_taskList;
|
||||
private $_taskList;
|
||||
|
||||
/**
|
||||
* @var TaskManager singleton instance object
|
||||
*/
|
||||
protected static $_instance;
|
||||
private static $_instance;
|
||||
|
||||
/**
|
||||
* @var int TASK_INTERVAL_SECONDS how often, in seconds, to run the TaskManager tasks
|
||||
*/
|
||||
const TASK_INTERVAL_SECONDS = 30;
|
||||
public const TASK_INTERVAL_SECONDS = 30;
|
||||
|
||||
/**
|
||||
*
|
||||
* @var $con PDO Propel connection object
|
||||
* @var PDO Propel connection object
|
||||
*/
|
||||
private $_con;
|
||||
|
||||
/**
|
||||
* Private constructor so class is uninstantiable
|
||||
* Private constructor so class is uninstantiable.
|
||||
*/
|
||||
private function __construct() {
|
||||
private function __construct()
|
||||
{
|
||||
foreach (TaskFactory::getTasks() as $k => $task) {
|
||||
$this->_taskList[$task] = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the singleton instance of this class
|
||||
* Get the singleton instance of this class.
|
||||
*
|
||||
* @return TaskManager the TaskManager instance
|
||||
*/
|
||||
public static function getInstance() {
|
||||
public static function getInstance()
|
||||
{
|
||||
if (!self::$_instance) {
|
||||
self::$_instance = new TaskManager();
|
||||
}
|
||||
|
||||
return self::$_instance;
|
||||
}
|
||||
|
||||
|
@ -55,7 +57,8 @@ final class TaskManager {
|
|||
*
|
||||
* @param string $taskName the ENUM name of the task to be run
|
||||
*/
|
||||
public function runTask($taskName) {
|
||||
public function runTask($taskName)
|
||||
{
|
||||
$task = TaskFactory::getTask($taskName);
|
||||
if ($task && $task->shouldBeRun()) {
|
||||
$task->run();
|
||||
|
@ -75,7 +78,8 @@ final class TaskManager {
|
|||
* which, assuming enough time has passed, is updated before running
|
||||
* the tasks.
|
||||
*/
|
||||
public function runTasks() {
|
||||
public function runTasks()
|
||||
{
|
||||
// If there is data in auth storage, this could be a user request
|
||||
// so we should just return to avoid blocking
|
||||
if ($this->_isUserSessionRequest()) {
|
||||
|
@ -83,12 +87,14 @@ final class TaskManager {
|
|||
}
|
||||
$this->_con = Propel::getConnection(CcPrefPeer::DATABASE_NAME);
|
||||
$this->_con->beginTransaction();
|
||||
|
||||
try {
|
||||
$lock = $this->_getLock();
|
||||
if ($lock && (microtime(true) < ($lock['valstr'] + self::TASK_INTERVAL_SECONDS))) {
|
||||
// Propel caches the database connection and uses it persistently, so if we don't
|
||||
// use commit() here, we end up blocking other queries made within this request
|
||||
$this->_con->commit();
|
||||
|
||||
return;
|
||||
}
|
||||
$this->_updateLock($lock);
|
||||
|
@ -97,6 +103,7 @@ final class TaskManager {
|
|||
// We get here if there are simultaneous requests trying to fetch the lock row
|
||||
$this->_con->rollBack();
|
||||
Logging::warn($e->getMessage());
|
||||
|
||||
return;
|
||||
}
|
||||
foreach ($this->_taskList as $task => $hasTaskRun) {
|
||||
|
@ -107,22 +114,24 @@ final class TaskManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check if the current session is a user request
|
||||
* Check if the current session is a user request.
|
||||
*
|
||||
* @return bool true if there is a Zend_Auth object in the current session,
|
||||
* otherwise false
|
||||
*/
|
||||
private function _isUserSessionRequest() {
|
||||
private function _isUserSessionRequest()
|
||||
{
|
||||
if (!Zend_Session::isStarted()) {
|
||||
return false;
|
||||
}
|
||||
$auth = Zend_Auth::getInstance();
|
||||
$data = $auth->getStorage()->read();
|
||||
|
||||
return !empty($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the task_manager_lock from cc_pref with a row-level lock for atomicity
|
||||
* Get the task_manager_lock from cc_pref with a row-level lock for atomicity.
|
||||
*
|
||||
* The lock is exclusive (prevent reads) and will only last for the duration
|
||||
* of the transaction. We add NOWAIT so reads on the row during the transaction
|
||||
|
@ -130,108 +139,108 @@ final class TaskManager {
|
|||
*
|
||||
* @return array|bool an array containing the row values, or false on failure
|
||||
*/
|
||||
private function _getLock() {
|
||||
private function _getLock()
|
||||
{
|
||||
$sql = "SELECT * FROM cc_pref WHERE keystr='task_manager_lock' LIMIT 1 FOR UPDATE NOWAIT";
|
||||
$st = $this->_con->prepare($sql);
|
||||
$st->execute();
|
||||
|
||||
return $st->fetch();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update and commit the new lock value, or insert it if it doesn't exist
|
||||
* Update and commit the new lock value, or insert it if it doesn't exist.
|
||||
*
|
||||
* @param $lock array cc_pref lock row values
|
||||
*/
|
||||
private function _updateLock($lock) {
|
||||
private function _updateLock($lock)
|
||||
{
|
||||
$sql = empty($lock) ? "INSERT INTO cc_pref (keystr, valstr) VALUES ('task_manager_lock', :value)"
|
||||
: "UPDATE cc_pref SET valstr=:value WHERE keystr='task_manager_lock'";
|
||||
$st = $this->_con->prepare($sql);
|
||||
$st->execute(array(":value" => microtime(true)));
|
||||
$st->execute([':value' => microtime(true)]);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Interface AirtimeTask Interface for task operations
|
||||
* Interface AirtimeTask Interface for task operations.
|
||||
*/
|
||||
interface AirtimeTask {
|
||||
|
||||
interface AirtimeTask
|
||||
{
|
||||
/**
|
||||
* Check whether the task should be run
|
||||
* Check whether the task should be run.
|
||||
*
|
||||
* @return bool true if the task needs to be run, otherwise false
|
||||
*/
|
||||
public function shouldBeRun();
|
||||
|
||||
/**
|
||||
* Run the task
|
||||
*
|
||||
* @return void
|
||||
* Run the task.
|
||||
*/
|
||||
public function run();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Class UpgradeTask
|
||||
* Class UpgradeTask.
|
||||
*
|
||||
* Checks the current Airtime version and runs any outstanding upgrades
|
||||
*/
|
||||
class UpgradeTask implements AirtimeTask {
|
||||
|
||||
class UpgradeTask implements AirtimeTask
|
||||
{
|
||||
/**
|
||||
* Check the current Airtime schema version to see if an upgrade should be run
|
||||
* Check the current Airtime schema version to see if an upgrade should be run.
|
||||
*
|
||||
* @return bool true if an upgrade is needed
|
||||
*/
|
||||
public function shouldBeRun() {
|
||||
public function shouldBeRun()
|
||||
{
|
||||
return UpgradeManager::checkIfUpgradeIsNeeded();
|
||||
}
|
||||
|
||||
/**
|
||||
* Run all upgrades above the current schema version
|
||||
* Run all upgrades above the current schema version.
|
||||
*/
|
||||
public function run() {
|
||||
public function run()
|
||||
{
|
||||
UpgradeManager::doUpgrade();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Class CeleryTask
|
||||
* Class CeleryTask.
|
||||
*
|
||||
* Checks the Celery broker task queue and runs callbacks for completed tasks
|
||||
*/
|
||||
class CeleryTask implements AirtimeTask {
|
||||
|
||||
class CeleryTask implements AirtimeTask
|
||||
{
|
||||
/**
|
||||
* Check the ThirdPartyTrackReferences table to see if there are any pending tasks
|
||||
* Check the ThirdPartyTrackReferences table to see if there are any pending tasks.
|
||||
*
|
||||
* @return bool true if there are pending tasks in ThirdPartyTrackReferences
|
||||
*/
|
||||
public function shouldBeRun() {
|
||||
public function shouldBeRun()
|
||||
{
|
||||
return !CeleryManager::isBrokerTaskQueueEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll the task queue for any completed Celery tasks
|
||||
* Poll the task queue for any completed Celery tasks.
|
||||
*/
|
||||
public function run() {
|
||||
public function run()
|
||||
{
|
||||
CeleryManager::pollBrokerTaskQueue();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Class AutoPlaylistTask
|
||||
* Class AutoPlaylistTask.
|
||||
*
|
||||
* Checks for shows with an autoplaylist that needs to be filled in
|
||||
*
|
||||
*/
|
||||
class AutoPlaylistTask implements AirtimeTask
|
||||
{
|
||||
/**
|
||||
* Checks whether or not the autoplaylist polling interval has passed
|
||||
* Checks whether or not the autoplaylist polling interval has passed.
|
||||
*
|
||||
* @return bool true if the autoplaylist polling interval has passed
|
||||
*/
|
||||
|
@ -241,7 +250,7 @@ class AutoPlaylistTask implements AirtimeTask
|
|||
}
|
||||
|
||||
/**
|
||||
* Schedule the autoplaylist for the shows
|
||||
* Schedule the autoplaylist for the shows.
|
||||
*/
|
||||
public function run()
|
||||
{
|
||||
|
@ -250,123 +259,133 @@ class AutoPlaylistTask implements AirtimeTask
|
|||
}
|
||||
|
||||
/**
|
||||
* Class PodcastTask
|
||||
* Class PodcastTask.
|
||||
*
|
||||
* Checks podcasts marked for automatic ingest and downloads any new episodes
|
||||
* since the task was last run
|
||||
*/
|
||||
class PodcastTask implements AirtimeTask {
|
||||
|
||||
class PodcastTask implements AirtimeTask
|
||||
{
|
||||
/**
|
||||
* Check whether or not the podcast polling interval has passed
|
||||
* Check whether or not the podcast polling interval has passed.
|
||||
*
|
||||
* @return bool true if the podcast polling interval has passed
|
||||
*/
|
||||
public function shouldBeRun() {
|
||||
public function shouldBeRun()
|
||||
{
|
||||
$overQuota = Application_Model_Systemstatus::isDiskOverQuota();
|
||||
|
||||
return !$overQuota && PodcastManager::hasPodcastPollIntervalPassed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Download the latest episode for all podcasts flagged for automatic ingest
|
||||
* Download the latest episode for all podcasts flagged for automatic ingest.
|
||||
*/
|
||||
public function run() {
|
||||
public function run()
|
||||
{
|
||||
PodcastManager::downloadNewestEpisodes();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Class ImportTask
|
||||
* Class ImportTask.
|
||||
*/
|
||||
class ImportCleanupTask implements AirtimeTask {
|
||||
|
||||
class ImportCleanupTask implements AirtimeTask
|
||||
{
|
||||
/**
|
||||
* Check if there are any files that have been stuck
|
||||
* in Pending status for over an hour
|
||||
* in Pending status for over an hour.
|
||||
*
|
||||
* @return bool true if there are any files stuck pending,
|
||||
* otherwise false
|
||||
*/
|
||||
public function shouldBeRun() {
|
||||
public function shouldBeRun()
|
||||
{
|
||||
return Application_Service_MediaService::areFilesStuckInPending();
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up stuck imports by changing their import status to Failed
|
||||
* Clean up stuck imports by changing their import status to Failed.
|
||||
*/
|
||||
public function run() {
|
||||
public function run()
|
||||
{
|
||||
Application_Service_MediaService::clearStuckPendingImports();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Class StationPodcastTask
|
||||
* Class StationPodcastTask.
|
||||
*
|
||||
* Checks the Station podcast rollover timer and resets allotted
|
||||
* downloads if enough time has passed (default: 1 month)
|
||||
*/
|
||||
class StationPodcastTask implements AirtimeTask {
|
||||
|
||||
const STATION_PODCAST_RESET_TIMER_SECONDS = 2.628e+6; // 1 month
|
||||
class StationPodcastTask implements AirtimeTask
|
||||
{
|
||||
public const STATION_PODCAST_RESET_TIMER_SECONDS = 2.628e+6; // 1 month
|
||||
|
||||
/**
|
||||
* Check whether or not the download counter for the station podcast should be reset
|
||||
* Check whether or not the download counter for the station podcast should be reset.
|
||||
*
|
||||
* @return bool true if enough time has passed
|
||||
*/
|
||||
public function shouldBeRun() {
|
||||
public function shouldBeRun()
|
||||
{
|
||||
$lastReset = Application_Model_Preference::getStationPodcastDownloadResetTimer();
|
||||
|
||||
return empty($lastReset) || (microtime(true) > ($lastReset + self::STATION_PODCAST_RESET_TIMER_SECONDS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the station podcast download counter
|
||||
* Reset the station podcast download counter.
|
||||
*/
|
||||
public function run() {
|
||||
public function run()
|
||||
{
|
||||
Application_Model_Preference::resetStationPodcastDownloadCounter();
|
||||
Application_Model_Preference::setStationPodcastDownloadResetTimer(microtime(true));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Class TaskFactory Factory class to abstract task instantiation
|
||||
* Class TaskFactory Factory class to abstract task instantiation.
|
||||
*/
|
||||
class TaskFactory {
|
||||
|
||||
class TaskFactory
|
||||
{
|
||||
/**
|
||||
* Check if the class with the given name implements AirtimeTask
|
||||
* Check if the class with the given name implements AirtimeTask.
|
||||
*
|
||||
* @param $c string class name
|
||||
*
|
||||
* @return bool true if the class $c implements AirtimeTask
|
||||
*/
|
||||
private static function _isTask($c) {
|
||||
private static function _isTask($c)
|
||||
{
|
||||
return array_key_exists('AirtimeTask', class_implements($c));
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter all declared classes to get all classes implementing the AirtimeTask interface
|
||||
* Filter all declared classes to get all classes implementing the AirtimeTask interface.
|
||||
*
|
||||
* @return array all classes implementing the AirtimeTask interface
|
||||
*/
|
||||
public static function getTasks() {
|
||||
return array_filter(get_declared_classes(), array(__CLASS__, "_isTask"));
|
||||
public static function getTasks()
|
||||
{
|
||||
return array_filter(get_declared_classes(), [__CLASS__, '_isTask']);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an AirtimeTask based on class name
|
||||
* Get an AirtimeTask based on class name.
|
||||
*
|
||||
* @param $task string name of the class implementing AirtimeTask to construct
|
||||
*
|
||||
* @return AirtimeTask|null return a task of the given type or null if no corresponding task exists
|
||||
* @return null|AirtimeTask return a task of the given type or null if no corresponding task exists
|
||||
*/
|
||||
public static function getTask($task) {
|
||||
public static function getTask($task)
|
||||
{
|
||||
// Try to get a valid class name from the given string
|
||||
if (!class_exists($task)) $task = str_replace(' ', '', ucwords($task)) . "Task";
|
||||
if (!class_exists($task)) {
|
||||
$task = str_replace(' ', '', ucwords($task)) . 'Task';
|
||||
}
|
||||
|
||||
return class_exists($task) ? new $task() : null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue