From 626489bb3bd43ade681ea08e6e84889152a51d2a Mon Sep 17 00:00:00 2001 From: Duncan Sommerville Date: Wed, 10 Jun 2015 15:04:49 -0400 Subject: [PATCH] SAAS-853 - Celery backend for SoundCloud uploads --- airtime_mvc/application/Bootstrap.php | 2 +- airtime_mvc/application/configs/conf.php | 1 + .../controllers/LibraryController.php | 1 - .../controllers/SoundcloudController.php | 50 +--- .../controllers/ThirdPartyController.php | 56 ++++- .../controllers/UpgradeController.php | 4 +- .../upgrade_sql/airtime_2.5.13/upgrade.sql | 11 +- airtime_mvc/application/models/RabbitMq.php | 95 ++++++-- .../map/ThirdPartyTrackReferencesTableMap.php | 7 +- .../om/BaseThirdPartyTrackReferences.php | 224 ++++++++++++++++-- .../om/BaseThirdPartyTrackReferencesPeer.php | 43 ++-- .../om/BaseThirdPartyTrackReferencesQuery.php | 130 ++++++++-- ...CloudService.php => SoundcloudService.php} | 81 ++++--- .../services/ThirdPartyService.php | 164 ++++++++++++- airtime_mvc/build/schema.xml | 14 +- airtime_mvc/build/sql/schema.sql | 11 +- .../public/js/airtime/common/common.js | 6 + .../airtime-celery/airtime-celery/__init__.py | 3 + .../airtime-celery}/celeryconfig.py | 17 +- .../airtime-celery/airtime-celery/uploader.py | 24 ++ .../install/conf/airtime-celery} | 4 +- .../install/upstart/airtime-celery} | 0 python_apps/airtime-celery/setup.py | 45 ++++ .../bin/soundcloud_uploader | 8 - python_apps/soundcloud_uploader/setup.py | 35 --- .../soundcloud_uploader/__init__.py | 3 - .../soundcloud_uploader/uploader.py | 24 -- 27 files changed, 813 insertions(+), 250 deletions(-) rename airtime_mvc/application/services/{SoundCloudService.php => SoundcloudService.php} (71%) create mode 100644 python_apps/airtime-celery/airtime-celery/__init__.py rename python_apps/{soundcloud_uploader/soundcloud_uploader => airtime-celery/airtime-celery}/celeryconfig.py (69%) create mode 100644 python_apps/airtime-celery/airtime-celery/uploader.py rename python_apps/{soundcloud_uploader/install/conf/soundcloud_uploader => airtime-celery/install/conf/airtime-celery} (89%) rename python_apps/{soundcloud_uploader/install/upstart/soundcloud_uploader => airtime-celery/install/upstart/airtime-celery} (100%) create mode 100644 python_apps/airtime-celery/setup.py delete mode 100644 python_apps/soundcloud_uploader/bin/soundcloud_uploader delete mode 100644 python_apps/soundcloud_uploader/setup.py delete mode 100644 python_apps/soundcloud_uploader/soundcloud_uploader/__init__.py delete mode 100644 python_apps/soundcloud_uploader/soundcloud_uploader/uploader.py diff --git a/airtime_mvc/application/Bootstrap.php b/airtime_mvc/application/Bootstrap.php index 7baa3f5ac..7580eef69 100644 --- a/airtime_mvc/application/Bootstrap.php +++ b/airtime_mvc/application/Bootstrap.php @@ -27,7 +27,7 @@ require_once "ProvisioningHelper.php"; require_once "GoogleAnalytics.php"; require_once "Timezone.php"; require_once "Auth.php"; -require_once __DIR__.'/services/SoundCloudService.php'; +require_once __DIR__.'/services/SoundcloudService.php'; require_once __DIR__.'/forms/helpers/ValidationTypes.php'; require_once __DIR__.'/forms/helpers/CustomDecorators.php'; require_once __DIR__.'/controllers/plugins/RabbitMqPlugin.php'; diff --git a/airtime_mvc/application/configs/conf.php b/airtime_mvc/application/configs/conf.php index afc1d5f07..80199b38c 100644 --- a/airtime_mvc/application/configs/conf.php +++ b/airtime_mvc/application/configs/conf.php @@ -37,6 +37,7 @@ class Config { $CC_CONFIG['baseDir'] = $values['general']['base_dir']; $CC_CONFIG['baseUrl'] = $values['general']['base_url']; $CC_CONFIG['basePort'] = $values['general']['base_port']; + $CC_CONFIG['stationId'] = $values['general']['station_id']; $CC_CONFIG['phpDir'] = $values['general']['airtime_dir']; if (isset($values['general']['dev_env'])) { $CC_CONFIG['dev_env'] = $values['general']['dev_env']; diff --git a/airtime_mvc/application/controllers/LibraryController.php b/airtime_mvc/application/controllers/LibraryController.php index 07bbc109b..91816901f 100644 --- a/airtime_mvc/application/controllers/LibraryController.php +++ b/airtime_mvc/application/controllers/LibraryController.php @@ -283,7 +283,6 @@ class LibraryController extends Zend_Controller_Action $text = _("Upload to SoundCloud"); } - // TODO: reimplement how this works $menu["soundcloud"]["items"]["upload"] = array("name" => $text, "icon" => "soundcloud", "url" => $baseUrl."soundcloud/upload/id/{$id}"); } diff --git a/airtime_mvc/application/controllers/SoundcloudController.php b/airtime_mvc/application/controllers/SoundcloudController.php index b265f5740..c45597b87 100644 --- a/airtime_mvc/application/controllers/SoundcloudController.php +++ b/airtime_mvc/application/controllers/SoundcloudController.php @@ -8,32 +8,19 @@ class SoundcloudController extends ThirdPartyController { /** * @var SoundcloudService */ - private $_soundcloudService; + protected $_service; + + /** + * @var string Application_Model_Preference service request token accessor function name + */ + protected $_SERVICE_TOKEN_ACCESSOR = 'setSoundCloudRequestToken'; /** * Set up SoundCloud access variables. */ public function init() { parent::init(); - $this->_soundcloudService = new SoundcloudService(); - } - - /** - * Send user to SoundCloud to authorize before being redirected - */ - public function authorizeAction() { - $auth_url = $this->_soundcloudService->getAuthorizeUrl(); - header('Location: ' . $auth_url); - } - - /** - * Called when user successfully completes SoundCloud authorization. - * Store the returned request token for future requests. - */ - public function redirectAction() { - $code = $_GET['code']; - $this->_soundcloudService->requestNewAccessToken($code); - header('Location: ' . $this->_baseUrl . 'Preference'); // Redirect back to the Preference page + $this->_service = new SoundcloudService(); } /** @@ -43,36 +30,17 @@ class SoundcloudController extends ThirdPartyController { $request = $this->getRequest(); $id = $request->getParam('id'); try { - $soundcloudLink = $this->_soundcloudService->getLinkToFile($id); + $soundcloudLink = $this->_service->getLinkToFile($id); header('Location: ' . $soundcloudLink); } catch (Soundcloud\Exception\InvalidHttpResponseCodeException $e) { // If we end up here it means the track was removed from SoundCloud // or the foreign id in our database is incorrect, so we should just // get rid of the database record Logging::warn("Error retrieving track data from SoundCloud: " . $e->getMessage()); - $this->_soundcloudService->removeTrackReference($id); + $this->_service->removeTrackReference($id); // Redirect to a 404 so the user knows something went wrong header('Location: ' . $this->_baseUrl . 'error/error-404'); // Redirect back to the Preference page } } - /** - * Upload the file with the given id to SoundCloud. - * - * @throws Zend_Controller_Response_Exception thrown if upload fails for any reason - */ - public function uploadAction() { - $request = $this->getRequest(); - $id = $request->getParam('id'); - $this->_soundcloudService->upload($id); - } - - /** - * Clear the previously saved request token from the preferences. - */ - public function deauthorizeAction() { - Application_Model_Preference::setSoundCloudRequestToken(""); - header('Location: ' . $this->_baseUrl . 'Preference'); // Redirect back to the Preference page - } - } diff --git a/airtime_mvc/application/controllers/ThirdPartyController.php b/airtime_mvc/application/controllers/ThirdPartyController.php index 2fec86161..139d0c014 100644 --- a/airtime_mvc/application/controllers/ThirdPartyController.php +++ b/airtime_mvc/application/controllers/ThirdPartyController.php @@ -10,6 +10,16 @@ abstract class ThirdPartyController extends Zend_Controller_Action { */ protected $_baseUrl; + /** + * @var ThirdPartyService third party service object + */ + protected $_service; + + /** + * @var string Application_Model_Preference service request token accessor function name + */ + protected $_SERVICE_TOKEN_ACCESSOR; + /** * Disable controller rendering and initialize */ @@ -17,8 +27,8 @@ abstract class ThirdPartyController extends Zend_Controller_Action { $CC_CONFIG = Config::getConfig(); $this->_baseUrl = 'http://' . $CC_CONFIG['baseUrl'] . ":" . $CC_CONFIG['basePort'] . "/"; - $this->view->layout()->disableLayout(); // Don't inject the standard Now Playing header. - $this->_helper->viewRenderer->setNoRender(true); // Don't use (phtml) templates + $this->view->layout()->disableLayout(); // Don't inject the standard Now Playing header. + $this->_helper->viewRenderer->setNoRender(true); // Don't use (phtml) templates } /** @@ -26,30 +36,56 @@ abstract class ThirdPartyController extends Zend_Controller_Action { * * @return void */ - abstract function authorizeAction(); + public function authorizeAction() { + $auth_url = $this->_service->getAuthorizeUrl(); + header('Location: ' . $auth_url); + } /** - * Called when user successfully completes third-party authorization. - * Store the returned request token for future requests. + * Called when user successfully completes third-party authorization + * Store the returned request token for future requests * * @return void */ - abstract function redirectAction(); + public function redirectAction() { + $code = $_GET['code']; + $this->_service->requestNewAccessToken($code); + header('Location: ' . $this->_baseUrl . 'Preference'); // Redirect back to the Preference page + } /** - * Upload the file with the given id to a third-party service. + * Upload the file with the given id to a third-party service * * @return void * * @throws Zend_Controller_Response_Exception thrown if upload fails for any reason */ - abstract function uploadAction(); + public function uploadAction() { + $request = $this->getRequest(); + $id = $request->getParam('id'); + $this->_service->upload($id); + } /** - * Clear the previously saved request token from the preferences. + * Clear the previously saved request token from the preferences * * @return void */ - abstract function deauthorizeAction(); + public function deauthorizeAction() { + Application_Model_Preference::$this->_SERVICE_TOKEN_ACCESSOR(""); + header('Location: ' . $this->_baseUrl . 'Preference'); // Redirect back to the Preference page + } + + /** + * Poll the task queue for completed tasks associated with this service + * Optionally accepts a specific task name as a parameter + * + * @return void + */ + public function pollBrokerTaskQueueAction() { + $request = $this->getRequest(); + $taskName = $request->getParam('task'); + $this->_service->pollBrokerTaskQueue($taskName); + } } \ No newline at end of file diff --git a/airtime_mvc/application/controllers/UpgradeController.php b/airtime_mvc/application/controllers/UpgradeController.php index d3cee649b..4c04da699 100644 --- a/airtime_mvc/application/controllers/UpgradeController.php +++ b/airtime_mvc/application/controllers/UpgradeController.php @@ -13,9 +13,9 @@ class UpgradeController extends Zend_Controller_Action return; } - // Get all upgrades dynamically so we don't have to add them explicitly each time + // Get all upgrades dynamically (in declaration order!) so we don't have to add them explicitly each time + // TODO: explicitly sort classnames by ascending version suffix for safety $upgraders = getUpgrades(); - Logging::info($upgraders); $didWePerformAnUpgrade = false; try diff --git a/airtime_mvc/application/controllers/upgrade_sql/airtime_2.5.13/upgrade.sql b/airtime_mvc/application/controllers/upgrade_sql/airtime_2.5.13/upgrade.sql index e2d051bff..15a6432d7 100644 --- a/airtime_mvc/application/controllers/upgrade_sql/airtime_2.5.13/upgrade.sql +++ b/airtime_mvc/application/controllers/upgrade_sql/airtime_2.5.13/upgrade.sql @@ -1,11 +1,16 @@ CREATE TABLE IF NOT EXISTS "third_party_track_references" ( "id" serial NOT NULL, - "service" VARCHAR(512) NOT NULL, - "foreign_id" INTEGER NOT NULL, + "service" VARCHAR(256) NOT NULL, + "foreign_id" VARCHAR(256), + "broker_task_id" VARCHAR(256), + "broker_task_name" VARCHAR(256), + "broker_task_dispatch_time" TIMESTAMP, "file_id" INTEGER NOT NULL, "status" VARCHAR(256) NOT NULL, - PRIMARY KEY ("id") + PRIMARY KEY ("id"), + CONSTRAINT "broker_task_id_unique" UNIQUE ("broker_task_id"), + CONSTRAINT "foreign_id_unique" UNIQUE ("foreign_id") ); ALTER TABLE "third_party_track_references" ADD CONSTRAINT "track_reference_fkey" diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index c9492436b..da949cd88 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -6,7 +6,15 @@ class Application_Model_RabbitMq { public static $doPush = false; - const CELERY_TIMEOUT = 10; + /** + * @var int milliseconds (for compatibility with celery) until we consider a message to have timed out + */ + public static $_CELERY_MESSAGE_TIMEOUT = 300000; // 5 minutes + + /** + * @var string exchange for celery task results + */ + public static $_CELERY_RESULTS_EXCHANGE = 'airtime-results'; /** * Sets a flag to push the schedule at the end of the request. @@ -45,30 +53,72 @@ class Application_Model_RabbitMq $conn->close(); } + /** + * Connect to the Celery daemon via amqp + * + * @param $config array the airtime configuration array + * @param $exchange string the amqp exchange name + * @param $queue string the amqp queue name + * + * @return Celery the Celery connection object + * + * @throws Exception when a connection error occurs + */ + private static function _setupCeleryExchange($config, $exchange, $queue) { + return new Celery($config["rabbitmq"]["host"], + $config["rabbitmq"]["user"], + $config["rabbitmq"]["password"], + $config["rabbitmq"]["vhost"], + $exchange, // Exchange name + $queue, // Binding/queue + $config["rabbitmq"]["port"], + false, // Connector + true, // Persistent messages + self::$_CELERY_MESSAGE_TIMEOUT, // Result expiration + array()); // SSL opts + } + + /** + * Send an amqp message to Celery the airtime-celery daemon to perform a task + * + * @param $task string the Celery task name + * @param $exchange string the amqp exchange name + * @param $data array an associative array containing arguments for the Celery task + * + * @return string the task identifier for the started Celery task so we can fetch the + * results asynchronously later + * + * @throws CeleryException when no message is found + */ public static function sendCeleryMessage($task, $exchange, $data) { - $CC_CONFIG = Config::getConfig(); + $config = Config::getConfig(); + $queue = $routingKey = $exchange; + $c = self::_setupCeleryExchange($config, $exchange, $queue); // Use the exchange name for the queue + $result = $c->PostTask($task, $data, true, $routingKey); // and routing key + return $result->getId(); + } - $c = new Celery($CC_CONFIG["rabbitmq"]["host"], - $CC_CONFIG["rabbitmq"]["user"], - $CC_CONFIG["rabbitmq"]["password"], - $CC_CONFIG["rabbitmq"]["vhost"], - $exchange=$exchange); - $result = $c->PostTask($task, $data); + /** + * Given a task name and identifier, check the Celery results queue for any + * corresponding messages + * + * @param $task string the Celery task name + * @param $id string the Celery task identifier + * + * @return object the message object + * + * @throws CeleryException when no message is found + */ + public static function getAsyncResultMessage($task, $id) { + $config = Config::getConfig(); + $queue = self::$_CELERY_RESULTS_EXCHANGE . "." . $config["stationId"]; + $c = self::_setupCeleryExchange($config, self::$_CELERY_RESULTS_EXCHANGE, $queue); + $message = $c->getAsyncResultMessage($task, $id); - $timeout = 0; - while(!$result->isReady()) { - sleep(1); - if($timeout++ >= self::CELERY_TIMEOUT) { - break; - } - } - - if($result->isSuccess()) { - Logging::info($result); - return $result->getResult(); - } else { - throw new CeleryTimeoutException("Celery task $task timed out!"); + if ($message == FALSE) { + throw new CeleryException("Failed to get message for task $task with ID $id"); } + return $message; } public static function SendMessageToPypo($event_type, $md) @@ -177,7 +227,4 @@ class Application_Model_RabbitMq //XXX: This function has been deprecated and is no longer needed } - public static function uploadToSoundCloud($data) { - return self::sendCeleryMessage("upload", "soundcloud-uploads", $data); - } } diff --git a/airtime_mvc/application/models/airtime/map/ThirdPartyTrackReferencesTableMap.php b/airtime_mvc/application/models/airtime/map/ThirdPartyTrackReferencesTableMap.php index bf80e6cd1..07f49e86a 100644 --- a/airtime_mvc/application/models/airtime/map/ThirdPartyTrackReferencesTableMap.php +++ b/airtime_mvc/application/models/airtime/map/ThirdPartyTrackReferencesTableMap.php @@ -40,8 +40,11 @@ class ThirdPartyTrackReferencesTableMap extends TableMap $this->setPrimaryKeyMethodInfo('third_party_track_references_id_seq'); // columns $this->addPrimaryKey('id', 'DbId', 'INTEGER', true, null, null); - $this->addColumn('service', 'DbService', 'VARCHAR', true, 512, null); - $this->addColumn('foreign_id', 'DbForeignId', 'INTEGER', true, null, null); + $this->addColumn('service', 'DbService', 'VARCHAR', true, 256, null); + $this->addColumn('foreign_id', 'DbForeignId', 'VARCHAR', false, 256, null); + $this->addColumn('broker_task_id', 'DbBrokerTaskId', 'VARCHAR', false, 256, null); + $this->addColumn('broker_task_name', 'DbBrokerTaskName', 'VARCHAR', false, 256, null); + $this->addColumn('broker_task_dispatch_time', 'DbBrokerTaskDispatchTime', 'TIMESTAMP', false, null, null); $this->addForeignKey('file_id', 'DbFileId', 'INTEGER', 'cc_playout_history_template', 'id', true, null, null); $this->addColumn('status', 'DbStatus', 'VARCHAR', true, 256, null); // validators diff --git a/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferences.php b/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferences.php index b94085171..b880cd0d4 100644 --- a/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferences.php +++ b/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferences.php @@ -43,10 +43,28 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi /** * The value for the foreign_id field. - * @var int + * @var string */ protected $foreign_id; + /** + * The value for the broker_task_id field. + * @var string + */ + protected $broker_task_id; + + /** + * The value for the broker_task_name field. + * @var string + */ + protected $broker_task_name; + + /** + * The value for the broker_task_dispatch_time field. + * @var string + */ + protected $broker_task_dispatch_time; + /** * The value for the file_id field. * @var int @@ -109,7 +127,7 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi /** * Get the [foreign_id] column value. * - * @return int + * @return string */ public function getDbForeignId() { @@ -117,6 +135,63 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi return $this->foreign_id; } + /** + * Get the [broker_task_id] column value. + * + * @return string + */ + public function getDbBrokerTaskId() + { + + return $this->broker_task_id; + } + + /** + * Get the [broker_task_name] column value. + * + * @return string + */ + public function getDbBrokerTaskName() + { + + return $this->broker_task_name; + } + + /** + * Get the [optionally formatted] temporal [broker_task_dispatch_time] column value. + * + * + * @param string $format The date/time format string (either date()-style or strftime()-style). + * If format is null, then the raw DateTime object will be returned. + * @return mixed Formatted date/time value as string or DateTime object (if format is null), null if column is null + * @throws PropelException - if unable to parse/validate the date/time value. + */ + public function getDbBrokerTaskDispatchTime($format = 'Y-m-d H:i:s') + { + if ($this->broker_task_dispatch_time === null) { + return null; + } + + + try { + $dt = new DateTime($this->broker_task_dispatch_time); + } catch (Exception $x) { + throw new PropelException("Internally stored date/time/timestamp value could not be converted to DateTime: " . var_export($this->broker_task_dispatch_time, true), $x); + } + + if ($format === null) { + // Because propel.useDateTimeClass is true, we return a DateTime object. + return $dt; + } + + if (strpos($format, '%') !== false) { + return strftime($format, $dt->format('U')); + } + + return $dt->format($format); + + } + /** * Get the [file_id] column value. * @@ -184,13 +259,13 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi /** * Set the value of [foreign_id] column. * - * @param int $v new value + * @param string $v new value * @return ThirdPartyTrackReferences The current object (for fluent API support) */ public function setDbForeignId($v) { if ($v !== null && is_numeric($v)) { - $v = (int) $v; + $v = (string) $v; } if ($this->foreign_id !== $v) { @@ -202,6 +277,71 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi return $this; } // setDbForeignId() + /** + * Set the value of [broker_task_id] column. + * + * @param string $v new value + * @return ThirdPartyTrackReferences The current object (for fluent API support) + */ + public function setDbBrokerTaskId($v) + { + if ($v !== null && is_numeric($v)) { + $v = (string) $v; + } + + if ($this->broker_task_id !== $v) { + $this->broker_task_id = $v; + $this->modifiedColumns[] = ThirdPartyTrackReferencesPeer::BROKER_TASK_ID; + } + + + return $this; + } // setDbBrokerTaskId() + + /** + * Set the value of [broker_task_name] column. + * + * @param string $v new value + * @return ThirdPartyTrackReferences The current object (for fluent API support) + */ + public function setDbBrokerTaskName($v) + { + if ($v !== null && is_numeric($v)) { + $v = (string) $v; + } + + if ($this->broker_task_name !== $v) { + $this->broker_task_name = $v; + $this->modifiedColumns[] = ThirdPartyTrackReferencesPeer::BROKER_TASK_NAME; + } + + + return $this; + } // setDbBrokerTaskName() + + /** + * Sets the value of [broker_task_dispatch_time] column to a normalized version of the date/time value specified. + * + * @param mixed $v string, integer (timestamp), or DateTime value. + * Empty strings are treated as null. + * @return ThirdPartyTrackReferences The current object (for fluent API support) + */ + public function setDbBrokerTaskDispatchTime($v) + { + $dt = PropelDateTime::newInstance($v, null, 'DateTime'); + if ($this->broker_task_dispatch_time !== null || $dt !== null) { + $currentDateAsString = ($this->broker_task_dispatch_time !== null && $tmpDt = new DateTime($this->broker_task_dispatch_time)) ? $tmpDt->format('Y-m-d H:i:s') : null; + $newDateAsString = $dt ? $dt->format('Y-m-d H:i:s') : null; + if ($currentDateAsString !== $newDateAsString) { + $this->broker_task_dispatch_time = $newDateAsString; + $this->modifiedColumns[] = ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME; + } + } // if either are not null + + + return $this; + } // setDbBrokerTaskDispatchTime() + /** * Set the value of [file_id] column. * @@ -282,9 +422,12 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi $this->id = ($row[$startcol + 0] !== null) ? (int) $row[$startcol + 0] : null; $this->service = ($row[$startcol + 1] !== null) ? (string) $row[$startcol + 1] : null; - $this->foreign_id = ($row[$startcol + 2] !== null) ? (int) $row[$startcol + 2] : null; - $this->file_id = ($row[$startcol + 3] !== null) ? (int) $row[$startcol + 3] : null; - $this->status = ($row[$startcol + 4] !== null) ? (string) $row[$startcol + 4] : null; + $this->foreign_id = ($row[$startcol + 2] !== null) ? (string) $row[$startcol + 2] : null; + $this->broker_task_id = ($row[$startcol + 3] !== null) ? (string) $row[$startcol + 3] : null; + $this->broker_task_name = ($row[$startcol + 4] !== null) ? (string) $row[$startcol + 4] : null; + $this->broker_task_dispatch_time = ($row[$startcol + 5] !== null) ? (string) $row[$startcol + 5] : null; + $this->file_id = ($row[$startcol + 6] !== null) ? (int) $row[$startcol + 6] : null; + $this->status = ($row[$startcol + 7] !== null) ? (string) $row[$startcol + 7] : null; $this->resetModified(); $this->setNew(false); @@ -294,7 +437,7 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi } $this->postHydrate($row, $startcol, $rehydrate); - return $startcol + 5; // 5 = ThirdPartyTrackReferencesPeer::NUM_HYDRATE_COLUMNS. + return $startcol + 8; // 8 = ThirdPartyTrackReferencesPeer::NUM_HYDRATE_COLUMNS. } catch (Exception $e) { throw new PropelException("Error populating ThirdPartyTrackReferences object", $e); @@ -541,6 +684,15 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::FOREIGN_ID)) { $modifiedColumns[':p' . $index++] = '"foreign_id"'; } + if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::BROKER_TASK_ID)) { + $modifiedColumns[':p' . $index++] = '"broker_task_id"'; + } + if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::BROKER_TASK_NAME)) { + $modifiedColumns[':p' . $index++] = '"broker_task_name"'; + } + if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME)) { + $modifiedColumns[':p' . $index++] = '"broker_task_dispatch_time"'; + } if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::FILE_ID)) { $modifiedColumns[':p' . $index++] = '"file_id"'; } @@ -565,7 +717,16 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi $stmt->bindValue($identifier, $this->service, PDO::PARAM_STR); break; case '"foreign_id"': - $stmt->bindValue($identifier, $this->foreign_id, PDO::PARAM_INT); + $stmt->bindValue($identifier, $this->foreign_id, PDO::PARAM_STR); + break; + case '"broker_task_id"': + $stmt->bindValue($identifier, $this->broker_task_id, PDO::PARAM_STR); + break; + case '"broker_task_name"': + $stmt->bindValue($identifier, $this->broker_task_name, PDO::PARAM_STR); + break; + case '"broker_task_dispatch_time"': + $stmt->bindValue($identifier, $this->broker_task_dispatch_time, PDO::PARAM_STR); break; case '"file_id"': $stmt->bindValue($identifier, $this->file_id, PDO::PARAM_INT); @@ -722,9 +883,18 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi return $this->getDbForeignId(); break; case 3: - return $this->getDbFileId(); + return $this->getDbBrokerTaskId(); break; case 4: + return $this->getDbBrokerTaskName(); + break; + case 5: + return $this->getDbBrokerTaskDispatchTime(); + break; + case 6: + return $this->getDbFileId(); + break; + case 7: return $this->getDbStatus(); break; default: @@ -759,8 +929,11 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi $keys[0] => $this->getDbId(), $keys[1] => $this->getDbService(), $keys[2] => $this->getDbForeignId(), - $keys[3] => $this->getDbFileId(), - $keys[4] => $this->getDbStatus(), + $keys[3] => $this->getDbBrokerTaskId(), + $keys[4] => $this->getDbBrokerTaskName(), + $keys[5] => $this->getDbBrokerTaskDispatchTime(), + $keys[6] => $this->getDbFileId(), + $keys[7] => $this->getDbStatus(), ); $virtualColumns = $this->virtualColumns; foreach ($virtualColumns as $key => $virtualColumn) { @@ -815,9 +988,18 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi $this->setDbForeignId($value); break; case 3: - $this->setDbFileId($value); + $this->setDbBrokerTaskId($value); break; case 4: + $this->setDbBrokerTaskName($value); + break; + case 5: + $this->setDbBrokerTaskDispatchTime($value); + break; + case 6: + $this->setDbFileId($value); + break; + case 7: $this->setDbStatus($value); break; } // switch() @@ -847,8 +1029,11 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi if (array_key_exists($keys[0], $arr)) $this->setDbId($arr[$keys[0]]); if (array_key_exists($keys[1], $arr)) $this->setDbService($arr[$keys[1]]); if (array_key_exists($keys[2], $arr)) $this->setDbForeignId($arr[$keys[2]]); - if (array_key_exists($keys[3], $arr)) $this->setDbFileId($arr[$keys[3]]); - if (array_key_exists($keys[4], $arr)) $this->setDbStatus($arr[$keys[4]]); + if (array_key_exists($keys[3], $arr)) $this->setDbBrokerTaskId($arr[$keys[3]]); + if (array_key_exists($keys[4], $arr)) $this->setDbBrokerTaskName($arr[$keys[4]]); + if (array_key_exists($keys[5], $arr)) $this->setDbBrokerTaskDispatchTime($arr[$keys[5]]); + if (array_key_exists($keys[6], $arr)) $this->setDbFileId($arr[$keys[6]]); + if (array_key_exists($keys[7], $arr)) $this->setDbStatus($arr[$keys[7]]); } /** @@ -863,6 +1048,9 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::ID)) $criteria->add(ThirdPartyTrackReferencesPeer::ID, $this->id); if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::SERVICE)) $criteria->add(ThirdPartyTrackReferencesPeer::SERVICE, $this->service); if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::FOREIGN_ID)) $criteria->add(ThirdPartyTrackReferencesPeer::FOREIGN_ID, $this->foreign_id); + if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::BROKER_TASK_ID)) $criteria->add(ThirdPartyTrackReferencesPeer::BROKER_TASK_ID, $this->broker_task_id); + if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::BROKER_TASK_NAME)) $criteria->add(ThirdPartyTrackReferencesPeer::BROKER_TASK_NAME, $this->broker_task_name); + if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME)) $criteria->add(ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME, $this->broker_task_dispatch_time); if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::FILE_ID)) $criteria->add(ThirdPartyTrackReferencesPeer::FILE_ID, $this->file_id); if ($this->isColumnModified(ThirdPartyTrackReferencesPeer::STATUS)) $criteria->add(ThirdPartyTrackReferencesPeer::STATUS, $this->status); @@ -930,6 +1118,9 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi { $copyObj->setDbService($this->getDbService()); $copyObj->setDbForeignId($this->getDbForeignId()); + $copyObj->setDbBrokerTaskId($this->getDbBrokerTaskId()); + $copyObj->setDbBrokerTaskName($this->getDbBrokerTaskName()); + $copyObj->setDbBrokerTaskDispatchTime($this->getDbBrokerTaskDispatchTime()); $copyObj->setDbFileId($this->getDbFileId()); $copyObj->setDbStatus($this->getDbStatus()); @@ -1050,6 +1241,9 @@ abstract class BaseThirdPartyTrackReferences extends BaseObject implements Persi $this->id = null; $this->service = null; $this->foreign_id = null; + $this->broker_task_id = null; + $this->broker_task_name = null; + $this->broker_task_dispatch_time = null; $this->file_id = null; $this->status = null; $this->alreadyInSave = false; diff --git a/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferencesPeer.php b/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferencesPeer.php index 20e769677..079dd9808 100644 --- a/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferencesPeer.php +++ b/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferencesPeer.php @@ -24,13 +24,13 @@ abstract class BaseThirdPartyTrackReferencesPeer const TM_CLASS = 'ThirdPartyTrackReferencesTableMap'; /** The total number of columns. */ - const NUM_COLUMNS = 5; + const NUM_COLUMNS = 8; /** The number of lazy-loaded columns. */ const NUM_LAZY_LOAD_COLUMNS = 0; /** The number of columns to hydrate (NUM_COLUMNS - NUM_LAZY_LOAD_COLUMNS) */ - const NUM_HYDRATE_COLUMNS = 5; + const NUM_HYDRATE_COLUMNS = 8; /** the column name for the id field */ const ID = 'third_party_track_references.id'; @@ -41,6 +41,15 @@ abstract class BaseThirdPartyTrackReferencesPeer /** the column name for the foreign_id field */ const FOREIGN_ID = 'third_party_track_references.foreign_id'; + /** the column name for the broker_task_id field */ + const BROKER_TASK_ID = 'third_party_track_references.broker_task_id'; + + /** the column name for the broker_task_name field */ + const BROKER_TASK_NAME = 'third_party_track_references.broker_task_name'; + + /** the column name for the broker_task_dispatch_time field */ + const BROKER_TASK_DISPATCH_TIME = 'third_party_track_references.broker_task_dispatch_time'; + /** the column name for the file_id field */ const FILE_ID = 'third_party_track_references.file_id'; @@ -66,12 +75,12 @@ abstract class BaseThirdPartyTrackReferencesPeer * e.g. ThirdPartyTrackReferencesPeer::$fieldNames[ThirdPartyTrackReferencesPeer::TYPE_PHPNAME][0] = 'Id' */ protected static $fieldNames = array ( - BasePeer::TYPE_PHPNAME => array ('DbId', 'DbService', 'DbForeignId', 'DbFileId', 'DbStatus', ), - BasePeer::TYPE_STUDLYPHPNAME => array ('dbId', 'dbService', 'dbForeignId', 'dbFileId', 'dbStatus', ), - BasePeer::TYPE_COLNAME => array (ThirdPartyTrackReferencesPeer::ID, ThirdPartyTrackReferencesPeer::SERVICE, ThirdPartyTrackReferencesPeer::FOREIGN_ID, ThirdPartyTrackReferencesPeer::FILE_ID, ThirdPartyTrackReferencesPeer::STATUS, ), - BasePeer::TYPE_RAW_COLNAME => array ('ID', 'SERVICE', 'FOREIGN_ID', 'FILE_ID', 'STATUS', ), - BasePeer::TYPE_FIELDNAME => array ('id', 'service', 'foreign_id', 'file_id', 'status', ), - BasePeer::TYPE_NUM => array (0, 1, 2, 3, 4, ) + BasePeer::TYPE_PHPNAME => array ('DbId', 'DbService', 'DbForeignId', 'DbBrokerTaskId', 'DbBrokerTaskName', 'DbBrokerTaskDispatchTime', 'DbFileId', 'DbStatus', ), + BasePeer::TYPE_STUDLYPHPNAME => array ('dbId', 'dbService', 'dbForeignId', 'dbBrokerTaskId', 'dbBrokerTaskName', 'dbBrokerTaskDispatchTime', 'dbFileId', 'dbStatus', ), + BasePeer::TYPE_COLNAME => array (ThirdPartyTrackReferencesPeer::ID, ThirdPartyTrackReferencesPeer::SERVICE, ThirdPartyTrackReferencesPeer::FOREIGN_ID, ThirdPartyTrackReferencesPeer::BROKER_TASK_ID, ThirdPartyTrackReferencesPeer::BROKER_TASK_NAME, ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME, ThirdPartyTrackReferencesPeer::FILE_ID, ThirdPartyTrackReferencesPeer::STATUS, ), + BasePeer::TYPE_RAW_COLNAME => array ('ID', 'SERVICE', 'FOREIGN_ID', 'BROKER_TASK_ID', 'BROKER_TASK_NAME', 'BROKER_TASK_DISPATCH_TIME', 'FILE_ID', 'STATUS', ), + BasePeer::TYPE_FIELDNAME => array ('id', 'service', 'foreign_id', 'broker_task_id', 'broker_task_name', 'broker_task_dispatch_time', 'file_id', 'status', ), + BasePeer::TYPE_NUM => array (0, 1, 2, 3, 4, 5, 6, 7, ) ); /** @@ -81,12 +90,12 @@ abstract class BaseThirdPartyTrackReferencesPeer * e.g. ThirdPartyTrackReferencesPeer::$fieldNames[BasePeer::TYPE_PHPNAME]['Id'] = 0 */ protected static $fieldKeys = array ( - BasePeer::TYPE_PHPNAME => array ('DbId' => 0, 'DbService' => 1, 'DbForeignId' => 2, 'DbFileId' => 3, 'DbStatus' => 4, ), - BasePeer::TYPE_STUDLYPHPNAME => array ('dbId' => 0, 'dbService' => 1, 'dbForeignId' => 2, 'dbFileId' => 3, 'dbStatus' => 4, ), - BasePeer::TYPE_COLNAME => array (ThirdPartyTrackReferencesPeer::ID => 0, ThirdPartyTrackReferencesPeer::SERVICE => 1, ThirdPartyTrackReferencesPeer::FOREIGN_ID => 2, ThirdPartyTrackReferencesPeer::FILE_ID => 3, ThirdPartyTrackReferencesPeer::STATUS => 4, ), - BasePeer::TYPE_RAW_COLNAME => array ('ID' => 0, 'SERVICE' => 1, 'FOREIGN_ID' => 2, 'FILE_ID' => 3, 'STATUS' => 4, ), - BasePeer::TYPE_FIELDNAME => array ('id' => 0, 'service' => 1, 'foreign_id' => 2, 'file_id' => 3, 'status' => 4, ), - BasePeer::TYPE_NUM => array (0, 1, 2, 3, 4, ) + BasePeer::TYPE_PHPNAME => array ('DbId' => 0, 'DbService' => 1, 'DbForeignId' => 2, 'DbBrokerTaskId' => 3, 'DbBrokerTaskName' => 4, 'DbBrokerTaskDispatchTime' => 5, 'DbFileId' => 6, 'DbStatus' => 7, ), + BasePeer::TYPE_STUDLYPHPNAME => array ('dbId' => 0, 'dbService' => 1, 'dbForeignId' => 2, 'dbBrokerTaskId' => 3, 'dbBrokerTaskName' => 4, 'dbBrokerTaskDispatchTime' => 5, 'dbFileId' => 6, 'dbStatus' => 7, ), + BasePeer::TYPE_COLNAME => array (ThirdPartyTrackReferencesPeer::ID => 0, ThirdPartyTrackReferencesPeer::SERVICE => 1, ThirdPartyTrackReferencesPeer::FOREIGN_ID => 2, ThirdPartyTrackReferencesPeer::BROKER_TASK_ID => 3, ThirdPartyTrackReferencesPeer::BROKER_TASK_NAME => 4, ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME => 5, ThirdPartyTrackReferencesPeer::FILE_ID => 6, ThirdPartyTrackReferencesPeer::STATUS => 7, ), + BasePeer::TYPE_RAW_COLNAME => array ('ID' => 0, 'SERVICE' => 1, 'FOREIGN_ID' => 2, 'BROKER_TASK_ID' => 3, 'BROKER_TASK_NAME' => 4, 'BROKER_TASK_DISPATCH_TIME' => 5, 'FILE_ID' => 6, 'STATUS' => 7, ), + BasePeer::TYPE_FIELDNAME => array ('id' => 0, 'service' => 1, 'foreign_id' => 2, 'broker_task_id' => 3, 'broker_task_name' => 4, 'broker_task_dispatch_time' => 5, 'file_id' => 6, 'status' => 7, ), + BasePeer::TYPE_NUM => array (0, 1, 2, 3, 4, 5, 6, 7, ) ); /** @@ -163,12 +172,18 @@ abstract class BaseThirdPartyTrackReferencesPeer $criteria->addSelectColumn(ThirdPartyTrackReferencesPeer::ID); $criteria->addSelectColumn(ThirdPartyTrackReferencesPeer::SERVICE); $criteria->addSelectColumn(ThirdPartyTrackReferencesPeer::FOREIGN_ID); + $criteria->addSelectColumn(ThirdPartyTrackReferencesPeer::BROKER_TASK_ID); + $criteria->addSelectColumn(ThirdPartyTrackReferencesPeer::BROKER_TASK_NAME); + $criteria->addSelectColumn(ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME); $criteria->addSelectColumn(ThirdPartyTrackReferencesPeer::FILE_ID); $criteria->addSelectColumn(ThirdPartyTrackReferencesPeer::STATUS); } else { $criteria->addSelectColumn($alias . '.id'); $criteria->addSelectColumn($alias . '.service'); $criteria->addSelectColumn($alias . '.foreign_id'); + $criteria->addSelectColumn($alias . '.broker_task_id'); + $criteria->addSelectColumn($alias . '.broker_task_name'); + $criteria->addSelectColumn($alias . '.broker_task_dispatch_time'); $criteria->addSelectColumn($alias . '.file_id'); $criteria->addSelectColumn($alias . '.status'); } diff --git a/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferencesQuery.php b/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferencesQuery.php index 29ac981eb..8602b947b 100644 --- a/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferencesQuery.php +++ b/airtime_mvc/application/models/airtime/om/BaseThirdPartyTrackReferencesQuery.php @@ -9,12 +9,18 @@ * @method ThirdPartyTrackReferencesQuery orderByDbId($order = Criteria::ASC) Order by the id column * @method ThirdPartyTrackReferencesQuery orderByDbService($order = Criteria::ASC) Order by the service column * @method ThirdPartyTrackReferencesQuery orderByDbForeignId($order = Criteria::ASC) Order by the foreign_id column + * @method ThirdPartyTrackReferencesQuery orderByDbBrokerTaskId($order = Criteria::ASC) Order by the broker_task_id column + * @method ThirdPartyTrackReferencesQuery orderByDbBrokerTaskName($order = Criteria::ASC) Order by the broker_task_name column + * @method ThirdPartyTrackReferencesQuery orderByDbBrokerTaskDispatchTime($order = Criteria::ASC) Order by the broker_task_dispatch_time column * @method ThirdPartyTrackReferencesQuery orderByDbFileId($order = Criteria::ASC) Order by the file_id column * @method ThirdPartyTrackReferencesQuery orderByDbStatus($order = Criteria::ASC) Order by the status column * * @method ThirdPartyTrackReferencesQuery groupByDbId() Group by the id column * @method ThirdPartyTrackReferencesQuery groupByDbService() Group by the service column * @method ThirdPartyTrackReferencesQuery groupByDbForeignId() Group by the foreign_id column + * @method ThirdPartyTrackReferencesQuery groupByDbBrokerTaskId() Group by the broker_task_id column + * @method ThirdPartyTrackReferencesQuery groupByDbBrokerTaskName() Group by the broker_task_name column + * @method ThirdPartyTrackReferencesQuery groupByDbBrokerTaskDispatchTime() Group by the broker_task_dispatch_time column * @method ThirdPartyTrackReferencesQuery groupByDbFileId() Group by the file_id column * @method ThirdPartyTrackReferencesQuery groupByDbStatus() Group by the status column * @@ -30,13 +36,19 @@ * @method ThirdPartyTrackReferences findOneOrCreate(PropelPDO $con = null) Return the first ThirdPartyTrackReferences matching the query, or a new ThirdPartyTrackReferences object populated from the query conditions when no match is found * * @method ThirdPartyTrackReferences findOneByDbService(string $service) Return the first ThirdPartyTrackReferences filtered by the service column - * @method ThirdPartyTrackReferences findOneByDbForeignId(int $foreign_id) Return the first ThirdPartyTrackReferences filtered by the foreign_id column + * @method ThirdPartyTrackReferences findOneByDbForeignId(string $foreign_id) Return the first ThirdPartyTrackReferences filtered by the foreign_id column + * @method ThirdPartyTrackReferences findOneByDbBrokerTaskId(string $broker_task_id) Return the first ThirdPartyTrackReferences filtered by the broker_task_id column + * @method ThirdPartyTrackReferences findOneByDbBrokerTaskName(string $broker_task_name) Return the first ThirdPartyTrackReferences filtered by the broker_task_name column + * @method ThirdPartyTrackReferences findOneByDbBrokerTaskDispatchTime(string $broker_task_dispatch_time) Return the first ThirdPartyTrackReferences filtered by the broker_task_dispatch_time column * @method ThirdPartyTrackReferences findOneByDbFileId(int $file_id) Return the first ThirdPartyTrackReferences filtered by the file_id column * @method ThirdPartyTrackReferences findOneByDbStatus(string $status) Return the first ThirdPartyTrackReferences filtered by the status column * * @method array findByDbId(int $id) Return ThirdPartyTrackReferences objects filtered by the id column * @method array findByDbService(string $service) Return ThirdPartyTrackReferences objects filtered by the service column - * @method array findByDbForeignId(int $foreign_id) Return ThirdPartyTrackReferences objects filtered by the foreign_id column + * @method array findByDbForeignId(string $foreign_id) Return ThirdPartyTrackReferences objects filtered by the foreign_id column + * @method array findByDbBrokerTaskId(string $broker_task_id) Return ThirdPartyTrackReferences objects filtered by the broker_task_id column + * @method array findByDbBrokerTaskName(string $broker_task_name) Return ThirdPartyTrackReferences objects filtered by the broker_task_name column + * @method array findByDbBrokerTaskDispatchTime(string $broker_task_dispatch_time) Return ThirdPartyTrackReferences objects filtered by the broker_task_dispatch_time column * @method array findByDbFileId(int $file_id) Return ThirdPartyTrackReferences objects filtered by the file_id column * @method array findByDbStatus(string $status) Return ThirdPartyTrackReferences objects filtered by the status column * @@ -146,7 +158,7 @@ abstract class BaseThirdPartyTrackReferencesQuery extends ModelCriteria */ protected function findPkSimple($key, $con) { - $sql = 'SELECT "id", "service", "foreign_id", "file_id", "status" FROM "third_party_track_references" WHERE "id" = :p0'; + $sql = 'SELECT "id", "service", "foreign_id", "broker_task_id", "broker_task_name", "broker_task_dispatch_time", "file_id", "status" FROM "third_party_track_references" WHERE "id" = :p0'; try { $stmt = $con->prepare($sql); $stmt->bindValue(':p0', $key, PDO::PARAM_INT); @@ -311,13 +323,101 @@ abstract class BaseThirdPartyTrackReferencesQuery extends ModelCriteria * * Example usage: * - * $query->filterByDbForeignId(1234); // WHERE foreign_id = 1234 - * $query->filterByDbForeignId(array(12, 34)); // WHERE foreign_id IN (12, 34) - * $query->filterByDbForeignId(array('min' => 12)); // WHERE foreign_id >= 12 - * $query->filterByDbForeignId(array('max' => 12)); // WHERE foreign_id <= 12 + * $query->filterByDbForeignId('fooValue'); // WHERE foreign_id = 'fooValue' + * $query->filterByDbForeignId('%fooValue%'); // WHERE foreign_id LIKE '%fooValue%' * * - * @param mixed $dbForeignId The value to use as filter. + * @param string $dbForeignId The value to use as filter. + * Accepts wildcards (* and % trigger a LIKE) + * @param string $comparison Operator to use for the column comparison, defaults to Criteria::EQUAL + * + * @return ThirdPartyTrackReferencesQuery The current query, for fluid interface + */ + public function filterByDbForeignId($dbForeignId = null, $comparison = null) + { + if (null === $comparison) { + if (is_array($dbForeignId)) { + $comparison = Criteria::IN; + } elseif (preg_match('/[\%\*]/', $dbForeignId)) { + $dbForeignId = str_replace('*', '%', $dbForeignId); + $comparison = Criteria::LIKE; + } + } + + return $this->addUsingAlias(ThirdPartyTrackReferencesPeer::FOREIGN_ID, $dbForeignId, $comparison); + } + + /** + * Filter the query on the broker_task_id column + * + * Example usage: + * + * $query->filterByDbBrokerTaskId('fooValue'); // WHERE broker_task_id = 'fooValue' + * $query->filterByDbBrokerTaskId('%fooValue%'); // WHERE broker_task_id LIKE '%fooValue%' + * + * + * @param string $dbBrokerTaskId The value to use as filter. + * Accepts wildcards (* and % trigger a LIKE) + * @param string $comparison Operator to use for the column comparison, defaults to Criteria::EQUAL + * + * @return ThirdPartyTrackReferencesQuery The current query, for fluid interface + */ + public function filterByDbBrokerTaskId($dbBrokerTaskId = null, $comparison = null) + { + if (null === $comparison) { + if (is_array($dbBrokerTaskId)) { + $comparison = Criteria::IN; + } elseif (preg_match('/[\%\*]/', $dbBrokerTaskId)) { + $dbBrokerTaskId = str_replace('*', '%', $dbBrokerTaskId); + $comparison = Criteria::LIKE; + } + } + + return $this->addUsingAlias(ThirdPartyTrackReferencesPeer::BROKER_TASK_ID, $dbBrokerTaskId, $comparison); + } + + /** + * Filter the query on the broker_task_name column + * + * Example usage: + * + * $query->filterByDbBrokerTaskName('fooValue'); // WHERE broker_task_name = 'fooValue' + * $query->filterByDbBrokerTaskName('%fooValue%'); // WHERE broker_task_name LIKE '%fooValue%' + * + * + * @param string $dbBrokerTaskName The value to use as filter. + * Accepts wildcards (* and % trigger a LIKE) + * @param string $comparison Operator to use for the column comparison, defaults to Criteria::EQUAL + * + * @return ThirdPartyTrackReferencesQuery The current query, for fluid interface + */ + public function filterByDbBrokerTaskName($dbBrokerTaskName = null, $comparison = null) + { + if (null === $comparison) { + if (is_array($dbBrokerTaskName)) { + $comparison = Criteria::IN; + } elseif (preg_match('/[\%\*]/', $dbBrokerTaskName)) { + $dbBrokerTaskName = str_replace('*', '%', $dbBrokerTaskName); + $comparison = Criteria::LIKE; + } + } + + return $this->addUsingAlias(ThirdPartyTrackReferencesPeer::BROKER_TASK_NAME, $dbBrokerTaskName, $comparison); + } + + /** + * Filter the query on the broker_task_dispatch_time column + * + * Example usage: + * + * $query->filterByDbBrokerTaskDispatchTime('2011-03-14'); // WHERE broker_task_dispatch_time = '2011-03-14' + * $query->filterByDbBrokerTaskDispatchTime('now'); // WHERE broker_task_dispatch_time = '2011-03-14' + * $query->filterByDbBrokerTaskDispatchTime(array('max' => 'yesterday')); // WHERE broker_task_dispatch_time < '2011-03-13' + * + * + * @param mixed $dbBrokerTaskDispatchTime The value to use as filter. + * Values can be integers (unix timestamps), DateTime objects, or strings. + * Empty strings are treated as NULL. * Use scalar values for equality. * Use array values for in_array() equivalent. * Use associative array('min' => $minValue, 'max' => $maxValue) for intervals. @@ -325,16 +425,16 @@ abstract class BaseThirdPartyTrackReferencesQuery extends ModelCriteria * * @return ThirdPartyTrackReferencesQuery The current query, for fluid interface */ - public function filterByDbForeignId($dbForeignId = null, $comparison = null) + public function filterByDbBrokerTaskDispatchTime($dbBrokerTaskDispatchTime = null, $comparison = null) { - if (is_array($dbForeignId)) { + if (is_array($dbBrokerTaskDispatchTime)) { $useMinMax = false; - if (isset($dbForeignId['min'])) { - $this->addUsingAlias(ThirdPartyTrackReferencesPeer::FOREIGN_ID, $dbForeignId['min'], Criteria::GREATER_EQUAL); + if (isset($dbBrokerTaskDispatchTime['min'])) { + $this->addUsingAlias(ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME, $dbBrokerTaskDispatchTime['min'], Criteria::GREATER_EQUAL); $useMinMax = true; } - if (isset($dbForeignId['max'])) { - $this->addUsingAlias(ThirdPartyTrackReferencesPeer::FOREIGN_ID, $dbForeignId['max'], Criteria::LESS_EQUAL); + if (isset($dbBrokerTaskDispatchTime['max'])) { + $this->addUsingAlias(ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME, $dbBrokerTaskDispatchTime['max'], Criteria::LESS_EQUAL); $useMinMax = true; } if ($useMinMax) { @@ -345,7 +445,7 @@ abstract class BaseThirdPartyTrackReferencesQuery extends ModelCriteria } } - return $this->addUsingAlias(ThirdPartyTrackReferencesPeer::FOREIGN_ID, $dbForeignId, $comparison); + return $this->addUsingAlias(ThirdPartyTrackReferencesPeer::BROKER_TASK_DISPATCH_TIME, $dbBrokerTaskDispatchTime, $comparison); } /** diff --git a/airtime_mvc/application/services/SoundCloudService.php b/airtime_mvc/application/services/SoundcloudService.php similarity index 71% rename from airtime_mvc/application/services/SoundCloudService.php rename to airtime_mvc/application/services/SoundcloudService.php index dbfe0c83c..d2f0f0dc8 100644 --- a/airtime_mvc/application/services/SoundCloudService.php +++ b/airtime_mvc/application/services/SoundcloudService.php @@ -4,6 +4,11 @@ require_once "ThirdPartyService.php"; class SoundcloudService extends ThirdPartyService { + /** + * @var string service access token for accessing remote API + */ + protected $_accessToken; + /** * @var Soundcloud\Service SoundCloud API wrapper object */ @@ -19,6 +24,16 @@ class SoundcloudService extends ThirdPartyService { */ protected $_THIRD_PARTY_TRACK_URI = 'http://api.soundcloud.com/tracks/'; + /** + * @var string exchange name for SoundCloud tasks + */ + protected $_CELERY_EXCHANGE_NAME = 'soundcloud-uploads'; + + /** + * @var string celery task name for third party uploads + */ + protected $_CELERY_UPLOAD_TASK_NAME = 'upload-to-soundcloud'; + /** * @var array Application_Model_Preference functions for SoundCloud and their * associated API parameter keys so that we can call them dynamically @@ -40,35 +55,11 @@ class SoundcloudService extends ThirdPartyService { $this->_client = new Soundcloud\Service($clientId, $clientSecret, $redirectUri); $accessToken = Application_Model_Preference::getSoundCloudRequestToken(); if (!empty($accessToken)) { + $this->_accessToken = $accessToken; $this->_client->setAccessToken($accessToken); } } - /** - * Upload the file with the given identifier to SoundCloud - * - * @param int $fileId the local CcFiles identifier - * - * @throws Soundcloud\Exception\InvalidHttpResponseCodeException - * thrown when the upload fails for any reason - */ - public function upload($fileId) { - $file = Application_Model_StoredFile::RecallById($fileId); - $data = array( - 'track_data' => $this->_buildTrackArray($file), - 'token' => $this->_client->getAccessToken(), - 'file_path' => $file->getFilePaths()[0] - ); - try { - $track = json_decode(Application_Model_RabbitMq::uploadToSoundCloud($data)); - parent::_createTrackReference($fileId, $track); - } catch(Soundcloud\Exception\InvalidHttpResponseCodeException $e) { - Logging::info("Invalid request: " . $e->getMessage()); - // We should only get here if we have an access token, so attempt to refresh - $this->accessTokenRefresh(); - } - } - /** * Build a parameter array for the track being uploaded to SoundCloud * @@ -76,11 +67,11 @@ class SoundcloudService extends ThirdPartyService { * * @return array the track array to send to SoundCloud */ - private function _buildTrackArray($file) { + protected function _getUploadData($file) { $trackArray = array( 'title' => $file->getName(), ); - foreach($this->_SOUNDCLOUD_PREF_FUNCTIONS as $func => $param) { + foreach ($this->_SOUNDCLOUD_PREF_FUNCTIONS as $func => $param) { $val = Application_Model_Preference::$func(); if (!empty($val)) { $trackArray[$param] = $val; @@ -90,6 +81,34 @@ class SoundcloudService extends ThirdPartyService { return $trackArray; } + /** + * Update a ThirdPartyTrackReferences object for a completed upload + * + * @param $fileId int local CcFiles identifier + * @param $track object third-party service track object + * + * @throws Exception + * @throws PropelException + */ + protected function _addOrUpdateTrackReference($fileId, $track) { + // First, check if the track already has an entry in the database + $ref = ThirdPartyTrackReferencesQuery::create() + ->filterByDbService($this->_SERVICE_NAME) + ->findOneByDbFileId($fileId); + if (is_null($ref)) { + $ref = new ThirdPartyTrackReferences(); + } + $ref->setDbService($this->_SERVICE_NAME); + $ref->setDbForeignId($track->id); + $ref->setDbFileId($fileId); + $ref->setDbStatus($track->state); + // Null the broker task fields because we no longer need them + $ref->setDbBrokerTaskId(NULL); + $ref->setDbBrokerTaskName(NULL); + $ref->setDbBrokerTaskDispatchTime(NULL); + $ref->save(); + } + /** * Given a CcFiles identifier for a file that's been uploaded to SoundCloud, * return a link to the remote file @@ -102,7 +121,7 @@ class SoundcloudService extends ThirdPartyService { $serviceId = $this->getServiceId($fileId); // If we don't find a record for the file we'll get 0 back for the id if ($serviceId == 0) { return ''; } - $track = json_decode($this->_client->get('tracks/'. $serviceId)); + $track = json_decode($this->_client->get('tracks/' . $serviceId)); return $track->permalink_url; } @@ -112,8 +131,7 @@ class SoundcloudService extends ThirdPartyService { * @return bool true if an access token exists, otherwise false */ public function hasAccessToken() { - $accessToken = $this->_client->getAccessToken(); - return !empty($accessToken); + return !empty($this->_accessToken); } /** @@ -139,6 +157,7 @@ class SoundcloudService extends ThirdPartyService { $response = $this->_client->accessToken($code, $postData = array('scope' => 'non-expiring')); $accessToken = $response['access_token']; Application_Model_Preference::setSoundCloudRequestToken($accessToken); + $this->_accessToken = $accessToken; } /** @@ -150,7 +169,7 @@ class SoundcloudService extends ThirdPartyService { public function accessTokenRefresh() { assert($this->hasAccessToken()); try { - $accessToken = $this->_client->getAccessToken(); + $accessToken = $this->_accessToken; $this->_client->accessTokenRefresh($accessToken); } catch(Soundcloud\Exception\InvalidHttpResponseCodeException $e) { // If we get here, then that means our token is stale, so remove it diff --git a/airtime_mvc/application/services/ThirdPartyService.php b/airtime_mvc/application/services/ThirdPartyService.php index 48e882a44..dc7497116 100644 --- a/airtime_mvc/application/services/ThirdPartyService.php +++ b/airtime_mvc/application/services/ThirdPartyService.php @@ -5,33 +5,79 @@ */ abstract class ThirdPartyService { + /** + * @var string service access token for accessing remote API + */ + protected $_accessToken; + /** * @var string service name to store in ThirdPartyTrackReferences database */ - protected $_SERVICE_NAME = ''; + protected $_SERVICE_NAME; /** * @var string base URI for third-party tracks */ - protected $_THIRD_PARTY_TRACK_URI = ''; + protected $_THIRD_PARTY_TRACK_URI; + + /** + * @var string broker exchange name for third party tasks + */ + protected $_CELERY_EXCHANGE_NAME = 'default'; + + /** + * @var string celery task name for third party uploads + */ + protected $_CELERY_UPLOAD_TASK_NAME = 'upload'; + + /** + * @var string status string for pending tasks + */ + protected $_PENDING_STATUS = 'PENDING'; + + /** + * @var string status string for failed tasks + */ + protected $_FAILED_STATUS = 'FAILED'; /** * Upload the file with the given identifier to a third-party service * * @param int $fileId the local CcFiles identifier + * + * @throws Exception thrown when the upload fails for any reason */ - abstract function upload($fileId); + public function upload($fileId) { + $file = Application_Model_StoredFile::RecallById($fileId); + $data = array( + 'data' => $this->_getUploadData($file), + 'token' => $this->_accessToken, + 'file_path' => $file->getFilePaths()[0] + ); + try { + $brokerTaskId = Application_Model_RabbitMq::sendCeleryMessage($this->_CELERY_UPLOAD_TASK_NAME, + $this->_CELERY_EXCHANGE_NAME, + $data); + $this->_createTaskReference($fileId, $brokerTaskId, $this->_CELERY_UPLOAD_TASK_NAME); + } catch(Exception $e) { + Logging::info("Invalid request: " . $e->getMessage()); + // We should only get here if we have an access token, so attempt to refresh + $this->accessTokenRefresh(); + } + } /** - * Create a ThirdPartyTrackReferences and save it to the database + * Create a ThirdPartyTrackReferences object for a pending task * - * @param $fileId int local CcFiles identifier - * @param $track object third-party service track object + * @param $fileId int local CcFiles identifier + * @param $brokerTaskId int broker task identifier to so we can asynchronously + * receive completed task messages + * @param $taskName string broker task name * * @throws Exception * @throws PropelException */ - protected function _createTrackReference($fileId, $track) { + protected function _createTaskReference($fileId, $brokerTaskId, $taskName) { // First, check if the track already has an entry in the database $ref = ThirdPartyTrackReferencesQuery::create() ->filterByDbService($this->_SERVICE_NAME) @@ -40,9 +86,12 @@ abstract class ThirdPartyService { $ref = new ThirdPartyTrackReferences(); } $ref->setDbService($this->_SERVICE_NAME); - $ref->setDbForeignId($track->id); + $ref->setDbBrokerTaskId($brokerTaskId); + $ref->setDbBrokerTaskName($taskName); + $utc = new DateTimeZone("UTC"); + $ref->setDbBrokerTaskDispatchTime(new DateTime("now", $utc)); $ref->setDbFileId($fileId); - $ref->setDbStatus($track->state); + $ref->setDbStatus($this->_PENDING_STATUS); $ref->save(); } @@ -91,6 +140,103 @@ abstract class ThirdPartyService { return $serviceId > 0 ? $this->_THIRD_PARTY_TRACK_URI . $serviceId : ''; } + /** + * Poll the message queue for this service to see if any tasks with the given name have completed + * If we find any completed tasks, adjust the ThirdPartyTrackReferences table accordingly + * If no task name is passed, we poll all tasks for this service + * + * @param string $taskName the name of the task to poll for + */ + public function pollBrokerTaskQueue($taskName="") { + $pendingTasks = $this->_getPendingTasks($taskName); + foreach ($pendingTasks as $task) { + try { + $result = $this->_getTaskResult($task); + Logging::info(json_decode($result)); + $this->_addOrUpdateTrackReference($task->getDbFileId(), json_decode($result)); + } catch(CeleryException $e) { + Logging::info("Couldn't retrieve task message for task " . $task->getDbBrokerTaskName() + . " with ID " . $task->getDbBrokerTaskId() . ": " . $e->getMessage()); + if ($this->_checkMessageTimeout($task)) { + $task->setDbStatus($this->_FAILED_STATUS); + $task->save(); + } + } + } + } + + /** + * Return a collection of all pending ThirdPartyTrackReferences to tasks for this service or task + * + * @param string $taskName the name of the task to look for + * + * @return PropelCollection any pending ThirdPartyTrackReferences results for this service + * or task if taskName is provided + */ + protected function _getPendingTasks($taskName) { + $query = ThirdPartyTrackReferencesQuery::create() + ->filterByDbService($this->_SERVICE_NAME) + ->filterByDbStatus($this->_PENDING_STATUS) + ->filterByDbBrokerTaskId('', Criteria::NOT_EQUAL); + if (!empty($taskName)) { + $query->filterByDbBrokerTaskName($taskName); + } + return $query->find(); + } + + /** + * Get the result from a celery task message in the results queue + * If the task message no longer exists, remove it from the track references table + * + * @param $task ThirdPartyTrackReferences the track reference object + * + * @return array the results from the task message + * + * @throws CeleryException when the result message for this task no longer exists + */ + protected function _getTaskResult($task) { + $message = Application_Model_RabbitMq::getAsyncResultMessage($task->getDbBrokerTaskName(), + $task->getDbBrokerTaskId()); + return json_decode($message['body'])->result; // The actual result message from the service + } + + /** + * Check if a task message has been unreachable for more our timeout time + * + * @param $task ThirdPartyTrackReferences the track reference object + * + * @return bool true if the dispatch time is empty or it's been more than our timeout time + * since the message was dispatched, otherwise false + */ + protected function _checkMessageTimeout($task) { + $utc = new DateTimeZone("UTC"); + $dispatchTime = new DateTime($task->getDbBrokerTaskDispatchTime(), $utc); + $now = new DateTime("now", $utc); + $timeoutSeconds = Application_Model_RabbitMq::$_CELERY_MESSAGE_TIMEOUT / 1000; // Convert from milliseconds + $timeoutInterval = new DateInterval("PT" . $timeoutSeconds . "S"); + return (empty($dispatchTime) || $dispatchTime->add($timeoutInterval) <= $now); + } + + /** + * Build a parameter array for the file being uploaded to a third party service + * + * @param $file Application_Model_StoredFile the file being uploaded + * + * @return array the track array to send to the third party service + */ + abstract protected function _getUploadData($file); + + /** + * Update a ThirdPartyTrackReferences object for a completed task + * + * @param $fileId int local CcFiles identifier + * @param $track object third-party service track object + * + * @throws Exception + * @throws PropelException + */ + abstract protected function _addOrUpdateTrackReference($fileId, $track); + /** * Check whether an OAuth access token exists for the third-party client * diff --git a/airtime_mvc/build/schema.xml b/airtime_mvc/build/schema.xml index 03bad21cb..53a479dc9 100644 --- a/airtime_mvc/build/schema.xml +++ b/airtime_mvc/build/schema.xml @@ -533,10 +533,20 @@ - - + + + + + + + + + + + + diff --git a/airtime_mvc/build/sql/schema.sql b/airtime_mvc/build/sql/schema.sql index 3d7aaf39a..74a39e597 100644 --- a/airtime_mvc/build/sql/schema.sql +++ b/airtime_mvc/build/sql/schema.sql @@ -679,11 +679,16 @@ DROP TABLE IF EXISTS "third_party_track_references" CASCADE; CREATE TABLE "third_party_track_references" ( "id" serial NOT NULL, - "service" VARCHAR(512) NOT NULL, - "foreign_id" INTEGER NOT NULL, + "service" VARCHAR(256) NOT NULL, + "foreign_id" VARCHAR(256), + "broker_task_id" VARCHAR(256), + "broker_task_name" VARCHAR(256), + "broker_task_dispatch_time" TIMESTAMP, "file_id" INTEGER NOT NULL, "status" VARCHAR(256) NOT NULL, - PRIMARY KEY ("id") + PRIMARY KEY ("id"), + CONSTRAINT "broker_task_id_unique" UNIQUE ("broker_task_id"), + CONSTRAINT "foreign_id_unique" UNIQUE ("foreign_id") ); ALTER TABLE "cc_files" ADD CONSTRAINT "cc_files_owner_fkey" diff --git a/airtime_mvc/public/js/airtime/common/common.js b/airtime_mvc/public/js/airtime/common/common.js index 31f4668bf..2cb0501fe 100644 --- a/airtime_mvc/public/js/airtime/common/common.js +++ b/airtime_mvc/public/js/airtime/common/common.js @@ -9,6 +9,7 @@ $(document).ready(function() { //this statement tells the browser to fade out any success message after 5 seconds setTimeout(function(){$(".success").fadeOut("slow", function(){$(this).empty()});}, 5000); + pollTaskQueues(); }); /* @@ -156,3 +157,8 @@ function removeSuccessMsg() { $status.fadeOut("slow", function(){$status.empty()}); } + +function pollTaskQueues() { + console.log("Polling broker queues..."); + $.get(baseUrl + 'soundcloud/poll-broker-task-queue'); +} \ No newline at end of file diff --git a/python_apps/airtime-celery/airtime-celery/__init__.py b/python_apps/airtime-celery/airtime-celery/__init__.py new file mode 100644 index 000000000..a65fa3c85 --- /dev/null +++ b/python_apps/airtime-celery/airtime-celery/__init__.py @@ -0,0 +1,3 @@ +import os +# Make the celeryconfig module visible to celery +os.environ['CELERY_CONFIG_MODULE'] = 'airtime-celery.celeryconfig' \ No newline at end of file diff --git a/python_apps/soundcloud_uploader/soundcloud_uploader/celeryconfig.py b/python_apps/airtime-celery/airtime-celery/celeryconfig.py similarity index 69% rename from python_apps/soundcloud_uploader/soundcloud_uploader/celeryconfig.py rename to python_apps/airtime-celery/airtime-celery/celeryconfig.py index 08e67821f..fcdd83a70 100644 --- a/python_apps/soundcloud_uploader/soundcloud_uploader/celeryconfig.py +++ b/python_apps/airtime-celery/airtime-celery/celeryconfig.py @@ -24,15 +24,22 @@ def get_rmq_broker(): # Celery amqp settings BROKER_URL = get_rmq_broker() CELERY_RESULT_BACKEND = 'amqp' # Use RabbitMQ as the celery backend -# CELERY_RESULT_EXCHANGE = 'upload-results' CELERY_RESULT_PERSISTENT = True # Persist through a broker restart -CELERY_TASK_RESULT_EXPIRES = None # Don't expire tasks +CELERY_TASK_RESULT_EXPIRES = 300 # Expire task results after 5 minutes CELERY_TRACK_STARTED = False +CELERY_RESULT_EXCHANGE = 'airtime-results' CELERY_QUEUES = ( - Queue('soundcloud-uploads', Exchange('soundcloud-uploads'), routing_key='celery'), + Queue('soundcloud-uploads', exchange=Exchange('soundcloud-uploads'), routing_key='soundcloud-uploads'), + Queue('airtime-results.soundcloud-uploads', exchange=Exchange('airtime-results')), +) +CELERY_ROUTES = ( + { + 'soundcloud_uploads.uploader.upload_to_soundcloud': { + 'exchange': 'airtime-results', + 'queue': 'airtime-results.soundcloud-uploads', + } + }, ) -CELERY_DEFAULT_QUEUE = 'soundcloud-uploads' -CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' # Celery task settings CELERY_TASK_SERIALIZER = 'json' diff --git a/python_apps/airtime-celery/airtime-celery/uploader.py b/python_apps/airtime-celery/airtime-celery/uploader.py new file mode 100644 index 000000000..addbc964d --- /dev/null +++ b/python_apps/airtime-celery/airtime-celery/uploader.py @@ -0,0 +1,24 @@ +import os +import json +import urllib2 +import soundcloud +from celery import Celery +from celery.utils.log import get_task_logger + +celery = Celery() +logger = get_task_logger(__name__) + + +@celery.task(name='upload-to-soundcloud') +def upload_to_soundcloud(data, token, file_path): + client = soundcloud.Client(access_token=token) + # Open the file with urllib2 if it's a cloud file + data['asset_data'] = open(file_path, 'rb') if os.path.isfile(file_path) else urllib2.urlopen(file_path) + try: + logger.info('Uploading track: {0}'.format(data)) + track = client.post('/tracks', track=data) + except Exception as e: + logger.info('Error uploading track {title}: {0}'.format(e.message, **data)) + raise e + data['asset_data'].close() + return json.dumps(track.fields()) diff --git a/python_apps/soundcloud_uploader/install/conf/soundcloud_uploader b/python_apps/airtime-celery/install/conf/airtime-celery similarity index 89% rename from python_apps/soundcloud_uploader/install/conf/soundcloud_uploader rename to python_apps/airtime-celery/install/conf/airtime-celery index 4276e402f..b026a8069 100644 --- a/python_apps/soundcloud_uploader/install/conf/soundcloud_uploader +++ b/python_apps/airtime-celery/install/conf/airtime-celery @@ -1,11 +1,11 @@ # Names of nodes to start -CELERYD_NODES="soundcloud_uploader" +CELERYD_NODES="airtime-celery" # Absolute or relative path to the 'celery' command: CELERY_BIN="/usr/local/bin/celery" # App instance to use -CELERY_APP="soundcloud_uploader.uploader:celery" +CELERY_APP="airtime-celery.uploader:celery" # Extra command-line arguments to the worker CELERYD_OPTS="--time-limit=300 --concurrency=8 --config=celeryconfig" diff --git a/python_apps/soundcloud_uploader/install/upstart/soundcloud_uploader b/python_apps/airtime-celery/install/upstart/airtime-celery similarity index 100% rename from python_apps/soundcloud_uploader/install/upstart/soundcloud_uploader rename to python_apps/airtime-celery/install/upstart/airtime-celery diff --git a/python_apps/airtime-celery/setup.py b/python_apps/airtime-celery/setup.py new file mode 100644 index 000000000..6f40351b7 --- /dev/null +++ b/python_apps/airtime-celery/setup.py @@ -0,0 +1,45 @@ +from setuptools import setup +from subprocess import call +import os +import sys + +install_args = ['install', 'install_data', 'develop'] + +# Definitely not the best way of doing this... +if sys.argv[1] in install_args: + data_files = [('/etc/default', ['install/conf/airtime-celery']), + ('/etc/init.d', ['install/upstart/airtime-celery'])] +else: + data_files = [] + + +def postinst(): + print "Reloading initctl configuration" + call(['initctl', 'reload-configuration']) + # Make /etc/init.d file executable and set proper + # permissions for the defaults config file + os.chmod('/etc/init.d/airtime-celery', 0755) + os.chmod('/etc/default/airtime-celery', 0640) + print "Setting uploader to start on boot" + call(['update-rc.d', 'airtime-celery', 'defaults']) + print "Run \"sudo service airtime-celery restart\" now." + +setup(name='airtime-celery', + version='0.1', + description='Airtime Celery service', + url='http://github.com/sourcefabric/Airtime', + author='Sourcefabric', + author_email='duncan.sommerville@sourcefabric.org', + license='MIT', + packages=['airtime-celery'], + install_requires=[ + 'soundcloud', + 'celery', + 'kombu' + ], + zip_safe=False, + data_files=data_files) + +if data_files: + postinst() + diff --git a/python_apps/soundcloud_uploader/bin/soundcloud_uploader b/python_apps/soundcloud_uploader/bin/soundcloud_uploader deleted file mode 100644 index 7ecdd2b88..000000000 --- a/python_apps/soundcloud_uploader/bin/soundcloud_uploader +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env sh - -if [ "$(id -u)" != "0" ]; then - echo "Please run as root user." - exit 1 -fi - -service soundcloud_uploader restart \ No newline at end of file diff --git a/python_apps/soundcloud_uploader/setup.py b/python_apps/soundcloud_uploader/setup.py deleted file mode 100644 index 5625f7c89..000000000 --- a/python_apps/soundcloud_uploader/setup.py +++ /dev/null @@ -1,35 +0,0 @@ -from setuptools import setup -from subprocess import call -import os - -data_files = [('/etc/default', ['install/conf/soundcloud_uploader']), - ('/etc/init.d', ['install/upstart/soundcloud_uploader'])] -print data_files - -setup(name='soundcloud_uploader', - version='0.1', - description='Celery SoundCloud upload worker', - url='http://github.com/sourcefabric/Airtime', - author='Sourcefabric', - author_email='duncan.sommerville@sourcefabric.org', - license='MIT', - packages=['soundcloud_uploader'], - scripts=['bin/soundcloud_uploader'], - install_requires=[ - 'soundcloud', - 'celery', - 'kombu' - ], - zip_safe=False, - data_files=data_files) - -if data_files: - print "Reloading initctl configuration" - call(['initctl', 'reload-configuration']) - # Make /etc/init.d file executable and set proper - # permissions for the defaults config file - os.chmod('/etc/init.d/soundcloud_uploader', 0755) - os.chmod('/etc/default/soundcloud_uploader', 0640) - print "Setting uploader to start on boot" - call(['update-rc.d', 'soundcloud_uploader', 'defaults']) - print "Run \"sudo service soundcloud_uploader restart\" now." \ No newline at end of file diff --git a/python_apps/soundcloud_uploader/soundcloud_uploader/__init__.py b/python_apps/soundcloud_uploader/soundcloud_uploader/__init__.py deleted file mode 100644 index d7484a82e..000000000 --- a/python_apps/soundcloud_uploader/soundcloud_uploader/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -import os -# Make the celeryconfig module visible to celery -os.environ['CELERY_CONFIG_MODULE'] = 'soundcloud_uploader.celeryconfig' \ No newline at end of file diff --git a/python_apps/soundcloud_uploader/soundcloud_uploader/uploader.py b/python_apps/soundcloud_uploader/soundcloud_uploader/uploader.py deleted file mode 100644 index 4fc24c21b..000000000 --- a/python_apps/soundcloud_uploader/soundcloud_uploader/uploader.py +++ /dev/null @@ -1,24 +0,0 @@ -import os -import json -import urllib2 -import soundcloud -from celery import Celery -from celery.utils.log import get_task_logger - -celery = Celery('uploader') -logger = get_task_logger(__name__) - - -@celery.task(queue='soundcloud-uploads', name='upload') -def upload(track_data, token, file_path): - client = soundcloud.Client(access_token=token) - # Open the file with urllib2 if it's a cloud file - track_data['asset_data'] = open(file_path, 'rb') if os.path.isfile(file_path) else urllib2.urlopen(file_path) - try: - logger.info('Uploading track: {0}'.format(track_data)) - track = client.post('/tracks', track=track_data) - except Exception as e: - logger.info('Error uploading track {name}: {0}'.format(e.message, **track_data)) - raise e - track_data['asset_data'].close() - return json.dumps(track.fields())