From d03d01bce98dc511e27469e60d2cf22bcff332b8 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Thu, 1 Mar 2012 17:58:44 -0500 Subject: [PATCH] CC-3366: Pypo: Use local files instead of downloading them -done --- airtime_mvc/application/Bootstrap.php | 2 +- airtime_mvc/application/models/Schedule.php | 64 ++++++---- python_apps/pypo/airtime-playout | 2 +- python_apps/pypo/{pypo-cli.py => pypocli.py} | 21 +++- python_apps/pypo/pypofetch.py | 79 ++++++++----- python_apps/pypo/pypofile.py | 117 +++++++++++++++++++ 6 files changed, 228 insertions(+), 57 deletions(-) rename python_apps/pypo/{pypo-cli.py => pypocli.py} (89%) create mode 100644 python_apps/pypo/pypofile.py diff --git a/airtime_mvc/application/Bootstrap.php b/airtime_mvc/application/Bootstrap.php index d4ca032f5..1e3037fa6 100644 --- a/airtime_mvc/application/Bootstrap.php +++ b/airtime_mvc/application/Bootstrap.php @@ -37,7 +37,7 @@ Zend_Validate::setDefaultNamespaces("Zend"); $front = Zend_Controller_Front::getInstance(); $front->registerPlugin(new RabbitMqPlugin()); -Logging::debug($_SERVER['REQUEST_URI']); +//Logging::debug($_SERVER['REQUEST_URI']); /* The bootstrap class should only be used to initialize actions that return a view. Actions that return JSON will not use the bootstrap class! */ diff --git a/airtime_mvc/application/models/Schedule.php b/airtime_mvc/application/models/Schedule.php index 21306d43d..46ad10d25 100644 --- a/airtime_mvc/application/models/Schedule.php +++ b/airtime_mvc/application/models/Schedule.php @@ -421,30 +421,52 @@ class Application_Model_Schedule { */ public static function GetItems($p_currentDateTime, $p_toDateTime) { global $CC_CONFIG, $CC_DBC; - $rows = array(); + + $baseQuery = "SELECT st.file_id AS file_id," + ." st.id as id," + ." st.starts AS start," + ." st.ends AS end," + ." st.cue_in AS cue_in," + ." st.cue_out AS cue_out," + ." st.fade_in AS fade_in," + ." st.fade_out AS fade_out," + ." si.starts as show_start," + ." si.ends as show_end" + ." FROM $CC_CONFIG[scheduleTable] as st" + ." LEFT JOIN $CC_CONFIG[showInstances] as si" + ." ON st.instance_id = si.id"; + - $sql = "SELECT st.file_id AS file_id," - ." st.id as id," - ." st.starts AS start," - ." st.ends AS end," - ." st.cue_in AS cue_in," - ." st.cue_out AS cue_out," - ." st.fade_in AS fade_in," - ." st.fade_out AS fade_out," - ." si.starts as show_start," - ." si.ends as show_end" - ." FROM $CC_CONFIG[scheduleTable] as st" - ." LEFT JOIN $CC_CONFIG[showInstances] as si" - ." ON st.instance_id = si.id" - ." ORDER BY start"; - - Logging::log($sql); + $predicates = " WHERE st.ends > '$p_currentDateTime'" + ." AND st.starts < '$p_toDateTime'" + ." ORDER BY st.starts"; + + $sql = $baseQuery.$predicates; $rows = $CC_DBC->GetAll($sql); if (PEAR::isError($rows)) { return null; } - + + if (count($rows) < 3){ + Logging::debug("Get Schedule: Less than 3 results returned. Doing another query since we need a minimum of 3 results."); + + $dt = new DateTime("@".time()); + $dt->add(new DateInterval("PT30M")); + $range_end = $dt->format("Y-m-d H:i:s"); + + $predicates = " WHERE st.ends > '$p_currentDateTime'" + ." AND st.starts < '$range_end'" + ." ORDER BY st.starts" + ." LIMIT 3"; + + $sql = $baseQuery.$predicates; + $rows = $CC_DBC->GetAll($sql); + if (PEAR::isError($rows)) { + return null; + } + } + return $rows; } @@ -462,7 +484,7 @@ class Application_Model_Schedule { } if (is_null($p_fromDateTime)) { $t2 = new DateTime("@".time()); - $t2->add(new DateInterval("PT24H")); + $t2->add(new DateInterval("PT30M")); $range_end = $t2->format("Y-m-d H:i:s"); } else { $range_end = Application_Model_Schedule::PypoTimeToAirtimeTime($p_toDateTime); @@ -480,8 +502,8 @@ class Application_Model_Schedule { foreach ($items as $item){ $storedFile = Application_Model_StoredFile::Recall($item["file_id"]); - $uri = $storedFile->getFileUrlUsingConfigAddress(); - + $uri = $storedFile->getFilePath(); + $showEndDateTime = new DateTime($item["show_end"], $utcTimeZone); $trackEndDateTime = new DateTime($item["end"], $utcTimeZone); diff --git a/python_apps/pypo/airtime-playout b/python_apps/pypo/airtime-playout index 3ef01ad2b..05fbd20c4 100755 --- a/python_apps/pypo/airtime-playout +++ b/python_apps/pypo/airtime-playout @@ -8,7 +8,7 @@ pypo_user="pypo" # Location of pypo_cli.py Python script pypo_path="/usr/lib/airtime/pypo/bin/" api_client_path="/usr/lib/airtime/" -pypo_script="pypo-cli.py" +pypo_script="pypocli.py" cd ${pypo_path} exec 2>&1 diff --git a/python_apps/pypo/pypo-cli.py b/python_apps/pypo/pypocli.py similarity index 89% rename from python_apps/pypo/pypo-cli.py rename to python_apps/pypo/pypocli.py index 300f57f9c..265d141d9 100644 --- a/python_apps/pypo/pypo-cli.py +++ b/python_apps/pypo/pypocli.py @@ -1,8 +1,7 @@ -# -*- coding: utf-8 -*- - """ Python part of radio playout (pypo) """ + import time from optparse import * import sys @@ -15,6 +14,7 @@ from Queue import Queue from pypopush import PypoPush from pypofetch import PypoFetch +from pypofile import PypoFile from recorder import Recorder from pypomessagehandler import PypoMessageHandler @@ -125,11 +125,23 @@ if __name__ == '__main__': recorder_q = Queue() pypoPush_q = Queue() + """ + This queue is shared between pypo-fetch and pypo-file, where pypo-file + is the receiver. Pypo-fetch will send every schedule it gets to pypo-file + and pypo will parse this schedule to determine which file has the highest + priority, and will retrieve it. + """ + media_q = Queue() + pmh = PypoMessageHandler(pypoFetch_q, recorder_q) pmh.daemon = True pmh.start() - pf = PypoFetch(pypoFetch_q, pypoPush_q) + pfile = PypoFile(media_q) + pfile.daemon = True + pfile.start() + + pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q) pf.daemon = True pf.start() @@ -142,8 +154,9 @@ if __name__ == '__main__': recorder.start() pmh.join() - pp.join() + pfile.join() pf.join() + pp.join() recorder.join() logger.info("pypo fetch exit") diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index fbdf1ea2a..11810f00a 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -10,6 +10,7 @@ import string import json import telnetlib import math +import copy from threading import Thread from subprocess import Popen, PIPE from datetime import datetime @@ -36,11 +37,12 @@ except Exception, e: sys.exit() class PypoFetch(Thread): - def __init__(self, pypoFetch_q, pypoPush_q): + def __init__(self, pypoFetch_q, pypoPush_q, media_q): Thread.__init__(self) self.api_client = api_client.api_client_factory(config) self.fetch_queue = pypoFetch_q self.push_queue = pypoPush_q + self.media_prepare_queue = media_q self.logger = logging.getLogger(); @@ -288,11 +290,29 @@ class PypoFetch(Thread): # Download all the media and put playlists in liquidsoap "annotate" format try: - media = self.prepare_media(media, bootstrapping) + + """ + Make sure cache_dir exists + """ + download_dir = self.cache_dir + try: + os.makedirs(download_dir) + except Exception, e: + pass + + for key in media: + media_item = media[key] + + fileExt = os.path.splitext(media_item['uri'])[1] + dst = os.path.join(download_dir, media_item['id']+fileExt) + media_item['dst'] = dst + + self.media_prepare_queue.put(copy.copy(media)) + self.prepare_media(media, bootstrapping) except Exception, e: self.logger.error("%s", e) # Send the data to pypo-push - self.logger.debug("Pushing to pypo-push: "+ str(media)) + self.logger.debug("Pushing to pypo-push") self.push_queue.put(media) """ @@ -317,27 +337,10 @@ class PypoFetch(Thread): if bootstrapping: self.check_for_previous_crash(media_item) - - # create playlist directory - try: - """ - Extract year, month, date from mkey - """ - y_m_d = mkey[0:10] - download_dir = os.path.join(self.cache_dir, y_m_d) - try: - os.makedirs(os.path.join(self.cache_dir, y_m_d)) - except Exception, e: - pass - fileExt = os.path.splitext(media_item['uri'])[1] - dst = os.path.join(download_dir, media_item['id']+fileExt) - except Exception, e: - self.logger.warning(e) - - if self.handle_media_file(media_item, dst): - entry = self.create_liquidsoap_annotation(media_item, dst) - media_item['show_name'] = "TODO" - media_item["annotation"] = entry + + entry = self.create_liquidsoap_annotation(media_item) + media_item['show_name'] = "TODO" + media_item["annotation"] = entry except Exception, e: self.logger.error("%s", e) @@ -345,7 +348,7 @@ class PypoFetch(Thread): return media - def create_liquidsoap_annotation(self, media, dst): + def create_liquidsoap_annotation(self, media): entry = \ 'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \ % (media['id'], 0, \ @@ -353,7 +356,7 @@ class PypoFetch(Thread): float(media['fade_out']) / 1000, \ float(media['cue_in']), \ float(media['cue_out']), \ - media['row_id'], dst) + media['row_id'], media['dst']) return entry @@ -397,7 +400,8 @@ class PypoFetch(Thread): try: #blocking function to download the media item - self.download_file(media_item, dst) + #self.download_file(media_item, dst) + self.copy_file(media_item, dst) if os.access(dst, os.R_OK): # check filesize (avoid zero-byte files) @@ -417,10 +421,25 @@ class PypoFetch(Thread): return False - """ - Download a file from a remote server and store it in the cache. - """ + def copy_file(self, media_item, dst): + """ + Copy the file from local library directory. + """ + if not os.path.isfile(dst): + self.logger.debug("copying from %s to local cache %s" % (media_item['uri'], dst)) + try: + shutil.copy(media_item['uri'], dst) + except: + self.logger.error("Could not copy from %s to %s" % (media_item['uri'], dst)) + else: + #file already exists + pass + + def download_file(self, media_item, dst): + """ + Download a file from a remote server and store it in the cache. + """ if os.path.isfile(dst): pass #self.logger.debug("file already in cache: %s", dst) diff --git a/python_apps/pypo/pypofile.py b/python_apps/pypo/pypofile.py new file mode 100644 index 000000000..c5e40b598 --- /dev/null +++ b/python_apps/pypo/pypofile.py @@ -0,0 +1,117 @@ +from threading import Thread +from Queue import Empty +from configobj import ConfigObj + +import logging +import logging.config +import shutil +import os +import time + +# configure logging +logging.config.fileConfig("logging.cfg") + +# loading config file +try: + config = ConfigObj('/etc/airtime/pypo.cfg') + LS_HOST = config['ls_host'] + LS_PORT = config['ls_port'] + POLL_INTERVAL = int(config['poll_interval']) + +except Exception, e: + logger = logging.getLogger() + logger.error('Error loading config file: %s', e) + sys.exit() + + +class PypoFile(Thread): + + def __init__(self, schedule_queue): + Thread.__init__(self) + self.logger = logging.getLogger() + self.media_queue = schedule_queue + self.media = None + self.cache_dir = os.path.join(config["cache_dir"], "scheduler") + + def copy_file(self, media_item): + """ + Copy media_item from local library directory to local cache directory. + """ + + if media_item is None: + return + + dst = media_item['dst'] + + if not os.path.isfile(dst): + self.logger.debug("copying from %s to local cache %s" % (media_item['uri'], dst)) + try: + shutil.copy(media_item['uri'], dst) + except: + self.logger.error("Could not copy from %s to %s" % (media_item['uri'], dst)) + else: + self.logger.debug("Destination %s already exists. Not copying", dst) + + def get_highest_priority_media_item(self, schedule): + """ + Get highest priority media_item in the queue. Currently the highest + priority is decided by how close the start time is to "now". + """ + if schedule is None: + return None + + sorted_keys = sorted(schedule.keys()) + + if len(sorted_keys) == 0: + return None + + highest_priority = sorted_keys[0] + media_item = schedule[highest_priority] + + self.logger.debug("Highest priority item: %s" % highest_priority) + + """ + Remove this media_item from the dictionary. On the next iteration + (from the main function) we won't consider it for prioritization + anymore. If on the next iteration we have received a new schedule, + it is very possible we will have to deal with the same media_items + again. In this situation, the worst possible case is that we try to + copy the file again and realize we already have it (thus aborting the copy). + """ + del schedule[highest_priority] + + return media_item + + + def main(self): + while True: + try: + if self.media is None or len(self.media) == 0: + """ + We have no schedule, so we have nothing else to do. Let's + do a blocked wait on the queue + """ + self.media = self.media_queue.get(block=True) + else: + """ + We have a schedule we need to process, but we also want + to check if a newer schedule is available. In this case + do a non-blocking queue.get and in either case (we get something + or we don't), get back to work on preparing getting files. + """ + try: + self.media = self.media_queue.get_nowait() + except Empty, e: + pass + + media_item = self.get_highest_priority_media_item(self.media) + self.copy_file(media_item) + except Exception, e: + self.logger.error(str(e)) + raise + + def run(self): + """ + Entry point of the thread + """ + self.main()