sintonia/airtime_mvc/application/services/CeleryService.php

211 lines
8.9 KiB
PHP
Raw Normal View History

<?php
require_once "CeleryServiceFactory.php";
class CeleryService {
/**
* @var int milliseconds (for compatibility with celery) until we consider a message to have timed out
*/
private static $_CELERY_MESSAGE_TIMEOUT = 600000; // 10 minutes
/**
* We have to use celeryresults (the default results exchange) because php-celery
* doesn't support named results exchanges.
*
* @var string exchange for celery task results
*/
private static $_CELERY_RESULTS_EXCHANGE = 'celeryresults';
/**
* Connect to the Celery daemon via amqp
*
* @param $config array the airtime configuration array
* @param $exchange string the amqp exchange name
* @param $queue string the amqp queue name
*
* @return Celery the Celery connection object
*
* @throws Exception when a connection error occurs
*/
private static function _setupCeleryExchange($config, $exchange, $queue) {
return new Celery($config["rabbitmq"]["host"],
$config["rabbitmq"]["user"],
$config["rabbitmq"]["password"],
$config["rabbitmq"]["vhost"],
$exchange, // Exchange name
$queue, // Binding/queue
$config["rabbitmq"]["port"],
false,
true, // Persistent messages
self::$_CELERY_MESSAGE_TIMEOUT); // Result expiration
}
/**
* Send an amqp message to Celery the airtime-celery daemon to perform a task
*
* @param $task string the Celery task name
* @param $exchange string the amqp exchange name
* @param $data array an associative array containing arguments for the Celery task
*
* @return string the task identifier for the started Celery task so we can fetch the
* results asynchronously later
*/
public static function sendCeleryMessage($task, $exchange, $data) {
$config = parse_ini_file(Application_Model_RabbitMq::getRmqConfigPath(), true);
$queue = $routingKey = $exchange;
$c = self::_setupCeleryExchange($config, $exchange, $queue); // Use the exchange name for the queue
$result = $c->PostTask($task, $data, true, $routingKey); // and routing key
return $result->getId();
}
/**
* Given a task name and identifier, check the Celery results queue for any
* corresponding messages
*
* @param $task CeleryTasks the Celery task object
*
2015-06-23 21:10:02 +02:00
* @return array the message response array
*
2015-06-23 21:10:02 +02:00
* @throws CeleryException when no message is found
* @throws CeleryTimeoutException when no message is found and more than
* $_CELERY_MESSAGE_TIMEOUT milliseconds have passed
*/
private static function getAsyncResultMessage($task) {
$config = parse_ini_file(Application_Model_RabbitMq::getRmqConfigPath(), true);
$queue = self::$_CELERY_RESULTS_EXCHANGE . "." . $task;
$c = self::_setupCeleryExchange($config, self::$_CELERY_RESULTS_EXCHANGE, $queue);
$message = $c->getAsyncResultMessage($task->getDbName(), $task->getDbId());
// If the message isn't ready yet (Celery hasn't finished the task),
// only throw an exception if the message has timed out.
2015-06-23 21:10:02 +02:00
if ($message == FALSE) {
if (self::_checkMessageTimeout($task)) {
// If the task times out, mark it as failed. We don't want to remove the
// track reference here in case it was a deletion that failed, for example.
$task->setDbStatus(CELERY_FAILED_STATUS)->save();
throw new CeleryTimeoutException("Celery task " . $task->getDbName()
. " with ID " . $task->getDbId() . " timed out");
} else {
// The message hasn't timed out, but it's still false, which means it hasn't been
// sent back from Celery yet.
throw new CeleryException("Waiting on Celery task " . $task->getDbName()
. " with ID " . $task->getDbId());
}
}
return $message;
}
/**
* Check to see if there are any pending tasks for this service
*
* @param string $taskName the name of the task to poll for
* @param string $serviceName the name of the service to poll for
*
* @return bool true if there are any pending tasks, otherwise false
*/
public static function isBrokerTaskQueueEmpty($taskName="", $serviceName = "") {
$pendingTasks = self::_getPendingTasks($taskName, $serviceName);
return empty($pendingTasks);
}
/**
* Poll the message queue for this service to see if any tasks with the given name have completed
*
* If we find any completed tasks, adjust the ThirdPartyTrackReferences table accordingly
*
* If no task name is passed, we poll all tasks for this service
*
* @param string $taskName the name of the task to poll for
* @param string $serviceName the name of the service to poll for
*/
public static function pollBrokerTaskQueue($taskName = "", $serviceName = "") {
$pendingTasks = self::_getPendingTasks($taskName, $serviceName);
foreach ($pendingTasks as $task) {
try {
$message = self::_getTaskMessage($task);
self::_processTaskMessage($task, $message);
} catch (CeleryTimeoutException $e) {
2015-06-23 21:10:02 +02:00
Logging::warn($e->getMessage());
} catch (Exception $e) {
// Because $message->result can be either an object or a string, sometimes
// we get a json_decode error and end up here
Logging::info($e->getMessage());
}
}
}
/**
* Return a collection of all pending CeleryTasks for this service or task
*
* @param string $taskName the name of the task to find
* @param string $serviceName the name of the service to find
*
* @return PropelCollection any pending CeleryTasks results for this service
* or task if taskName is provided
*/
protected static function _getPendingTasks($taskName, $serviceName) {
$query = CeleryTasksQuery::create()
->filterByDbStatus(CELERY_PENDING_STATUS)
->filterByDbId('', Criteria::NOT_EQUAL);
if (!empty($taskName)) {
$query->filterByDbName($taskName);
}
if (!empty($serviceName)) {
$query->useThirdPartyTrackReferencesQuery()
->filterByDbService($serviceName)->endUse();
}
return $query->joinThirdPartyTrackReferences()
->with('ThirdPartyTrackReferences')->find();
}
/**
* Get a Celery task message from the results queue
*
* @param $task CeleryTasks the Celery task object
*
* @return object the task message object
*
2015-06-23 21:10:02 +02:00
* @throws CeleryException when the result message for this task is still pending
* @throws CeleryTimeoutException when the result message for this task no longer exists
*/
protected static function _getTaskMessage($task) {
2015-06-23 21:10:02 +02:00
$message = self::getAsyncResultMessage($task);
return json_decode($message['body']);
}
/**
* Process a message from the results queue
*
* @param $task CeleryTasks Celery task object
* @param $message mixed async message object from php-celery
*/
protected static function _processTaskMessage($task, $message) {
$ref = $task->getThirdPartyTrackReferences(); // ThirdPartyTrackReferences join
$service = CeleryServiceFactory::getService($ref->getDbService());
if ($message->status == CELERY_SUCCESS_STATUS
&& $task->getDbName() == $service->getCeleryDeleteTaskName()) {
$service->removeTrackReference($ref->getDbFileId());
} else {
$service->updateTrackReference($ref->getDbId(), json_decode($message->result), $message->status);
}
}
/**
* Check if a task message has been unreachable for more our timeout time
*
* @param $task CeleryTasks the Celery task object
*
* @return bool true if the dispatch time is empty or it's been more than our timeout time
* since the message was dispatched, otherwise false
*/
protected static function _checkMessageTimeout($task) {
$utc = new DateTimeZone("UTC");
$dispatchTime = new DateTime($task->getDbDispatchTime(), $utc);
$now = new DateTime("now", $utc);
$timeoutSeconds = self::$_CELERY_MESSAGE_TIMEOUT / 1000; // Convert from milliseconds
$timeoutInterval = new DateInterval("PT" . $timeoutSeconds . "S");
return (empty($dispatchTime) || $dispatchTime->add($timeoutInterval) <= $now);
}
}