CC-3318: When changing stream settings (Liquidsoap + Pypo restart), sometimes Airtime does not resume playback

-fixed
This commit is contained in:
Martin Konecny 2012-02-10 18:43:40 -05:00
parent 33b594c39a
commit eadf68cb61
6 changed files with 71 additions and 35 deletions

View File

@ -219,8 +219,6 @@ class PreferenceController extends Zend_Controller_Action
for($i=1;$i<=$num_of_stream;$i++){ for($i=1;$i<=$num_of_stream;$i++){
Application_Model_StreamSetting::setLiquidsoapError($i, "waiting"); Application_Model_StreamSetting::setLiquidsoapError($i, "waiting");
} }
// this goes into cc_pref table
Application_Model_Preference::SetStreamLabelFormat($values['streamFormat']);
// store stream update timestamp // store stream update timestamp
Application_Model_Preference::SetStreamUpdateTimestamp(); Application_Model_Preference::SetStreamUpdateTimestamp();
Application_Model_RabbitMq::SendMessageToPypo("update_stream_setting", $data); Application_Model_RabbitMq::SendMessageToPypo("update_stream_setting", $data);

View File

@ -102,7 +102,6 @@ class Application_Model_Preference
public static function SetHeadTitle($title, $view=null){ public static function SetHeadTitle($title, $view=null){
self::SetValue("station_name", $title); self::SetValue("station_name", $title);
Application_Model_RabbitMq::PushSchedule();
// in case this is called from airtime-saas script // in case this is called from airtime-saas script
if($view !== null){ if($view !== null){
@ -111,6 +110,11 @@ class Application_Model_Preference
$view->headTitle()->exchangeArray(array()); //clear headTitle ArrayObject $view->headTitle()->exchangeArray(array()); //clear headTitle ArrayObject
$view->headTitle(self::GetHeadTitle()); $view->headTitle(self::GetHeadTitle());
} }
$eventType = "update_station_name";
$md = array("station_name"=>$title);
Application_Model_RabbitMq::SendMessageToPypo($eventType, $md);
} }
/** /**
@ -153,7 +157,11 @@ class Application_Model_Preference
public static function SetStreamLabelFormat($type){ public static function SetStreamLabelFormat($type){
self::SetValue("stream_label_format", $type); self::SetValue("stream_label_format", $type);
Application_Model_RabbitMq::PushSchedule();
$eventType = "update_stream_format";
$md = array("stream_format"=>$type);
Application_Model_RabbitMq::SendMessageToPypo($eventType, $md);
} }
public static function GetStreamLabelFormat(){ public static function GetStreamLabelFormat(){

View File

@ -83,8 +83,12 @@ class Application_Model_StreamSetting {
$CC_DBC->query($sql); $CC_DBC->query($sql);
} else if ($key == "output_sound_device_type") { } else if ($key == "output_sound_device_type") {
$sql = "UPDATE cc_stream_setting SET value='$d' WHERE keyname='$key'"; $sql = "UPDATE cc_stream_setting SET value='$d' WHERE keyname='$key'";
$CC_DBC->query($sql); $CC_DBC->query($sql);
} else { } else if ($key == "streamFormat"){
// this goes into cc_pref table
Logging::log("Insert stream label format $d");
Application_Model_Preference::SetStreamLabelFormat($d);
} else if (is_array($d)) {
$temp = explode('_', $key); $temp = explode('_', $key);
$prefix = $temp[0]; $prefix = $temp[0];
foreach ($d as $k=>$v) { foreach ($d as $k=>$v) {
@ -96,6 +100,8 @@ class Application_Model_StreamSetting {
$sql = "UPDATE cc_stream_setting SET value='$v' WHERE keyname='$keyname'"; $sql = "UPDATE cc_stream_setting SET value='$v' WHERE keyname='$keyname'";
$CC_DBC->query($sql); $CC_DBC->query($sql);
} }
} else {
Logging::log("Warning unexpected value: ".$key);
} }
} }
} }

View File

@ -5,10 +5,9 @@ end
# A function applied to each metadata chunk # A function applied to each metadata chunk
def append_title(m) = def append_title(m) =
log("Using stream_format #{!stream_metadata_type}")
if !stream_metadata_type == 1 then if !stream_metadata_type == 1 then
[("artist","#{!show_name} - #{m['artist']}")] [("artist","#{!show_name} - #{m['artist']}")]
#####elsif !stream_metadata_type == 2 then
##### [("artist", ""), ("title", !show_name)]
elsif !stream_metadata_type == 2 then elsif !stream_metadata_type == 2 then
[("artist",!station_name), ("title", !show_name)] [("artist",!station_name), ("title", !show_name)]
else else

View File

@ -11,7 +11,7 @@ import json
import telnetlib import telnetlib
import math import math
import socket import socket
from threading import Thread from threading import Thread, Lock
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
@ -44,11 +44,12 @@ except Exception, e:
class PypoFetch(Thread): class PypoFetch(Thread):
def __init__(self, q): def __init__(self, q):
Thread.__init__(self) Thread.__init__(self)
logger = logging.getLogger('fetch') self.lock = Lock()
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.set_export_source('scheduler') self.set_export_source('scheduler')
self.queue = q self.queue = q
self.schedule_data = [] self.schedule_data = []
logger = logging.getLogger('fetch')
logger.info("PypoFetch: init complete") logger.info("PypoFetch: init complete")
def init_rabbit_mq(self): def init_rabbit_mq(self):
@ -74,6 +75,12 @@ class PypoFetch(Thread):
""" """
def handle_message(self, body, message): def handle_message(self, body, message):
try: try:
#Acquire Lock because multiple rabbitmq messages can be sent simultaneously
#and therefore we can have multiple threads inside this function. This causes
#multiple telnet connections to Liquidsoap which causes problems (refused connections).
self.lock.acquire()
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
logger.info("Received event from RabbitMQ: " + message.body) logger.info("Received event from RabbitMQ: " + message.body)
@ -87,14 +94,21 @@ class PypoFetch(Thread):
elif command == 'update_stream_setting': elif command == 'update_stream_setting':
logger.info("Updating stream setting...") logger.info("Updating stream setting...")
self.regenerateLiquidsoapConf(m['setting']) self.regenerateLiquidsoapConf(m['setting'])
elif command == 'update_stream_format':
logger.info("Updating stream format...")
self.update_liquidsoap_stream_format(m['stream_format'])
elif command == 'update_station_name':
logger.info("Updating station name...")
self.update_liquidsoap_station_name(m['station_name'])
elif command == 'cancel_current_show': elif command == 'cancel_current_show':
logger.info("Cancel current show command received...") logger.info("Cancel current show command received...")
self.stop_current_show() self.stop_current_show()
except Exception, e: except Exception, e:
logger.error("Exception in handling RabbitMQ message: %s", e) logger.error("Exception in handling RabbitMQ message: %s", e)
finally: finally:
# ACK the message to take it off the queue self.lock.release()
try: try:
# ACK the message to take it off the queue
message.ack() message.ack()
except MessageStateError, m: except MessageStateError, m:
logger.error("Message ACK error: %s", m) logger.error("Message ACK error: %s", m)
@ -257,6 +271,39 @@ class PypoFetch(Thread):
self.cache_dir = config["cache_dir"] + self.export_source + '/' self.cache_dir = config["cache_dir"] + self.export_source + '/'
logger.info("Creating cache directory at %s", self.cache_dir) logger.info("Creating cache directory at %s", self.cache_dir)
def update_liquidsoap_stream_format(self, stream_format):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
logger = logging.getLogger('fetch')
logger.info(LS_HOST)
logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
logger.error("Exception %s", e)
def update_liquidsoap_station_name(self, station_name):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
logger = logging.getLogger('fetch')
logger.info(LS_HOST)
logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.station_name %s\n' % station_name).encode('utf-8')
logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
logger.error("Exception %s", e)
""" """
Process the schedule Process the schedule
- Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for")
@ -269,22 +316,6 @@ class PypoFetch(Thread):
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
playlists = schedule_data["playlists"] playlists = schedule_data["playlists"]
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
stream_metadata = schedule_data['stream_metadata']
try:
logger.info(LS_HOST)
logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
#encode in latin-1 due to telnet protocol not supporting utf-8
tn.write(('vars.stream_metadata_type %s\n' % stream_metadata['format']).encode('latin-1'))
tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1'))
tn.write('exit\n')
tn.read_all()
except Exception, e:
logger.error("Exception %s", e)
status = 0
# Download all the media and put playlists in liquidsoap "annotate" format # Download all the media and put playlists in liquidsoap "annotate" format
try: try:
liquidsoap_playlists = self.prepare_playlists(playlists, bootstrapping) liquidsoap_playlists = self.prepare_playlists(playlists, bootstrapping)
@ -294,7 +325,6 @@ class PypoFetch(Thread):
scheduled_data = dict() scheduled_data = dict()
scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists
scheduled_data['schedule'] = playlists scheduled_data['schedule'] = playlists
scheduled_data['stream_metadata'] = schedule_data["stream_metadata"]
self.queue.put(scheduled_data) self.queue.put(scheduled_data)
# cleanup # cleanup

View File

@ -39,7 +39,6 @@ class PypoPush(Thread):
self.schedule = dict() self.schedule = dict()
self.playlists = dict() self.playlists = dict()
self.stream_metadata = dict()
self.liquidsoap_state_play = True self.liquidsoap_state_play = True
self.push_ahead = 10 self.push_ahead = 10
@ -58,7 +57,7 @@ class PypoPush(Thread):
def push(self, export_source): def push(self, export_source):
logger = logging.getLogger('push') logger = logging.getLogger('push')
timenow = time.time()
# get a new schedule from pypo-fetch # get a new schedule from pypo-fetch
if not self.queue.empty(): if not self.queue.empty():
# make sure we get the latest schedule # make sure we get the latest schedule
@ -67,16 +66,12 @@ class PypoPush(Thread):
logger.debug("Received data from pypo-fetch") logger.debug("Received data from pypo-fetch")
self.schedule = scheduled_data['schedule'] self.schedule = scheduled_data['schedule']
self.playlists = scheduled_data['liquidsoap_playlists'] self.playlists = scheduled_data['liquidsoap_playlists']
self.stream_metadata = scheduled_data['stream_metadata']
logger.debug('schedule %s' % json.dumps(self.schedule)) logger.debug('schedule %s' % json.dumps(self.schedule))
logger.debug('playlists %s' % json.dumps(self.playlists)) logger.debug('playlists %s' % json.dumps(self.playlists))
schedule = self.schedule schedule = self.schedule
playlists = self.playlists playlists = self.playlists
timenow = time.time()
logger.debug('timenow %s' % timenow)
currently_on_air = False currently_on_air = False
if schedule: if schedule:
@ -169,7 +164,7 @@ class PypoPush(Thread):
#Sending schedule table row id string. #Sending schedule table row id string.
logger.debug("vars.pypo_data %s\n"%(liquidsoap_data["schedule_id"])) logger.debug("vars.pypo_data %s\n"%(liquidsoap_data["schedule_id"]))
tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('latin-1')) tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8'))
logger.debug('Preparing to push playlist %s' % pkey) logger.debug('Preparing to push playlist %s' % pkey)
for item in playlist: for item in playlist: