Fix merge errors; use Celery status messages + handle fail case better

This commit is contained in:
Duncan Sommerville 2015-06-10 17:11:42 -04:00
parent 8163608666
commit c1b5b53a16
3 changed files with 33 additions and 20 deletions

View file

@ -83,15 +83,16 @@ class SoundcloudService extends ThirdPartyService {
/** /**
* Update a ThirdPartyTrackReferences object for a completed upload * Update a ThirdPartyTrackReferences object for a completed upload
* TODO: should we have a database layer class to handle Propel operations?
* *
* @param $fileId int local CcFiles identifier * @param $fileId int local CcFiles identifier
* @param $track object third-party service track object * @param $track object third-party service track object
* @param $status string Celery task status
* *
* @throws Exception * @throws Exception
* @throws PropelException * @throws PropelException
*/ */
protected function _addOrUpdateTrackReference($fileId, $track) { protected function _addOrUpdateTrackReference($fileId, $track, $status) {
// First, check if the track already has an entry in the database
$ref = ThirdPartyTrackReferencesQuery::create() $ref = ThirdPartyTrackReferencesQuery::create()
->filterByDbService($this->_SERVICE_NAME) ->filterByDbService($this->_SERVICE_NAME)
->findOneByDbFileId($fileId); ->findOneByDbFileId($fileId);
@ -99,10 +100,17 @@ class SoundcloudService extends ThirdPartyService {
$ref = new ThirdPartyTrackReferences(); $ref = new ThirdPartyTrackReferences();
} }
$ref->setDbService($this->_SERVICE_NAME); $ref->setDbService($this->_SERVICE_NAME);
$ref->setDbForeignId($track->id); // Only set the SoundCloud fields if the task was successful
if ($status == $this->_SUCCESS_STATUS) {
// TODO: fetch any additional SoundCloud parameters we want to store
$ref->setDbForeignId($track->id); // SoundCloud identifier
}
$ref->setDbFileId($fileId); $ref->setDbFileId($fileId);
$ref->setDbStatus($track->state); $ref->setDbStatus($status);
// Null the broker task fields because we no longer need them // Null the broker task fields because we no longer need them
// We use NULL over an empty string/object here because we have
// a unique constraint on the task ID and it's easier to filter
// and query against NULLs
$ref->setDbBrokerTaskId(NULL); $ref->setDbBrokerTaskId(NULL);
$ref->setDbBrokerTaskName(NULL); $ref->setDbBrokerTaskName(NULL);
$ref->setDbBrokerTaskDispatchTime(NULL); $ref->setDbBrokerTaskDispatchTime(NULL);

View file

@ -35,6 +35,11 @@ abstract class ThirdPartyService {
*/ */
protected $_PENDING_STATUS = 'PENDING'; protected $_PENDING_STATUS = 'PENDING';
/**
* @var string status string for successful tasks
*/
protected $_SUCCESS_STATUS = 'SUCCESS';
/** /**
* @var string status string for failed tasks * @var string status string for failed tasks
*/ */
@ -68,6 +73,7 @@ abstract class ThirdPartyService {
/** /**
* Create a ThirdPartyTrackReferences object for a pending task * Create a ThirdPartyTrackReferences object for a pending task
* TODO: should we have a database layer class to handle Propel operations?
* *
* @param $fileId int local CcFiles identifier * @param $fileId int local CcFiles identifier
* @param $brokerTaskId int broker task identifier to so we can asynchronously * @param $brokerTaskId int broker task identifier to so we can asynchronously
@ -151,9 +157,8 @@ abstract class ThirdPartyService {
$pendingTasks = $this->_getPendingTasks($taskName); $pendingTasks = $this->_getPendingTasks($taskName);
foreach ($pendingTasks as $task) { foreach ($pendingTasks as $task) {
try { try {
$result = $this->_getTaskResult($task); $message = $this->_getTaskMessage($task);
Logging::info(json_decode($result)); $this->_addOrUpdateTrackReference($task->getDbFileId(), json_decode($message->result), $message->status);
$this->_addOrUpdateTrackReference($task->getDbFileId(), json_decode($result));
} catch(CeleryException $e) { } catch(CeleryException $e) {
Logging::info("Couldn't retrieve task message for task " . $task->getDbBrokerTaskName() Logging::info("Couldn't retrieve task message for task " . $task->getDbBrokerTaskName()
. " with ID " . $task->getDbBrokerTaskId() . ": " . $e->getMessage()); . " with ID " . $task->getDbBrokerTaskId() . ": " . $e->getMessage());
@ -185,19 +190,18 @@ abstract class ThirdPartyService {
} }
/** /**
* Get the result from a celery task message in the results queue * Get a Celery task message from the results queue
* If the task message no longer exists, remove it from the track references table
* *
* @param $task ThirdPartyTrackReferences the track reference object * @param $task ThirdPartyTrackReferences the track reference object
* *
* @return array the results from the task message * @return object the task message object
* *
* @throws CeleryException when the result message for this task no longer exists * @throws CeleryException when the result message for this task no longer exists
*/ */
protected function _getTaskResult($task) { protected function _getTaskMessage($task) {
$message = Application_Model_RabbitMq::getAsyncResultMessage($task->getDbBrokerTaskName(), $message = Application_Model_RabbitMq::getAsyncResultMessage($task->getDbBrokerTaskName(),
$task->getDbBrokerTaskId()); $task->getDbBrokerTaskId());
return json_decode($message['body'])->result; // The actual result message from the service return json_decode($message['body']);
} }
/** /**
@ -231,11 +235,12 @@ abstract class ThirdPartyService {
* *
* @param $fileId int local CcFiles identifier * @param $fileId int local CcFiles identifier
* @param $track object third-party service track object * @param $track object third-party service track object
* @param $status string Celery task status
* *
* @throws Exception * @throws Exception
* @throws PropelException * @throws PropelException
*/ */
abstract protected function _addOrUpdateTrackReference($fileId, $track); abstract protected function _addOrUpdateTrackReference($fileId, $track, $status);
/** /**
* Check whether an OAuth access token exists for the third-party client * Check whether an OAuth access token exists for the third-party client

View file

@ -35,7 +35,7 @@ class UpgradeManager
{ {
$schemaVersion = Application_Model_Preference::GetSchemaVersion(); $schemaVersion = Application_Model_Preference::GetSchemaVersion();
$supportedSchemaVersions = self::getSupportedSchemaVersions(); $supportedSchemaVersions = self::getSupportedSchemaVersions();
$upgradeNeeded = !in_array($schemaVersion, $supportedSchemaVersions); return !in_array($schemaVersion, $supportedSchemaVersions);
// We shouldn't run the upgrade as a side-effect of this function! // We shouldn't run the upgrade as a side-effect of this function!
/* /*
if ($upgradeNeeded) { if ($upgradeNeeded) {
@ -46,7 +46,6 @@ class UpgradeManager
public function doUpgrade() public function doUpgrade()
{ {
$didWePerformAnUpgrade = false;
// Get all upgrades dynamically (in declaration order!) so we don't have to add them explicitly each time // Get all upgrades dynamically (in declaration order!) so we don't have to add them explicitly each time
// TODO: explicitly sort classnames by ascending version suffix for safety // TODO: explicitly sort classnames by ascending version suffix for safety
$upgraders = getUpgrades(); $upgraders = getUpgrades();
@ -69,7 +68,7 @@ class UpgradeManager
{ {
// pass the given directory to the upgrades, since __DIR__ returns parent dir of file, not executor // pass the given directory to the upgrades, since __DIR__ returns parent dir of file, not executor
$upgrader->upgrade($dir); //This will throw an exception if the upgrade fails. $upgrader->upgrade($dir); //This will throw an exception if the upgrade fails.
$didWePerformAnUpgrade = true; $upgradePerformed = true;
} }
} }
return $upgradePerformed; return $upgradePerformed;
@ -512,7 +511,7 @@ class AirtimeUpgrader2512 extends AirtimeUpgrader
class AirtimeUpgrader2513 extends AirtimeUpgrader class AirtimeUpgrader2513 extends AirtimeUpgrader
{ {
protected function getSupportedVersions() { protected function getSupportedSchemaVersions() {
return array ( return array (
'2.5.12' '2.5.12'
); );
@ -544,7 +543,7 @@ class AirtimeUpgrader2513 extends AirtimeUpgrader
passthru("export PGPASSWORD=$password && psql -h $host -U $username -q -f $dir/upgrade_sql/airtime_" passthru("export PGPASSWORD=$password && psql -h $host -U $username -q -f $dir/upgrade_sql/airtime_"
.$newVersion."/upgrade.sql $database 2>&1 | grep -v -E \"will create implicit sequence|will create implicit index\""); .$newVersion."/upgrade.sql $database 2>&1 | grep -v -E \"will create implicit sequence|will create implicit index\"");
Application_Model_Preference::SetAirtimeVersion($newVersion); Application_Model_Preference::SetSchemaVersion($newVersion);
Cache::clear(); Cache::clear();
$this->toggleMaintenanceScreen(false); $this->toggleMaintenanceScreen(false);
@ -553,5 +552,6 @@ class AirtimeUpgrader2513 extends AirtimeUpgrader
throw $e; throw $e;
} }
} }
} }