Merge branch '2.4.x'
Conflicts: python_apps/pypo/pypopush.py python_apps/pypo/schedule/pypofetch.py python_apps/pypo/schedule/pypofile.py python_apps/pypo/schedule/pypoliqqueue.py
This commit is contained in:
commit
586bdf99e9
71 changed files with 5253 additions and 4614 deletions
|
@ -18,7 +18,6 @@ PIDFILE=/var/run/airtime-liquidsoap.pid
|
|||
EXEC='/usr/bin/airtime-liquidsoap'
|
||||
|
||||
start () {
|
||||
|
||||
mkdir -p /var/log/airtime/pypo-liquidsoap
|
||||
chown $USERID:$GROUPID /var/log/airtime/pypo-liquidsoap
|
||||
|
||||
|
@ -36,9 +35,19 @@ start () {
|
|||
}
|
||||
|
||||
stop () {
|
||||
timeout --version >/dev/null 2>&1
|
||||
RESULT="$?"
|
||||
|
||||
#send term signal after 10 seconds
|
||||
timeout -s9 10s /usr/lib/airtime/airtime_virtualenv/bin/python \
|
||||
/usr/lib/airtime/pypo/bin/liquidsoap_scripts/liquidsoap_prepare_terminate.py
|
||||
if [ "$RESULT" = "0" ]; then
|
||||
timeout -s9 10s /usr/lib/airtime/airtime_virtualenv/bin/python \
|
||||
/usr/lib/airtime/pypo/bin/liquidsoap_scripts/liquidsoap_prepare_terminate.py
|
||||
else
|
||||
#some earlier versions of Ubuntu (Lucid) had a different timeout
|
||||
#command that takes different input parameters.
|
||||
timeout 10 /usr/lib/airtime/airtime_virtualenv/bin/python \
|
||||
/usr/lib/airtime/pypo/bin/liquidsoap_scripts/liquidsoap_prepare_terminate.py
|
||||
fi
|
||||
# Send TERM after 5 seconds, wait at most 30 seconds.
|
||||
#start-stop-daemon --stop --oknodo --retry=TERM/10/KILL/5 --quiet --pidfile $PIDFILE
|
||||
start-stop-daemon --stop --oknodo --retry=TERM/10/KILL/5 --quiet --exec $EXEC
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
set httpd port 2812
|
||||
|
||||
check process airtime-liquidsoap matching "airtime-liquidsoap.*airtime.*ls_script"
|
||||
if does not exist for 3 cycles then restart
|
||||
|
||||
start program = "/etc/init.d/airtime-liquidsoap start" with timeout 30 seconds
|
||||
stop program = "/etc/init.d/airtime-liquidsoap stop"
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
set httpd port 2812
|
||||
|
||||
check process airtime-liquidsoap
|
||||
with pidfile "/var/run/airtime-liquidsoap.pid"
|
||||
check process airtime-liquidsoap with pidfile "/var/run/airtime-liquidsoap.pid"
|
||||
if does not exist for 3 cycles then restart
|
||||
start program = "/etc/init.d/airtime-liquidsoap start" with timeout 5 seconds
|
||||
stop program = "/etc/init.d/airtime-liquidsoap stop"
|
||||
|
|
196
python_apps/pypo/pypopush.py
Normal file
196
python_apps/pypo/pypopush.py
Normal file
|
@ -0,0 +1,196 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
|
||||
import sys
|
||||
import time
|
||||
import logging.config
|
||||
import telnetlib
|
||||
import calendar
|
||||
import math
|
||||
import traceback
|
||||
import os
|
||||
|
||||
from pypofetch import PypoFetch
|
||||
from pypoliqqueue import PypoLiqQueue
|
||||
|
||||
from Queue import Empty, Queue
|
||||
|
||||
from threading import Thread
|
||||
|
||||
from api_clients import api_client
|
||||
from std_err_override import LogWriter
|
||||
from configobj import ConfigObj
|
||||
|
||||
|
||||
# configure logging
|
||||
logging_cfg = os.path.join(os.path.dirname(__file__), "logging.cfg")
|
||||
logging.config.fileConfig(logging_cfg)
|
||||
logger = logging.getLogger()
|
||||
LogWriter.override_std_err(logger)
|
||||
|
||||
#need to wait for Python 2.7 for this..
|
||||
#logging.captureWarnings(True)
|
||||
|
||||
PUSH_INTERVAL = 2
|
||||
|
||||
|
||||
def is_stream(media_item):
|
||||
return media_item['type'] == 'stream_output_start'
|
||||
|
||||
def is_file(media_item):
|
||||
return media_item['type'] == 'file'
|
||||
|
||||
class PypoPush(Thread):
|
||||
def __init__(self, q, telnet_lock, pypo_liquidsoap, config):
|
||||
Thread.__init__(self)
|
||||
self.api_client = api_client.AirtimeApiClient()
|
||||
self.queue = q
|
||||
|
||||
self.telnet_lock = telnet_lock
|
||||
self.config = config
|
||||
|
||||
self.pushed_objects = {}
|
||||
self.logger = logging.getLogger('push')
|
||||
self.current_prebuffering_stream_id = None
|
||||
self.queue_id = 0
|
||||
|
||||
self.future_scheduled_queue = Queue()
|
||||
self.pypo_liquidsoap = pypo_liquidsoap
|
||||
|
||||
self.plq = PypoLiqQueue(self.future_scheduled_queue, \
|
||||
self.pypo_liquidsoap, \
|
||||
self.logger)
|
||||
self.plq.daemon = True
|
||||
self.plq.start()
|
||||
|
||||
|
||||
def main(self):
|
||||
loops = 0
|
||||
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
|
||||
|
||||
media_schedule = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
media_schedule = self.queue.get(block=True)
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
raise
|
||||
else:
|
||||
self.logger.debug(media_schedule)
|
||||
#separate media_schedule list into currently_playing and
|
||||
#scheduled_for_future lists
|
||||
currently_playing, scheduled_for_future = \
|
||||
self.separate_present_future(media_schedule)
|
||||
|
||||
self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
|
||||
self.future_scheduled_queue.put(scheduled_for_future)
|
||||
|
||||
if loops % heartbeat_period == 0:
|
||||
self.logger.info("heartbeat")
|
||||
loops = 0
|
||||
loops += 1
|
||||
|
||||
|
||||
def separate_present_future(self, media_schedule):
|
||||
tnow = datetime.utcnow()
|
||||
|
||||
present = []
|
||||
future = {}
|
||||
|
||||
sorted_keys = sorted(media_schedule.keys())
|
||||
for mkey in sorted_keys:
|
||||
media_item = media_schedule[mkey]
|
||||
|
||||
diff_td = tnow - media_item['start']
|
||||
diff_sec = self.date_interval_to_seconds(diff_td)
|
||||
|
||||
if diff_sec >= 0:
|
||||
present.append(media_item)
|
||||
else:
|
||||
future[mkey] = media_item
|
||||
|
||||
return present, future
|
||||
|
||||
def get_current_stream_id_from_liquidsoap(self):
|
||||
response = "-1"
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
|
||||
|
||||
msg = 'dynamic_source.get_id\n'
|
||||
tn.write(msg)
|
||||
response = tn.read_until("\r\n").strip(" \r\n")
|
||||
tn.write('exit\n')
|
||||
tn.read_all()
|
||||
except Exception, e:
|
||||
self.logger.error("Error connecting to Liquidsoap: %s", e)
|
||||
response = []
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
return response
|
||||
|
||||
#def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id):
|
||||
#correct = False
|
||||
#if media_item is None:
|
||||
#correct = (len(liquidsoap_queue_approx) == 0 and liquidsoap_stream_id == "-1")
|
||||
#else:
|
||||
#if is_file(media_item):
|
||||
#if len(liquidsoap_queue_approx) == 0:
|
||||
#correct = False
|
||||
#else:
|
||||
#correct = liquidsoap_queue_approx[0]['start'] == media_item['start'] and \
|
||||
#liquidsoap_queue_approx[0]['row_id'] == media_item['row_id'] and \
|
||||
#liquidsoap_queue_approx[0]['end'] == media_item['end'] and \
|
||||
#liquidsoap_queue_approx[0]['replay_gain'] == media_item['replay_gain']
|
||||
#elif is_stream(media_item):
|
||||
#correct = liquidsoap_stream_id == str(media_item['row_id'])
|
||||
|
||||
#self.logger.debug("Is current item correct?: %s", str(correct))
|
||||
#return correct
|
||||
|
||||
def date_interval_to_seconds(self, interval):
|
||||
"""
|
||||
Convert timedelta object into int representing the number of seconds. If
|
||||
number of seconds is less than 0, then return 0.
|
||||
"""
|
||||
seconds = (interval.microseconds + \
|
||||
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
||||
|
||||
return seconds
|
||||
|
||||
def stop_web_stream_all(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.config['LS_HOST'], self.config['LS_PORT'])
|
||||
|
||||
#msg = 'dynamic_source.read_stop_all xxx\n'
|
||||
msg = 'http.stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.output_stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.id -1\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def run(self):
|
||||
try: self.main()
|
||||
except Exception, e:
|
||||
top = traceback.format_exc()
|
||||
self.logger.error('Pypo Push Exception: %s', top)
|
||||
|
|
@ -378,11 +378,11 @@ class PypoFetch(Thread):
|
|||
media_item['file_ready'] = False
|
||||
media_filtered[key] = media_item
|
||||
|
||||
media_item['start'] = datetime.strptime(media_item['start'],
|
||||
media_item['start'] = datetime.strptime(media_item['start'],
|
||||
"%Y-%m-%d-%H-%M-%S")
|
||||
media_item['end'] = datetime.strptime(media_item['end'],
|
||||
"%Y-%m-%d-%H-%M-%S")
|
||||
media_copy[media_item['start']] = media_item
|
||||
media_copy[key] = media_item
|
||||
|
||||
self.media_prepare_queue.put(copy.copy(media_filtered))
|
||||
except Exception, e: self.logger.error("%s", e)
|
||||
|
|
|
@ -76,14 +76,6 @@ class PypoFile(Thread):
|
|||
self.logger.debug("copying from %s to local cache %s" % (src, dst))
|
||||
try:
|
||||
|
||||
"""
|
||||
List file as "ready" before it starts copying because by the
|
||||
time Liquidsoap is ready to play this file, it should have at
|
||||
least started copying (and can continue copying while
|
||||
Liquidsoap reads from the beginning of the file)
|
||||
"""
|
||||
media_item['file_ready'] = True
|
||||
|
||||
"""
|
||||
copy will overwrite dst if it already exists
|
||||
"""
|
||||
|
@ -91,6 +83,8 @@ class PypoFile(Thread):
|
|||
|
||||
#make file world readable
|
||||
os.chmod(dst, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
|
||||
|
||||
media_item['file_ready'] = True
|
||||
except Exception, e:
|
||||
self.logger.error("Could not copy from %s to %s" % (src, dst))
|
||||
self.logger.error(e)
|
||||
|
|
|
@ -74,8 +74,9 @@ class PypoLiqQueue(Thread):
|
|||
schedule_deque.append(media_schedule[i])
|
||||
|
||||
if len(keys):
|
||||
time_until_next_play = pure.date_interval_to_seconds(\
|
||||
keys[0] - datetime.utcnow())
|
||||
time_until_next_play = self.date_interval_to_seconds(
|
||||
media_schedule[keys[0]]['start'] -
|
||||
datetime.utcnow())
|
||||
else:
|
||||
time_until_next_play = None
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue