CC-3366: Pypo: Use local files instead of downloading them

-done
This commit is contained in:
Martin Konecny 2012-03-01 17:58:44 -05:00
parent 508c623850
commit d03d01bce9
6 changed files with 228 additions and 57 deletions

View File

@ -37,7 +37,7 @@ Zend_Validate::setDefaultNamespaces("Zend");
$front = Zend_Controller_Front::getInstance();
$front->registerPlugin(new RabbitMqPlugin());
Logging::debug($_SERVER['REQUEST_URI']);
//Logging::debug($_SERVER['REQUEST_URI']);
/* The bootstrap class should only be used to initialize actions that return a view.
Actions that return JSON will not use the bootstrap class! */

View File

@ -421,30 +421,52 @@ class Application_Model_Schedule {
*/
public static function GetItems($p_currentDateTime, $p_toDateTime) {
global $CC_CONFIG, $CC_DBC;
$rows = array();
$baseQuery = "SELECT st.file_id AS file_id,"
." st.id as id,"
." st.starts AS start,"
." st.ends AS end,"
." st.cue_in AS cue_in,"
." st.cue_out AS cue_out,"
." st.fade_in AS fade_in,"
." st.fade_out AS fade_out,"
." si.starts as show_start,"
." si.ends as show_end"
." FROM $CC_CONFIG[scheduleTable] as st"
." LEFT JOIN $CC_CONFIG[showInstances] as si"
." ON st.instance_id = si.id";
$sql = "SELECT st.file_id AS file_id,"
." st.id as id,"
." st.starts AS start,"
." st.ends AS end,"
." st.cue_in AS cue_in,"
." st.cue_out AS cue_out,"
." st.fade_in AS fade_in,"
." st.fade_out AS fade_out,"
." si.starts as show_start,"
." si.ends as show_end"
." FROM $CC_CONFIG[scheduleTable] as st"
." LEFT JOIN $CC_CONFIG[showInstances] as si"
." ON st.instance_id = si.id"
." ORDER BY start";
Logging::log($sql);
$predicates = " WHERE st.ends > '$p_currentDateTime'"
." AND st.starts < '$p_toDateTime'"
." ORDER BY st.starts";
$sql = $baseQuery.$predicates;
$rows = $CC_DBC->GetAll($sql);
if (PEAR::isError($rows)) {
return null;
}
if (count($rows) < 3){
Logging::debug("Get Schedule: Less than 3 results returned. Doing another query since we need a minimum of 3 results.");
$dt = new DateTime("@".time());
$dt->add(new DateInterval("PT30M"));
$range_end = $dt->format("Y-m-d H:i:s");
$predicates = " WHERE st.ends > '$p_currentDateTime'"
." AND st.starts < '$range_end'"
." ORDER BY st.starts"
." LIMIT 3";
$sql = $baseQuery.$predicates;
$rows = $CC_DBC->GetAll($sql);
if (PEAR::isError($rows)) {
return null;
}
}
return $rows;
}
@ -462,7 +484,7 @@ class Application_Model_Schedule {
}
if (is_null($p_fromDateTime)) {
$t2 = new DateTime("@".time());
$t2->add(new DateInterval("PT24H"));
$t2->add(new DateInterval("PT30M"));
$range_end = $t2->format("Y-m-d H:i:s");
} else {
$range_end = Application_Model_Schedule::PypoTimeToAirtimeTime($p_toDateTime);
@ -480,8 +502,8 @@ class Application_Model_Schedule {
foreach ($items as $item){
$storedFile = Application_Model_StoredFile::Recall($item["file_id"]);
$uri = $storedFile->getFileUrlUsingConfigAddress();
$uri = $storedFile->getFilePath();
$showEndDateTime = new DateTime($item["show_end"], $utcTimeZone);
$trackEndDateTime = new DateTime($item["end"], $utcTimeZone);

View File

@ -8,7 +8,7 @@ pypo_user="pypo"
# Location of pypo_cli.py Python script
pypo_path="/usr/lib/airtime/pypo/bin/"
api_client_path="/usr/lib/airtime/"
pypo_script="pypo-cli.py"
pypo_script="pypocli.py"
cd ${pypo_path}
exec 2>&1

View File

@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
"""
Python part of radio playout (pypo)
"""
import time
from optparse import *
import sys
@ -15,6 +14,7 @@ from Queue import Queue
from pypopush import PypoPush
from pypofetch import PypoFetch
from pypofile import PypoFile
from recorder import Recorder
from pypomessagehandler import PypoMessageHandler
@ -125,11 +125,23 @@ if __name__ == '__main__':
recorder_q = Queue()
pypoPush_q = Queue()
"""
This queue is shared between pypo-fetch and pypo-file, where pypo-file
is the receiver. Pypo-fetch will send every schedule it gets to pypo-file
and pypo will parse this schedule to determine which file has the highest
priority, and will retrieve it.
"""
media_q = Queue()
pmh = PypoMessageHandler(pypoFetch_q, recorder_q)
pmh.daemon = True
pmh.start()
pf = PypoFetch(pypoFetch_q, pypoPush_q)
pfile = PypoFile(media_q)
pfile.daemon = True
pfile.start()
pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q)
pf.daemon = True
pf.start()
@ -142,8 +154,9 @@ if __name__ == '__main__':
recorder.start()
pmh.join()
pp.join()
pfile.join()
pf.join()
pp.join()
recorder.join()
logger.info("pypo fetch exit")

View File

@ -10,6 +10,7 @@ import string
import json
import telnetlib
import math
import copy
from threading import Thread
from subprocess import Popen, PIPE
from datetime import datetime
@ -36,11 +37,12 @@ except Exception, e:
sys.exit()
class PypoFetch(Thread):
def __init__(self, pypoFetch_q, pypoPush_q):
def __init__(self, pypoFetch_q, pypoPush_q, media_q):
Thread.__init__(self)
self.api_client = api_client.api_client_factory(config)
self.fetch_queue = pypoFetch_q
self.push_queue = pypoPush_q
self.media_prepare_queue = media_q
self.logger = logging.getLogger();
@ -288,11 +290,29 @@ class PypoFetch(Thread):
# Download all the media and put playlists in liquidsoap "annotate" format
try:
media = self.prepare_media(media, bootstrapping)
"""
Make sure cache_dir exists
"""
download_dir = self.cache_dir
try:
os.makedirs(download_dir)
except Exception, e:
pass
for key in media:
media_item = media[key]
fileExt = os.path.splitext(media_item['uri'])[1]
dst = os.path.join(download_dir, media_item['id']+fileExt)
media_item['dst'] = dst
self.media_prepare_queue.put(copy.copy(media))
self.prepare_media(media, bootstrapping)
except Exception, e: self.logger.error("%s", e)
# Send the data to pypo-push
self.logger.debug("Pushing to pypo-push: "+ str(media))
self.logger.debug("Pushing to pypo-push")
self.push_queue.put(media)
"""
@ -317,27 +337,10 @@ class PypoFetch(Thread):
if bootstrapping:
self.check_for_previous_crash(media_item)
# create playlist directory
try:
"""
Extract year, month, date from mkey
"""
y_m_d = mkey[0:10]
download_dir = os.path.join(self.cache_dir, y_m_d)
try:
os.makedirs(os.path.join(self.cache_dir, y_m_d))
except Exception, e:
pass
fileExt = os.path.splitext(media_item['uri'])[1]
dst = os.path.join(download_dir, media_item['id']+fileExt)
except Exception, e:
self.logger.warning(e)
if self.handle_media_file(media_item, dst):
entry = self.create_liquidsoap_annotation(media_item, dst)
media_item['show_name'] = "TODO"
media_item["annotation"] = entry
entry = self.create_liquidsoap_annotation(media_item)
media_item['show_name'] = "TODO"
media_item["annotation"] = entry
except Exception, e:
self.logger.error("%s", e)
@ -345,7 +348,7 @@ class PypoFetch(Thread):
return media
def create_liquidsoap_annotation(self, media, dst):
def create_liquidsoap_annotation(self, media):
entry = \
'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], 0, \
@ -353,7 +356,7 @@ class PypoFetch(Thread):
float(media['fade_out']) / 1000, \
float(media['cue_in']), \
float(media['cue_out']), \
media['row_id'], dst)
media['row_id'], media['dst'])
return entry
@ -397,7 +400,8 @@ class PypoFetch(Thread):
try:
#blocking function to download the media item
self.download_file(media_item, dst)
#self.download_file(media_item, dst)
self.copy_file(media_item, dst)
if os.access(dst, os.R_OK):
# check filesize (avoid zero-byte files)
@ -417,10 +421,25 @@ class PypoFetch(Thread):
return False
"""
Download a file from a remote server and store it in the cache.
"""
def copy_file(self, media_item, dst):
"""
Copy the file from local library directory.
"""
if not os.path.isfile(dst):
self.logger.debug("copying from %s to local cache %s" % (media_item['uri'], dst))
try:
shutil.copy(media_item['uri'], dst)
except:
self.logger.error("Could not copy from %s to %s" % (media_item['uri'], dst))
else:
#file already exists
pass
def download_file(self, media_item, dst):
"""
Download a file from a remote server and store it in the cache.
"""
if os.path.isfile(dst):
pass
#self.logger.debug("file already in cache: %s", dst)

View File

@ -0,0 +1,117 @@
from threading import Thread
from Queue import Empty
from configobj import ConfigObj
import logging
import logging.config
import shutil
import os
import time
# configure logging
logging.config.fileConfig("logging.cfg")
# loading config file
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
LS_HOST = config['ls_host']
LS_PORT = config['ls_port']
POLL_INTERVAL = int(config['poll_interval'])
except Exception, e:
logger = logging.getLogger()
logger.error('Error loading config file: %s', e)
sys.exit()
class PypoFile(Thread):
def __init__(self, schedule_queue):
Thread.__init__(self)
self.logger = logging.getLogger()
self.media_queue = schedule_queue
self.media = None
self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
def copy_file(self, media_item):
"""
Copy media_item from local library directory to local cache directory.
"""
if media_item is None:
return
dst = media_item['dst']
if not os.path.isfile(dst):
self.logger.debug("copying from %s to local cache %s" % (media_item['uri'], dst))
try:
shutil.copy(media_item['uri'], dst)
except:
self.logger.error("Could not copy from %s to %s" % (media_item['uri'], dst))
else:
self.logger.debug("Destination %s already exists. Not copying", dst)
def get_highest_priority_media_item(self, schedule):
"""
Get highest priority media_item in the queue. Currently the highest
priority is decided by how close the start time is to "now".
"""
if schedule is None:
return None
sorted_keys = sorted(schedule.keys())
if len(sorted_keys) == 0:
return None
highest_priority = sorted_keys[0]
media_item = schedule[highest_priority]
self.logger.debug("Highest priority item: %s" % highest_priority)
"""
Remove this media_item from the dictionary. On the next iteration
(from the main function) we won't consider it for prioritization
anymore. If on the next iteration we have received a new schedule,
it is very possible we will have to deal with the same media_items
again. In this situation, the worst possible case is that we try to
copy the file again and realize we already have it (thus aborting the copy).
"""
del schedule[highest_priority]
return media_item
def main(self):
while True:
try:
if self.media is None or len(self.media) == 0:
"""
We have no schedule, so we have nothing else to do. Let's
do a blocked wait on the queue
"""
self.media = self.media_queue.get(block=True)
else:
"""
We have a schedule we need to process, but we also want
to check if a newer schedule is available. In this case
do a non-blocking queue.get and in either case (we get something
or we don't), get back to work on preparing getting files.
"""
try:
self.media = self.media_queue.get_nowait()
except Empty, e:
pass
media_item = self.get_highest_priority_media_item(self.media)
self.copy_file(media_item)
except Exception, e:
self.logger.error(str(e))
raise
def run(self):
"""
Entry point of the thread
"""
self.main()