Merge branch 'devel' of dev.sourcefabric.org:airtime into devel

This commit is contained in:
James 2011-09-13 15:21:38 -04:00
commit a84a28d556
8 changed files with 119 additions and 30 deletions

View File

@ -21,6 +21,7 @@ class ApiController extends Zend_Controller_Action
->addActionContext('remove-watched-dir', 'json')
->addActionContext('set-storage-dir', 'json')
->addActionContext('get-stream-setting', 'json')
->addActionContext('status', 'json')
->initContext();
}
@ -636,5 +637,32 @@ class ApiController extends Zend_Controller_Action
$this->view->msg = Application_Model_StreamSetting::getStreamSetting();
}
public function statusAction() {
global $CC_CONFIG;
$request = $this->getRequest();
$api_key = $request->getParam('api_key');
/*
if (!in_array($api_key, $CC_CONFIG["apiKey"]))
{
header('HTTP/1.0 401 Unauthorized');
print 'You are not allowed to access this resource.';
exit;
}
*/
$status = array(
"airtime_version"=>Application_Model_Systemstatus::GetAirtimeVersion(),
"icecast"=>Application_Model_Systemstatus::GetIcecastStatus(),
"pypo"=>Application_Model_Systemstatus::GetPypoStatus(),
"liquidsoap"=>Application_Model_Systemstatus::GetLiquidsoapStatus(),
"show-recorder"=>Application_Model_Systemstatus::GetShowRecorderStatus(),
"media-monitor"=>Application_Model_Systemstatus::GetMediaMonitorStatus()
);
$this->view->status = $status;
}
}

View File

@ -7,6 +7,7 @@ class RabbitMqPlugin extends Zend_Controller_Plugin_Abstract
if (RabbitMq::$doPush) {
$md = array('schedule' => Schedule::GetScheduledPlaylists());
RabbitMq::SendMessageToPypo("update_schedule", $md);
RabbitMq::SendMessageToShowRecorder("update_schedule");
}
}
}
}

View File

@ -34,8 +34,6 @@ class RabbitMq
$channel->basic_publish($msg, $EXCHANGE);
$channel->close();
$conn->close();
self::SendMessageToShowRecorder("update_schedule");
}
public static function SendMessageToMediaMonitor($event_type, $md)

View File

