Celery backend and support for dev-env worker parallelization

This commit is contained in:
Duncan Sommerville 2015-06-12 12:31:55 -04:00
parent c1b5b53a16
commit 15c7ef5885
17 changed files with 1664 additions and 368 deletions

View file

@ -44,42 +44,53 @@ class UpgradeManager
*/
}
/**
* Upgrade the Airtime schema version to match the highest supported version
*
* @return boolean whether or not an upgrade was performed
*/
public function doUpgrade()
{
// Get all upgrades dynamically (in declaration order!) so we don't have to add them explicitly each time
// TODO: explicitly sort classnames by ascending version suffix for safety
$upgraders = getUpgrades();
return $this->runUpgrades($upgraders, (dirname(__DIR__) . "/controllers"));
$dir = (dirname(__DIR__) . "/controllers");
$upgradePerformed = false;
foreach ($upgraders as $upgrader) {
$upgradePerformed = $this->_runUpgrade(new $upgrader($dir)) ? true : $upgradePerformed;
}
return $upgradePerformed;
}
/**
* Run a given set of upgrades
*
* @param array $upgraders the upgrades to perform
* @param string $dir the directory containing the upgrade sql
* @return boolean whether or not an upgrade was performed
* Run the given upgrade
*
* @param $upgrader AirtimeUpgrader the upgrade class to be executed
*
* @return bool true if the upgrade was successful, otherwise false
*/
public function runUpgrades($upgraders, $dir) {
$upgradePerformed = false;
foreach ($upgraders as $upgrader) {
/** @var $upgrader AirtimeUpgrader */
$upgrader = new $upgrader();
if ($upgrader->checkIfUpgradeSupported())
{
// pass the given directory to the upgrades, since __DIR__ returns parent dir of file, not executor
$upgrader->upgrade($dir); //This will throw an exception if the upgrade fails.
$upgradePerformed = true;
}
}
return $upgradePerformed;
private function _runUpgrade(AirtimeUpgrader $upgrader) {
return $upgrader->checkIfUpgradeSupported() && $upgrader->upgrade();
}
}
abstract class AirtimeUpgrader
{
protected $_dir;
/**
* @param $dir string directory housing upgrade files
*/
public function __construct($dir) {
$this->_dir = $dir;
}
/** Schema versions that this upgrader class can upgrade from (an array of version strings). */
abstract protected function getSupportedSchemaVersions();
/** The schema version that this upgrader class will upgrade to. (returns a version string) */
abstract public function getNewVersion();
@ -94,10 +105,7 @@ abstract class AirtimeUpgrader
*/
public function checkIfUpgradeSupported()
{
if (!in_array(AirtimeUpgrader::getCurrentSchemaVersion(), $this->getSupportedSchemaVersions())) {
return false;
}
return true;
return in_array(AirtimeUpgrader::getCurrentSchemaVersion(), $this->getSupportedSchemaVersions());
}
protected function toggleMaintenanceScreen($toggle)
@ -122,8 +130,45 @@ abstract class AirtimeUpgrader
}
}
/** Implement this for each new version of Airtime */
abstract public function upgrade();
/**
* Implement this for each new version of Airtime
* This function abstracts out the core upgrade functionality,
* allowing child classes to overwrite _runUpgrade to reduce duplication
*/
public function upgrade() {
Cache::clear();
assert($this->checkIfUpgradeSupported());
try {
// $this->toggleMaintenanceScreen(true);
Cache::clear();
$this->_runUpgrade();
Application_Model_Preference::SetSchemaVersion($this->getNewVersion());
Cache::clear();
// $this->toggleMaintenanceScreen(false);
} catch(Exception $e) {
// $this->toggleMaintenanceScreen(false);
return false;
}
return true;
}
protected function _runUpgrade() {
$airtimeConf = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf";
$values = parse_ini_file($airtimeConf, true);
$username = $values['database']['dbuser'];
$password = $values['database']['dbpass'];
$host = $values['database']['host'];
$database = $values['database']['dbname'];
passthru("export PGPASSWORD=$password && psql -h $host -U $username -q -f ".$this->_dir."/upgrade_sql/airtime_"
.$this->getNewVersion()."/upgrade.sql $database 2>&1 | grep -v -E \"will create implicit sequence|will create implicit index\"");
}
}
class AirtimeUpgrader253 extends AirtimeUpgrader
@ -137,60 +182,17 @@ class AirtimeUpgrader253 extends AirtimeUpgrader
{
return '2.5.3';
}
public function upgrade($dir = __DIR__)
{
Cache::clear();
assert($this->checkIfUpgradeSupported());
$con = Propel::getConnection();
$con->beginTransaction();
try {
$this->toggleMaintenanceScreen(true);
Cache::clear();
//Begin upgrade
//Update disk_usage value in cc_pref
$musicDir = CcMusicDirsQuery::create()
->filterByType('stor')
->filterByExists(true)
->findOne();
$storPath = $musicDir->getDirectory();
//Update disk_usage value in cc_pref
$storDir = isset($_SERVER['AIRTIME_BASE']) ? $_SERVER['AIRTIME_BASE']."srv/airtime/stor" : "/srv/airtime/stor";
$diskUsage = shell_exec("du -sb $storDir | awk '{print $1}'");
Application_Model_Preference::setDiskUsage($diskUsage);
//clear out the cache
Cache::clear();
$con->commit();
//update system_version in cc_pref and change some columns in cc_files
$airtimeConf = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf";
$values = parse_ini_file($airtimeConf, true);
$username = $values['database']['dbuser'];
$password = $values['database']['dbpass'];
$host = $values['database']['host'];
$database = $values['database']['dbname'];
passthru("export PGPASSWORD=$password && psql -h $host -U $username -q -f $dir/upgrade_sql/airtime_".$this->getNewVersion()."/upgrade.sql $database 2>&1 | grep -v -E \"will create implicit sequence|will create implicit index\"");
Application_Model_Preference::SetSchemaVersion($this->getNewVersion());
//clear out the cache
Cache::clear();
$this->toggleMaintenanceScreen(false);
} catch (Exception $e) {
$con->rollback();
$this->toggleMaintenanceScreen(false);
}
protected function _runUpgrade()
{
//Update disk_usage value in cc_pref
$storDir = isset($_SERVER['AIRTIME_BASE']) ? $_SERVER['AIRTIME_BASE']."srv/airtime/stor" : "/srv/airtime/stor";
$diskUsage = shell_exec("du -sb $storDir | awk '{print $1}'");
Application_Model_Preference::setDiskUsage($diskUsage);
//update system_version in cc_pref and change some columns in cc_files
parent::_runUpgrade();
}
}
@ -205,78 +207,49 @@ class AirtimeUpgrader254 extends AirtimeUpgrader
return '2.5.4';
}
public function upgrade()
protected function _runUpgrade()
{
Cache::clear();
assert($this->checkIfUpgradeSupported());
$newVersion = $this->getNewVersion();
$con = Propel::getConnection();
//$con->beginTransaction();
try {
$this->toggleMaintenanceScreen(true);
Cache::clear();
//Begin upgrade
//First, ensure there are no superadmins already.
$numberOfSuperAdmins = CcSubjsQuery::create()
//First, ensure there are no superadmins already.
$numberOfSuperAdmins = CcSubjsQuery::create()
->filterByDbType(UTYPE_SUPERADMIN)
->filterByDbLogin("sourcefabric_admin", Criteria::NOT_EQUAL) //Ignore sourcefabric_admin users
->count();
//Only create a super admin if there isn't one already.
if ($numberOfSuperAdmins == 0)
{
//Find the "admin" user and promote them to superadmin.
$adminUser = CcSubjsQuery::create()
//Only create a super admin if there isn't one already.
if ($numberOfSuperAdmins == 0)
{
//Find the "admin" user and promote them to superadmin.
$adminUser = CcSubjsQuery::create()
->filterByDbLogin('admin')
->findOne();
if (!$adminUser)
{
//TODO: Otherwise get the user with the lowest ID that is of type administrator:
//
$adminUser = CcSubjsQuery::create()
if (!$adminUser)
{
// Otherwise get the user with the lowest ID that is of type administrator:
$adminUser = CcSubjsQuery::create()
->filterByDbType(UTYPE_ADMIN)
->orderByDbId(Criteria::ASC)
->findOne();
if (!$adminUser) {
throw new Exception("Failed to find any users of type 'admin' ('A').");
}
}
$adminUser = new Application_Model_User($adminUser->getDbId());
$adminUser->setType(UTYPE_SUPERADMIN);
$adminUser->save();
Logging::info($_SERVER['HTTP_HOST'] . ': ' . $newVersion . " Upgrade: Promoted user " . $adminUser->getLogin() . " to be a Super Admin.");
//Also try to promote the sourcefabric_admin user
$sofabAdminUser = CcSubjsQuery::create()
->filterByDbLogin('sourcefabric_admin')
->findOne();
if ($sofabAdminUser) {
$sofabAdminUser = new Application_Model_User($sofabAdminUser->getDbId());
$sofabAdminUser->setType(UTYPE_SUPERADMIN);
$sofabAdminUser->save();
Logging::info($_SERVER['HTTP_HOST'] . ': ' . $newVersion . " Upgrade: Promoted user " . $sofabAdminUser->getLogin() . " to be a Super Admin.");
if (!$adminUser) {
throw new Exception("Failed to find any users of type 'admin' ('A').");
}
}
//$con->commit();
Application_Model_Preference::SetSchemaVersion($newVersion);
Cache::clear();
$this->toggleMaintenanceScreen(false);
return true;
} catch(Exception $e) {
//$con->rollback();
$this->toggleMaintenanceScreen(false);
throw $e;
$adminUser = new Application_Model_User($adminUser->getDbId());
$adminUser->setType(UTYPE_SUPERADMIN);
$adminUser->save();
Logging::info($_SERVER['HTTP_HOST'] . ': ' . $this->getNewVersion() . " Upgrade: Promoted user " . $adminUser->getLogin() . " to be a Super Admin.");
//Also try to promote the sourcefabric_admin user
$sofabAdminUser = CcSubjsQuery::create()
->filterByDbLogin('sourcefabric_admin')
->findOne();
if ($sofabAdminUser) {
$sofabAdminUser = new Application_Model_User($sofabAdminUser->getDbId());
$sofabAdminUser->setType(UTYPE_SUPERADMIN);
$sofabAdminUser->save();
Logging::info($_SERVER['HTTP_HOST'] . ': ' . $this->getNewVersion() . " Upgrade: Promoted user " . $sofabAdminUser->getLogin() . " to be a Super Admin.");
}
}
}
}
@ -291,40 +264,6 @@ class AirtimeUpgrader255 extends AirtimeUpgrader {
public function getNewVersion() {
return '2.5.5';
}
public function upgrade($dir = __DIR__) {
Cache::clear();
assert($this->checkIfUpgradeSupported());
$newVersion = $this->getNewVersion();
try {
$this->toggleMaintenanceScreen(true);
Cache::clear();
// Begin upgrade
$airtimeConf = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf";
$values = parse_ini_file($airtimeConf, true);
$username = $values['database']['dbuser'];
$password = $values['database']['dbpass'];
$host = $values['database']['host'];
$database = $values['database']['dbname'];
passthru("export PGPASSWORD=$password && psql -h $host -U $username -q -f $dir/upgrade_sql/airtime_"
.$this->getNewVersion()."/upgrade.sql $database 2>&1 | grep -v -E \"will create implicit sequence|will create implicit index\"");
Application_Model_Preference::SetSchemaVersion($newVersion);
Cache::clear();
$this->toggleMaintenanceScreen(false);
return true;
} catch(Exception $e) {
$this->toggleMaintenanceScreen(false);
throw $e;
}
}
}
class AirtimeUpgrader259 extends AirtimeUpgrader {
@ -337,38 +276,6 @@ class AirtimeUpgrader259 extends AirtimeUpgrader {
public function getNewVersion() {
return '2.5.9';
}
public function upgrade($dir = __DIR__) {
Cache::clear();
assert($this->checkIfUpgradeSupported());
$newVersion = $this->getNewVersion();
try {
$this->toggleMaintenanceScreen(true);
Cache::clear();
// Begin upgrade
$airtimeConf = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf";
$values = parse_ini_file($airtimeConf, true);
$username = $values['database']['dbuser'];
$password = $values['database']['dbpass'];
$host = $values['database']['host'];
$database = $values['database']['dbname'];
passthru("export PGPASSWORD=$password && psql -h $host -U $username -q -f $dir/upgrade_sql/airtime_"
.$this->getNewVersion()."/upgrade.sql $database 2>&1 | grep -v -E \"will create implicit sequence|will create implicit index\"");
Application_Model_Preference::SetSchemaVersion($newVersion);
Cache::clear();
$this->toggleMaintenanceScreen(false);
} catch(Exception $e) {
$this->toggleMaintenanceScreen(false);
throw $e;
}
}
}
class AirtimeUpgrader2510 extends AirtimeUpgrader
@ -382,38 +289,6 @@ class AirtimeUpgrader2510 extends AirtimeUpgrader
public function getNewVersion() {
return '2.5.10';
}
public function upgrade($dir = __DIR__) {
Cache::clear();
assert($this->checkIfUpgradeSupported());
$newVersion = $this->getNewVersion();
try {
$this->toggleMaintenanceScreen(true);
Cache::clear();
// Begin upgrade
$airtimeConf = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf";
$values = parse_ini_file($airtimeConf, true);
$username = $values['database']['dbuser'];
$password = $values['database']['dbpass'];
$host = $values['database']['host'];
$database = $values['database']['dbname'];
passthru("export PGPASSWORD=$password && psql -h $host -U $username -q -f $dir/upgrade_sql/airtime_"
.$this->getNewVersion()."/upgrade.sql $database 2>&1 | grep -v -E \"will create implicit sequence|will create implicit index\"");
Application_Model_Preference::SetSchemaVersion($newVersion);
Cache::clear();
$this->toggleMaintenanceScreen(false);
} catch(Exception $e) {
$this->toggleMaintenanceScreen(false);
throw $e;
}
}
}
class AirtimeUpgrader2511 extends AirtimeUpgrader
@ -428,35 +303,13 @@ class AirtimeUpgrader2511 extends AirtimeUpgrader
return '2.5.11';
}
public function upgrade($dir = __DIR__) {
Cache::clear();
assert($this->checkIfUpgradeSupported());
$newVersion = $this->getNewVersion();
try {
$this->toggleMaintenanceScreen(true);
Cache::clear();
// Begin upgrade
$queryResult = CcFilesQuery::create()
->select(array('disk_usage'))
->withColumn('SUM(CcFiles.filesize)', 'disk_usage')
->find();
$disk_usage = $queryResult[0];
Application_Model_Preference::setDiskUsage($disk_usage);
Application_Model_Preference::SetSchemaVersion($newVersion);
Cache::clear();
$this->toggleMaintenanceScreen(false);
} catch(Exception $e) {
$this->toggleMaintenanceScreen(false);
throw $e;
}
}
public function downgrade() {
protected function _runUpgrade() {
$queryResult = CcFilesQuery::create()
->select(array('disk_usage'))
->withColumn('SUM(CcFiles.filesize)', 'disk_usage')
->find();
$disk_usage = $queryResult[0];
Application_Model_Preference::setDiskUsage($disk_usage);
}
}
@ -472,41 +325,6 @@ class AirtimeUpgrader2512 extends AirtimeUpgrader
public function getNewVersion() {
return '2.5.12';
}
public function upgrade($dir = __DIR__) {
Cache::clear();
assert($this->checkIfUpgradeSupported());
$newVersion = $this->getNewVersion();
try {
$this->toggleMaintenanceScreen(true);
Cache::clear();
// Begin upgrade
$airtimeConf = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf";
$values = parse_ini_file($airtimeConf, true);
$username = $values['database']['dbuser'];
$password = $values['database']['dbpass'];
$host = $values['database']['host'];
$database = $values['database']['dbname'];
passthru("export PGPASSWORD=$password && psql -h $host -U $username -q -f $dir/upgrade_sql/airtime_"
.$this->getNewVersion()."/upgrade.sql $database 2>&1 | grep -v -E \"will create implicit sequence|will create implicit index\"");
Application_Model_Preference::SetSchemaVersion($newVersion);
Cache::clear();
$this->toggleMaintenanceScreen(false);
} catch(Exception $e) {
$this->toggleMaintenanceScreen(false);
throw $e;
}
}
public function downgrade() {
}
}
class AirtimeUpgrader2513 extends AirtimeUpgrader
@ -520,38 +338,4 @@ class AirtimeUpgrader2513 extends AirtimeUpgrader
public function getNewVersion() {
return '2.5.13';
}
public function upgrade($dir = __DIR__) {
Cache::clear();
assert($this->checkIfUpgradeSupported());
$newVersion = $this->getNewVersion();
try {
$this->toggleMaintenanceScreen(true);
Cache::clear();
// Begin upgrade
$airtimeConf = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf";
$values = parse_ini_file($airtimeConf, true);
$username = $values['database']['dbuser'];
$password = $values['database']['dbpass'];
$host = $values['database']['host'];
$database = $values['database']['dbname'];
passthru("export PGPASSWORD=$password && psql -h $host -U $username -q -f $dir/upgrade_sql/airtime_"
.$newVersion."/upgrade.sql $database 2>&1 | grep -v -E \"will create implicit sequence|will create implicit index\"");
Application_Model_Preference::SetSchemaVersion($newVersion);
Cache::clear();
$this->toggleMaintenanceScreen(false);
} catch(Exception $e) {
$this->toggleMaintenanceScreen(false);
throw $e;
}
}
}