sintonia/python_apps/pypo/pypopush.py

277 lines
8.8 KiB
Python

# -*- coding: utf-8 -*-
from datetime import datetime
from datetime import timedelta
import sys
import time
import logging.config
import telnetlib
import calendar
import math
import traceback
import os
from pypofetch import PypoFetch
from telnetliquidsoap import TelnetLiquidsoap
from pypoliqqueue import PypoLiqQueue
from Queue import Empty, Queue
from threading import Thread
from api_clients import api_client
from std_err_override import LogWriter
from configobj import ConfigObj
# configure logging
logging_cfg = os.path.join(os.path.dirname(__file__), "logging.cfg")
logging.config.fileConfig(logging_cfg)
logger = logging.getLogger()
LogWriter.override_std_err(logger)
#need to wait for Python 2.7 for this..
#logging.captureWarnings(True)
# loading config file
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
LS_HOST = config['ls_host']
LS_PORT = config['ls_port']
PUSH_INTERVAL = 2
except Exception, e:
logger.error('Error loading config file %s', e)
sys.exit()
def is_stream(media_item):
return media_item['type'] == 'stream_output_start'
def is_file(media_item):
return media_item['type'] == 'file'
class PypoPush(Thread):
def __init__(self, q, telnet_lock):
Thread.__init__(self)
self.api_client = api_client.AirtimeApiClient()
self.queue = q
self.telnet_lock = telnet_lock
self.pushed_objects = {}
self.logger = logging.getLogger('push')
self.current_prebuffering_stream_id = None
self.queue_id = 0
self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \
self.logger,\
LS_HOST,\
LS_PORT\
)
self.liq_queue_tracker = {
"s0": None,
"s1": None,
"s2": None,
"s3": None,
}
self.future_scheduled_queue = Queue()
self.plq = PypoLiqQueue(self.future_scheduled_queue, \
telnet_lock, \
self.logger, \
self.liq_queue_tracker, \
self.telnet_liquidsoap)
self.plq.daemon = True
self.plq.start()
def main(self):
loops = 0
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
media_schedule = None
while True:
try:
media_schedule = self.queue.get(block=True)
except Exception, e:
self.logger.error(str(e))
raise
else:
#separate media_schedule list into currently_playing and
#scheduled_for_future lists
currently_playing, scheduled_for_future = \
self.separate_present_future(media_schedule)
self.verify_correct_present_media(currently_playing)
self.future_scheduled_queue.put(scheduled_for_future)
if loops % heartbeat_period == 0:
self.logger.info("heartbeat")
loops = 0
loops += 1
def separate_present_future(self, media_schedule):
tnow = datetime.utcnow()
present = []
future = {}
sorted_keys = sorted(media_schedule.keys())
for mkey in sorted_keys:
media_item = media_schedule[mkey]
diff_td = tnow - media_item['start']
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec >= 0:
present.append(media_item)
else:
future[media_item['start']] = media_item
return present, future
def verify_correct_present_media(self, scheduled_now):
#verify whether Liquidsoap is currently playing the correct files.
#if we find an item that Liquidsoap is not playing, then push it
#into one of Liquidsoap's queues. If Liquidsoap is already playing
#it do nothing. If Liquidsoap is playing a track that isn't in
#currently_playing then stop it.
#Check for Liquidsoap media we should source.skip
#get liquidsoap items for each queue. Since each queue can only have one
#item, we should have a max of 8 items.
#TODO: Verify start, end, replay_gain is the same
#TODO: Verify this is a file or webstream and also handle webstreams
schedule_ids = set()
for i in scheduled_now:
schedule_ids.add(i["row_id"])
liq_queue_ids = set()
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if not self.plq.is_media_item_finished(mi):
liq_queue_ids.add(mi["row_id"])
to_be_removed = liq_queue_ids - schedule_ids
to_be_added = schedule_ids - liq_queue_ids
if len(to_be_removed):
self.logger.info("Need to remove items from Liquidsoap: %s" % \
to_be_removed)
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi is not None and mi["row_id"] in to_be_removed:
self.telnet_liquidsoap.queue_remove(i)
self.liq_queue_tracker[i] = None
if len(to_be_added):
self.logger.info("Need to add items to Liquidsoap *now*: %s" % \
to_be_added)
for i in scheduled_now:
if i["row_id"] in to_be_added:
self.modify_cue_point(i)
queue_id = self.plq.find_available_queue()
self.telnet_liquidsoap.queue_push(queue_id, i)
self.liq_queue_tracker[queue_id] = i
def get_current_stream_id_from_liquidsoap(self):
response = "-1"
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = 'dynamic_source.get_id\n'
tn.write(msg)
response = tn.read_until("\r\n").strip(" \r\n")
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error("Error connecting to Liquidsoap: %s", e)
response = []
finally:
self.telnet_lock.release()
return response
#def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id):
#correct = False
#if media_item is None:
#correct = (len(liquidsoap_queue_approx) == 0 and liquidsoap_stream_id == "-1")
#else:
#if is_file(media_item):
#if len(liquidsoap_queue_approx) == 0:
#correct = False
#else:
#correct = liquidsoap_queue_approx[0]['start'] == media_item['start'] and \
#liquidsoap_queue_approx[0]['row_id'] == media_item['row_id'] and \
#liquidsoap_queue_approx[0]['end'] == media_item['end'] and \
#liquidsoap_queue_approx[0]['replay_gain'] == media_item['replay_gain']
#elif is_stream(media_item):
#correct = liquidsoap_stream_id == str(media_item['row_id'])
#self.logger.debug("Is current item correct?: %s", str(correct))
#return correct
def modify_cue_point(self, link):
tnow = datetime.utcnow()
link_start = link['start']
diff_td = tnow - link_start
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec > 0:
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (interval.microseconds + \
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
return seconds
def stop_web_stream_all(self):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
#msg = 'dynamic_source.read_stop_all xxx\n'
msg = 'http.stop\n'
self.logger.debug(msg)
tn.write(msg)
msg = 'dynamic_source.output_stop\n'
self.logger.debug(msg)
tn.write(msg)
msg = 'dynamic_source.id -1\n'
self.logger.debug(msg)
tn.write(msg)
tn.write("exit\n")
self.logger.debug(tn.read_all())
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
def run(self):
try: self.main()
except Exception, e:
top = traceback.format_exc()
self.logger.error('Pypo Push Exception: %s', top)