@ -3,6 +3,50 @@
class Application_Model_Systemstatus
{
public static function GetPypoStatus(){
RabbitMq::SendMessageToPypo("get_status", array());
return array(
"process_id"=>500,
"uptime_seconds"=>3600
);
}
public static function GetLiquidsoapStatus(){
return array(
"process_id"=>500,
"uptime_seconds"=>3600
);
}
public static function GetShowRecorderStatus(){
return array(
"process_id"=>500,
"uptime_seconds"=>3600
);
}
public static function GetMediaMonitorStatus(){
return array(
"process_id"=>500,
"uptime_seconds"=>3600
);
}
public static function GetIcecastStatus(){
return array(
"process_id"=>500,
"uptime_seconds"=>3600
);
}
public static function GetAirtimeVersion(){
return AIRTIME_VERSION;
}
private function getCheckSystemResults(){
//exec("airtime-check-system", $output);

View File

@ -0,0 +1,3 @@
<?php
echo $status;

View File

@ -14,7 +14,7 @@ $PORT = 5672;
$USER = 'guest';
$PASS = 'guest';
$VHOST = '/';
$EXCHANGE = 'router';
$EXCHANGE = 'airtime-schedule';
$QUEUE = 'msgs';
$CONSUMER_TAG = 'consumer';
@ -23,7 +23,7 @@ $ch = $conn->channel();
$ch->access_request($VHOST, false, false, true, true);
$ch->queue_declare($QUEUE);
$ch->exchange_declare($EXCHANGE, 'direct', false, false, false);
$ch->exchange_declare($EXCHANGE, 'direct', false, true);
$ch->queue_bind($QUEUE, $EXCHANGE);
function process_message($msg) {

View File

@ -13,15 +13,17 @@ $PORT = 5672;
$USER = 'guest';
$PASS = 'guest';
$VHOST = '/';
$EXCHANGE = 'router';
$EXCHANGE = 'airtime-schedule';
$QUEUE = 'msgs';
$conn = new AMQPConnection($HOST, $PORT, $USER, $PASS);
$ch = $conn->channel();
$ch->access_request($VHOST, false, false, true, true);
$ch->exchange_declare($EXCHANGE, 'direct', false, false, false);
$ch->exchange_declare($EXCHANGE, 'direct', false, true);
$msg_body = implode(' ', array_slice($argv, 1));
$msg_body = json_encode(array("event_type"=>"get_status", "id"=>time()));
//$msg_body = '{"schedule":{"status":{"range":{"start":"2011-09-12 20:45:22","end":"2011-09-13 20:45:22"},"version":"1.1"},"playlists":[],"check":1,"stream_metadata":{"format":"","station_name":""}},"event_type":"update_schedule"}';
$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain'));
$ch->basic_publish($msg, $EXCHANGE);
@ -29,4 +31,5 @@ $ch->basic_publish($msg, $EXCHANGE);
echo "Sent message '".$msg_body."'\n";
$ch->close();
$conn->close();
?>

View File

@ -20,6 +20,7 @@ import filecmp
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
from kombu.exceptions import MessageStateError
from api_clients import api_client
@ -72,24 +73,37 @@ class PypoFetch(Thread):
Hopefully there is a better way to do this.
"""
def handle_message(self, body, message):
logger = logging.getLogger('fetch')
logger.info("Received event from RabbitMQ: " + message.body)
try:
logger = logging.getLogger('fetch')
logger.info("Received event from RabbitMQ: " + message.body)
m = json.loads(message.body)
command = m['event_type']
logger.info("Handling command: " + command)
m = json.loads(message.body)
command = m['event_type']
logger.info("Handling command: " + command)
if(command == 'update_schedule'):
self.schedule_data = m['schedule']
self.process_schedule(self.schedule_data, "scheduler", False)
elif (command == 'update_stream_setting'):
logger.info("Updating stream setting...")
self.regenerateLiquidsoapConf(m['setting'])
elif (command == 'cancel_current_show'):
logger.info("Cancel current show command received...")
self.stop_current_show()
# ACK the message to take it off the queue
message.ack()
if command == 'update_schedule':
self.schedule_data = m['schedule']
self.process_schedule(self.schedule_data, "scheduler", False)
elif command == 'update_stream_setting':
logger.info("Updating stream setting...")
self.regenerateLiquidsoapConf(m['setting'])
elif command == 'cancel_current_show':
logger.info("Cancel current show command received...")
self.stop_current_show()
elif command == 'get_status':
self.get_status()
except Exception, e:
logger.error("Exception in handling RabbitMQ message: %s", e)
finally:
# ACK the message to take it off the queue
try:
message.ack()
except MessageStateError, m:
logger.error("Message ACK error: %s", m);
def get_status(self):
logger = logging.getLogger('fetch')
logger.debug("get_status")
def stop_current_show(self):
logger = logging.getLogger('fetch')
@ -451,26 +465,24 @@ class PypoFetch(Thread):
# Wait for messages from RabbitMQ. Timeout if we
# dont get any after POLL_INTERVAL.
self.connection.drain_events(timeout=POLL_INTERVAL)
# Hooray for globals!
schedule_data = SCHEDULE_PUSH_MSG
status = 1
except socket.timeout, se:
# We didnt get a message for a while, so poll the server
# to get an updated schedule.
status, schedule_data = self.api_client.get_schedule()
status, self.schedule_data = self.api_client.get_schedule()
except Exception, e:
"""
This Generic exception is thrown whenever the RabbitMQ
Service is stopped. In this case let's check every few
seconds to see if it has come back up
"""
logger.info("Unknown exception")
logger.info("Exception, %s", e)
return
#return based on the exception
if status == 1:
self.process_schedule(schedule_data, "scheduler", False)
self.process_schedule(self.schedule_data, "scheduler", False)
loops += 1
"""