PostTask($task, $data, true, $routingKey); // and routing key return $result->getId(); } /** * Given a task name and identifier, check the Celery results queue for any * corresponding messages. * * @param $task CeleryTasks the Celery task object * * @return array the message response array * * @throws CeleryException when no message is found * @throws CeleryTimeoutException when no message is found and more than * $_CELERY_MESSAGE_TIMEOUT milliseconds have passed */ private static function getAsyncResultMessage($task) { $config = Config::getConfig(); $queue = self::$_CELERY_RESULTS_EXCHANGE . '.' . $task; $c = self::_setupCeleryExchange($config, self::$_CELERY_RESULTS_EXCHANGE, $queue); $message = $c->getAsyncResultMessage($task->getDbName(), $task->getDbTaskId()); // If the message isn't ready yet (Celery hasn't finished the task), throw an exception. if ($message == false) { if (static::_checkMessageTimeout($task)) { // If the task times out, mark it as failed. We don't want to remove the // track reference here in case it was a deletion that failed, for example. $task->setDbStatus(CELERY_FAILED_STATUS)->save(); throw new CeleryTimeoutException('Celery task ' . $task->getDbName() . ' with ID ' . $task->getDbTaskId() . ' timed out'); } // The message hasn't timed out, but it's still false, which means it hasn't been // sent back from Celery yet. throw new CeleryException('Waiting on Celery task ' . $task->getDbName() . ' with ID ' . $task->getDbTaskId()); } return $message; } /** * Check to see if there are any pending tasks for this service. * * @param string $taskName the name of the task to poll for * @param string $serviceName the name of the service to poll for * * @return bool true if there are any pending tasks, otherwise false */ public static function isBrokerTaskQueueEmpty($taskName = '', $serviceName = '') { self::$_pendingTasks = static::_getPendingTasks($taskName, $serviceName); return empty(self::$_pendingTasks); } /** * Poll the message queue for this service to see if any tasks with the given name have completed. * * If we find any completed tasks, adjust the ThirdPartyTrackReferences table accordingly * * If no task name is passed, we poll all tasks for this service * * @param string $taskName the name of the task to poll for * @param string $serviceName the name of the service to poll for */ public static function pollBrokerTaskQueue($taskName = '', $serviceName = '') { $pendingTasks = empty(self::$_pendingTasks) ? static::_getPendingTasks($taskName, $serviceName) : self::$_pendingTasks; foreach ($pendingTasks as $task) { try { $message = static::_getTaskMessage($task); static::_processTaskMessage($task, $message); } catch (CeleryTimeoutException $e) { Logging::warn($e->getMessage()); } catch (CeleryException $e) { // Don't log these - they end up clogging up the logs } catch (Exception $e) { // Because $message->result can be either an object or a string, sometimes // we get a json_decode error and end up here Logging::info($e->getMessage()); } } } /** * Return a collection of all pending CeleryTasks for this service or task. * * @param string $taskName the name of the task to find * @param string $serviceName the name of the service to find * * @return PropelCollection any pending CeleryTasks results for this service * or task if taskName is provided */ protected static function _getPendingTasks($taskName, $serviceName) { $query = CeleryTasksQuery::create() ->filterByDbStatus(CELERY_PENDING_STATUS) ->filterByDbTaskId('', Criteria::NOT_EQUAL); if (!empty($taskName)) { $query->filterByDbName($taskName); } if (!empty($serviceName)) { $query->useThirdPartyTrackReferencesQuery() ->filterByDbService($serviceName)->endUse(); } return $query->joinThirdPartyTrackReferences() ->with('ThirdPartyTrackReferences')->find(); } /** * Get a Celery task message from the results queue. * * @param $task CeleryTasks the Celery task object * * @return object the task message object * * @throws CeleryException when the result message for this task is still pending * @throws CeleryTimeoutException when the result message for this task no longer exists */ protected static function _getTaskMessage($task) { $message = self::getAsyncResultMessage($task); return json_decode($message['body']); } /** * Process a message from the results queue. * * @param $task CeleryTasks Celery task object * @param $message mixed async message object from php-celery */ protected static function _processTaskMessage($task, $message) { $ref = $task->getThirdPartyTrackReferences(); // ThirdPartyTrackReferences join $service = CeleryServiceFactory::getService($ref->getDbService()); $service->updateTrackReference($task, $ref->getDbId(), json_decode($message->result), $message->status); } /** * Check if a task message has been unreachable for more our timeout time. * * @param $task CeleryTasks the Celery task object * * @return bool true if the dispatch time is empty or it's been more than our timeout time * since the message was dispatched, otherwise false */ protected static function _checkMessageTimeout($task) { $utc = new DateTimeZone('UTC'); $dispatchTime = new DateTime($task->getDbDispatchTime(), $utc); $now = new DateTime('now', $utc); $timeoutSeconds = self::$_CELERY_MESSAGE_TIMEOUT / 1000; // Convert from milliseconds $timeoutInterval = new DateInterval('PT' . $timeoutSeconds . 'S'); return empty($dispatchTime) || $dispatchTime->add($timeoutInterval) <= $now; } }