Rename CeleryService to avoid confusion; fix wording and reduce redundancy in task functions
This commit is contained in:
parent
a444751397
commit
7b76c8d6d3
|
@ -237,14 +237,14 @@ class CeleryTask implements AirtimeTask {
|
||||||
* @return bool true if there are pending tasks in ThirdPartyTrackReferences
|
* @return bool true if there are pending tasks in ThirdPartyTrackReferences
|
||||||
*/
|
*/
|
||||||
public function shouldBeRun() {
|
public function shouldBeRun() {
|
||||||
return !CeleryService::isBrokerTaskQueueEmpty();
|
return !CeleryManager::isBrokerTaskQueueEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Poll the task queue for any completed Celery tasks
|
* Poll the task queue for any completed Celery tasks
|
||||||
*/
|
*/
|
||||||
public function run() {
|
public function run() {
|
||||||
CeleryService::pollBrokerTaskQueue();
|
CeleryManager::pollBrokerTaskQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
require_once "CeleryServiceFactory.php";
|
require_once "CeleryServiceFactory.php";
|
||||||
|
|
||||||
class CeleryService {
|
class CeleryManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @var int milliseconds (for compatibility with celery) until we consider a message to have timed out
|
* @var int milliseconds (for compatibility with celery) until we consider a message to have timed out
|
|
@ -24,6 +24,30 @@ abstract class ThirdPartyCeleryService extends ThirdPartyService {
|
||||||
*/
|
*/
|
||||||
protected static $_CELERY_DELETE_TASK_NAME;
|
protected static $_CELERY_DELETE_TASK_NAME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a Celery task with the given name and data parameters
|
||||||
|
*
|
||||||
|
* FIXME: Currently, downloads will not create task reference rows because they
|
||||||
|
* don't have a valid file identifier - this means that we will never know if there
|
||||||
|
* is an issue with the download before the callback to /rest/media is called!
|
||||||
|
*
|
||||||
|
* @param string $taskName the name of the celery task to execute
|
||||||
|
* @param array $data the data array to send as task parameters
|
||||||
|
* @param int $fileId the unique identifier for the file involved in the task
|
||||||
|
*/
|
||||||
|
protected function _executeTask($taskName, $data, $fileId) {
|
||||||
|
try {
|
||||||
|
$brokerTaskId = CeleryManager::sendCeleryMessage($taskName,
|
||||||
|
static::$_CELERY_EXCHANGE_NAME,
|
||||||
|
$data);
|
||||||
|
if (!empty($fileId)) {
|
||||||
|
$this->_createTaskReference($fileId, $brokerTaskId, $taskName);
|
||||||
|
}
|
||||||
|
} catch (Exception $e) {
|
||||||
|
Logging::info("Invalid request: " . $e->getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upload the file with the given identifier to a third-party service
|
* Upload the file with the given identifier to a third-party service
|
||||||
*
|
*
|
||||||
|
@ -36,20 +60,13 @@ abstract class ThirdPartyCeleryService extends ThirdPartyService {
|
||||||
'token' => $this->_accessToken,
|
'token' => $this->_accessToken,
|
||||||
'file_path' => $file->getFilePaths()[0]
|
'file_path' => $file->getFilePaths()[0]
|
||||||
);
|
);
|
||||||
try {
|
$this->_executeTask(static::$_CELERY_UPLOAD_TASK_NAME, $data, $fileId);
|
||||||
$brokerTaskId = CeleryService::sendCeleryMessage(static::$_CELERY_UPLOAD_TASK_NAME,
|
|
||||||
static::$_CELERY_EXCHANGE_NAME,
|
|
||||||
$data);
|
|
||||||
$this->_createTaskReference($fileId, $brokerTaskId, static::$_CELERY_UPLOAD_TASK_NAME);
|
|
||||||
} catch (Exception $e) {
|
|
||||||
Logging::info("Invalid request: " . $e->getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a track identifier, download a track from a third-party service.
|
* Given a track identifier, download a track from a third-party service.
|
||||||
*
|
*
|
||||||
* @param int|null $trackId a track identifier
|
* @param int $trackId a track identifier
|
||||||
*/
|
*/
|
||||||
public function download($trackId) {
|
public function download($trackId) {
|
||||||
$namespace = new Zend_Session_Namespace('csrf_namespace');
|
$namespace = new Zend_Session_Namespace('csrf_namespace');
|
||||||
|
@ -59,13 +76,9 @@ abstract class ThirdPartyCeleryService extends ThirdPartyService {
|
||||||
'token' => $this->_accessToken,
|
'token' => $this->_accessToken,
|
||||||
'track_id' => $trackId
|
'track_id' => $trackId
|
||||||
);
|
);
|
||||||
try {
|
// FIXME
|
||||||
CeleryService::sendCeleryMessage(static::$_CELERY_DOWNLOAD_TASK_NAME,
|
Logging::warn("FIXME: we can't create a task reference without a valid file ID");
|
||||||
static::$_CELERY_EXCHANGE_NAME,
|
$this->_executeTask(static::$_CELERY_DOWNLOAD_TASK_NAME, $data, null);
|
||||||
$data);
|
|
||||||
} catch (Exception $e) {
|
|
||||||
Logging::info("Invalid request: " . $e->getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -85,14 +98,7 @@ abstract class ThirdPartyCeleryService extends ThirdPartyService {
|
||||||
'token' => $this->_accessToken,
|
'token' => $this->_accessToken,
|
||||||
'track_id' => $serviceId
|
'track_id' => $serviceId
|
||||||
);
|
);
|
||||||
try {
|
$this->_executeTask(static::$_CELERY_DELETE_TASK_NAME, $data, $fileId);
|
||||||
$brokerTaskId = CeleryService::sendCeleryMessage(static::$_CELERY_DELETE_TASK_NAME,
|
|
||||||
static::$_CELERY_EXCHANGE_NAME,
|
|
||||||
$data);
|
|
||||||
$this->_createTaskReference($fileId, $brokerTaskId, static::$_CELERY_DELETE_TASK_NAME);
|
|
||||||
} catch (Exception $e) {
|
|
||||||
Logging::info("Invalid request: " . $e->getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -108,19 +114,19 @@ abstract class ThirdPartyCeleryService extends ThirdPartyService {
|
||||||
* @throws PropelException
|
* @throws PropelException
|
||||||
*/
|
*/
|
||||||
protected function _createTaskReference($fileId, $brokerTaskId, $taskName) {
|
protected function _createTaskReference($fileId, $brokerTaskId, $taskName) {
|
||||||
$trackId = $this->createTrackReference($fileId);
|
$trackReferenceId = $this->createTrackReference($fileId);
|
||||||
$task = new CeleryTasks();
|
$task = new CeleryTasks();
|
||||||
$task->setDbTaskId($brokerTaskId);
|
$task->setDbTaskId($brokerTaskId);
|
||||||
$task->setDbName($taskName);
|
$task->setDbName($taskName);
|
||||||
$utc = new DateTimeZone("UTC");
|
$utc = new DateTimeZone("UTC");
|
||||||
$task->setDbDispatchTime(new DateTime("now", $utc));
|
$task->setDbDispatchTime(new DateTime("now", $utc));
|
||||||
$task->setDbStatus(CELERY_PENDING_STATUS);
|
$task->setDbStatus(CELERY_PENDING_STATUS);
|
||||||
$task->setDbTrackReference($trackId);
|
$task->setDbTrackReference($trackReferenceId);
|
||||||
$task->save();
|
$task->save();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update a CeleryTasks object for a completed upload
|
* Update a CeleryTasks object for a completed task
|
||||||
* TODO: should we have a database layer class to handle Propel operations?
|
* TODO: should we have a database layer class to handle Propel operations?
|
||||||
*
|
*
|
||||||
* @param $trackId int ThirdPartyTrackReferences identifier
|
* @param $trackId int ThirdPartyTrackReferences identifier
|
||||||
|
|
|
@ -54,7 +54,7 @@ abstract class ThirdPartyService {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a ThirdPartyTrackReferences from the database.
|
* Remove a ThirdPartyTrackReferences row from the database.
|
||||||
* This is necessary if the track was removed from the service
|
* This is necessary if the track was removed from the service
|
||||||
* or the foreign id in our database is incorrect
|
* or the foreign id in our database is incorrect
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue