2015-06-16 21:10:08 +02:00
|
|
|
<?php
|
|
|
|
|
2022-03-04 18:46:57 +01:00
|
|
|
use Celery\Celery;
|
|
|
|
use Celery\CeleryException;
|
|
|
|
use Celery\CeleryTimeoutException;
|
|
|
|
|
2021-10-11 16:10:47 +02:00
|
|
|
class CeleryManager
|
|
|
|
{
|
2015-06-16 21:10:08 +02:00
|
|
|
/**
|
|
|
|
* @var int milliseconds (for compatibility with celery) until we consider a message to have timed out
|
|
|
|
*/
|
2015-10-29 22:53:45 +01:00
|
|
|
private static $_CELERY_MESSAGE_TIMEOUT = 900000; // 15 minutes
|
2015-06-16 21:10:08 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* We have to use celeryresults (the default results exchange) because php-celery
|
|
|
|
* doesn't support named results exchanges.
|
|
|
|
*
|
|
|
|
* @var string exchange for celery task results
|
|
|
|
*/
|
|
|
|
private static $_CELERY_RESULTS_EXCHANGE = 'celeryresults';
|
|
|
|
|
2015-10-15 20:44:17 +02:00
|
|
|
/**
|
|
|
|
* @var PropelCollection cache of any pending CeleryTasks results for a service or task
|
|
|
|
*/
|
|
|
|
private static $_pendingTasks;
|
|
|
|
|
2015-06-16 21:10:08 +02:00
|
|
|
/**
|
2021-10-11 16:10:47 +02:00
|
|
|
* Connect to the Celery daemon via amqp.
|
2015-06-16 21:10:08 +02:00
|
|
|
*
|
|
|
|
* @param $config array the airtime configuration array
|
|
|
|
* @param $exchange string the amqp exchange name
|
|
|
|
* @param $queue string the amqp queue name
|
|
|
|
*
|
2021-10-11 16:10:47 +02:00
|
|
|
* @return Celery the Celery connection object
|
2022-09-12 13:16:14 +02:00
|
|
|
*
|
|
|
|
* @throws Exception when a connection error occurs
|
2015-06-16 21:10:08 +02:00
|
|
|
*/
|
2021-10-11 16:10:47 +02:00
|
|
|
private static function _setupCeleryExchange($config, $exchange, $queue)
|
|
|
|
{
|
|
|
|
return new Celery(
|
|
|
|
$config['rabbitmq']['host'],
|
|
|
|
$config['rabbitmq']['user'],
|
|
|
|
$config['rabbitmq']['password'],
|
|
|
|
$config['rabbitmq']['vhost'],
|
2022-03-04 18:46:57 +01:00
|
|
|
$exchange,
|
|
|
|
$queue,
|
|
|
|
$config['rabbitmq']['port'],
|
2021-10-11 16:10:47 +02:00
|
|
|
false,
|
2022-03-04 18:46:57 +01:00
|
|
|
self::$_CELERY_MESSAGE_TIMEOUT
|
2021-10-11 16:10:47 +02:00
|
|
|
); // Result expiration
|
2015-06-16 21:10:08 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2021-10-11 16:10:47 +02:00
|
|
|
* Send an amqp message to Celery the airtime-celery daemon to perform a task.
|
2015-06-16 21:10:08 +02:00
|
|
|
*
|
|
|
|
* @param $task string the Celery task name
|
|
|
|
* @param $exchange string the amqp exchange name
|
|
|
|
* @param $data array an associative array containing arguments for the Celery task
|
|
|
|
*
|
|
|
|
* @return string the task identifier for the started Celery task so we can fetch the
|
|
|
|
* results asynchronously later
|
|
|
|
*/
|
2021-10-11 16:10:47 +02:00
|
|
|
public static function sendCeleryMessage($task, $exchange, $data)
|
|
|
|
{
|
2017-07-18 22:27:19 +02:00
|
|
|
$config = Config::getConfig();
|
2015-06-16 21:10:08 +02:00
|
|
|
$queue = $routingKey = $exchange;
|
|
|
|
$c = self::_setupCeleryExchange($config, $exchange, $queue); // Use the exchange name for the queue
|
|
|
|
$result = $c->PostTask($task, $data, true, $routingKey); // and routing key
|
2021-10-11 16:10:47 +02:00
|
|
|
|
2015-06-16 21:10:08 +02:00
|
|
|
return $result->getId();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Given a task name and identifier, check the Celery results queue for any
|
2021-10-11 16:10:47 +02:00
|
|
|
* corresponding messages.
|
2015-06-16 21:10:08 +02:00
|
|
|
*
|
|
|
|
* @param $task CeleryTasks the Celery task object
|
|
|
|
*
|
2022-09-12 13:16:14 +02:00
|
|
|
* @return array the message response array
|
|
|
|
*
|
2015-06-23 21:10:02 +02:00
|
|
|
* @throws CeleryException when no message is found
|
2015-06-16 21:10:08 +02:00
|
|
|
* @throws CeleryTimeoutException when no message is found and more than
|
|
|
|
* $_CELERY_MESSAGE_TIMEOUT milliseconds have passed
|
|
|
|
*/
|
2021-10-11 16:10:47 +02:00
|
|
|
private static function getAsyncResultMessage($task)
|
|
|
|
{
|
2017-07-18 22:27:19 +02:00
|
|
|
$config = Config::getConfig();
|
2021-10-11 16:10:47 +02:00
|
|
|
$queue = self::$_CELERY_RESULTS_EXCHANGE . '.' . $task;
|
2015-06-16 21:10:08 +02:00
|
|
|
$c = self::_setupCeleryExchange($config, self::$_CELERY_RESULTS_EXCHANGE, $queue);
|
2015-06-25 00:38:04 +02:00
|
|
|
$message = $c->getAsyncResultMessage($task->getDbName(), $task->getDbTaskId());
|
2015-06-16 21:10:08 +02:00
|
|
|
|
2015-09-15 19:59:48 +02:00
|
|
|
// If the message isn't ready yet (Celery hasn't finished the task), throw an exception.
|
2021-10-11 16:10:47 +02:00
|
|
|
if ($message == false) {
|
2015-10-15 20:44:17 +02:00
|
|
|
if (static::_checkMessageTimeout($task)) {
|
2015-06-23 21:10:02 +02:00
|
|
|
// If the task times out, mark it as failed. We don't want to remove the
|
|
|
|
// track reference here in case it was a deletion that failed, for example.
|
|
|
|
$task->setDbStatus(CELERY_FAILED_STATUS)->save();
|
2021-10-11 16:10:47 +02:00
|
|
|
|
2022-03-04 18:46:57 +01:00
|
|
|
throw new CeleryTimeoutException('Celery task ' . $task->getDbName() . ' with ID ' . $task->getDbTaskId() . ' timed out');
|
2015-06-23 21:10:02 +02:00
|
|
|
}
|
2023-08-15 18:28:18 +02:00
|
|
|
|
2021-10-11 16:10:47 +02:00
|
|
|
// The message hasn't timed out, but it's still false, which means it hasn't been
|
|
|
|
// sent back from Celery yet.
|
2022-03-04 18:46:57 +01:00
|
|
|
throw new CeleryException('Waiting on Celery task ' . $task->getDbName() . ' with ID ' . $task->getDbTaskId());
|
2015-06-16 21:10:08 +02:00
|
|
|
}
|
2021-10-11 16:10:47 +02:00
|
|
|
|
2015-06-16 21:10:08 +02:00
|
|
|
return $message;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2021-10-11 16:10:47 +02:00
|
|
|
* Check to see if there are any pending tasks for this service.
|
2015-06-16 21:10:08 +02:00
|
|
|
*
|
|
|
|
* @param string $taskName the name of the task to poll for
|
|
|
|
* @param string $serviceName the name of the service to poll for
|
|
|
|
*
|
|
|
|
* @return bool true if there are any pending tasks, otherwise false
|
|
|
|
*/
|
2021-10-11 16:10:47 +02:00
|
|
|
public static function isBrokerTaskQueueEmpty($taskName = '', $serviceName = '')
|
|
|
|
{
|
2015-10-15 20:44:17 +02:00
|
|
|
self::$_pendingTasks = static::_getPendingTasks($taskName, $serviceName);
|
2021-10-11 16:10:47 +02:00
|
|
|
|
2015-10-15 20:44:17 +02:00
|
|
|
return empty(self::$_pendingTasks);
|
2015-06-16 21:10:08 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2021-10-11 16:10:47 +02:00
|
|
|
* Poll the message queue for this service to see if any tasks with the given name have completed.
|
2015-06-16 21:10:08 +02:00
|
|
|
*
|
|
|
|
* If we find any completed tasks, adjust the ThirdPartyTrackReferences table accordingly
|
|
|
|
*
|
|
|
|
* If no task name is passed, we poll all tasks for this service
|
|
|
|
*
|
|
|
|
* @param string $taskName the name of the task to poll for
|
|
|
|
* @param string $serviceName the name of the service to poll for
|
|
|
|
*/
|
2021-10-11 16:10:47 +02:00
|
|
|
public static function pollBrokerTaskQueue($taskName = '', $serviceName = '')
|
|
|
|
{
|
2022-03-04 18:46:57 +01:00
|
|
|
$pendingTasks = empty(self::$_pendingTasks)
|
|
|
|
? static::_getPendingTasks($taskName, $serviceName)
|
|
|
|
: self::$_pendingTasks;
|
2015-06-16 21:10:08 +02:00
|
|
|
foreach ($pendingTasks as $task) {
|
|
|
|
try {
|
2015-10-15 20:44:17 +02:00
|
|
|
$message = static::_getTaskMessage($task);
|
|
|
|
static::_processTaskMessage($task, $message);
|
2015-06-16 21:10:08 +02:00
|
|
|
} catch (CeleryTimeoutException $e) {
|
2015-06-23 21:10:02 +02:00
|
|
|
Logging::warn($e->getMessage());
|
2015-11-27 23:22:05 +01:00
|
|
|
} catch (CeleryException $e) {
|
|
|
|
// Don't log these - they end up clogging up the logs
|
2015-06-16 21:10:08 +02:00
|
|
|
} catch (Exception $e) {
|
|
|
|
// Because $message->result can be either an object or a string, sometimes
|
|
|
|
// we get a json_decode error and end up here
|
|
|
|
Logging::info($e->getMessage());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2021-10-11 16:10:47 +02:00
|
|
|
* Return a collection of all pending CeleryTasks for this service or task.
|
2015-06-16 21:10:08 +02:00
|
|
|
*
|
|
|
|
* @param string $taskName the name of the task to find
|
|
|
|
* @param string $serviceName the name of the service to find
|
|
|
|
*
|
|
|
|
* @return PropelCollection any pending CeleryTasks results for this service
|
|
|
|
* or task if taskName is provided
|
|
|
|
*/
|
2021-10-11 16:10:47 +02:00
|
|
|
protected static function _getPendingTasks($taskName, $serviceName)
|
|
|
|
{
|
2015-06-16 21:10:08 +02:00
|
|
|
$query = CeleryTasksQuery::create()
|
|
|
|
->filterByDbStatus(CELERY_PENDING_STATUS)
|
2022-01-23 19:15:55 +01:00
|
|
|
->filterByDbTaskId('', Criteria::NOT_EQUAL);
|
2015-06-16 21:10:08 +02:00
|
|
|
if (!empty($taskName)) {
|
|
|
|
$query->filterByDbName($taskName);
|
|
|
|
}
|
|
|
|
if (!empty($serviceName)) {
|
|
|
|
$query->useThirdPartyTrackReferencesQuery()
|
|
|
|
->filterByDbService($serviceName)->endUse();
|
|
|
|
}
|
2021-10-11 16:10:47 +02:00
|
|
|
|
2015-06-16 21:10:08 +02:00
|
|
|
return $query->joinThirdPartyTrackReferences()
|
|
|
|
->with('ThirdPartyTrackReferences')->find();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2021-10-11 16:10:47 +02:00
|
|
|
* Get a Celery task message from the results queue.
|
2015-06-16 21:10:08 +02:00
|
|
|
*
|
|
|
|
* @param $task CeleryTasks the Celery task object
|
|
|
|
*
|
2022-09-12 13:16:14 +02:00
|
|
|
* @return object the task message object
|
|
|
|
*
|
2021-10-11 16:10:47 +02:00
|
|
|
* @throws CeleryException when the result message for this task is still pending
|
2015-06-23 21:10:02 +02:00
|
|
|
* @throws CeleryTimeoutException when the result message for this task no longer exists
|
2015-06-16 21:10:08 +02:00
|
|
|
*/
|
2021-10-11 16:10:47 +02:00
|
|
|
protected static function _getTaskMessage($task)
|
|
|
|
{
|
2015-06-23 21:10:02 +02:00
|
|
|
$message = self::getAsyncResultMessage($task);
|
2021-10-11 16:10:47 +02:00
|
|
|
|
2015-06-16 21:10:08 +02:00
|
|
|
return json_decode($message['body']);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2021-10-11 16:10:47 +02:00
|
|
|
* Process a message from the results queue.
|
2015-06-16 21:10:08 +02:00
|
|
|
*
|
|
|
|
* @param $task CeleryTasks Celery task object
|
|
|
|
* @param $message mixed async message object from php-celery
|
|
|
|
*/
|
2021-10-11 16:10:47 +02:00
|
|
|
protected static function _processTaskMessage($task, $message)
|
|
|
|
{
|
2015-06-16 21:10:08 +02:00
|
|
|
$ref = $task->getThirdPartyTrackReferences(); // ThirdPartyTrackReferences join
|
|
|
|
$service = CeleryServiceFactory::getService($ref->getDbService());
|
2015-09-21 23:03:46 +02:00
|
|
|
$service->updateTrackReference($task, $ref->getDbId(), json_decode($message->result), $message->status);
|
2015-06-16 21:10:08 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2021-10-11 16:10:47 +02:00
|
|
|
* Check if a task message has been unreachable for more our timeout time.
|
2015-06-16 21:10:08 +02:00
|
|
|
*
|
|
|
|
* @param $task CeleryTasks the Celery task object
|
|
|
|
*
|
|
|
|
* @return bool true if the dispatch time is empty or it's been more than our timeout time
|
|
|
|
* since the message was dispatched, otherwise false
|
|
|
|
*/
|
2021-10-11 16:10:47 +02:00
|
|
|
protected static function _checkMessageTimeout($task)
|
|
|
|
{
|
|
|
|
$utc = new DateTimeZone('UTC');
|
2015-06-16 21:10:08 +02:00
|
|
|
$dispatchTime = new DateTime($task->getDbDispatchTime(), $utc);
|
2021-10-11 16:10:47 +02:00
|
|
|
$now = new DateTime('now', $utc);
|
2015-06-16 21:10:08 +02:00
|
|
|
$timeoutSeconds = self::$_CELERY_MESSAGE_TIMEOUT / 1000; // Convert from milliseconds
|
2021-10-11 16:10:47 +02:00
|
|
|
$timeoutInterval = new DateInterval('PT' . $timeoutSeconds . 'S');
|
2015-06-16 21:10:08 +02:00
|
|
|
|
2021-10-11 16:10:47 +02:00
|
|
|
return empty($dispatchTime) || $dispatchTime->add($timeoutInterval) <= $now;
|
|
|
|
}
|
2017-07-18 22:27:19 +02:00
|
|
|
}
|