SAAS-868 - Refactor third party + celery workflow, implement locking on TaskManager
This commit is contained in:
parent
3902c8c746
commit
8d2e476ff1
34 changed files with 4302 additions and 1143 deletions
206
airtime_mvc/application/services/CeleryService.php
Normal file
206
airtime_mvc/application/services/CeleryService.php
Normal file
|
@ -0,0 +1,206 @@
|
|||
<?php
|
||||
|
||||
require_once "CeleryServiceFactory.php";
|
||||
|
||||
class CeleryService {
|
||||
|
||||
/**
|
||||
* @var int milliseconds (for compatibility with celery) until we consider a message to have timed out
|
||||
*/
|
||||
private static $_CELERY_MESSAGE_TIMEOUT = 600000; // 10 minutes
|
||||
|
||||
/**
|
||||
* We have to use celeryresults (the default results exchange) because php-celery
|
||||
* doesn't support named results exchanges.
|
||||
*
|
||||
* @var string exchange for celery task results
|
||||
*/
|
||||
private static $_CELERY_RESULTS_EXCHANGE = 'celeryresults';
|
||||
|
||||
/**
|
||||
* Connect to the Celery daemon via amqp
|
||||
*
|
||||
* @param $config array the airtime configuration array
|
||||
* @param $exchange string the amqp exchange name
|
||||
* @param $queue string the amqp queue name
|
||||
*
|
||||
* @return Celery the Celery connection object
|
||||
*
|
||||
* @throws Exception when a connection error occurs
|
||||
*/
|
||||
private static function _setupCeleryExchange($config, $exchange, $queue) {
|
||||
return new Celery($config["rabbitmq"]["host"],
|
||||
$config["rabbitmq"]["user"],
|
||||
$config["rabbitmq"]["password"],
|
||||
$config["rabbitmq"]["vhost"],
|
||||
$exchange, // Exchange name
|
||||
$queue, // Binding/queue
|
||||
$config["rabbitmq"]["port"],
|
||||
false,
|
||||
true, // Persistent messages
|
||||
self::$_CELERY_MESSAGE_TIMEOUT); // Result expiration
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an amqp message to Celery the airtime-celery daemon to perform a task
|
||||
*
|
||||
* @param $task string the Celery task name
|
||||
* @param $exchange string the amqp exchange name
|
||||
* @param $data array an associative array containing arguments for the Celery task
|
||||
*
|
||||
* @return string the task identifier for the started Celery task so we can fetch the
|
||||
* results asynchronously later
|
||||
*
|
||||
* @throws CeleryException when no message is found
|
||||
*/
|
||||
public static function sendCeleryMessage($task, $exchange, $data) {
|
||||
$config = parse_ini_file(Application_Model_RabbitMq::getRmqConfigPath(), true);
|
||||
$queue = $routingKey = $exchange;
|
||||
$c = self::_setupCeleryExchange($config, $exchange, $queue); // Use the exchange name for the queue
|
||||
$result = $c->PostTask($task, $data, true, $routingKey); // and routing key
|
||||
return $result->getId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a task name and identifier, check the Celery results queue for any
|
||||
* corresponding messages
|
||||
*
|
||||
* @param $task CeleryTasks the Celery task object
|
||||
*
|
||||
* @return object the message object
|
||||
*
|
||||
* @throws CeleryTimeoutException when no message is found and more than
|
||||
* $_CELERY_MESSAGE_TIMEOUT milliseconds have passed
|
||||
*/
|
||||
private static function getAsyncResultMessage($task) {
|
||||
$config = parse_ini_file(Application_Model_RabbitMq::getRmqConfigPath(), true);
|
||||
$queue = self::$_CELERY_RESULTS_EXCHANGE . "." . $task;
|
||||
$c = self::_setupCeleryExchange($config, self::$_CELERY_RESULTS_EXCHANGE, $queue);
|
||||
$message = $c->getAsyncResultMessage($task->getDbName(), $task->getDbId());
|
||||
|
||||
// If the message isn't ready yet (Celery hasn't finished the task),
|
||||
// only throw an exception if the message has timed out.
|
||||
if ($message == FALSE && self::_checkMessageTimeout($task)) {
|
||||
throw new CeleryTimeoutException("Celery task " . $task->getDbName()
|
||||
. " with ID " . $task->getDbId() . " timed out");
|
||||
}
|
||||
return $message;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if there are any pending tasks for this service
|
||||
*
|
||||
* @param string $taskName the name of the task to poll for
|
||||
* @param string $serviceName the name of the service to poll for
|
||||
*
|
||||
* @return bool true if there are any pending tasks, otherwise false
|
||||
*/
|
||||
public static function isBrokerTaskQueueEmpty($taskName="", $serviceName = "") {
|
||||
$pendingTasks = self::_getPendingTasks($taskName, $serviceName);
|
||||
return empty($pendingTasks);
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll the message queue for this service to see if any tasks with the given name have completed
|
||||
*
|
||||
* If we find any completed tasks, adjust the ThirdPartyTrackReferences table accordingly
|
||||
*
|
||||
* If no task name is passed, we poll all tasks for this service
|
||||
*
|
||||
* @param string $taskName the name of the task to poll for
|
||||
* @param string $serviceName the name of the service to poll for
|
||||
*/
|
||||
public static function pollBrokerTaskQueue($taskName = "", $serviceName = "") {
|
||||
$pendingTasks = self::_getPendingTasks($taskName, $serviceName);
|
||||
foreach ($pendingTasks as $task) {
|
||||
try {
|
||||
$message = self::_getTaskMessage($task);
|
||||
self::_processTaskMessage($task, $message);
|
||||
} catch (CeleryTimeoutException $e) {
|
||||
// If the task times out, mark it as failed. We don't want to remove the
|
||||
// track reference here in case it was a deletion that failed, for example.
|
||||
// TODO: Move this somewhere more appropriate
|
||||
$task->setDbStatus(CELERY_FAILED_STATUS);
|
||||
$task->save();
|
||||
Logging::info($e->getMessage());
|
||||
} catch (Exception $e) {
|
||||
// Because $message->result can be either an object or a string, sometimes
|
||||
// we get a json_decode error and end up here
|
||||
Logging::info($e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a collection of all pending CeleryTasks for this service or task
|
||||
*
|
||||
* @param string $taskName the name of the task to find
|
||||
* @param string $serviceName the name of the service to find
|
||||
*
|
||||
* @return PropelCollection any pending CeleryTasks results for this service
|
||||
* or task if taskName is provided
|
||||
*/
|
||||
protected static function _getPendingTasks($taskName, $serviceName) {
|
||||
$query = CeleryTasksQuery::create()
|
||||
->filterByDbStatus(CELERY_PENDING_STATUS)
|
||||
->filterByDbId('', Criteria::NOT_EQUAL);
|
||||
if (!empty($taskName)) {
|
||||
$query->filterByDbName($taskName);
|
||||
}
|
||||
if (!empty($serviceName)) {
|
||||
$query->useThirdPartyTrackReferencesQuery()
|
||||
->filterByDbService($serviceName)->endUse();
|
||||
}
|
||||
return $query->joinThirdPartyTrackReferences()
|
||||
->with('ThirdPartyTrackReferences')->find();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a Celery task message from the results queue
|
||||
*
|
||||
* @param $task CeleryTasks the Celery task object
|
||||
*
|
||||
* @return object the task message object
|
||||
*
|
||||
* @throws CeleryException when the result message for this task no longer exists
|
||||
*/
|
||||
protected static function _getTaskMessage($task) {
|
||||
$message = self::getAsyncResultMessage($task);
|
||||
return json_decode($message['body']);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a message from the results queue
|
||||
*
|
||||
* @param $task CeleryTasks Celery task object
|
||||
* @param $message mixed async message object from php-celery
|
||||
*/
|
||||
protected static function _processTaskMessage($task, $message) {
|
||||
$ref = $task->getThirdPartyTrackReferences(); // ThirdPartyTrackReferences join
|
||||
$service = CeleryServiceFactory::getService($ref->getDbService());
|
||||
if ($message->status == CELERY_SUCCESS_STATUS
|
||||
&& $task->getDbName() == $service->getCeleryDeleteTaskName()) {
|
||||
$service->removeTrackReference($ref->getDbFileId());
|
||||
} else {
|
||||
$service->updateTrackReference($ref->getDbId(), json_decode($message->result), $message->status);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a task message has been unreachable for more our timeout time
|
||||
*
|
||||
* @param $task CeleryTasks the Celery task object
|
||||
*
|
||||
* @return bool true if the dispatch time is empty or it's been more than our timeout time
|
||||
* since the message was dispatched, otherwise false
|
||||
*/
|
||||
protected static function _checkMessageTimeout($task) {
|
||||
$utc = new DateTimeZone("UTC");
|
||||
$dispatchTime = new DateTime($task->getDbDispatchTime(), $utc);
|
||||
$now = new DateTime("now", $utc);
|
||||
$timeoutSeconds = self::$_CELERY_MESSAGE_TIMEOUT / 1000; // Convert from milliseconds
|
||||
$timeoutInterval = new DateInterval("PT" . $timeoutSeconds . "S");
|
||||
return (empty($dispatchTime) || $dispatchTime->add($timeoutInterval) <= $now);
|
||||
}
|
||||
|
||||
}
|
20
airtime_mvc/application/services/CeleryServiceFactory.php
Normal file
20
airtime_mvc/application/services/CeleryServiceFactory.php
Normal file
|
@ -0,0 +1,20 @@
|
|||
<?php
|
||||
|
||||
class CeleryServiceFactory {
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param $serviceName string the name of the service to create
|
||||
*
|
||||
* @return ThirdPartyCeleryService|null
|
||||
*/
|
||||
public static function getService($serviceName) {
|
||||
switch($serviceName) {
|
||||
case SOUNDCLOUD_SERVICE_NAME:
|
||||
return new SoundcloudService();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
<?php
|
||||
|
||||
require_once "ThirdPartyService.php";
|
||||
require_once "ThirdPartyCeleryService.php";
|
||||
|
||||
class SoundcloudService extends ThirdPartyService {
|
||||
class SoundcloudService extends ThirdPartyCeleryService implements OAuth2 {
|
||||
|
||||
/**
|
||||
* @var string service access token for accessing remote API
|
||||
|
@ -17,7 +17,7 @@ class SoundcloudService extends ThirdPartyService {
|
|||
/**
|
||||
* @var string service name to store in ThirdPartyTrackReferences database
|
||||
*/
|
||||
protected static $_SERVICE_NAME = 'SoundCloud';
|
||||
protected static $_SERVICE_NAME = SOUNDCLOUD_SERVICE_NAME; // SoundCloud service name constant from constants.php
|
||||
|
||||
/**
|
||||
* @var string exchange name for SoundCloud tasks
|
||||
|
@ -84,25 +84,20 @@ class SoundcloudService extends ThirdPartyService {
|
|||
/**
|
||||
* Update a ThirdPartyTrackReferences object for a completed upload
|
||||
* TODO: should we have a database layer class to handle Propel operations?
|
||||
* TODO: break this function up, it's a bit of a beast
|
||||
*
|
||||
* @param $fileId int local CcFiles identifier
|
||||
* @param $track object third-party service track object
|
||||
* @param $status string Celery task status
|
||||
* @param $trackId int ThirdPartyTrackReferences identifier
|
||||
* @param $track object third-party service track object
|
||||
* @param $status string Celery task status
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws PropelException
|
||||
*/
|
||||
protected function _addOrUpdateTrackReference($fileId, $track, $status) {
|
||||
public function updateTrackReference($trackId, $track, $status) {
|
||||
parent::updateTrackReference($trackId, $track, $status);
|
||||
$ref = ThirdPartyTrackReferencesQuery::create()
|
||||
->filterByDbService(static::$_SERVICE_NAME)
|
||||
->findOneByDbFileId($fileId);
|
||||
->findOneByDbId($trackId);
|
||||
if (is_null($ref)) {
|
||||
$ref = new ThirdPartyTrackReferences();
|
||||
} // If this was a delete task, just remove the record and return
|
||||
else if ($ref->getDbBrokerTaskName() == static::$_CELERY_DELETE_TASK_NAME) {
|
||||
$ref->delete();
|
||||
return;
|
||||
}
|
||||
$ref->setDbService(static::$_SERVICE_NAME);
|
||||
// Only set the SoundCloud fields if the task was successful
|
||||
|
@ -110,15 +105,8 @@ class SoundcloudService extends ThirdPartyService {
|
|||
// TODO: fetch any additional SoundCloud parameters we want to store
|
||||
$ref->setDbForeignId($track->id); // SoundCloud identifier
|
||||
}
|
||||
$ref->setDbFileId($fileId);
|
||||
$ref->setDbStatus($status);
|
||||
// Null the broker task fields because we no longer need them
|
||||
// We use NULL over an empty string/object here because we have
|
||||
// a unique constraint on the task ID and it's easier to filter
|
||||
// and query against NULLs
|
||||
$ref->setDbBrokerTaskId(NULL);
|
||||
$ref->setDbBrokerTaskName(NULL);
|
||||
$ref->setDbBrokerTaskDispatchTime(NULL);
|
||||
// TODO: set SoundCloud upload status?
|
||||
// $ref->setDbStatus($status);
|
||||
$ref->save();
|
||||
}
|
||||
|
||||
|
|
135
airtime_mvc/application/services/ThirdPartyCeleryService.php
Normal file
135
airtime_mvc/application/services/ThirdPartyCeleryService.php
Normal file
|
@ -0,0 +1,135 @@
|
|||
<?php
|
||||
|
||||
require_once "ThirdPartyService.php";
|
||||
|
||||
abstract class ThirdPartyCeleryService extends ThirdPartyService {
|
||||
|
||||
/**
|
||||
* @var string broker exchange name for third-party tasks
|
||||
*/
|
||||
protected static $_CELERY_EXCHANGE_NAME;
|
||||
|
||||
/**
|
||||
* @var string celery task name for third-party uploads
|
||||
*/
|
||||
protected static $_CELERY_UPLOAD_TASK_NAME;
|
||||
|
||||
/**
|
||||
* @var string celery task name for third-party deletion
|
||||
*/
|
||||
protected static $_CELERY_DELETE_TASK_NAME;
|
||||
|
||||
/**
|
||||
* Upload the file with the given identifier to a third-party service
|
||||
*
|
||||
* @param int $fileId the local CcFiles identifier
|
||||
*/
|
||||
public function upload($fileId) {
|
||||
$file = Application_Model_StoredFile::RecallById($fileId);
|
||||
$data = array(
|
||||
'data' => $this->_getUploadData($file),
|
||||
'token' => $this->_accessToken,
|
||||
'file_path' => $file->getFilePaths()[0]
|
||||
);
|
||||
try {
|
||||
$brokerTaskId = CeleryService::sendCeleryMessage(static::$_CELERY_UPLOAD_TASK_NAME,
|
||||
static::$_CELERY_EXCHANGE_NAME,
|
||||
$data);
|
||||
$this->_createTaskReference($fileId, $brokerTaskId, static::$_CELERY_UPLOAD_TASK_NAME);
|
||||
} catch (Exception $e) {
|
||||
Logging::info("Invalid request: " . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the file with the given identifier from a third-party service
|
||||
*
|
||||
* @param int $fileId the local CcFiles identifier
|
||||
*
|
||||
* @throws ServiceNotFoundException when a $fileId with no corresponding
|
||||
* service identifier is given
|
||||
*/
|
||||
public function delete($fileId) {
|
||||
$serviceId = $this->getServiceId($fileId);
|
||||
if ($serviceId == 0) {
|
||||
throw new ServiceNotFoundException("No service found for file with ID $fileId");
|
||||
}
|
||||
$data = array(
|
||||
'token' => $this->_accessToken,
|
||||
'track_id' => $serviceId
|
||||
);
|
||||
try {
|
||||
$brokerTaskId = CeleryService::sendCeleryMessage(static::$_CELERY_DELETE_TASK_NAME,
|
||||
static::$_CELERY_EXCHANGE_NAME,
|
||||
$data);
|
||||
$this->_createTaskReference($fileId, $brokerTaskId, static::$_CELERY_DELETE_TASK_NAME);
|
||||
} catch (Exception $e) {
|
||||
Logging::info("Invalid request: " . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a CeleryTasks object for a pending task
|
||||
* TODO: should we have a database layer class to handle Propel operations?
|
||||
*
|
||||
* @param $fileId int CcFiles identifier
|
||||
* @param $brokerTaskId int broker task identifier to so we can asynchronously
|
||||
* receive completed task messages
|
||||
* @param $taskName string broker task name
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws PropelException
|
||||
*/
|
||||
protected function _createTaskReference($fileId, $brokerTaskId, $taskName) {
|
||||
$trackId = $this->createTrackReference($fileId);
|
||||
// First, check if the track already has an entry in the database
|
||||
$ref = CeleryTasksQuery::create()->findOneByDbId($brokerTaskId);
|
||||
if (is_null($ref)) {
|
||||
$ref = new CeleryTasks();
|
||||
}
|
||||
$ref->setDbId($brokerTaskId);
|
||||
$ref->setDbName($taskName);
|
||||
$utc = new DateTimeZone("UTC");
|
||||
$ref->setDbDispatchTime(new DateTime("now", $utc));
|
||||
$ref->setDbStatus(CELERY_PENDING_STATUS);
|
||||
$ref->setDbTrackReference($trackId);
|
||||
$ref->save();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a CeleryTasks object for a completed upload
|
||||
* TODO: should we have a database layer class to handle Propel operations?
|
||||
*
|
||||
* @param $trackId int ThirdPartyTrackReferences identifier
|
||||
* @param $track object third-party service track object
|
||||
* @param $status string Celery task status
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws PropelException
|
||||
*/
|
||||
public function updateTrackReference($trackId, $track, $status) {
|
||||
$ref = CeleryTasksQuery::create()
|
||||
->findOneByDbTrackReference($trackId);
|
||||
$ref->setDbStatus($status);
|
||||
$ref->save();
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a parameter array for the file being uploaded to a third party service
|
||||
*
|
||||
* @param $file Application_Model_StoredFile the file being uploaded
|
||||
*
|
||||
* @return array the track array to send to the third party service
|
||||
*/
|
||||
abstract protected function _getUploadData($file);
|
||||
|
||||
/**
|
||||
* Field accessor for $_CELERY_DELETE_TASK_NAME
|
||||
*
|
||||
* @return string the Celery task name for deleting tracks from this service
|
||||
*/
|
||||
public function getCeleryDeleteTaskName() {
|
||||
return self::$_CELERY_DELETE_TASK_NAME;
|
||||
}
|
||||
|
||||
}
|
|
@ -7,12 +7,11 @@ class ServiceNotFoundException extends Exception {}
|
|||
|
||||
/**
|
||||
* Class ThirdPartyService generic superclass for third-party services
|
||||
* TODO: decouple the media/track-specific functions into ThirdPartyMediaService class?
|
||||
*/
|
||||
abstract class ThirdPartyService {
|
||||
|
||||
/**
|
||||
* @var string service access token for accessing remote API
|
||||
* @var string service access token for accessing third-party API
|
||||
*/
|
||||
protected $_accessToken;
|
||||
|
||||
|
@ -27,86 +26,18 @@ abstract class ThirdPartyService {
|
|||
protected static $_THIRD_PARTY_TRACK_URI;
|
||||
|
||||
/**
|
||||
* @var string broker exchange name for third party tasks
|
||||
*/
|
||||
protected static $_CELERY_EXCHANGE_NAME;
|
||||
|
||||
/**
|
||||
* @var string celery task name for third party uploads
|
||||
*/
|
||||
protected static $_CELERY_UPLOAD_TASK_NAME;
|
||||
|
||||
/**
|
||||
* @var string celery task name for third party deletion
|
||||
*/
|
||||
protected static $_CELERY_DELETE_TASK_NAME;
|
||||
|
||||
/**
|
||||
* Upload the file with the given identifier to a third-party service
|
||||
*
|
||||
* @param int $fileId the local CcFiles identifier
|
||||
*/
|
||||
public function upload($fileId) {
|
||||
$file = Application_Model_StoredFile::RecallById($fileId);
|
||||
$data = array(
|
||||
'data' => $this->_getUploadData($file),
|
||||
'token' => $this->_accessToken,
|
||||
'file_path' => $file->getFilePaths()[0]
|
||||
);
|
||||
try {
|
||||
$brokerTaskId = Application_Model_RabbitMq::sendCeleryMessage(static::$_CELERY_UPLOAD_TASK_NAME,
|
||||
static::$_CELERY_EXCHANGE_NAME,
|
||||
$data);
|
||||
$this->_createTaskReference($fileId, $brokerTaskId, static::$_CELERY_UPLOAD_TASK_NAME);
|
||||
} catch (Exception $e) {
|
||||
Logging::info("Invalid request: " . $e->getMessage());
|
||||
// We should only get here if we have an access token, so attempt to refresh
|
||||
$this->accessTokenRefresh();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the file with the given identifier from a third-party service
|
||||
*
|
||||
* @param int $fileId the local CcFiles identifier
|
||||
*
|
||||
* @throws ServiceNotFoundException when a $fileId with no corresponding
|
||||
* service identifier is given
|
||||
*/
|
||||
public function delete($fileId) {
|
||||
$serviceId = $this->getServiceId($fileId);
|
||||
if ($serviceId == 0) {
|
||||
throw new ServiceNotFoundException("No service found for file with ID $fileId");
|
||||
}
|
||||
$data = array(
|
||||
'token' => $this->_accessToken,
|
||||
'track_id' => $serviceId
|
||||
);
|
||||
try {
|
||||
$brokerTaskId = Application_Model_RabbitMq::sendCeleryMessage(static::$_CELERY_DELETE_TASK_NAME,
|
||||
static::$_CELERY_EXCHANGE_NAME,
|
||||
$data);
|
||||
$this->_createTaskReference($fileId, $brokerTaskId, static::$_CELERY_DELETE_TASK_NAME);
|
||||
} catch (Exception $e) {
|
||||
Logging::info("Invalid request: " . $e->getMessage());
|
||||
// We should only get here if we have an access token, so attempt to refresh
|
||||
$this->accessTokenRefresh();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a ThirdPartyTrackReferences object for a pending task
|
||||
* Create a ThirdPartyTrackReferences object for a track that's been uploaded
|
||||
* to an external service
|
||||
* TODO: should we have a database layer class to handle Propel operations?
|
||||
*
|
||||
* @param $fileId int local CcFiles identifier
|
||||
* @param $brokerTaskId int broker task identifier to so we can asynchronously
|
||||
* receive completed task messages
|
||||
* @param $taskName string broker task name
|
||||
* @param $fileId int local CcFiles identifier
|
||||
*
|
||||
* @return string the new ThirdPartyTrackReferences identifier
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws PropelException
|
||||
*/
|
||||
protected function _createTaskReference($fileId, $brokerTaskId, $taskName) {
|
||||
public function createTrackReference($fileId) {
|
||||
// First, check if the track already has an entry in the database
|
||||
$ref = ThirdPartyTrackReferencesQuery::create()
|
||||
->filterByDbService(static::$_SERVICE_NAME)
|
||||
|
@ -115,13 +46,11 @@ abstract class ThirdPartyService {
|
|||
$ref = new ThirdPartyTrackReferences();
|
||||
}
|
||||
$ref->setDbService(static::$_SERVICE_NAME);
|
||||
$ref->setDbBrokerTaskId($brokerTaskId);
|
||||
$ref->setDbBrokerTaskName($taskName);
|
||||
$utc = new DateTimeZone("UTC");
|
||||
$ref->setDbBrokerTaskDispatchTime(new DateTime("now", $utc));
|
||||
// TODO: implement service-specific statuses?
|
||||
// $ref->setDbStatus(CELERY_PENDING_STATUS);
|
||||
$ref->setDbFileId($fileId);
|
||||
$ref->setDbStatus(CELERY_PENDING_STATUS);
|
||||
$ref->save();
|
||||
return $ref->getDbId();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -129,7 +58,7 @@ abstract class ThirdPartyService {
|
|||
* This is necessary if the track was removed from the service
|
||||
* or the foreign id in our database is incorrect
|
||||
*
|
||||
* @param $fileId int local CcFiles identifier
|
||||
* @param $fileId int cc_files identifier
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws PropelException
|
||||
|
@ -147,169 +76,55 @@ abstract class ThirdPartyService {
|
|||
*
|
||||
* @param int $fileId the local CcFiles identifier
|
||||
*
|
||||
* @return int the service foreign identifier
|
||||
* @return string the service foreign identifier
|
||||
*/
|
||||
public function getServiceId($fileId) {
|
||||
$ref = ThirdPartyTrackReferencesQuery::create()
|
||||
->filterByDbService(static::$_SERVICE_NAME)
|
||||
->findOneByDbFileId($fileId); // There shouldn't be duplicates!
|
||||
return empty($ref) ? 0 : $ref->getDbForeignId();
|
||||
return empty($ref) ? '' : $ref->getDbForeignId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a CcFiles identifier for a file that's been uploaded to a third-party service,
|
||||
* return a link to the remote file
|
||||
*
|
||||
* @param int $fileId the local CcFiles identifier
|
||||
* @param int $fileId CcFiles identifier
|
||||
*
|
||||
* @return string the link to the remote file
|
||||
*/
|
||||
public function getLinkToFile($fileId) {
|
||||
$serviceId = $this->getServiceId($fileId);
|
||||
return $serviceId > 0 ? static::$_THIRD_PARTY_TRACK_URI . $serviceId : '';
|
||||
return empty($serviceId) ? '' : static::$_THIRD_PARTY_TRACK_URI . $serviceId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if there are any pending tasks for this service
|
||||
* Upload the file with the given identifier to a third-party service
|
||||
*
|
||||
* @param string $taskName
|
||||
*
|
||||
* @return bool true if there are any pending tasks, otherwise false
|
||||
* @param int $fileId CcFiles identifier
|
||||
*/
|
||||
public function isBrokerTaskQueueEmpty($taskName="") {
|
||||
$query = ThirdPartyTrackReferencesQuery::create()
|
||||
->filterByDbService(static::$_SERVICE_NAME);
|
||||
if (!empty($taskName)) {
|
||||
$query->filterByDbBrokerTaskName($taskName);
|
||||
}
|
||||
$result = $query->findOneByDbStatus(CELERY_PENDING_STATUS);
|
||||
return empty($result);
|
||||
}
|
||||
abstract function upload($fileId);
|
||||
|
||||
/**
|
||||
* Poll the message queue for this service to see if any tasks with the given name have completed
|
||||
* If we find any completed tasks, adjust the ThirdPartyTrackReferences table accordingly
|
||||
* If no task name is passed, we poll all tasks for this service
|
||||
* Delete the file with the given identifier from a third-party service
|
||||
*
|
||||
* @param string $taskName the name of the task to poll for
|
||||
* @param int $fileId the local CcFiles identifier
|
||||
*
|
||||
* @throws ServiceNotFoundException when a $fileId with no corresponding
|
||||
* service identifier is given
|
||||
*/
|
||||
public function pollBrokerTaskQueue($taskName="") {
|
||||
$pendingTasks = static::_getPendingTasks($taskName);
|
||||
foreach ($pendingTasks as $task) {
|
||||
try {
|
||||
$message = static::_getTaskMessage($task);
|
||||
static::_addOrUpdateTrackReference($task->getDbFileId(), json_decode($message->result), $message->status);
|
||||
} catch (CeleryException $e) {
|
||||
// Fail silently unless the message has timed out; often we end up here when
|
||||
// the Celery task takes a while to execute
|
||||
if (static::_checkMessageTimeout($task)) {
|
||||
Logging::info($e->getMessage());
|
||||
$task->setDbStatus(CELERY_FAILED_STATUS);
|
||||
$task->save();
|
||||
}
|
||||
} catch (Exception $e) {
|
||||
// Sometimes we might catch a json_decode error and end up here
|
||||
Logging::info($e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a collection of all pending ThirdPartyTrackReferences to tasks for this service or task
|
||||
*
|
||||
* @param string $taskName the name of the task to look for
|
||||
*
|
||||
* @return PropelCollection any pending ThirdPartyTrackReferences results for this service
|
||||
* or task if taskName is provided
|
||||
*/
|
||||
protected function _getPendingTasks($taskName) {
|
||||
$query = ThirdPartyTrackReferencesQuery::create()
|
||||
->filterByDbService(static::$_SERVICE_NAME)
|
||||
->filterByDbStatus(CELERY_PENDING_STATUS)
|
||||
->filterByDbBrokerTaskId('', Criteria::NOT_EQUAL);
|
||||
if (!empty($taskName)) {
|
||||
$query->filterByDbBrokerTaskName($taskName);
|
||||
}
|
||||
return $query->find();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a Celery task message from the results queue
|
||||
*
|
||||
* @param $task ThirdPartyTrackReferences the track reference object
|
||||
*
|
||||
* @return object the task message object
|
||||
*
|
||||
* @throws CeleryException when the result message for this task no longer exists
|
||||
*/
|
||||
protected static function _getTaskMessage($task) {
|
||||
$message = Application_Model_RabbitMq::getAsyncResultMessage($task->getDbBrokerTaskName(),
|
||||
$task->getDbBrokerTaskId());
|
||||
return json_decode($message['body']);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a task message has been unreachable for more our timeout time
|
||||
*
|
||||
* @param $task ThirdPartyTrackReferences the track reference object
|
||||
*
|
||||
* @return bool true if the dispatch time is empty or it's been more than our timeout time
|
||||
* since the message was dispatched, otherwise false
|
||||
*/
|
||||
protected static function _checkMessageTimeout($task) {
|
||||
$utc = new DateTimeZone("UTC");
|
||||
$dispatchTime = new DateTime($task->getDbBrokerTaskDispatchTime(), $utc);
|
||||
$now = new DateTime("now", $utc);
|
||||
$timeoutSeconds = Application_Model_RabbitMq::$_CELERY_MESSAGE_TIMEOUT / 1000; // Convert from milliseconds
|
||||
$timeoutInterval = new DateInterval("PT" . $timeoutSeconds . "S");
|
||||
return (empty($dispatchTime) || $dispatchTime->add($timeoutInterval) <= $now);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a parameter array for the file being uploaded to a third party service
|
||||
*
|
||||
* @param $file Application_Model_StoredFile the file being uploaded
|
||||
*
|
||||
* @return array the track array to send to the third party service
|
||||
*/
|
||||
abstract protected function _getUploadData($file);
|
||||
abstract function delete($fileId);
|
||||
|
||||
/**
|
||||
* Update a ThirdPartyTrackReferences object for a completed task
|
||||
*
|
||||
* @param $fileId int local CcFiles identifier
|
||||
* @param $track object third-party service track object
|
||||
* @param $status string Celery task status
|
||||
* @param $trackId int ThirdPartyTrackReferences identifier
|
||||
* @param $track object third-party service track object
|
||||
* @param $status string Celery task status
|
||||
*
|
||||
* @throws Exception
|
||||
* @throws PropelException
|
||||
*/
|
||||
abstract protected function _addOrUpdateTrackReference($fileId, $track, $status);
|
||||
|
||||
/**
|
||||
* Check whether an OAuth access token exists for the third-party client
|
||||
*
|
||||
* @return bool true if an access token exists, otherwise false
|
||||
*/
|
||||
abstract function hasAccessToken();
|
||||
|
||||
/**
|
||||
* Get the OAuth authorization URL
|
||||
*
|
||||
* @return string the authorization URL
|
||||
*/
|
||||
abstract function getAuthorizeUrl();
|
||||
|
||||
/**
|
||||
* Request a new OAuth access token from a third-party service and store it in CcPref
|
||||
*
|
||||
* @param $code string exchange authorization code for access token
|
||||
*/
|
||||
abstract function requestNewAccessToken($code);
|
||||
|
||||
/**
|
||||
* Regenerate the third-party client's OAuth access token
|
||||
*/
|
||||
abstract function accessTokenRefresh();
|
||||
abstract function updateTrackReference($trackId, $track, $status);
|
||||
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue