CC-3336: Refactor schedule API used by pypo

-done
This commit is contained in:
Martin Konecny 2012-02-29 21:27:42 -05:00
parent 08048cd403
commit 92f19139b9
5 changed files with 46 additions and 65 deletions

View File

@ -27,6 +27,7 @@ class ApiController extends Zend_Controller_Action
->addActionContext('live-chat', 'json') ->addActionContext('live-chat', 'json')
->addActionContext('update-file-system-mount', 'json') ->addActionContext('update-file-system-mount', 'json')
->addActionContext('handle-watched-dir-missing', 'json') ->addActionContext('handle-watched-dir-missing', 'json')
->addActionContext('rabbitmq-do-push', 'json')
->initContext(); ->initContext();
} }
@ -318,6 +319,7 @@ class ApiController extends Zend_Controller_Action
} }
} }
/*
public function notifyScheduleGroupPlayAction() public function notifyScheduleGroupPlayAction()
{ {
global $CC_CONFIG; global $CC_CONFIG;
@ -357,6 +359,7 @@ class ApiController extends Zend_Controller_Action
exit; exit;
} }
} }
*/
public function recordedShowsAction() public function recordedShowsAction()
{ {
@ -903,5 +906,26 @@ class ApiController extends Zend_Controller_Action
$dir = base64_decode($request->getParam('dir')); $dir = base64_decode($request->getParam('dir'));
Application_Model_MusicDir::removeWatchedDir($dir, false); Application_Model_MusicDir::removeWatchedDir($dir, false);
} }
/* This action is for use by our dev scripts, that make
* a change to the database and we want rabbitmq to send
* out a message to pypo that a potential change has been made. */
public function rabbitmqDoPushAction(){
global $CC_CONFIG;
$request = $this->getRequest();
$api_key = $request->getParam('api_key');
if (!in_array($api_key, $CC_CONFIG["apiKey"]))
{
header('HTTP/1.0 401 Unauthorized');
print 'You are not allowed to access this resource.';
exit;
}
Logging::log("Notifying RabbitMQ to send message to pypo");
Application_Model_RabbitMq::PushSchedule();
}
} }

View File

@ -81,14 +81,6 @@ class ApiClientInterface:
def get_media(self, src, dst): def get_media(self, src, dst):
pass pass
# Implementation: optional
#
# Called from: push loop
#
# Tell server that the scheduled *playlist* has started.
def notify_scheduled_item_start_playing(self, pkey, schedule):
pass
# Implementation: optional # Implementation: optional
# You dont actually have to implement this function for the liquidsoap playout to work. # You dont actually have to implement this function for the liquidsoap playout to work.
# #
@ -285,32 +277,6 @@ class AirTimeApiClient(ApiClientInterface):
except Exception, e: except Exception, e:
logger.error("%s", e) logger.error("%s", e)
"""
Tell server that the scheduled *playlist* has started.
"""
def notify_scheduled_item_start_playing(self, pkey, schedule):
logger = self.logger
playlist = schedule[pkey]
schedule_id = playlist["schedule_id"]
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["update_item_url"])
url = url.replace("%%schedule_id%%", str(schedule_id))
logger.debug(url)
url = url.replace("%%api_key%%", self.config["api_key"])
try:
response = urllib.urlopen(url)
response = json.loads(response.read())
logger.info("API-Status %s", response['status'])
logger.info("API-Message %s", response['message'])
except Exception, e:
logger.error("Unable to connect - %s", e)
return response
""" """
This is a callback from liquidsoap, we use this to notify about the This is a callback from liquidsoap, we use this to notify about the
currently playing *song*. We get passed a JSON string which we handed to currently playing *song*. We get passed a JSON string which we handed to

View File

@ -34,7 +34,7 @@ import json
from configobj import ConfigObj from configobj import ConfigObj
# custom imports # custom imports
from util import * #from util import *
from api_clients import * from api_clients import *
# Set up command-line options # Set up command-line options

View File

@ -336,7 +336,7 @@ class PypoFetch(Thread):
if self.handle_media_file(media_item, dst): if self.handle_media_file(media_item, dst):
entry = self.create_liquidsoap_annotation(media_item, dst) entry = self.create_liquidsoap_annotation(media_item, dst)
entry['show_name'] = "TODO" media_item['show_name'] = "TODO"
media_item["annotation"] = entry media_item["annotation"] = entry
except Exception, e: except Exception, e:
@ -346,7 +346,7 @@ class PypoFetch(Thread):
def create_liquidsoap_annotation(self, media, dst): def create_liquidsoap_annotation(self, media, dst):
pl_entry = \ entry = \
'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \ 'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], 0, \ % (media['id'], 0, \
float(media['fade_in']) / 1000, \ float(media['fade_in']) / 1000, \
@ -355,15 +355,6 @@ class PypoFetch(Thread):
float(media['cue_out']), \ float(media['cue_out']), \
media['row_id'], dst) media['row_id'], dst)
"""
Tracks are only added to the playlist if they are accessible
on the file system and larger than 0 bytes.
So this can lead to playlists shorter than expectet.
(there is a hardware silence detector for this cases...)
"""
entry = dict()
entry['type'] = 'file'
entry['annotate'] = pl_entry
return entry return entry
def check_for_previous_crash(self, media_item): def check_for_previous_crash(self, media_item):

View File

@ -39,9 +39,11 @@ class PypoPush(Thread):
self.media = dict() self.media = dict()
self.liquidsoap_state_play = True self.liquidsoap_state_play = True
self.push_ahead = 30 self.push_ahead = 10
self.last_end_time = 0 self.last_end_time = 0
self.logger = logging.getLogger('push')
def push(self): def push(self):
""" """
The Push Loop - the push loop periodically checks if there is a playlist The Push Loop - the push loop periodically checks if there is a playlist
@ -49,7 +51,6 @@ class PypoPush(Thread):
If yes, the current liquidsoap playlist gets replaced with the corresponding one, If yes, the current liquidsoap playlist gets replaced with the corresponding one,
then liquidsoap is asked (via telnet) to reload and immediately play it. then liquidsoap is asked (via telnet) to reload and immediately play it.
""" """
logger = logging.getLogger('push')
timenow = time.time() timenow = time.time()
# get a new schedule from pypo-fetch # get a new schedule from pypo-fetch
@ -57,8 +58,8 @@ class PypoPush(Thread):
# make sure we get the latest schedule # make sure we get the latest schedule
while not self.queue.empty(): while not self.queue.empty():
self.media = self.queue.get() self.media = self.queue.get()
logger.debug("Received data from pypo-fetch") self.logger.debug("Received data from pypo-fetch")
logger.debug('media %s' % json.dumps(self.media)) self.logger.debug('media %s' % json.dumps(self.media))
media = self.media media = self.media
@ -77,17 +78,13 @@ class PypoPush(Thread):
""" """
If the media item starts in the next 30 seconds, push it to the queue. If the media item starts in the next 30 seconds, push it to the queue.
""" """
logger.debug('Preparing to push media item scheduled at: %s', key) self.logger.debug('Preparing to push media item scheduled at: %s', key)
if self.push_to_liquidsoap(media_item): if self.push_to_liquidsoap(media_item):
logger.debug("Pushed to liquidsoap, updating 'played' status.") self.logger.debug("Pushed to liquidsoap, updating 'played' status.")
currently_on_air = True currently_on_air = True
self.liquidsoap_state_play = True self.liquidsoap_state_play = True
# Call API to update schedule states
logger.debug("Doing callback to server to update 'played' status.")
self.api_client.notify_scheduled_item_start_playing(key, schedule)
def push_to_liquidsoap(self, media_item): def push_to_liquidsoap(self, media_item):
""" """
@ -133,15 +130,15 @@ class PypoPush(Thread):
#Return the time as a floating point number expressed in seconds since the epoch, in UTC. #Return the time as a floating point number expressed in seconds since the epoch, in UTC.
epoch_now = time.time() epoch_now = time.time()
logger.debug("Epoch start: %s" % epoch_start) self.logger.debug("Epoch start: %s" % epoch_start)
logger.debug("Epoch now: %s" % epoch_now) self.logger.debug("Epoch now: %s" % epoch_now)
sleep_time = epoch_start - epoch_now sleep_time = epoch_start - epoch_now
if sleep_time < 0: if sleep_time < 0:
sleep_time = 0 sleep_time = 0
logger.debug('sleeping for %s s' % (sleep_time)) self.logger.debug('sleeping for %s s' % (sleep_time))
time.sleep(sleep_time) time.sleep(sleep_time)
def telnet_to_liquidsoap(self, media_item): def telnet_to_liquidsoap(self, media_item):
@ -156,25 +153,28 @@ class PypoPush(Thread):
#tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8')) #tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8'))
annotation = media_item['annotation'] annotation = media_item['annotation']
tn.write('queue.push %s\n' % annotation.encode('utf-8')) msg = 'queue.push %s\n' % annotation.encode('utf-8')
tn.write(msg)
self.logger.debug(msg)
show_name = media_item['show_name'] show_name = media_item['show_name']
tn.write('vars.show_name %s\n' % show_name.encode('utf-8')) msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
tn.write(msg)
self.logger.debug(msg)
tn.write("exit\n") tn.write("exit\n")
logger.debug(tn.read_all()) self.logger.debug(tn.read_all())
def run(self): def run(self):
loops = 0 loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL) heartbeat_period = math.floor(30/PUSH_INTERVAL)
logger = logging.getLogger('push')
while True: while True:
if loops % heartbeat_period == 0: if loops % heartbeat_period == 0:
logger.info("heartbeat") self.logger.info("heartbeat")
loops = 0 loops = 0
try: self.push() try: self.push()
except Exception, e: except Exception, e:
logger.error('Pypo Push Exception: %s', e) self.logger.error('Pypo Push Exception: %s', e)
time.sleep(PUSH_INTERVAL) time.sleep(PUSH_INTERVAL)
loops += 1 loops += 1