Merge branch 'devel' of dev.sourcefabric.org:airtime into devel

This commit is contained in:
Naomi Aro 2012-03-19 10:18:52 +01:00
commit b5f92dfa93
23 changed files with 527 additions and 315 deletions

View File

@ -62,6 +62,8 @@ class Config {
$CC_CONFIG['baseUrl'] = $values['general']['base_url']; $CC_CONFIG['baseUrl'] = $values['general']['base_url'];
$CC_CONFIG['basePort'] = $values['general']['base_port']; $CC_CONFIG['basePort'] = $values['general']['base_port'];
$CC_CONFIG['cache_ahead_hours'] = $values['general']['cache_ahead_hours'];
// Database config // Database config
$CC_CONFIG['dsn']['username'] = $values['database']['dbuser']; $CC_CONFIG['dsn']['username'] = $values['database']['dbuser'];
$CC_CONFIG['dsn']['password'] = $values['database']['dbpass']; $CC_CONFIG['dsn']['password'] = $values['database']['dbpass'];

View File

@ -150,8 +150,11 @@ class ApiController extends Zend_Controller_Action
//user clicks play button for track and downloads it. //user clicks play button for track and downloads it.
header('Content-Disposition: inline; filename="'.$file_base_name.'"'); header('Content-Disposition: inline; filename="'.$file_base_name.'"');
} }
if ($ext === 'mp3'){
$this->smartReadFile($filepath, 'audio/'.$ext); $this->smartReadFile($filepath, 'audio/mpeg');
} else {
$this->smartReadFile($filepath, 'audio/'.$ext);
}
exit; exit;
}else{ }else{
header ("HTTP/1.1 404 Not Found"); header ("HTTP/1.1 404 Not Found");

View File

@ -232,6 +232,9 @@ class PreferenceController extends Zend_Controller_Action
$this->view->statusMsg = "<div class='success'>Stream Setting Updated.</div>"; $this->view->statusMsg = "<div class='success'>Stream Setting Updated.</div>";
} }
} }
$this->view->confirm_pypo_restart_text = "Updating settings will temporarily interrupt any currently playing shows. Click \'OK\' to continue.";
$this->view->num_stream = $num_of_stream; $this->view->num_stream = $num_of_stream;
$this->view->enable_stream_conf = Application_Model_Preference::GetEnableStreamConf(); $this->view->enable_stream_conf = Application_Model_Preference::GetEnableStreamConf();
$this->view->form = $form; $this->view->form = $form;

View File

@ -778,15 +778,13 @@ class ScheduleController extends Zend_Controller_Action
try { try {
$showInstance = new Application_Model_ShowInstance($showInstanceId); $showInstance = new Application_Model_ShowInstance($showInstanceId);
} } catch(Exception $e) {
catch(Exception $e){
$this->view->show_error = true; $this->view->show_error = true;
return false; return false;
} }
$show = new Application_Model_Show($showInstance->getShowId()); $show = new Application_Model_Show($showInstance->getShowId());
$show->cancelShow($showInstance->getShowInstanceStart()); $show->cancelShow($showInstance->getShowInstanceStart());
$this->view->show_id = $showInstance->getShowId(); $this->view->show_id = $showInstance->getShowId();
} }
} }
@ -806,8 +804,8 @@ class ScheduleController extends Zend_Controller_Action
} }
$showInstance->clearShow(); $showInstance->clearShow();
$showInstance->delete(); $showInstance->delete();
// send 'cancel-current-show' command to pypo
Application_Model_RabbitMq::SendMessageToPypo("cancel_current_show", array()); Application_Model_RabbitMq::PushSchedule();
} }
} }

View File

@ -64,14 +64,16 @@ class Application_Model_Schedule {
$utcTimeNow = $date->getUtcTimestamp(); $utcTimeNow = $date->getUtcTimestamp();
$shows = Application_Model_Show::getPrevCurrentNext($utcTimeNow); $shows = Application_Model_Show::getPrevCurrentNext($utcTimeNow);
$previousShowID = count($shows['previousShow'])>0?$shows['previousShow'][0]['id']:null;
$currentShowID = count($shows['currentShow'])>0?$shows['currentShow'][0]['id']:null; $currentShowID = count($shows['currentShow'])>0?$shows['currentShow'][0]['id']:null;
$results = Application_Model_Schedule::GetPrevCurrentNext($currentShowID, $utcTimeNow); $nextShowID = count($shows['nextShow'])>0?$shows['nextShow'][0]['id']:null;
$results = Application_Model_Schedule::GetPrevCurrentNext($previousShowID, $currentShowID, $nextShowID, $utcTimeNow);
$range = array("env"=>APPLICATION_ENV, $range = array("env"=>APPLICATION_ENV,
"schedulerTime"=>$timeNow, "schedulerTime"=>$timeNow,
"previous"=>isset($results['previous'])?$results['previous']:(count($shows['previousShow'])>0?$shows['previousShow'][0]:null), "previous"=>$results['previous'] !=null?$results['previous']:(count($shows['previousShow'])>0?$shows['previousShow'][0]:null),
"current"=>isset($results['current'])?$results['current']:null, "current"=>$results['current'] !=null?$results['current']:null,
"next"=> isset($results['next'])?$results['next']:(count($shows['nextShow'])>0?$shows['nextShow'][0]:null), "next"=> $results['next'] !=null?$results['next']:(count($shows['nextShow'])>0?$shows['nextShow'][0]:null),
"currentShow"=>$shows['currentShow'], "currentShow"=>$shows['currentShow'],
"nextShow"=>$shows['nextShow'], "nextShow"=>$shows['nextShow'],
"timezone"=> date("T"), "timezone"=> date("T"),
@ -88,19 +90,39 @@ class Application_Model_Schedule {
* show types are not found through this mechanism a call is made to the old way of querying * show types are not found through this mechanism a call is made to the old way of querying
* the database to find the track info. * the database to find the track info.
**/ **/
public static function GetPrevCurrentNext($p_currentShowID, $p_timeNow) public static function GetPrevCurrentNext($p_previousShowID, $p_currentShowID, $p_nextShowID, $p_timeNow)
{ {
if ($p_previousShowID == null && $p_currentShowID == null && $p_nextShowID == null)
return;
global $CC_CONFIG, $CC_DBC; global $CC_CONFIG, $CC_DBC;
$sql = 'Select ft.artist_name, ft.track_title, st.starts as starts, st.ends as ends, st.media_item_played as media_item_played
if (!isset($p_currentShowID)) {
return array();
}
$sql = "Select ft.artist_name, ft.track_title, st.starts as starts, st.ends as ends, st.media_item_played as media_item_played
FROM cc_schedule st LEFT JOIN cc_files ft ON st.file_id = ft.id FROM cc_schedule st LEFT JOIN cc_files ft ON st.file_id = ft.id
WHERE st.instance_id = '$p_currentShowID' AND st.playout_status > 0 WHERE ';
ORDER BY st.starts";
if (isset($p_previousShowID)){
if (isset($p_nextShowID) || isset($p_currentShowID))
$sql .= '(';
$sql .= 'st.instance_id = '.$p_previousShowID;
}
if ($p_currentShowID != null){
if ($p_previousShowID != null)
$sql .= ' OR ';
else if($p_nextShowID != null)
$sql .= '(';
$sql .= 'st.instance_id = '.$p_currentShowID;
}
if ($p_nextShowID != null) {
if ($p_previousShowID != null || $p_currentShowID != null)
$sql .= ' OR ';
$sql .= 'st.instance_id = '.$p_nextShowID;
if($p_previousShowID != null || $p_currentShowID != null)
$sql .= ')';
} else if($p_previousShowID != null && $p_currentShowID != null)
$sql .= ')';
$sql .= ' AND st.playout_status > 0 ORDER BY st.starts';
//Logging::log($sql);
$rows = $CC_DBC->GetAll($sql); $rows = $CC_DBC->GetAll($sql);
$numberOfRows = count($rows); $numberOfRows = count($rows);
@ -587,7 +609,17 @@ class Application_Model_Schedule {
} }
if (is_null($p_fromDateTime)) { if (is_null($p_fromDateTime)) {
$t2 = new DateTime("@".time()); $t2 = new DateTime("@".time());
$t2->add(new DateInterval("PT30M"));
$cache_ahead_hours = $CC_CONFIG["cache_ahead_hours"];
if (is_numeric($cache_ahead_hours)){
//make sure we are not dealing with a float
$cache_ahead_hours = intval($cache_ahead_hours);
} else {
$cache_ahead_hours = 1;
}
$t2->add(new DateInterval("PT".$cache_ahead_hours."H"));
$range_end = $t2->format("Y-m-d H:i:s"); $range_end = $t2->format("Y-m-d H:i:s");
} else { } else {
$range_end = Application_Model_Schedule::PypoTimeToAirtimeTime($p_toDateTime); $range_end = Application_Model_Schedule::PypoTimeToAirtimeTime($p_toDateTime);

View File

@ -199,9 +199,6 @@ class Application_Model_Show {
->filterByDbShowId($this->_showId) ->filterByDbShowId($this->_showId)
->update(array('DbLastShow' => $timeinfo[0])); ->update(array('DbLastShow' => $timeinfo[0]));
//$sql = "DELETE FROM cc_show_instances
// WHERE starts >= '{$day_timestamp}' AND show_id = {$this->_showId}";
$sql = "UPDATE cc_show_instances $sql = "UPDATE cc_show_instances
SET modified_instance = TRUE SET modified_instance = TRUE
WHERE starts >= '{$day_timestamp}' AND show_id = {$this->_showId}"; WHERE starts >= '{$day_timestamp}' AND show_id = {$this->_showId}";
@ -1762,7 +1759,9 @@ class Application_Model_Show {
//Find the show that is within the current time. //Find the show that is within the current time.
if ((strtotime($rows[$i]['starts']) <= $timeNowAsMillis) && (strtotime($rows[$i]['ends']) >= $timeNowAsMillis)){ if ((strtotime($rows[$i]['starts']) <= $timeNowAsMillis) && (strtotime($rows[$i]['ends']) >= $timeNowAsMillis)){
if ( $i - 1 >= 0){ if ( $i - 1 >= 0){
$results['previousShow'][0] = array("name"=>$rows[$i-1]['name'], $results['previousShow'][0] = array(
"id"=>$rows[$i-1]['id'],
"name"=>$rows[$i-1]['name'],
"start_timestamp"=>$rows[$i-1]['start_timestamp'], "start_timestamp"=>$rows[$i-1]['start_timestamp'],
"end_timestamp"=>$rows[$i-1]['end_timestamp'], "end_timestamp"=>$rows[$i-1]['end_timestamp'],
"starts"=>$rows[$i-1]['starts'], "starts"=>$rows[$i-1]['starts'],
@ -1772,7 +1771,9 @@ class Application_Model_Show {
$results['currentShow'][0] = $rows[$i]; $results['currentShow'][0] = $rows[$i];
if ( isset($rows[$i+1])){ if ( isset($rows[$i+1])){
$results['nextShow'][0] = array("name"=>$rows[$i+1]['name'], $results['nextShow'][0] = array(
"id"=>$rows[$i+1]['id'],
"name"=>$rows[$i+1]['name'],
"start_timestamp"=>$rows[$i+1]['start_timestamp'], "start_timestamp"=>$rows[$i+1]['start_timestamp'],
"end_timestamp"=>$rows[$i+1]['end_timestamp'], "end_timestamp"=>$rows[$i+1]['end_timestamp'],
"starts"=>$rows[$i+1]['starts'], "starts"=>$rows[$i+1]['starts'],
@ -1787,7 +1788,9 @@ class Application_Model_Show {
} }
//if we hit this we know we've gone to far and can stop looping. //if we hit this we know we've gone to far and can stop looping.
if (strtotime($rows[$i]['starts']) > $timeNowAsMillis) { if (strtotime($rows[$i]['starts']) > $timeNowAsMillis) {
$results['nextShow'][0] = array("name"=>$rows[$i]['name'], $results['nextShow'][0] = array(
"id"=>$rows[$i]['id'],
"name"=>$rows[$i]['name'],
"start_timestamp"=>$rows[$i]['start_timestamp'], "start_timestamp"=>$rows[$i]['start_timestamp'],
"end_timestamp"=>$rows[$i]['end_timestamp'], "end_timestamp"=>$rows[$i]['end_timestamp'],
"starts"=>$rows[$i]['starts'], "starts"=>$rows[$i]['starts'],
@ -1798,7 +1801,9 @@ class Application_Model_Show {
//If we didn't find a a current show because the time didn't fit we may still have //If we didn't find a a current show because the time didn't fit we may still have
//found a previous show so use it. //found a previous show so use it.
if (count($results['previousShow']) == 0 && isset($previousShowIndex)) { if (count($results['previousShow']) == 0 && isset($previousShowIndex)) {
$results['previousShow'][0] = array("name"=>$rows[$previousShowIndex]['name'], $results['previousShow'][0] = array(
"id"=>$rows[$previousShowIndex]['id'],
"name"=>$rows[$previousShowIndex]['name'],
"start_timestamp"=>$rows[$previousShowIndex]['start_timestamp'], "start_timestamp"=>$rows[$previousShowIndex]['start_timestamp'],
"end_timestamp"=>$rows[$previousShowIndex]['end_timestamp'], "end_timestamp"=>$rows[$previousShowIndex]['end_timestamp'],
"starts"=>$rows[$previousShowIndex]['starts'], "starts"=>$rows[$previousShowIndex]['starts'],

View File

@ -1,7 +1,7 @@
<div class="ui-widget ui-widget-content block-shadow simple-formblock clearfix padded-strong stream-config"> <div class="ui-widget ui-widget-content block-shadow simple-formblock clearfix padded-strong stream-config">
<h2 <?php if($this->enable_stream_conf == "true"){?>style="float:left"<?php }?>>Stream Settings</h2> <h2 <?php if($this->enable_stream_conf == "true"){?>style="float:left"<?php }?>>Stream Settings</h2>
<?php if($this->enable_stream_conf == "true"){?> <?php if($this->enable_stream_conf == "true"){?>
<form method="post" action="/Preference/stream-setting" enctype="application/x-www-form-urlencoded"> <form method="post" action="/Preference/stream-setting" enctype="application/x-www-form-urlencoded" onsubmit="return confirm('<?php echo $this->confirm_pypo_restart_text ?>');">
<div class="button-bar bottom" id="submit-element" style="float:right"> <div class="button-bar bottom" id="submit-element" style="float:right">
<input type="submit" class="ui-button ui-state-default right-floated" value="Save" id="Save" name="Save" /> <input type="submit" class="ui-button ui-state-default right-floated" value="Save" id="Save" name="Save" />
</div> </div>

View File

@ -18,6 +18,10 @@ airtime_dir = x
base_url = localhost base_url = localhost
base_port = 80 base_port = 80
#How many hours ahead of time should Airtime playout engine (PYPO)
#cache scheduled media files.
cache_ahead_hours = 1
[soundcloud] [soundcloud]
connection_retries = 3 connection_retries = 3
time_between_retries = 60 time_between_retries = 60

View File

@ -8,16 +8,22 @@ var _idToPostionLookUp;
*/ */
$(document).ready(function(){ $(document).ready(function(){
if (useFlash())
mySupplied = "oga, mp3, m4v";
else
mySupplied = "oga, mp3";
_playlist_jplayer = new jPlayerPlaylist({ _playlist_jplayer = new jPlayerPlaylist({
jPlayer: "#jquery_jplayer_1", jPlayer: "#jquery_jplayer_1",
cssSelectorAncestor: "#jp_container_1" cssSelectorAncestor: "#jp_container_1"
},[], //array of songs will be filled with below's json call },[], //array of songs will be filled with below's json call
{ {
swfPath: "/js/jplayer", swfPath: "/js/jplayer",
//supplied: "mp3,oga", supplied:mySupplied,
wmode: "window" wmode: "window"
}); });
$.jPlayer.timeFormat.showHour = true; $.jPlayer.timeFormat.showHour = true;
var audioFileID = $('.audioFileID').text(); var audioFileID = $('.audioFileID').text();
@ -35,6 +41,10 @@ $(document).ready(function(){
} }
}); });
function useFlash() {
console.log(navigator.userAgent);
return navigator.userAgent.toLowerCase().match('firefox');
}
/** /**
* Sets up the jPlayerPlaylist to play. * Sets up the jPlayerPlaylist to play.
* - Get the playlist info based on the playlistID give. * - Get the playlist info based on the playlistID give.
@ -122,7 +132,7 @@ function play(p_playlistIndex){
function playOne(p_audioFileID) { function playOne(p_audioFileID) {
var playlist = new Array(); var playlist = new Array();
var fileExtensioin = p_audioFileID.split('.').pop(); var fileExtensioin = p_audioFileID.split('.').pop();
console.log(p_audioFileID);
if (fileExtensioin === 'mp3') { if (fileExtensioin === 'mp3') {
media = {title: $('.audioFileTitle').text() !== 'null' ?$('.audioFileTitle').text():"", media = {title: $('.audioFileTitle').text() !== 'null' ?$('.audioFileTitle').text():"",
artist: $('.audioFileArtist').text() !== 'null' ?$('.audioFileArtist').text():"", artist: $('.audioFileArtist').text() !== 'null' ?$('.audioFileArtist').text():"",
@ -135,7 +145,7 @@ function playOne(p_audioFileID) {
}; };
} }
_playlist_jplayer.option("autoPlay", true); _playlist_jplayer.option("autoPlay", true);
console.log(media);
playlist[0] = media; playlist[0] = media;
//_playlist_jplayer.setPlaylist(playlist); --if I use this the player will call _init on the setPlaylist and on the ready //_playlist_jplayer.setPlaylist(playlist); --if I use this the player will call _init on the setPlaylist and on the ready
_playlist_jplayer._initPlaylist(playlist); _playlist_jplayer._initPlaylist(playlist);

View File

@ -0,0 +1,131 @@
import os
import time
import shutil
import sys
import logging
from configobj import ConfigObj
from subprocess import Popen, PIPE
from api_clients import api_client as apc
"""
The purpose of this script is that you can run it, and it will compare what the database has to what your filesystem
has. It will then report if there are any differences. It will *NOT* make any changes, unlike media-monitor which uses
similar code when it starts up (but then makes changes if something is different)
"""
class AirtimeMediaMonitorBootstrap():
"""AirtimeMediaMonitorBootstrap constructor
Keyword Arguments:
logger -- reference to the media-monitor logging facility
pe -- reference to an instance of ProcessEvent
api_clients -- reference of api_clients to communicate with airtime-server
"""
def __init__(self):
config = ConfigObj('/etc/airtime/media-monitor.cfg')
self.api_client = apc.api_client_factory(config)
"""
try:
logging.config.fileConfig("logging.cfg")
except Exception, e:
print 'Error configuring logging: ', e
sys.exit(1)
"""
self.logger = logging.getLogger()
self.logger.info("Adding %s on watch list...", "xxx")
self.scan()
"""On bootup we want to scan all directories and look for files that
weren't there or files that changed before media-monitor process
went offline.
"""
def scan(self):
directories = self.get_list_of_watched_dirs();
self.logger.info("watched directories found: %s", directories)
for id, dir in directories.iteritems():
self.logger.debug("%s, %s", id, dir)
#CHANGED!!!
#self.sync_database_to_filesystem(id, api_client.encode_to(dir, "utf-8"))
self.sync_database_to_filesystem(id, dir)
"""Gets a list of files that the Airtime database knows for a specific directory.
You need to provide the directory's row ID, which is obtained when calling
get_list_of_watched_dirs function.
dir_id -- row id of the directory in the cc_watched_dirs database table
"""
def list_db_files(self, dir_id):
return self.api_client.list_all_db_files(dir_id)
"""
returns the path and the database row id for this path for all watched directories. Also
returns the Stor directory, which can be identified by its row id (always has value of "1")
"""
def get_list_of_watched_dirs(self):
json = self.api_client.list_all_watched_dirs()
return json["dirs"]
def scan_dir_for_existing_files(self, dir):
command = 'find "%s" -type f -iname "*.ogg" -o -iname "*.mp3" -readable' % dir.replace('"', '\\"')
self.logger.debug(command)
#CHANGED!!
stdout = self.exec_command(command).decode("UTF-8")
return stdout.splitlines()
def exec_command(self, command):
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
self.logger.warn("command \n%s\n return with a non-zero return value", command)
self.logger.error(stderr)
return stdout
"""
This function takes in a path name provided by the database (and its corresponding row id)
and reads the list of files in the local file system. Its purpose is to discover which files
exist on the file system but not in the database and vice versa, as well as which files have
been modified since the database was last updated. In each case, this method will call an
appropiate method to ensure that the database actually represents the filesystem.
dir_id -- row id of the directory in the cc_watched_dirs database table
dir -- pathname of the directory
"""
def sync_database_to_filesystem(self, dir_id, dir):
"""
set to hold new and/or modified files. We use a set to make it ok if files are added
twice. This is because some of the tests for new files return result sets that are not
mutually exclusive from each other.
"""
db_known_files_set = set()
files = self.list_db_files(dir_id)
for file in files['files']:
db_known_files_set.add(file)
existing_files = self.scan_dir_for_existing_files(dir)
existing_files_set = set()
for file_path in existing_files:
if len(file_path.strip(" \n")) > 0:
existing_files_set.add(file_path[len(dir):])
deleted_files_set = db_known_files_set - existing_files_set
new_files_set = existing_files_set - db_known_files_set
print ("DB Known files: \n%s\n\n"%len(db_known_files_set))
print ("FS Known files: \n%s\n\n"%len(existing_files_set))
print ("Deleted files: \n%s\n\n"%deleted_files_set)
print ("New files: \n%s\n\n"%new_files_set)
if __name__ == "__main__":
AirtimeMediaMonitorBootstrap()

View File

@ -0,0 +1,2 @@
[main]
liquidsoap_tar_url = http://dl.dropbox.com/u/256410/airtime/liquidsoap.tar.gz

View File

@ -10,6 +10,8 @@ import sys
from fabric.api import * from fabric.api import *
from fabric.contrib.files import comment, sed, append from fabric.contrib.files import comment, sed, append
from ConfigParser import ConfigParser
from xml.dom.minidom import parse from xml.dom.minidom import parse
from xml.dom.minidom import Node from xml.dom.minidom import Node
from xml.dom.minidom import Element from xml.dom.minidom import Element
@ -158,6 +160,12 @@ def debian_squeeze_64(fresh_os=True):
def compile_liquidsoap(filename="liquidsoap"): def compile_liquidsoap(filename="liquidsoap"):
config = ConfigParser()
config.readfp(open('fab_liquidsoap_compile.cfg'))
url = config.get('main', 'liquidsoap_tar_url')
print "Will get liquidsoap from " + url
do_sudo('apt-get update') do_sudo('apt-get update')
do_sudo('apt-get upgrade -y --force-yes') do_sudo('apt-get upgrade -y --force-yes')
do_sudo('apt-get install -y --force-yes ocaml-findlib libao-ocaml-dev libportaudio-ocaml-dev ' + \ do_sudo('apt-get install -y --force-yes ocaml-findlib libao-ocaml-dev libportaudio-ocaml-dev ' + \
@ -171,14 +179,15 @@ def compile_liquidsoap(filename="liquidsoap"):
do_run('mkdir -p %s' % root) do_run('mkdir -p %s' % root)
tmpPath = do_local("mktemp", capture=True) tmpPath = do_local("mktemp", capture=True)
do_run('wget %s -O %s' % ('https://downloads.sourceforge.net/project/savonet/liquidsoap/1.0.0/liquidsoap-1.0.0-full.tar.bz2', tmpPath)) do_run('wget %s -O %s' % (url, tmpPath))
do_run('mv %s %s/liquidsoap-1.0.0-full.tar.bz2' % (tmpPath, root)) do_run('mv %s %s/liquidsoap.tar.gz' % (tmpPath, root))
do_run('cd %s && bunzip2 liquidsoap-1.0.0-full.tar.bz2 && tar xf liquidsoap-1.0.0-full.tar' % root) do_run('cd %s && tar xzf liquidsoap.tar.gz' % root)
do_run('cd %s/liquidsoap-1.0.0-full && cp PACKAGES.minimal PACKAGES' % root) do_run('cd %s/savonet && cp PACKAGES.minimal PACKAGES' % root)
sed('%s/liquidsoap-1.0.0-full/PACKAGES' % root, '#ocaml-portaudio', 'ocaml-portaudio') sed('%s/savonet/PACKAGES' % root, '#ocaml-portaudio', 'ocaml-portaudio')
sed('%s/liquidsoap-1.0.0-full/PACKAGES' % root, '#ocaml-alsa', 'ocaml-alsa') sed('%s/savonet/PACKAGES' % root, '#ocaml-alsa', 'ocaml-alsa')
sed('%s/liquidsoap-1.0.0-full/PACKAGES' % root, '#ocaml-pulseaudio', 'ocaml-pulseaudio') sed('%s/savonet/PACKAGES' % root, '#ocaml-pulseaudio', 'ocaml-pulseaudio')
do_run('cd %s/liquidsoap-1.0.0-full && ./configure' % root) do_run('cd %s/savonet && ./bootstrap' % root)
do_run('cd %s/liquidsoap-1.0.0-full && make' % root) do_run('cd %s/savonet && ./configure' % root)
get('%s/liquidsoap-1.0.0-full/liquidsoap-1.0.0/src/liquidsoap' % root, filename) do_run('cd %s/savonet && make' % root)
get('%s/savonet/liquidsoap/src/liquidsoap' % root, filename)

View File

@ -42,7 +42,7 @@ class AirtimeMediaMonitorBootstrap():
for id, dir in directories.iteritems(): for id, dir in directories.iteritems():
self.logger.debug("%s, %s", id, dir) self.logger.debug("%s, %s", id, dir)
self.sync_database_to_filesystem(id, api_client.encode_to(dir, "utf-8")) self.sync_database_to_filesystem(id, dir)
"""Gets a list of files that the Airtime database knows for a specific directory. """Gets a list of files that the Airtime database knows for a specific directory.
You need to provide the directory's row ID, which is obtained when calling You need to provide the directory's row ID, which is obtained when calling
@ -82,7 +82,7 @@ class AirtimeMediaMonitorBootstrap():
db_known_files_set = set() db_known_files_set = set()
files = self.list_db_files(dir_id) files = self.list_db_files(dir_id)
for file in files['files']: for file in files['files']:
db_known_files_set.add(api_client.encode_to(file, 'utf-8')) db_known_files_set.add(file)
new_files = self.mmc.scan_dir_for_new_files(dir) new_files = self.mmc.scan_dir_for_new_files(dir)
@ -123,9 +123,6 @@ class AirtimeMediaMonitorBootstrap():
new_files_set = all_files_set - db_known_files_set new_files_set = all_files_set - db_known_files_set
modified_files_set = new_and_modified_files - new_files_set modified_files_set = new_and_modified_files - new_files_set
#NAOMI: Please comment out the "Known files" line, if you find the bug.
#it is for debugging purposes only (Too much data will be written to log). -mk
#self.logger.info("Known files: \n%s\n\n"%db_known_files_set)
self.logger.info("Deleted files: \n%s\n\n"%deleted_files_set) self.logger.info("Deleted files: \n%s\n\n"%deleted_files_set)
self.logger.info("New files: \n%s\n\n"%new_files_set) self.logger.info("New files: \n%s\n\n"%new_files_set)
self.logger.info("Modified files: \n%s\n\n"%modified_files_set) self.logger.info("Modified files: \n%s\n\n"%modified_files_set)

View File

@ -251,12 +251,29 @@ class MediaMonitorCommon:
if p.returncode != 0: if p.returncode != 0:
self.logger.warn("command \n%s\n return with a non-zero return value", command) self.logger.warn("command \n%s\n return with a non-zero return value", command)
self.logger.error(stderr) self.logger.error(stderr)
try:
"""
File name charset encoding is UTF-8.
"""
stdout = stdout.decode("UTF-8")
except Exception, e:
self.logger.error("Could not decode %s using UTF-8" % stdout)
return stdout return stdout
def scan_dir_for_new_files(self, dir): def scan_dir_for_new_files(self, dir):
command = 'find "%s" -type f -iname "*.ogg" -o -iname "*.mp3" -readable' % dir.replace('"', '\\"') command = 'find "%s" -type f -iname "*.ogg" -o -iname "*.mp3" -readable' % dir.replace('"', '\\"')
self.logger.debug(command) self.logger.debug(command)
stdout = self.exec_command(command) stdout = self.exec_command(command).decode("UTF-8")
try:
"""
File name charset encoding is UTF-8.
"""
stdout = stdout.decode("UTF-8")
except Exception, e:
self.logger.error("Could not decode %s using UTF-8" % stdout)
return stdout.splitlines() return stdout.splitlines()

View File

View File

@ -95,7 +95,7 @@ class Notify:
logger.debug("Response: "+json.dumps(response)) logger.debug("Response: "+json.dumps(response))
def notify_source_status(self, source_name, status): def notify_source_status(self, source_name, status):
logger = logging.getLogger() logger = logging.getLogger("notify")
logger.debug('#################################################') logger.debug('#################################################')
logger.debug('# Calling server to update source status #') logger.debug('# Calling server to update source status #')

View File

@ -5,13 +5,14 @@ Python part of radio playout (pypo)
import time import time
from optparse import * from optparse import *
import sys import sys
import os
import signal import signal
import logging import logging
import logging.config import logging.config
import logging.handlers import logging.handlers
from Queue import Queue from Queue import Queue
from threading import Lock
from pypopush import PypoPush from pypopush import PypoPush
from pypofetch import PypoFetch from pypofetch import PypoFetch
from pypofile import PypoFile from pypofile import PypoFile
@ -125,6 +126,8 @@ if __name__ == '__main__':
recorder_q = Queue() recorder_q = Queue()
pypoPush_q = Queue() pypoPush_q = Queue()
telnet_lock = Lock()
""" """
This queue is shared between pypo-fetch and pypo-file, where pypo-file This queue is shared between pypo-fetch and pypo-file, where pypo-file
is the receiver. Pypo-fetch will send every schedule it gets to pypo-file is the receiver. Pypo-fetch will send every schedule it gets to pypo-file
@ -141,11 +144,11 @@ if __name__ == '__main__':
pfile.daemon = True pfile.daemon = True
pfile.start() pfile.start()
pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q) pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock)
pf.daemon = True pf.daemon = True
pf.start() pf.start()
pp = PypoPush(pypoPush_q) pp = PypoPush(pypoPush_q, telnet_lock)
pp.daemon = True pp.daemon = True
pp.start() pp.start()

View File

@ -1,22 +1,13 @@
import os import os
import sys import sys
import time import time
import calendar
import logging import logging
import logging.config import logging.config
import shutil import shutil
import random
import string
import json import json
import telnetlib import telnetlib
import math
import copy import copy
from threading import Thread from threading import Thread
from subprocess import Popen, PIPE
from datetime import datetime
from datetime import timedelta
from Queue import Empty
import filecmp
from api_clients import api_client from api_clients import api_client
@ -38,13 +29,15 @@ except Exception, e:
sys.exit() sys.exit()
class PypoFetch(Thread): class PypoFetch(Thread):
def __init__(self, pypoFetch_q, pypoPush_q, media_q): def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock):
Thread.__init__(self) Thread.__init__(self)
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.fetch_queue = pypoFetch_q self.fetch_queue = pypoFetch_q
self.push_queue = pypoPush_q self.push_queue = pypoPush_q
self.media_prepare_queue = media_q self.media_prepare_queue = media_q
self.telnet_lock = telnet_lock
self.logger = logging.getLogger(); self.logger = logging.getLogger();
self.cache_dir = os.path.join(config["cache_dir"], "scheduler") self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
@ -79,7 +72,7 @@ class PypoFetch(Thread):
if command == 'update_schedule': if command == 'update_schedule':
self.schedule_data = m['schedule'] self.schedule_data = m['schedule']
self.process_schedule(self.schedule_data, False) self.process_schedule(self.schedule_data)
elif command == 'update_stream_setting': elif command == 'update_stream_setting':
self.logger.info("Updating stream setting...") self.logger.info("Updating stream setting...")
self.regenerateLiquidsoapConf(m['setting']) self.regenerateLiquidsoapConf(m['setting'])
@ -89,9 +82,6 @@ class PypoFetch(Thread):
elif command == 'update_station_name': elif command == 'update_station_name':
self.logger.info("Updating station name...") self.logger.info("Updating station name...")
self.update_liquidsoap_station_name(m['station_name']) self.update_liquidsoap_station_name(m['station_name'])
elif command == 'cancel_current_show':
self.logger.info("Cancel current show command received...")
self.stop_current_show()
elif command == 'switch_source': elif command == 'switch_source':
self.logger.info("switch_on_source show command received...") self.logger.info("switch_on_source show command received...")
self.switch_source(m['sourcename'], m['status']) self.switch_source(m['sourcename'], m['status'])
@ -113,14 +103,16 @@ class PypoFetch(Thread):
elif(sourcename == "live_dj"): elif(sourcename == "live_dj"):
command += "live_dj_harbor.kick\n" command += "live_dj_harbor.kick\n"
self.telnet_lock.acquire()
try: try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write(command) tn.write(command)
tn.write('exit\n') tn.write('exit\n')
tn.read_all() tn.read_all()
except Exception, e: except Exception, e:
self.logger.debug(e) self.logger.error(str(e))
self.logger.debug('Could not connect to liquidsoap') finally:
self.telnet_lock.release()
def switch_source(self, sourcename, status): def switch_source(self, sourcename, status):
self.logger.debug('Switching source: %s to "%s" status', sourcename, status) self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
@ -137,14 +129,16 @@ class PypoFetch(Thread):
else: else:
command += "stop\n" command += "stop\n"
self.telnet_lock.acquire()
try: try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write(command) tn.write(command)
tn.write('exit\n') tn.write('exit\n')
tn.read_all() tn.read_all()
except Exception, e: except Exception, e:
self.logger.debug(e) self.logger.error(str(e))
self.logger.debug('Could not connect to liquidsoap') finally:
self.telnet_lock.release()
""" """
This check current switch status from Airtime and update the status This check current switch status from Airtime and update the status
@ -156,17 +150,6 @@ class PypoFetch(Thread):
for k, v in switch_status['status'].iteritems(): for k, v in switch_status['status'].iteritems():
self.switch_source(k, v) self.switch_source(k, v)
def stop_current_show(self):
self.logger.debug('Notifying Liquidsoap to stop playback.')
try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write('source.skip\n')
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.debug(e)
self.logger.debug('Could not connect to liquidsoap')
def regenerateLiquidsoapConf(self, setting_p): def regenerateLiquidsoapConf(self, setting_p):
existing = {} existing = {}
# create a temp file # create a temp file
@ -252,19 +235,19 @@ class PypoFetch(Thread):
fh.write("# THIS FILE IS AUTO GENERATED. DO NOT CHANGE!! #\n") fh.write("# THIS FILE IS AUTO GENERATED. DO NOT CHANGE!! #\n")
fh.write("################################################\n") fh.write("################################################\n")
for k, d in setting: for k, d in setting:
buffer = d[u'keyname'] + " = " buffer_str = d[u'keyname'] + " = "
if(d[u'type'] == 'string'): if(d[u'type'] == 'string'):
temp = d[u'value'] temp = d[u'value']
if(temp == ""): if(temp == ""):
temp = "" temp = ""
buffer += "\"" + temp + "\"" buffer_str += "\"" + temp + "\""
else: else:
temp = d[u'value'] temp = d[u'value']
if(temp == ""): if(temp == ""):
temp = "0" temp = "0"
buffer += temp buffer_str += temp
buffer += "\n" buffer_str += "\n"
fh.write(api_client.encode_to(buffer)) fh.write(api_client.encode_to(buffer_str))
fh.write("log_file = \"/var/log/airtime/pypo-liquidsoap/<script>.log\"\n"); fh.write("log_file = \"/var/log/airtime/pypo-liquidsoap/<script>.log\"\n");
fh.close() fh.close()
# restarting pypo. # restarting pypo.
@ -280,17 +263,25 @@ class PypoFetch(Thread):
updates the status of liquidsoap connection to the streaming server updates the status of liquidsoap connection to the streaming server
This fucntion updates the bootup time variable in liquidsoap script This fucntion updates the bootup time variable in liquidsoap script
""" """
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
# update the boot up time of liquidsoap. Since liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
boot_up_time_command = "vars.bootup_time "+str(current_time)+"\n"
tn.write(boot_up_time_command)
tn.write("streams.connection_status\n")
tn.write('exit\n')
output = tn.read_all() self.telnet_lock.acquire()
try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
# update the boot up time of liquidsoap. Since liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
boot_up_time_command = "vars.bootup_time "+str(current_time)+"\n"
tn.write(boot_up_time_command)
tn.write("streams.connection_status\n")
tn.write('exit\n')
output = tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
output_list = output.split("\r\n") output_list = output.split("\r\n")
stream_info = output_list[2] stream_info = output_list[2]
@ -311,8 +302,7 @@ class PypoFetch(Thread):
# Push stream metadata to liquidsoap # Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try: try:
self.logger.info(LS_HOST) self.telnet_lock.acquire()
self.logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
self.logger.info(command) self.logger.info(command)
@ -321,6 +311,8 @@ class PypoFetch(Thread):
tn.read_all() tn.read_all()
except Exception, e: except Exception, e:
self.logger.error("Exception %s", e) self.logger.error("Exception %s", e)
finally:
self.telnet_lock.release()
def update_liquidsoap_station_name(self, station_name): def update_liquidsoap_station_name(self, station_name):
# Push stream metadata to liquidsoap # Push stream metadata to liquidsoap
@ -328,12 +320,19 @@ class PypoFetch(Thread):
try: try:
self.logger.info(LS_HOST) self.logger.info(LS_HOST)
self.logger.info(LS_PORT) self.logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.station_name %s\n' % station_name).encode('utf-8') self.telnet_lock.acquire()
self.logger.info(command) try:
tn.write(command) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write('exit\n') command = ('vars.station_name %s\n' % station_name).encode('utf-8')
tn.read_all() self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
except Exception, e: except Exception, e:
self.logger.error("Exception %s", e) self.logger.error("Exception %s", e)
@ -345,7 +344,7 @@ class PypoFetch(Thread):
to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
- runs the cleanup routine, to get rid of unused cached files - runs the cleanup routine, to get rid of unused cached files
""" """
def process_schedule(self, schedule_data, bootstrapping): def process_schedule(self, schedule_data):
self.logger.debug(schedule_data) self.logger.debug(schedule_data)
media = schedule_data["media"] media = schedule_data["media"]
@ -369,7 +368,7 @@ class PypoFetch(Thread):
media_item['dst'] = dst media_item['dst'] = dst
self.media_prepare_queue.put(copy.copy(media)) self.media_prepare_queue.put(copy.copy(media))
self.prepare_media(media, bootstrapping) self.prepare_media(media)
except Exception, e: self.logger.error("%s", e) except Exception, e: self.logger.error("%s", e)
# Send the data to pypo-push # Send the data to pypo-push
@ -378,85 +377,27 @@ class PypoFetch(Thread):
# cleanup # cleanup
try: self.cleanup(media) try: self.cache_cleanup(media)
except Exception, e: self.logger.error("%s", e) except Exception, e: self.logger.error("%s", e)
def prepare_media(self, media, bootstrapping): def prepare_media(self, media):
""" """
Iterate through the list of media items in "media" and Iterate through the list of media items in "media" append some
download them. attributes such as show_name
""" """
try: try:
mediaKeys = sorted(media.iterkeys()) mediaKeys = sorted(media.iterkeys())
for mkey in mediaKeys: for mkey in mediaKeys:
self.logger.debug("Media item starting at %s", mkey)
media_item = media[mkey] media_item = media[mkey]
if bootstrapping:
self.check_for_previous_crash(media_item)
entry = self.create_liquidsoap_annotation(media_item)
media_item['show_name'] = "TODO" media_item['show_name'] = "TODO"
media_item["annotation"] = entry
except Exception, e: except Exception, e:
self.logger.error("%s", e) self.logger.error("%s", e)
return media return media
def create_liquidsoap_annotation(self, media):
"""entry = \
'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], 0, \
float(media['fade_in']) / 1000, \
float(media['fade_out']) / 1000, \
float(media['cue_in']), \
float(media['cue_out']), \
media['row_id'], media['dst'])"""
entry = \
'annotate:media_id="%s",liq_start_next="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], 0, \
float(media['cue_in']), \
float(media['cue_out']), \
media['row_id'], media['dst'])
return entry
def check_for_previous_crash(self, media_item):
start = media_item['start']
end = media_item['end']
dtnow = datetime.utcnow()
str_tnow_s = dtnow.strftime('%Y-%m-%d-%H-%M-%S')
if start <= str_tnow_s and str_tnow_s < end:
#song is currently playing and we just started pypo. Maybe there
#was a power outage? Let's restart playback of this song.
start_split = map(int, start.split('-'))
media_start = datetime(start_split[0], start_split[1], start_split[2], start_split[3], start_split[4], start_split[5], 0, None)
self.logger.debug("Found media item that started at %s.", media_start)
delta = dtnow - media_start #we get a TimeDelta object from this operation
self.logger.info("Starting media item at %d second point", delta.seconds)
"""
Set the cue_in. This is used by Liquidsoap to determine at what point in the media
item it should start playing. If the cue_in happens to be > cue_out, then make cue_in = cue_out
"""
media_item['cue_in'] = delta.seconds + 10 if delta.seconds + 10 < media_item['cue_out'] else media_item['cue_out']
"""
Set the start time, which is used by pypo-push to determine when a media item is scheduled.
Pushing the start time into the future will ensure pypo-push will push this to Liquidsoap.
"""
td = timedelta(seconds=10)
media_item['start'] = (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S')
self.logger.info("Crash detected, setting playlist to restart at %s", (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S'))
def handle_media_file(self, media_item, dst): def handle_media_file(self, media_item, dst):
""" """
Download and cache the media item. Download and cache the media item.
@ -489,31 +430,58 @@ class PypoFetch(Thread):
def copy_file(self, media_item, dst): def copy_file(self, media_item, dst):
""" """
Copy the file from local library directory. Copy the file from local library directory. Note that we are not using os.path.exists
since this can lie to us! It seems the best way to get whether a file exists is to actually
do an operation on the file (such as query its size). Getting the file size of a non-existent
file will throw an exception, so we can look for this exception instead of os.path.exists.
""" """
if not os.path.isfile(dst):
self.logger.debug("copying from %s to local cache %s" % (media_item['uri'], dst)) src = media_item['uri']
try:
shutil.copy(media_item['uri'], dst) try:
except: src_size = os.path.getsize(src)
self.logger.error("Could not copy from %s to %s" % (media_item['uri'], dst)) except Exception, e:
self.logger.error("Could not get size of source file: %s", src)
return
dst_exists = True
try:
dst_size = os.path.getsize(dst)
except Exception, e:
dst_exists = False
do_copy = False
if dst_exists:
if src_size != dst_size:
do_copy = True
else: else:
#file already exists do_copy = True
pass
if do_copy:
self.logger.debug("copying from %s to local cache %s" % (src, dst))
try:
"""
copy will overwrite dst if it already exists
"""
shutil.copy(src, dst)
except:
self.logger.error("Could not copy from %s to %s" % (src, dst))
"""
def download_file(self, media_item, dst): def download_file(self, media_item, dst):
""" #Download a file from a remote server and store it in the cache.
Download a file from a remote server and store it in the cache.
"""
if os.path.isfile(dst): if os.path.isfile(dst):
pass pass
#self.logger.debug("file already in cache: %s", dst) #self.logger.debug("file already in cache: %s", dst)
else: else:
self.logger.debug("try to download %s", media_item['uri']) self.logger.debug("try to download %s", media_item['uri'])
self.api_client.get_media(media_item['uri'], dst) self.api_client.get_media(media_item['uri'], dst)
"""
def cleanup(self, media): def cache_cleanup(self, media):
""" """
Get list of all files in the cache dir and remove them if they aren't being used anymore. Get list of all files in the cache dir and remove them if they aren't being used anymore.
Input dict() media, lists all files that are scheduled or currently playing. Not being in this Input dict() media, lists all files that are scheduled or currently playing. Not being in this
@ -540,7 +508,7 @@ class PypoFetch(Thread):
success, self.schedule_data = self.api_client.get_schedule() success, self.schedule_data = self.api_client.get_schedule()
if success: if success:
self.logger.info("Bootstrap schedule received: %s", self.schedule_data) self.logger.info("Bootstrap schedule received: %s", self.schedule_data)
self.process_schedule(self.schedule_data, True) self.process_schedule(self.schedule_data)
self.check_switch_status() self.check_switch_status()
loops = 1 loops = 1
@ -567,7 +535,7 @@ class PypoFetch(Thread):
success, self.schedule_data = self.api_client.get_schedule() success, self.schedule_data = self.api_client.get_schedule()
if success: if success:
self.process_schedule(self.schedule_data, False) self.process_schedule(self.schedule_data)
loops += 1 loops += 1

View File

@ -6,7 +6,7 @@ import logging
import logging.config import logging.config
import shutil import shutil
import os import os
import time import sys
# configure logging # configure logging
logging.config.fileConfig("logging.cfg") logging.config.fileConfig("logging.cfg")
@ -21,7 +21,7 @@ try:
except Exception, e: except Exception, e:
logger = logging.getLogger() logger = logging.getLogger()
logger.error('Error loading config file: %s', e) logger.error('Error loading config file: %s', e)
sys.exit() sys.exit(1)
class PypoFile(Thread): class PypoFile(Thread):

View File

@ -6,8 +6,7 @@ from threading import Thread
import time import time
# For RabbitMQ # For RabbitMQ
from kombu.connection import BrokerConnection from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer from kombu.messaging import Exchange, Queue
from kombu.exceptions import MessageStateError
from kombu.simple import SimpleQueue from kombu.simple import SimpleQueue
import json import json
@ -71,9 +70,6 @@ class PypoMessageHandler(Thread):
elif command == 'update_station_name': elif command == 'update_station_name':
self.logger.info("Updating station name...") self.logger.info("Updating station name...")
self.pypo_queue.put(message) self.pypo_queue.put(message)
elif command == 'cancel_current_show':
self.logger.info("Cancel current show command received...")
self.pypo_queue.put(message)
elif command == 'switch_source': elif command == 'switch_source':
self.logger.info("switch_source command received...") self.logger.info("switch_source command received...")
self.pypo_queue.put(message) self.pypo_queue.put(message)

View File

@ -1,23 +1,19 @@
import os from datetime import datetime
from datetime import timedelta
import sys import sys
import time import time
import logging import logging
import logging.config import logging.config
import logging.handlers
import pickle
import telnetlib import telnetlib
import calendar import calendar
import json import json
import math import math
""" from Queue import Empty
It is possible to use a list as a queue, where the first element added is the first element
retrieved ("first-in, first-out"); however, lists are not efficient for this purpose. Let's use
"deque"
"""
from collections import deque
from threading import Thread from threading import Thread
from api_clients import api_client from api_clients import api_client
from configobj import ConfigObj from configobj import ConfigObj
@ -38,15 +34,16 @@ except Exception, e:
sys.exit() sys.exit()
class PypoPush(Thread): class PypoPush(Thread):
def __init__(self, q): def __init__(self, q, telnet_lock):
Thread.__init__(self) Thread.__init__(self)
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.queue = q self.queue = q
self.media = dict() self.media = dict()
self.liquidsoap_state_play = True self.telnet_lock = telnet_lock
self.push_ahead = 10
self.push_ahead = 5
self.last_end_time = 0 self.last_end_time = 0
self.pushed_objects = {} self.pushed_objects = {}
@ -62,57 +59,75 @@ class PypoPush(Thread):
""" """
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
self.logger.debug('liquidsoap_queue_approx %s', liquidsoap_queue_approx)
timenow = time.time()
# get a new schedule from pypo-fetch
if not self.queue.empty():
# make sure we get the latest schedule
while not self.queue.empty():
self.media = self.queue.get()
try:
self.media = self.queue.get(block=True, timeout=PUSH_INTERVAL)
if not self.queue.empty():
while not self.queue.empty():
self.media = self.queue.get()
self.logger.debug("Received data from pypo-fetch") self.logger.debug("Received data from pypo-fetch")
self.logger.debug('media %s' % json.dumps(self.media)) self.logger.debug('media %s' % json.dumps(self.media))
self.handle_new_media(self.media, liquidsoap_queue_approx) self.handle_new_media(self.media, liquidsoap_queue_approx)
except Empty, e:
pass
media = self.media media = self.media
if len(liquidsoap_queue_approx) < MAX_LIQUIDSOAP_QUEUE_LENGTH: if len(liquidsoap_queue_approx) < MAX_LIQUIDSOAP_QUEUE_LENGTH:
currently_on_air = False
if media: if media:
tnow = time.gmtime(timenow)
tcoming = time.gmtime(timenow + self.push_ahead) tnow = datetime.utcnow()
str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5]) tcoming = tnow + timedelta(seconds=self.push_ahead)
str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5])
for key in media.keys(): for key in media.keys():
media_item = media[key] media_item = media[key]
item_start = media_item['start'][0:19]
if str_tnow_s <= item_start and item_start < str_tcoming_s: item_start = datetime.strptime(media_item['start'][0:19], "%Y-%m-%d-%H-%M-%S")
item_end = datetime.strptime(media_item['end'][0:19], "%Y-%m-%d-%H-%M-%S")
if len(liquidsoap_queue_approx) == 0 and item_start <= tnow and tnow < item_end:
""" """
If the media item starts in the next 30 seconds, push it to the queue. Something is scheduled now, but Liquidsoap is not playing anything! Let's play the current media_item
"""
self.logger.debug("Found media_item that should be playing! Starting...")
adjusted_cue_in = tnow - item_start
adjusted_cue_in_seconds = self.date_interval_to_seconds(adjusted_cue_in)
self.logger.debug("Found media_item that should be playing! Adjust cue point by %ss" % adjusted_cue_in_seconds)
self.push_to_liquidsoap(media_item, adjusted_cue_in_seconds)
elif tnow <= item_start and item_start < tcoming:
"""
If the media item starts in the next 10 seconds, push it to the queue.
""" """
self.logger.debug('Preparing to push media item scheduled at: %s', key) self.logger.debug('Preparing to push media item scheduled at: %s', key)
if self.push_to_liquidsoap(media_item): if self.push_to_liquidsoap(media_item, None):
self.logger.debug("Pushed to liquidsoap, updating 'played' status.") self.logger.debug("Pushed to liquidsoap, updating 'played' status.")
""" """
Temporary solution to make sure we don't push the same track multiple times. Temporary solution to make sure we don't push the same track multiple times. Not a full solution because if we
get a new schedule, the key becomes available again.
""" """
#TODO
del media[key] del media[key]
currently_on_air = True def date_interval_to_seconds(self, interval):
self.liquidsoap_state_play = True return (interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10**6) / 10**6
def push_to_liquidsoap(self, media_item): def push_to_liquidsoap(self, media_item, adjusted_cue_in=None):
""" """
This function looks at the media item, and either pushes it to the Liquidsoap This function looks at the media item, and either pushes it to the Liquidsoap
queue immediately, or if the queue is empty - waits until the start time of the queue immediately, or if the queue is empty - waits until the start time of the
media item before pushing it. media item before pushing it.
""" """
if adjusted_cue_in is not None:
media_item["cue_in"] = adjusted_cue_in + float(media_item["cue_in"])
try: try:
if media_item["start"] == self.last_end_time: if media_item["start"] == self.last_end_time:
""" """
@ -139,36 +154,26 @@ class PypoPush(Thread):
return True return True
"""
def update_liquidsoap_queue(self):
# the queue variable liquidsoap_queue is our attempt to mirror
# what liquidsoap actually has in its own queue. Liquidsoap automatically
# updates its own queue when an item finishes playing, we have to do this
# manually.
#
# This function will iterate through the liquidsoap_queue and remove items
# whose end time are in the past.
tnow = time.gmtime(timenow)
str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5])
while len(self.liquidsoap_queue) > 0:
if self.liquidsoap_queue[0]["end"] < str_tnow_s:
self.liquidsoap_queue.popleft()
"""
def get_queue_items_from_liquidsoap(self): def get_queue_items_from_liquidsoap(self):
""" """
This function connects to Liquidsoap to find what media items are in its queue. This function connects to Liquidsoap to find what media items are in its queue.
""" """
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = 'queue.queue\n' try:
tn.write(msg) self.telnet_lock.acquire()
response = tn.read_until("\r\n").strip(" \r\n") tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write('exit\n')
tn.read_all() msg = 'queue.queue\n'
tn.write(msg)
response = tn.read_until("\r\n").strip(" \r\n")
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
liquidsoap_queue_approx = [] liquidsoap_queue_approx = []
@ -181,7 +186,14 @@ class PypoPush(Thread):
if item in self.pushed_objects: if item in self.pushed_objects:
liquidsoap_queue_approx.append(self.pushed_objects[item]) liquidsoap_queue_approx.append(self.pushed_objects[item])
else: else:
"""
We should only reach here if Pypo crashed and restarted (because self.pushed_objects was reset). In this case
let's clear the entire Liquidsoap queue.
"""
self.logger.error("ID exists in liquidsoap queue that does not exist in our pushed_objects queue: " + item) self.logger.error("ID exists in liquidsoap queue that does not exist in our pushed_objects queue: " + item)
self.clear_liquidsoap_queue()
liquidsoap_queue_approx = []
break
return liquidsoap_queue_approx return liquidsoap_queue_approx
@ -195,9 +207,6 @@ class PypoPush(Thread):
queue. queue.
""" """
#TODO: Keys should already be sorted. Verify this.
sorted_keys = sorted(media.keys())
if len(liquidsoap_queue_approx) == 0: if len(liquidsoap_queue_approx) == 0:
""" """
liquidsoap doesn't have anything in its queue, so we have nothing liquidsoap doesn't have anything in its queue, so we have nothing
@ -240,25 +249,49 @@ class PypoPush(Thread):
else: else:
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0]) self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
def clear_liquidsoap_queue(self):
self.logger.debug("Clearing Liquidsoap queue")
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = "source.skip\n"
tn.write(msg)
tn.write("exit\n")
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
def remove_from_liquidsoap_queue(self, media_item, do_only_source_skip=False): def remove_from_liquidsoap_queue(self, media_item, do_only_source_skip=False):
if 'queue_id' in media_item: if 'queue_id' in media_item:
queue_id = media_item['queue_id'] queue_id = media_item['queue_id']
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = "queue.remove %s\n" % queue_id
tn.write(msg)
response = tn.read_until("\r\n").strip("\r\n")
if "No such request in my queue" in response: try:
""" self.telnet_lock.acquire()
Cannot remove because Liquidsoap started playing the item. Need tn = telnetlib.Telnet(LS_HOST, LS_PORT)
to use source.skip instead msg = "queue.remove %s\n" % queue_id
""" self.logger.debug(msg)
msg = "source.skip" tn.write(msg)
tn.write("source.skip") response = tn.read_until("\r\n").strip("\r\n")
if "No such request in my queue" in response:
"""
Cannot remove because Liquidsoap started playing the item. Need
to use source.skip instead
"""
msg = "source.skip\n"
self.logger.debug(msg)
tn.write(msg)
tn.write("exit\n")
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
tn.write("exit\n")
tn.read_all()
else: else:
self.logger.error("'queue_id' key doesn't exist in media_item dict()") self.logger.error("'queue_id' key doesn't exist in media_item dict()")
@ -294,31 +327,40 @@ class PypoPush(Thread):
show name of every media_item as well, just to keep Liquidsoap up-to-date show name of every media_item as well, just to keep Liquidsoap up-to-date
about which show is playing. about which show is playing.
""" """
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT) #tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8'))
#tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8')) annotation = self.create_liquidsoap_annotation(media_item)
msg = 'queue.push %s\n' % annotation.encode('utf-8')
self.logger.debug(msg)
tn.write(msg)
queue_id = tn.read_until("\r\n").strip("\r\n")
annotation = media_item['annotation'] #remember the media_item's queue id which we may use
msg = 'queue.push %s\n' % annotation.encode('utf-8') #later if we need to remove it from the queue.
self.logger.debug(msg) media_item['queue_id'] = queue_id
tn.write(msg)
queue_id = tn.read_until("\r\n").strip("\r\n")
#remember the media_item's queue id which we may use #add media_item to the end of our queue
#later if we need to remove it from the queue. self.pushed_objects[queue_id] = media_item
media_item['queue_id'] = queue_id
#add media_item to the end of our queue show_name = media_item['show_name']
self.pushed_objects[queue_id] = media_item msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
tn.write(msg)
self.logger.debug(msg)
show_name = media_item['show_name'] tn.write("exit\n")
msg = 'vars.show_name %s\n' % show_name.encode('utf-8') self.logger.debug(tn.read_all())
tn.write(msg) except Exception, e:
self.logger.debug(msg) self.logger.error(str(e))
finally:
self.telnet_lock.release()
tn.write("exit\n") def create_liquidsoap_annotation(self, media):
self.logger.debug(tn.read_all()) return 'annotate:media_id="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], float(media['cue_in']), float(media['cue_out']), media['row_id'], media['dst'])
def run(self): def run(self):
loops = 0 loops = 0
@ -331,5 +373,4 @@ class PypoPush(Thread):
try: self.push() try: self.push()
except Exception, e: except Exception, e:
self.logger.error('Pypo Push Exception: %s', e) self.logger.error('Pypo Push Exception: %s', e)
time.sleep(PUSH_INTERVAL)
loops += 1 loops += 1

View File

@ -1,4 +1,3 @@
import urllib
import logging import logging
import logging.config import logging.config
import json import json
@ -6,8 +5,6 @@ import time
import datetime import datetime
import os import os
import sys import sys
import shutil
import socket
import pytz import pytz
import signal import signal
import math import math
@ -16,7 +13,6 @@ from configobj import ConfigObj
from poster.encode import multipart_encode from poster.encode import multipart_encode
from poster.streaminghttp import register_openers from poster.streaminghttp import register_openers
import urllib2
from subprocess import Popen from subprocess import Popen
from threading import Thread from threading import Thread
@ -25,15 +21,11 @@ import mutagen
from api_clients import api_client from api_clients import api_client
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
# loading config file # loading config file
try: try:
config = ConfigObj('/etc/airtime/pypo.cfg') config = ConfigObj('/etc/airtime/pypo.cfg')
except Exception, e: except Exception, e:
self.logger.error('Error loading config file: %s', e) print ('Error loading config file: %s', e)
sys.exit() sys.exit()
def getDateTimeObj(time): def getDateTimeObj(time):
@ -89,7 +81,7 @@ class ShowRecorder(Thread):
#blocks at the following line until the child process #blocks at the following line until the child process
#quits #quits
code = self.p.wait() self.p.wait()
self.logger.info("finishing record, return code %s", self.p.returncode) self.logger.info("finishing record, return code %s", self.p.returncode)
code = self.p.returncode code = self.p.returncode
@ -275,8 +267,6 @@ class Recorder(Thread):
self.logger.info("Bootstrap complete: got initial copy of the schedule") self.logger.info("Bootstrap complete: got initial copy of the schedule")
recording = False
self.loops = 0 self.loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL) heartbeat_period = math.floor(30/PUSH_INTERVAL)

View File

@ -4,12 +4,13 @@
#done in our rabbitmq init.d script, but placing it here so that monit recognizes #done in our rabbitmq init.d script, but placing it here so that monit recognizes
# it faster (in time for the upcoming airtime-check-system) # it faster (in time for the upcoming airtime-check-system)
codename=`lsb_release -cs` codename=`lsb_release -cs`
if [ "$codename" == "oneiric" ]; if [ "$codename" = "lucid" -o "$codename" = "maverick" -o "$codename" = "natty" -o "$codename" = "squeeze" ]
then then
rabbitmqpid=`sed "s/.*,\(.*\)\}.*/\1/" /var/lib/rabbitmq/pids`
else
#RabbitMQ in Ubuntu Oneiric and newer have a different way of storing the PID.
rabbitmqstatus=`/etc/init.d/rabbitmq-server status | grep "\[{pid"` rabbitmqstatus=`/etc/init.d/rabbitmq-server status | grep "\[{pid"`
rabbitmqpid=`echo $rabbitmqstatus | sed "s/.*,\(.*\)\}.*/\1/"` rabbitmqpid=`echo $rabbitmqstatus | sed "s/.*,\(.*\)\}.*/\1/"`
else
rabbitmqpid=`sed "s/.*,\(.*\)\}.*/\1/" /var/lib/rabbitmq/pids`
fi fi
echo "RabbitMQ PID: $rabbitmqpid" echo "RabbitMQ PID: $rabbitmqpid"
echo "$rabbitmqpid" > /var/run/rabbitmq.pid echo "$rabbitmqpid" > /var/run/rabbitmq.pid