2011-03-03 06:22:28 +01:00
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
import time
|
|
|
|
import logging
|
|
|
|
import logging.config
|
2011-05-31 22:05:48 +02:00
|
|
|
import logging.handlers
|
2011-03-03 06:22:28 +01:00
|
|
|
import pickle
|
|
|
|
import telnetlib
|
|
|
|
import calendar
|
|
|
|
import json
|
2011-03-21 00:34:43 +01:00
|
|
|
import math
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-03-06 02:47:14 +01:00
|
|
|
"""
|
|
|
|
It is possible to use a list as a queue, where the first element added is the first element
|
|
|
|
retrieved ("first-in, first-out"); however, lists are not efficient for this purpose. Let's use
|
|
|
|
"deque"
|
|
|
|
"""
|
|
|
|
from collections import deque
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-03-06 02:47:14 +01:00
|
|
|
from threading import Thread
|
|
|
|
from api_clients import api_client
|
2011-03-03 06:22:28 +01:00
|
|
|
from configobj import ConfigObj
|
|
|
|
|
2011-05-31 22:05:48 +02:00
|
|
|
|
2011-03-21 00:34:43 +01:00
|
|
|
# configure logging
|
|
|
|
logging.config.fileConfig("logging.cfg")
|
|
|
|
|
2011-03-03 06:22:28 +01:00
|
|
|
# loading config file
|
|
|
|
try:
|
2011-03-30 00:32:53 +02:00
|
|
|
config = ConfigObj('/etc/airtime/pypo.cfg')
|
2011-03-03 06:22:28 +01:00
|
|
|
LS_HOST = config['ls_host']
|
|
|
|
LS_PORT = config['ls_port']
|
2011-03-21 00:34:43 +01:00
|
|
|
PUSH_INTERVAL = 2
|
2012-03-06 02:47:14 +01:00
|
|
|
MAX_LIQUIDSOAP_QUEUE_LENGTH = 2
|
2011-03-03 06:22:28 +01:00
|
|
|
except Exception, e:
|
2011-06-01 18:32:42 +02:00
|
|
|
logger = logging.getLogger()
|
2011-03-21 00:34:43 +01:00
|
|
|
logger.error('Error loading config file %s', e)
|
2011-03-03 06:22:28 +01:00
|
|
|
sys.exit()
|
|
|
|
|
2011-03-21 00:34:43 +01:00
|
|
|
class PypoPush(Thread):
|
|
|
|
def __init__(self, q):
|
|
|
|
Thread.__init__(self)
|
2011-03-03 06:22:28 +01:00
|
|
|
self.api_client = api_client.api_client_factory(config)
|
2011-03-21 00:34:43 +01:00
|
|
|
self.queue = q
|
|
|
|
|
2012-02-27 19:52:35 +01:00
|
|
|
self.media = dict()
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-01-09 23:46:42 +01:00
|
|
|
self.liquidsoap_state_play = True
|
2012-03-01 03:27:42 +01:00
|
|
|
self.push_ahead = 10
|
2012-02-29 04:33:19 +01:00
|
|
|
self.last_end_time = 0
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
self.pushed_objects = {}
|
|
|
|
|
2012-03-01 03:27:42 +01:00
|
|
|
self.logger = logging.getLogger('push')
|
|
|
|
|
2012-02-23 02:41:24 +01:00
|
|
|
def push(self):
|
2012-02-28 19:58:10 +01:00
|
|
|
"""
|
|
|
|
The Push Loop - the push loop periodically checks if there is a playlist
|
|
|
|
that should be scheduled at the current time.
|
|
|
|
If yes, the current liquidsoap playlist gets replaced with the corresponding one,
|
|
|
|
then liquidsoap is asked (via telnet) to reload and immediately play it.
|
|
|
|
"""
|
2012-03-06 02:47:14 +01:00
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-02-11 00:43:40 +01:00
|
|
|
timenow = time.time()
|
2011-03-23 06:09:27 +01:00
|
|
|
# get a new schedule from pypo-fetch
|
2011-03-21 00:34:43 +01:00
|
|
|
if not self.queue.empty():
|
2012-01-18 23:52:09 +01:00
|
|
|
# make sure we get the latest schedule
|
|
|
|
while not self.queue.empty():
|
2012-02-27 19:52:35 +01:00
|
|
|
self.media = self.queue.get()
|
2012-03-06 02:47:14 +01:00
|
|
|
|
2012-03-01 03:27:42 +01:00
|
|
|
self.logger.debug("Received data from pypo-fetch")
|
|
|
|
self.logger.debug('media %s' % json.dumps(self.media))
|
2012-03-13 20:24:24 +01:00
|
|
|
self.handle_new_media(self.media, liquidsoap_queue_approx)
|
2012-03-06 02:47:14 +01:00
|
|
|
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-02-27 19:52:35 +01:00
|
|
|
media = self.media
|
2011-03-10 22:41:41 +01:00
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
if len(liquidsoap_queue_approx) < MAX_LIQUIDSOAP_QUEUE_LENGTH:
|
2012-03-06 02:47:14 +01:00
|
|
|
currently_on_air = False
|
|
|
|
if media:
|
|
|
|
tnow = time.gmtime(timenow)
|
|
|
|
tcoming = time.gmtime(timenow + self.push_ahead)
|
|
|
|
str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5])
|
|
|
|
str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5])
|
|
|
|
|
|
|
|
for key in media.keys():
|
|
|
|
media_item = media[key]
|
|
|
|
item_start = media_item['start'][0:19]
|
|
|
|
|
|
|
|
if str_tnow_s <= item_start and item_start < str_tcoming_s:
|
2012-03-05 22:09:17 +01:00
|
|
|
"""
|
2012-03-06 02:47:14 +01:00
|
|
|
If the media item starts in the next 30 seconds, push it to the queue.
|
2012-03-05 22:09:17 +01:00
|
|
|
"""
|
2012-03-06 02:47:14 +01:00
|
|
|
self.logger.debug('Preparing to push media item scheduled at: %s', key)
|
|
|
|
|
|
|
|
if self.push_to_liquidsoap(media_item):
|
|
|
|
self.logger.debug("Pushed to liquidsoap, updating 'played' status.")
|
|
|
|
|
|
|
|
"""
|
|
|
|
Temporary solution to make sure we don't push the same track multiple times.
|
|
|
|
"""
|
|
|
|
del media[key]
|
|
|
|
|
|
|
|
currently_on_air = True
|
|
|
|
self.liquidsoap_state_play = True
|
2012-02-28 17:06:31 +01:00
|
|
|
|
2012-02-27 19:52:35 +01:00
|
|
|
def push_to_liquidsoap(self, media_item):
|
2012-02-28 19:58:10 +01:00
|
|
|
"""
|
|
|
|
This function looks at the media item, and either pushes it to the Liquidsoap
|
|
|
|
queue immediately, or if the queue is empty - waits until the start time of the
|
|
|
|
media item before pushing it.
|
2012-02-29 01:22:13 +01:00
|
|
|
"""
|
2011-03-03 06:22:28 +01:00
|
|
|
try:
|
2012-02-29 04:33:19 +01:00
|
|
|
if media_item["start"] == self.last_end_time:
|
2012-02-28 17:06:31 +01:00
|
|
|
"""
|
|
|
|
this media item is attached to the end of the last
|
|
|
|
track, so let's push it now so that Liquidsoap can start playing
|
|
|
|
it immediately after (and prepare crossfades if need be).
|
|
|
|
"""
|
2012-03-09 11:28:43 +01:00
|
|
|
self.logger.debug("Push track immediately.")
|
2012-03-05 22:01:58 +01:00
|
|
|
self.telnet_to_liquidsoap(media_item)
|
2012-02-28 17:06:31 +01:00
|
|
|
self.last_end_time = media_item["end"]
|
|
|
|
else:
|
|
|
|
"""
|
|
|
|
this media item does not start right after a current playing track.
|
|
|
|
We need to sleep, and then wake up when this track starts.
|
|
|
|
"""
|
2012-03-09 11:28:43 +01:00
|
|
|
self.logger.debug("sleep until track start.")
|
2012-02-29 04:33:19 +01:00
|
|
|
self.sleep_until_start(media_item)
|
2012-02-28 17:06:31 +01:00
|
|
|
|
2012-02-29 04:33:19 +01:00
|
|
|
self.telnet_to_liquidsoap(media_item)
|
2012-02-28 17:06:31 +01:00
|
|
|
self.last_end_time = media_item["end"]
|
|
|
|
except Exception, e:
|
2012-03-09 11:28:43 +01:00
|
|
|
self.logger.error('Pypo Push Exception: %s', e)
|
2012-02-28 17:06:31 +01:00
|
|
|
return False
|
2011-09-20 20:31:09 +02:00
|
|
|
|
2012-02-28 17:06:31 +01:00
|
|
|
return True
|
2012-03-06 02:47:14 +01:00
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
"""
|
2012-03-06 02:47:14 +01:00
|
|
|
def update_liquidsoap_queue(self):
|
2012-03-13 20:24:24 +01:00
|
|
|
# the queue variable liquidsoap_queue is our attempt to mirror
|
|
|
|
# what liquidsoap actually has in its own queue. Liquidsoap automatically
|
|
|
|
# updates its own queue when an item finishes playing, we have to do this
|
|
|
|
# manually.
|
|
|
|
#
|
|
|
|
# This function will iterate through the liquidsoap_queue and remove items
|
|
|
|
# whose end time are in the past.
|
2012-03-06 02:47:14 +01:00
|
|
|
|
|
|
|
tnow = time.gmtime(timenow)
|
|
|
|
str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5])
|
|
|
|
|
|
|
|
while len(self.liquidsoap_queue) > 0:
|
|
|
|
if self.liquidsoap_queue[0]["end"] < str_tnow_s:
|
|
|
|
self.liquidsoap_queue.popleft()
|
2012-03-13 20:24:24 +01:00
|
|
|
"""
|
2012-03-06 02:47:14 +01:00
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
def get_queue_items_from_liquidsoap(self):
|
2012-03-06 02:47:14 +01:00
|
|
|
"""
|
2012-03-13 20:24:24 +01:00
|
|
|
This function connects to Liquidsoap to find what media items are in its queue.
|
2012-03-06 02:47:14 +01:00
|
|
|
"""
|
2012-03-10 20:13:38 +01:00
|
|
|
|
|
|
|
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
msg = 'queue.queue\n'
|
2012-03-10 20:13:38 +01:00
|
|
|
tn.write(msg)
|
2012-03-13 20:24:24 +01:00
|
|
|
response = tn.read_until("\r\n").strip(" \r\n")
|
2012-03-10 20:13:38 +01:00
|
|
|
tn.write('exit\n')
|
|
|
|
tn.read_all()
|
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
liquidsoap_queue_approx = []
|
2012-03-10 20:13:38 +01:00
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
if len(response) > 0:
|
|
|
|
items_in_queue = response.split(" ")
|
|
|
|
|
|
|
|
self.logger.debug("items_in_queue: %s", items_in_queue)
|
|
|
|
|
|
|
|
for item in items_in_queue:
|
|
|
|
if item in self.pushed_objects:
|
|
|
|
liquidsoap_queue_approx.append(self.pushed_objects[item])
|
|
|
|
else:
|
|
|
|
self.logger.error("ID exists in liquidsoap queue that does not exist in our pushed_objects queue: " + item)
|
|
|
|
|
|
|
|
return liquidsoap_queue_approx
|
|
|
|
|
|
|
|
|
|
|
|
def handle_new_media(self, media, liquidsoap_queue_approx):
|
|
|
|
"""
|
|
|
|
This function's purpose is to gracefully handle situations where
|
|
|
|
Liquidsoap already has a track in its queue, but the schedule
|
|
|
|
has changed. If the schedule has changed, this function's job is to
|
|
|
|
call other functions that will connect to Liquidsoap and alter its
|
|
|
|
queue.
|
|
|
|
"""
|
|
|
|
|
2012-03-10 20:13:38 +01:00
|
|
|
#TODO: Keys should already be sorted. Verify this.
|
2012-03-13 20:24:24 +01:00
|
|
|
sorted_keys = sorted(media.keys())
|
2012-03-10 20:13:38 +01:00
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
if len(liquidsoap_queue_approx) == 0:
|
2012-03-10 20:13:38 +01:00
|
|
|
"""
|
|
|
|
liquidsoap doesn't have anything in its queue, so we have nothing
|
2012-03-13 20:24:24 +01:00
|
|
|
to worry about. Life is good.
|
2012-03-10 20:13:38 +01:00
|
|
|
"""
|
|
|
|
pass
|
2012-03-13 20:24:24 +01:00
|
|
|
elif len(liquidsoap_queue_approx) == 1:
|
|
|
|
queue_item_0_start = liquidsoap_queue_approx[0]['start']
|
|
|
|
try:
|
|
|
|
if liquidsoap_queue_approx[0]['id'] != media[queue_item_0_start]['id']:
|
|
|
|
"""
|
|
|
|
liquidsoap's queue does not match the schedule we just received from the Airtime server.
|
|
|
|
The queue is only of length 1 which means the item in the queue is playing.
|
|
|
|
Need to do source.skip.
|
|
|
|
|
|
|
|
Since only one item, we don't have to worry about the current item ending and us calling
|
|
|
|
source.skip unintentionally on the next item (there is no next item).
|
|
|
|
"""
|
|
|
|
|
|
|
|
self.logger.debug("%s from ls does not exist in queue new schedule. Removing" % liquidsoap_queue_approx[0]['id'], media)
|
|
|
|
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
|
|
|
|
except KeyError, k:
|
|
|
|
self.logger.debug("%s from ls does not exist in queue schedule: %s Removing" % (queue_item_0_start, media))
|
|
|
|
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
|
|
|
|
|
|
|
|
|
|
|
|
elif len(liquidsoap_queue_approx) == 2:
|
|
|
|
queue_item_0_start = liquidsoap_queue_approx[0]['start']
|
|
|
|
queue_item_1_start = liquidsoap_queue_approx[1]['start']
|
|
|
|
|
|
|
|
if queue_item_1_start in media.keys():
|
|
|
|
if liquidsoap_queue_approx[1]['id'] != media[queue_item_1_start]['id']:
|
|
|
|
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[1])
|
|
|
|
else:
|
|
|
|
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[1])
|
2012-03-06 02:47:14 +01:00
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
if queue_item_0_start in media.keys():
|
|
|
|
if liquidsoap_queue_approx[0]['id'] != media[queue_item_0_start]['id']:
|
|
|
|
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
|
|
|
|
else:
|
|
|
|
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
|
2012-03-10 20:13:38 +01:00
|
|
|
|
2012-03-13 20:24:24 +01:00
|
|
|
def remove_from_liquidsoap_queue(self, media_item, do_only_source_skip=False):
|
2012-03-10 20:13:38 +01:00
|
|
|
if 'queue_id' in media_item:
|
|
|
|
queue_id = media_item['queue_id']
|
|
|
|
|
|
|
|
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
|
|
|
msg = "queue.remove %s\n" % queue_id
|
|
|
|
tn.write(msg)
|
|
|
|
response = tn.read_until("\r\n").strip("\r\n")
|
|
|
|
|
|
|
|
if "No such request in my queue" in response:
|
|
|
|
"""
|
|
|
|
Cannot remove because Liquidsoap started playing the item. Need
|
|
|
|
to use source.skip instead
|
|
|
|
"""
|
|
|
|
msg = "source.skip"
|
|
|
|
tn.write("source.skip")
|
|
|
|
|
|
|
|
tn.write("exit\n")
|
|
|
|
tn.read_all()
|
|
|
|
else:
|
|
|
|
self.logger.error("'queue_id' key doesn't exist in media_item dict()")
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-02-29 04:33:19 +01:00
|
|
|
def sleep_until_start(self, media_item):
|
2012-02-28 19:58:10 +01:00
|
|
|
"""
|
|
|
|
The purpose of this function is to look at the difference between
|
|
|
|
"now" and when the media_item starts, and sleep for that period of time.
|
|
|
|
After waking from sleep, this function returns.
|
|
|
|
"""
|
|
|
|
|
2012-02-28 17:06:31 +01:00
|
|
|
mi_start = media_item['start'][0:19]
|
|
|
|
|
|
|
|
#strptime returns struct_time in local time
|
|
|
|
epoch_start = calendar.timegm(time.strptime(mi_start, '%Y-%m-%d-%H-%M-%S'))
|
|
|
|
|
|
|
|
#Return the time as a floating point number expressed in seconds since the epoch, in UTC.
|
|
|
|
epoch_now = time.time()
|
|
|
|
|
2012-03-01 03:27:42 +01:00
|
|
|
self.logger.debug("Epoch start: %s" % epoch_start)
|
|
|
|
self.logger.debug("Epoch now: %s" % epoch_now)
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-02-28 17:06:31 +01:00
|
|
|
sleep_time = epoch_start - epoch_now
|
2011-09-19 01:32:39 +02:00
|
|
|
|
2012-02-28 17:06:31 +01:00
|
|
|
if sleep_time < 0:
|
|
|
|
sleep_time = 0
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-03-01 03:27:42 +01:00
|
|
|
self.logger.debug('sleeping for %s s' % (sleep_time))
|
2012-02-28 17:06:31 +01:00
|
|
|
time.sleep(sleep_time)
|
2011-03-03 06:22:28 +01:00
|
|
|
|
2012-02-29 04:33:19 +01:00
|
|
|
def telnet_to_liquidsoap(self, media_item):
|
2012-02-28 19:58:10 +01:00
|
|
|
"""
|
|
|
|
telnets to liquidsoap and pushes the media_item to its queue. Push the
|
|
|
|
show name of every media_item as well, just to keep Liquidsoap up-to-date
|
|
|
|
about which show is playing.
|
|
|
|
"""
|
|
|
|
|
2012-02-28 17:06:31 +01:00
|
|
|
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
|
|
|
|
|
|
|
#tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8'))
|
|
|
|
|
|
|
|
annotation = media_item['annotation']
|
2012-03-01 03:27:42 +01:00
|
|
|
msg = 'queue.push %s\n' % annotation.encode('utf-8')
|
|
|
|
self.logger.debug(msg)
|
2012-03-06 02:47:14 +01:00
|
|
|
tn.write(msg)
|
|
|
|
queue_id = tn.read_until("\r\n").strip("\r\n")
|
|
|
|
|
2012-03-10 20:13:38 +01:00
|
|
|
#remember the media_item's queue id which we may use
|
|
|
|
#later if we need to remove it from the queue.
|
2012-03-06 02:47:14 +01:00
|
|
|
media_item['queue_id'] = queue_id
|
|
|
|
|
|
|
|
#add media_item to the end of our queue
|
2012-03-10 20:13:38 +01:00
|
|
|
self.pushed_objects[queue_id] = media_item
|
2012-02-28 17:06:31 +01:00
|
|
|
|
|
|
|
show_name = media_item['show_name']
|
2012-03-01 03:27:42 +01:00
|
|
|
msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
|
|
|
|
tn.write(msg)
|
|
|
|
self.logger.debug(msg)
|
2012-02-28 17:06:31 +01:00
|
|
|
|
|
|
|
tn.write("exit\n")
|
2012-03-01 03:27:42 +01:00
|
|
|
self.logger.debug(tn.read_all())
|
2012-02-28 17:06:31 +01:00
|
|
|
|
2011-03-21 00:34:43 +01:00
|
|
|
def run(self):
|
|
|
|
loops = 0
|
|
|
|
heartbeat_period = math.floor(30/PUSH_INTERVAL)
|
|
|
|
|
|
|
|
while True:
|
|
|
|
if loops % heartbeat_period == 0:
|
2012-03-01 03:27:42 +01:00
|
|
|
self.logger.info("heartbeat")
|
2011-03-21 00:34:43 +01:00
|
|
|
loops = 0
|
2012-02-27 19:52:35 +01:00
|
|
|
try: self.push()
|
2011-03-21 00:34:43 +01:00
|
|
|
except Exception, e:
|
2012-03-01 03:27:42 +01:00
|
|
|
self.logger.error('Pypo Push Exception: %s', e)
|
2011-03-21 00:34:43 +01:00
|
|
|
time.sleep(PUSH_INTERVAL)
|
|
|
|
loops += 1
|