Fixes to airtime-celery setup

This commit is contained in:
Duncan Sommerville 2015-06-23 15:10:02 -04:00
parent 76a7aa9a24
commit 70f6cbbc71
5 changed files with 57 additions and 32 deletions

View file

@ -50,8 +50,6 @@ class CeleryService {
*
* @return string the task identifier for the started Celery task so we can fetch the
* results asynchronously later
*
* @throws CeleryException when no message is found
*/
public static function sendCeleryMessage($task, $exchange, $data) {
$config = parse_ini_file(Application_Model_RabbitMq::getRmqConfigPath(), true);
@ -67,8 +65,9 @@ class CeleryService {
*
* @param $task CeleryTasks the Celery task object
*
* @return object the message object
* @return array the message response array
*
* @throws CeleryException when no message is found
* @throws CeleryTimeoutException when no message is found and more than
* $_CELERY_MESSAGE_TIMEOUT milliseconds have passed
*/
@ -80,13 +79,19 @@ class CeleryService {
// If the message isn't ready yet (Celery hasn't finished the task),
// only throw an exception if the message has timed out.
if ($message == FALSE && self::_checkMessageTimeout($task)) {
// If the task times out, mark it as failed. We don't want to remove the
// track reference here in case it was a deletion that failed, for example.
$task->setDbStatus(CELERY_FAILED_STATUS);
$task->save();
throw new CeleryTimeoutException("Celery task " . $task->getDbName()
. " with ID " . $task->getDbId() . " timed out");
if ($message == FALSE) {
if (self::_checkMessageTimeout($task)) {
// If the task times out, mark it as failed. We don't want to remove the
// track reference here in case it was a deletion that failed, for example.
$task->setDbStatus(CELERY_FAILED_STATUS)->save();
throw new CeleryTimeoutException("Celery task " . $task->getDbName()
. " with ID " . $task->getDbId() . " timed out");
} else {
// The message hasn't timed out, but it's still false, which means it hasn't been
// sent back from Celery yet.
throw new CeleryException("Waiting on Celery task " . $task->getDbName()
. " with ID " . $task->getDbId());
}
}
return $message;
}
@ -121,7 +126,7 @@ class CeleryService {
$message = self::_getTaskMessage($task);
self::_processTaskMessage($task, $message);
} catch (CeleryTimeoutException $e) {
Logging::info($e->getMessage());
Logging::warn($e->getMessage());
} catch (Exception $e) {
// Because $message->result can be either an object or a string, sometimes
// we get a json_decode error and end up here
@ -161,10 +166,11 @@ class CeleryService {
*
* @return object the task message object
*
* @throws CeleryException when the result message for this task no longer exists
* @throws CeleryException when the result message for this task is still pending
* @throws CeleryTimeoutException when the result message for this task no longer exists
*/
protected static function _getTaskMessage($task) {
$message = self::getAsyncResultMessage($task);
$message = self::getAsyncResultMessage($task);
return json_decode($message['body']);
}