* Fixed a race condition in pypo which could kill playout. Input kick events and non-file/webstream events could result in an exception being thrown. Fixed the logic error behind this, caught the exception just in case, and added some code to prevent uncaught pypopush exceptions from terminating the thread. * Could prevent playout breakage in similar situations
162 lines
4.4 KiB
Python
162 lines
4.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
from configobj import ConfigObj
|
|
|
|
import sys
|
|
import time
|
|
import logging.config
|
|
import telnetlib
|
|
import calendar
|
|
import math
|
|
import traceback
|
|
import os
|
|
|
|
from pypofetch import PypoFetch
|
|
from pypoliqqueue import PypoLiqQueue
|
|
|
|
from Queue import Empty, Queue
|
|
|
|
from threading import Thread
|
|
|
|
from api_clients import api_client
|
|
from std_err_override import LogWriter
|
|
from timeout import ls_timeout
|
|
|
|
|
|
# configure logging
|
|
logging_cfg = os.path.join(os.path.dirname(__file__), "logging.cfg")
|
|
logging.config.fileConfig(logging_cfg)
|
|
logger = logging.getLogger()
|
|
LogWriter.override_std_err(logger)
|
|
|
|
#need to wait for Python 2.7 for this..
|
|
#logging.captureWarnings(True)
|
|
|
|
PUSH_INTERVAL = 2
|
|
|
|
|
|
def is_stream(media_item):
|
|
return media_item['type'] == 'stream_output_start'
|
|
|
|
def is_file(media_item):
|
|
return media_item['type'] == 'file'
|
|
|
|
class PypoPush(Thread):
|
|
def __init__(self, q, telnet_lock, pypo_liquidsoap, config):
|
|
Thread.__init__(self)
|
|
self.api_client = api_client.AirtimeApiClient()
|
|
self.queue = q
|
|
|
|
self.telnet_lock = telnet_lock
|
|
self.config = config
|
|
|
|
self.pushed_objects = {}
|
|
self.logger = logging.getLogger('push')
|
|
self.current_prebuffering_stream_id = None
|
|
self.queue_id = 0
|
|
|
|
self.future_scheduled_queue = Queue()
|
|
self.pypo_liquidsoap = pypo_liquidsoap
|
|
|
|
self.plq = PypoLiqQueue(self.future_scheduled_queue, \
|
|
self.pypo_liquidsoap, \
|
|
self.logger)
|
|
self.plq.daemon = True
|
|
self.plq.start()
|
|
|
|
|
|
def main(self):
|
|
loops = 0
|
|
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
|
|
|
|
media_schedule = None
|
|
|
|
while True:
|
|
try:
|
|
media_schedule = self.queue.get(block=True)
|
|
except Exception, e:
|
|
self.logger.error(str(e))
|
|
raise
|
|
else:
|
|
self.logger.debug(media_schedule)
|
|
#separate media_schedule list into currently_playing and
|
|
#scheduled_for_future lists
|
|
currently_playing, scheduled_for_future = \
|
|
self.separate_present_future(media_schedule)
|
|
|
|
self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
|
|
self.future_scheduled_queue.put(scheduled_for_future)
|
|
|
|
if loops % heartbeat_period == 0:
|
|
self.logger.info("heartbeat")
|
|
loops = 0
|
|
loops += 1
|
|
|
|
|
|
def separate_present_future(self, media_schedule):
|
|
tnow = datetime.utcnow()
|
|
|
|
present = []
|
|
future = {}
|
|
|
|
sorted_keys = sorted(media_schedule.keys())
|
|
for mkey in sorted_keys:
|
|
media_item = media_schedule[mkey]
|
|
|
|
diff_td = tnow - media_item['start']
|
|
diff_sec = self.date_interval_to_seconds(diff_td)
|
|
|
|
if diff_sec >= 0:
|
|
present.append(media_item)
|
|
else:
|
|
future[mkey] = media_item
|
|
|
|
return present, future
|
|
|
|
def date_interval_to_seconds(self, interval):
|
|
"""
|
|
Convert timedelta object into int representing the number of seconds. If
|
|
number of seconds is less than 0, then return 0.
|
|
"""
|
|
seconds = (interval.microseconds + \
|
|
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
|
|
|
return seconds
|
|
|
|
@ls_timeout
|
|
def stop_web_stream_all(self):
|
|
try:
|
|
self.telnet_lock.acquire()
|
|
tn = telnetlib.Telnet(self.config['LS_HOST'], self.config['LS_PORT'])
|
|
|
|
#msg = 'dynamic_source.read_stop_all xxx\n'
|
|
msg = 'http.stop\n'
|
|
self.logger.debug(msg)
|
|
tn.write(msg)
|
|
|
|
msg = 'dynamic_source.output_stop\n'
|
|
self.logger.debug(msg)
|
|
tn.write(msg)
|
|
|
|
msg = 'dynamic_source.id -1\n'
|
|
self.logger.debug(msg)
|
|
tn.write(msg)
|
|
|
|
tn.write("exit\n")
|
|
self.logger.debug(tn.read_all())
|
|
|
|
except Exception, e:
|
|
self.logger.error(str(e))
|
|
finally:
|
|
self.telnet_lock.release()
|
|
|
|
def run(self):
|
|
while True:
|
|
try: self.main()
|
|
except Exception, e:
|
|
top = traceback.format_exc()
|
|
self.logger.error('Pypo Push Exception: %s', top)
|
|
time.sleep(5)
|
|
|