CC-5709: Airtime Analyzer

* Notify airtime_analyzer of new uploads with RabbitMQ
* Use a durable exchange for airtime-uploads
This commit is contained in:
Albert Santoni 2014-03-13 13:35:48 -04:00
parent f4ea417b83
commit 451b19150b
4 changed files with 31 additions and 14 deletions

View File

@ -13,7 +13,7 @@ class Application_Model_RabbitMq
self::$doPush = true;
}
private static function sendMessage($exchange, $data)
private static function sendMessage($exchange, $exchangeType, $autoDeleteExchange, $data, $queue="")
{
$CC_CONFIG = Config::getConfig();
@ -31,7 +31,9 @@ class Application_Model_RabbitMq
$channel->access_request($CC_CONFIG["rabbitmq"]["vhost"], false, false,
true, true);
$channel->exchange_declare($exchange, 'direct', false, true);
//I'm pretty sure we DON'T want to autodelete ANY exchanges but I'm keeping the code
//the way it is just so I don't accidentally break anything when I add the Analyzer code in. -- Albert, March 13, 2014
$channel->exchange_declare($exchange, $exchangeType, false, true, $autoDeleteExchange);
$msg = new AMQPMessage($data, array('content_type' => 'text/plain'));
@ -46,7 +48,7 @@ class Application_Model_RabbitMq
$exchange = 'airtime-pypo';
$data = json_encode($md, JSON_FORCE_OBJECT);
self::sendMessage($exchange, $data);
self::sendMessage($exchange, 'direct', true, $data);
}
public static function SendMessageToMediaMonitor($event_type, $md)
@ -55,7 +57,7 @@ class Application_Model_RabbitMq
$exchange = 'airtime-media-monitor';
$data = json_encode($md);
self::sendMessage($exchange, $data);
self::sendMessage($exchange, 'direct', true, $data);
}
public static function SendMessageToShowRecorder($event_type)
@ -74,14 +76,18 @@ class Application_Model_RabbitMq
}
$data = json_encode($temp);
self::sendMessage($exchange, $data);
self::sendMessage($exchange, 'direct', true, $data);
}
public static function SendMessageToAnalyzer()
public static function SendMessageToAnalyzer($tmpFilePath, $finalDirectory, $callbackUrl, $apiKey)
{
$exchange = 'airtime-uploads';
//$data = json_encode($md);
//TODO: Finish me
//self::sendMessage($exchange, $data);
$data['tmp_file_path'] = $tmpFilePath;
$data['final_directory'] = $finalDirectory;
$data['callback_url'] = $callbackUrl;
$data['api_key'] = $apiKey;
$jsonData = json_encode($data);
self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads');
}
}

View File

@ -66,7 +66,7 @@ class Rest_MediaController extends Zend_Rest_Controller
return;
}
$this->processUploadedFile();
$this->processUploadedFile($this->getRequest()->getRequestUri());
//TODO: Strip or sanitize the JSON output
$file = new CcFiles();
@ -179,13 +179,24 @@ class Rest_MediaController extends Zend_Rest_Controller
$resp->appendBody("ERROR: Media not found.");
}
private function processUploadedFile()
private function processUploadedFile($callbackUrl)
{
$CC_CONFIG = Config::getConfig();
$apiKey = $CC_CONFIG["apiKey"][0];
$upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload";
$tempFilePath = Application_Model_StoredFile::uploadFile($upload_dir);
$tempFileName = basename($tempFilePath);
//TODO: Dispatch a message to airtime_analyzer through RabbitMQ!
//TODO: Remove copyFileToStor from StoredFile...
$storDir = Application_Model_MusicDir::getStorDir();
$finalDestinationDir = $storDir->getDirectory() . "/organize";
//Dispatch a message to airtime_analyzer through RabbitMQ,
//notifying it that there's a new upload to process!
Application_Model_RabbitMq::SendMessageToAnalyzer($tempFilePath,
$finalDestinationDir, $callbackUrl, $apiKey);
}
}

View File

@ -54,7 +54,7 @@ class MessageListener:
port=self._port, virtual_host=self._vhost,
credentials=pika.credentials.PlainCredentials(self._username, self._password)))
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=EXCHANGE, type=EXCHANGE_TYPE)
self._channel.exchange_declare(exchange=EXCHANGE, type=EXCHANGE_TYPE, durable=True)
result = self._channel.queue_declare(queue=QUEUE, durable=True)
self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)

View File

@ -36,7 +36,7 @@ $channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
// declare/create the exchange as a topic exchange.
$channel->exchange_declare($exchange, $exchangeType, false, false, false);
$channel->exchange_declare($exchange, $exchangeType, false, true, false);
$msg = new AMQPMessage($message, array("content_type" => "text/plain"));