Ensure only telnetliquidsoap has telnet responsibilities

-also added module docstrings
This commit is contained in:
Martin Konecny 2013-05-16 16:19:00 -04:00
parent 888e6db812
commit 060980d8c6
13 changed files with 303 additions and 389 deletions

View file

@ -2,10 +2,9 @@
Python part of radio playout (pypo) Python part of radio playout (pypo)
""" """
from optparse import OptionParser
from datetime import datetime from datetime import datetime
from configobj import ConfigObj
import telnetlib from Queue import Queue
import time import time
import sys import sys
@ -13,10 +12,6 @@ import signal
import logging import logging
import locale import locale
import os import os
import re
from Queue import Queue
from threading import Lock
from schedule.pypopush import PypoPush from schedule.pypopush import PypoPush
from schedule.pypofetch import PypoFetch from schedule.pypofetch import PypoFetch
@ -30,55 +25,10 @@ from schedule.pypoliquidsoap import PypoLiquidsoap
from media.update.replaygainupdater import ReplayGainUpdater from media.update.replaygainupdater import ReplayGainUpdater
from media.update.silananalyzer import SilanAnalyzer from media.update.silananalyzer import SilanAnalyzer
from configobj import ConfigObj
# custom imports # custom imports
from api_clients import api_client from api_clients import api_client
from std_err_override import LogWriter from std_err_override import LogWriter
# Set up command-line options
parser = OptionParser()
# help screen / info
usage = "%prog [options]" + " - python playout system"
parser = OptionParser(usage=usage)
# Options
parser.add_option("-v", "--compat",
help="Check compatibility with server API version",
default=False,
action="store_true",
dest="check_compat")
parser.add_option("-t", "--test",
help="Do a test to make sure everything is working properly.",
default=False,
action="store_true",
dest="test")
parser.add_option("-b",
"--cleanup",
help="Cleanup",
default=False,
action="store_true",
dest="cleanup")
parser.add_option("-c",
"--check",
help="Check the cached schedule and exit",
default=False,
action="store_true",
dest="check")
# parse options
(options, args) = parser.parse_args()
LIQUIDSOAP_MIN_VERSION = "1.1.1"
#need to wait for Python 2.7 for this..
#logging.captureWarnings(True)
# configure logging # configure logging
try: try:
logging.config.fileConfig("configs/logging.cfg") logging.config.fileConfig("configs/logging.cfg")
@ -131,16 +81,6 @@ def configure_locale():
current_locale_encoding) current_locale_encoding)
sys.exit(1) sys.exit(1)
configure_locale()
# loading config file
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
except Exception, e:
logger.error('Error loading config file: %s', e)
sys.exit(1)
class Global: class Global:
def __init__(self, api_client): def __init__(self, api_client):
self.api_client = api_client self.api_client = api_client
@ -148,67 +88,21 @@ class Global:
def selfcheck(self): def selfcheck(self):
return self.api_client.is_server_compatible() return self.api_client.is_server_compatible()
def test_api(self):
self.api_client.test()
def keyboardInterruptHandler(signum, frame): def keyboardInterruptHandler(signum, frame):
logger = logging.getLogger() logger = logging.getLogger()
logger.info('\nKeyboard Interrupt\n') logger.info('\nKeyboard Interrupt\n')
sys.exit(0) sys.exit(0)
def liquidsoap_get_info(telnet_lock, host, port, logger):
logger.debug("Checking to see if Liquidsoap is running")
try:
telnet_lock.acquire()
tn = telnetlib.Telnet(host, port)
msg = "version\n"
tn.write(msg)
tn.write("exit\n")
response = tn.read_all()
except Exception, e:
logger.error(str(e))
return None
finally:
telnet_lock.release()
return get_liquidsoap_version(response)
def get_liquidsoap_version(version_string):
m = re.match(r"Liquidsoap (\d+.\d+.\d+)", "Liquidsoap 1.1.1")
if m:
return m.group(1)
else:
return None
if m:
current_version = m.group(1)
return pure.version_cmp(current_version, LIQUIDSOAP_MIN_VERSION) >= 0
return False
def liquidsoap_startup_test():
liquidsoap_version_string = \
liquidsoap_get_info(telnet_lock, ls_host, ls_port, logger)
while not liquidsoap_version_string:
logger.warning("Liquidsoap doesn't appear to be running!, " + \
"Sleeping and trying again")
time.sleep(1)
liquidsoap_version_string = \
liquidsoap_get_info(telnet_lock, ls_host, ls_port, logger)
while pure.version_cmp(liquidsoap_version_string, LIQUIDSOAP_MIN_VERSION) < 0:
logger.warning("Liquidsoap is running but in incorrect version! " + \
"Make sure you have at least Liquidsoap %s installed" % LIQUIDSOAP_MIN_VERSION)
time.sleep(1)
liquidsoap_version_string = \
liquidsoap_get_info(telnet_lock, ls_host, ls_port, logger)
logger.info("Liquidsoap version string found %s" % liquidsoap_version_string)
if __name__ == '__main__': if __name__ == '__main__':
configure_locale()
# loading config file
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
except Exception, e:
logger.error('Error loading config file: %s', e)
sys.exit(1)
logger.info('###########################################') logger.info('###########################################')
logger.info('# *** pypo *** #') logger.info('# *** pypo *** #')
logger.info('# Liquidsoap Scheduled Playout System #') logger.info('# Liquidsoap Scheduled Playout System #')
@ -237,17 +131,11 @@ if __name__ == '__main__':
logger.error(str(e)) logger.error(str(e))
time.sleep(10) time.sleep(10)
telnet_lock = Lock()
ls_host = config['ls_host'] ls_host = config['ls_host']
ls_port = config['ls_port'] ls_port = config['ls_port']
liquidsoap_startup_test() pypo_liquidsoap = PypoLiquidsoap(logger, ls_host, ls_port)
pypo_liquidsoap.liquidsoap_startup_test()
if options.test:
g.test_api()
sys.exit(0)
ReplayGainUpdater.start_reply_gain(api_client) ReplayGainUpdater.start_reply_gain(api_client)
SilanAnalyzer.start_silan(api_client, logger) SilanAnalyzer.start_silan(api_client, logger)
@ -256,9 +144,6 @@ if __name__ == '__main__':
recorder_q = Queue() recorder_q = Queue()
pypoPush_q = Queue() pypoPush_q = Queue()
pypo_liquidsoap = PypoLiquidsoap(logger, telnet_lock,\
ls_host, ls_port)
""" """
This queue is shared between pypo-fetch and pypo-file, where pypo-file This queue is shared between pypo-fetch and pypo-file, where pypo-file
is the consumer. Pypo-fetch will send every schedule it gets to pypo-file is the consumer. Pypo-fetch will send every schedule it gets to pypo-file
@ -275,11 +160,11 @@ if __name__ == '__main__':
pfile.daemon = True pfile.daemon = True
pfile.start() pfile.start()
pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config) pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, pypo_liquidsoap, config)
pf.daemon = True pf.daemon = True
pf.start() pf.start()
pp = PypoPush(pypoPush_q, telnet_lock, pypo_liquidsoap, config) pp = PypoPush(pypoPush_q, pypo_liquidsoap)
pp.daemon = True pp.daemon = True
pp.start() pp.start()

View file

@ -0,0 +1 @@
LIQUIDSOAP_MIN_VERSION = "1.1.1"

View file

@ -1,3 +1,14 @@
"""
schedule.eventtypes
~~~~~~~~~
This module exports a set of constants indicating different types
of events the pypo scheduler may receive.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
FILE = "file" FILE = "file"
EVENT = "event" EVENT = "event"
STREAM_BUFFER_START = "stream_buffer_start" STREAM_BUFFER_START = "stream_buffer_start"

View file

@ -1,3 +1,16 @@
"""
schedule.listenerstat
~~~~~~~~~
This module exports a set of functions to help communicate with both
icecast and shoutcast servers. Its function is to parse the output XML
provided and return the current number of clients listening.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
from threading import Thread from threading import Thread
import urllib2 import urllib2
import xml.dom.minidom import xml.dom.minidom

View file

@ -1,5 +1,15 @@
import re """
schedule.pure
~~~~~~~~~
This module exports a set of 'pure' common functions with no side-effects
that may be used by various parts of the pypo scheduler.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
import re
def version_cmp(version1, version2): def version_cmp(version1, version2):
def normalize(v): def normalize(v):

View file

@ -1,25 +1,41 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
schedule.pypofetch
~~~~~~~~~
The purpose of this module is to parse schedule data received from the
server and perform various tasks based on this schedule such as pre-cache
tracks that are scheduled, do sanity checks on the schedule sent, perform
cache clean-up etc.
The schedule for the immediate future is pushed via RabbitMQ when a user is
editing the schedule, however a pull for this schedule is also initiated at
a specified interval in order to keep current when there is no user
activity.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
import os import os
import sys import sys
import time import time
import logging.config import logging.config
import json import json
import telnetlib
import copy import copy
import subprocess import subprocess
import signal import signal
from datetime import datetime
import traceback import traceback
from schedule import pure
from Queue import Empty from Queue import Empty
from datetime import datetime
from threading import Thread from threading import Thread
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from api_clients import api_client from api_clients import api_client
from std_err_override import LogWriter from std_err_override import LogWriter
import pure
# configure logging # configure logging
logging_cfg = os.path.join(os.path.dirname(__file__), "../configs/logging.cfg") logging_cfg = os.path.join(os.path.dirname(__file__), "../configs/logging.cfg")
@ -33,17 +49,11 @@ def keyboardInterruptHandler(signum, frame):
sys.exit(0) sys.exit(0)
signal.signal(signal.SIGINT, keyboardInterruptHandler) signal.signal(signal.SIGINT, keyboardInterruptHandler)
#need to wait for Python 2.7 for this..
#logging.captureWarnings(True)
POLL_INTERVAL = 1800 POLL_INTERVAL = 1800
config_static = None
class PypoFetch(Thread): class PypoFetch(Thread):
def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config): def __init__(self, pypoFetch_q, pypoPush_q, media_q, pypo_liquidsoap, config):
Thread.__init__(self) Thread.__init__(self)
global config_static
self.api_client = api_client.AirtimeApiClient() self.api_client = api_client.AirtimeApiClient()
self.fetch_queue = pypoFetch_q self.fetch_queue = pypoFetch_q
@ -51,13 +61,9 @@ class PypoFetch(Thread):
self.media_prepare_queue = media_q self.media_prepare_queue = media_q
self.last_update_schedule_timestamp = time.time() self.last_update_schedule_timestamp = time.time()
self.config = config self.config = config
config_static = config
self.listener_timeout = POLL_INTERVAL self.listener_timeout = POLL_INTERVAL
self.telnet_lock = telnet_lock
self.logger = logging.getLogger() self.logger = logging.getLogger()
self.pypo_liquidsoap = pypo_liquidsoap self.pypo_liquidsoap = pypo_liquidsoap
self.cache_dir = os.path.join(config["cache_dir"], "scheduler") self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
@ -100,13 +106,19 @@ class PypoFetch(Thread):
self.regenerate_liquidsoap_conf(m['setting']) self.regenerate_liquidsoap_conf(m['setting'])
elif command == 'update_stream_format': elif command == 'update_stream_format':
self.logger.info("Updating stream format...") self.logger.info("Updating stream format...")
self.update_liquidsoap_stream_format(m['stream_format']) self.telnetliquidsoap.\
get_telnet_dispatcher().\
update_liquidsoap_stream_format(m['stream_format'])
elif command == 'update_station_name': elif command == 'update_station_name':
self.logger.info("Updating station name...") self.logger.info("Updating station name...")
self.update_liquidsoap_station_name(m['station_name']) self.pypo_liquidsoap.\
get_telnet_dispatcher().\
update_liquidsoap_station_name(m['station_name'])
elif command == 'update_transition_fade': elif command == 'update_transition_fade':
self.logger.info("Updating transition_fade...") self.logger.info("Updating transition_fade...")
self.update_liquidsoap_transition_fade(m['transition_fade']) self.pypo_liquidsoap.\
get_telnet_dispatcher().\
update_liquidsoap_transition_fade(m['transition_fade'])
elif command == 'switch_source': elif command == 'switch_source':
self.logger.info("switch_on_source show command received...") self.logger.info("switch_on_source show command received...")
self.pypo_liquidsoap.\ self.pypo_liquidsoap.\
@ -179,28 +191,12 @@ class PypoFetch(Thread):
self.pypo_liquidsoap.clear_queue_tracker() self.pypo_liquidsoap.clear_queue_tracker()
def restart_liquidsoap(self): def restart_liquidsoap(self):
try: self.logger.info("Restarting Liquidsoap")
self.telnet_lock.acquire() subprocess.call(['/etc/init.d/airtime-liquidsoap', 'restart'])
self.logger.info("Restarting Liquidsoap")
subprocess.call('/etc/init.d/airtime-liquidsoap restart', shell=True)
#Wait here and poll Liquidsoap until it has started up #Wait here and poll Liquidsoap until it has started up
self.logger.info("Waiting for Liquidsoap to start") self.logger.info("Waiting for Liquidsoap to start")
while True: self.telnetliquidsoap.liquidsoap_startup_test()
try:
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
tn.write("exit\n")
tn.read_all()
self.logger.info("Liquidsoap is up and running")
break
except Exception, e:
#sleep 0.5 seconds and try again
time.sleep(0.5)
except Exception, e:
self.logger.error(e)
finally:
self.telnet_lock.release()
try: try:
self.set_bootstrap_variables() self.set_bootstrap_variables()
@ -313,28 +309,9 @@ class PypoFetch(Thread):
This function updates the bootup time variable in Liquidsoap script This function updates the bootup time variable in Liquidsoap script
""" """
try: output = self.telnetliquidsoap.\
self.telnet_lock.acquire() get_telnet_dispatcher().\
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) get_liquidsoap_connection_status(time.time())
# update the boot up time of Liquidsoap. Since Liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
boot_up_time_command = "vars.bootup_time " + str(current_time) + "\n"
self.logger.info(boot_up_time_command)
tn.write(boot_up_time_command)
connection_status = "streams.connection_status\n"
self.logger.info(connection_status)
tn.write(connection_status)
tn.write('exit\n')
output = tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
output_list = output.split("\r\n") output_list = output.split("\r\n")
stream_info = output_list[2] stream_info = output_list[2]
@ -353,56 +330,11 @@ class PypoFetch(Thread):
self.api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time)) self.api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time))
def update_liquidsoap_stream_format(self, stream_format):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error("Exception %s", e)
finally:
self.telnet_lock.release()
def update_liquidsoap_transition_fade(self, fade):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
command = ('vars.default_dj_fade %s\n' % fade).encode('utf-8')
self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error("Exception %s", e)
finally:
self.telnet_lock.release()
def update_liquidsoap_station_name(self, station_name):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
command = ('vars.station_name %s\n' % station_name).encode('utf-8')
self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
except Exception, e:
self.logger.error("Exception %s", e)
""" """
Process the schedule Process the schedule

View file

@ -1,4 +1,14 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
schedule.pypofile
~~~~~~~~~
This module is directly responsible for pre-caching a list of files
scheduled in the near future.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
from threading import Thread from threading import Thread
from Queue import Empty from Queue import Empty

View file

@ -1,7 +1,20 @@
"""
schedule.pypoliqqueue
~~~~~~~~~
This module takes a collection of media_items scheduled in the near future
and fires off a start event when the item's start time begins.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
from threading import Thread from threading import Thread
from collections import deque from collections import deque
from datetime import datetime from datetime import datetime
from schedule import pure
import traceback import traceback
import sys import sys
import time import time
@ -44,7 +57,7 @@ class PypoLiqQueue(Thread):
self.pypo_liquidsoap.play(media_item) self.pypo_liquidsoap.play(media_item)
if len(schedule_deque): if len(schedule_deque):
time_until_next_play = \ time_until_next_play = \
self.date_interval_to_seconds( pure.date_interval_to_seconds(
schedule_deque[0]['start'] - datetime.utcnow()) schedule_deque[0]['start'] - datetime.utcnow())
if time_until_next_play < 0: if time_until_next_play < 0:
time_until_next_play = 0 time_until_next_play = 0
@ -61,23 +74,11 @@ class PypoLiqQueue(Thread):
schedule_deque.append(media_schedule[i]) schedule_deque.append(media_schedule[i])
if len(keys): if len(keys):
time_until_next_play = self.date_interval_to_seconds(\ time_until_next_play = pure.date_interval_to_seconds(\
keys[0] - datetime.utcnow()) keys[0] - datetime.utcnow())
else: else:
time_until_next_play = None time_until_next_play = None
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (interval.microseconds + \
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
if seconds < 0: seconds = 0
return seconds
def run(self): def run(self):
try: self.main() try: self.main()
except Exception, e: except Exception, e:

View file

@ -1,14 +1,30 @@
"""
schedule.pypoliquidsoap
~~~~~~~~~
An attempt to abstract the various different ways we need to start/stop
files and webstreams into one unified interface with play() and stop()
methods instead.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
from pypofetch import PypoFetch from pypofetch import PypoFetch
from telnetliquidsoap import TelnetLiquidsoap from telnetliquidsoap import TelnetLiquidsoap
from schedule import pure
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
import eventtypes import eventtypes
import constants
import time import time
import re
class PypoLiquidsoap(): class PypoLiquidsoap():
def __init__(self, logger, telnet_lock, host, port): def __init__(self, logger, host, port):
self.logger = logger self.logger = logger
self.liq_queue_tracker = { self.liq_queue_tracker = {
"s0": None, "s0": None,
@ -17,7 +33,7 @@ class PypoLiquidsoap():
"s3": None, "s3": None,
} }
self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ self.telnet_liquidsoap = TelnetLiquidsoap(
logger,\ logger,\
host,\ host,\
port,\ port,\
@ -208,27 +224,43 @@ class PypoLiquidsoap():
link_start = link['start'] link_start = link['start']
diff_td = tnow - link_start diff_td = tnow - link_start
diff_sec = self.date_interval_to_seconds(diff_td) diff_sec = pure.date_interval_to_seconds(diff_td)
if diff_sec > 0: if diff_sec > 0:
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec) self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
original_cue_in_td = timedelta(seconds=float(link['cue_in'])) original_cue_in_td = timedelta(seconds=float(link['cue_in']))
link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec link['cue_in'] = pure.date_interval_to_seconds(original_cue_in_td) + diff_sec
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (interval.microseconds + \
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
if seconds < 0: seconds = 0
return seconds
def clear_all_queues(self): def clear_all_queues(self):
self.telnet_liquidsoap.queue_clear_all() self.telnet_liquidsoap.queue_clear_all()
def get_liquidsoap_version(self, version_string):
m = re.match(r"Liquidsoap (\d+.\d+.\d+)", "Liquidsoap 1.1.1")
if m:
return m.group(1)
else:
return None
def liquidsoap_startup_test(self):
liquidsoap_version_string = \
self.get_liquidsoap_version(self.telnet_liquidsoap.liquidsoap_get_info())
while not liquidsoap_version_string:
self.logger.warning("Liquidsoap doesn't appear to be running!, " +
"Sleeping and trying again")
time.sleep(1)
liquidsoap_version_string = \
self.get_liquidsoap_version(self.telnet_liquidsoap.liquidsoap_get_info())
while pure.version_cmp(liquidsoap_version_string, constants.LIQUIDSOAP_MIN_VERSION) < 0:
self.logger.warning("Liquidsoap is running but in incorrect version! " +
"Make sure you have at least Liquidsoap %s installed" %
constants.LIQUIDSOAP_MIN_VERSION)
time.sleep(1)
liquidsoap_version_string = \
self.get_liquidsoap_version(self.telnet_liquidsoap.liquidsoap_get_info())
self.logger.info("Liquidsoap version string found %s" % liquidsoap_version_string)
class UnknownMediaItemType(Exception): class UnknownMediaItemType(Exception):
pass pass

View file

@ -1,28 +1,32 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
schedule.pypopush
~~~~~~~~~
Purpose of pypopush is to separate the schedule into items currently
scheduled and scheduled in the future. Currently scheduled items are
handled immediately, and future scheduled items are handed off to
PypoLiqQueue
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
from threading import Thread
from Queue import Queue
import sys
import time
import logging.config import logging.config
import telnetlib
import calendar
import math import math
import traceback import traceback
import os import os
from pypofetch import PypoFetch
from pypoliqqueue import PypoLiqQueue from pypoliqqueue import PypoLiqQueue
from schedule import pure
from Queue import Empty, Queue
from threading import Thread
from api_clients import api_client from api_clients import api_client
from std_err_override import LogWriter from std_err_override import LogWriter
from configobj import ConfigObj
# configure logging # configure logging
logging_cfg = os.path.join(os.path.dirname(__file__), "../configs/logging.cfg") logging_cfg = os.path.join(os.path.dirname(__file__), "../configs/logging.cfg")
@ -30,31 +34,13 @@ logging.config.fileConfig(logging_cfg)
logger = logging.getLogger() logger = logging.getLogger()
LogWriter.override_std_err(logger) LogWriter.override_std_err(logger)
#need to wait for Python 2.7 for this..
#logging.captureWarnings(True)
PUSH_INTERVAL = 2
def is_stream(media_item):
return media_item['type'] == 'stream_output_start'
def is_file(media_item):
return media_item['type'] == 'file'
class PypoPush(Thread): class PypoPush(Thread):
def __init__(self, q, telnet_lock, pypo_liquidsoap, config): def __init__(self, q, pypo_liquidsoap):
Thread.__init__(self) Thread.__init__(self)
self.api_client = api_client.AirtimeApiClient() self.api_client = api_client.AirtimeApiClient()
self.queue = q self.queue = q
self.telnet_lock = telnet_lock
self.config = config
self.pushed_objects = {}
self.logger = logging.getLogger('push') self.logger = logging.getLogger('push')
self.current_prebuffering_stream_id = None
self.queue_id = 0
self.future_scheduled_queue = Queue() self.future_scheduled_queue = Queue()
self.pypo_liquidsoap = pypo_liquidsoap self.pypo_liquidsoap = pypo_liquidsoap
@ -67,9 +53,6 @@ class PypoPush(Thread):
def main(self): def main(self):
loops = 0
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
media_schedule = None media_schedule = None
while True: while True:
@ -88,12 +71,6 @@ class PypoPush(Thread):
self.pypo_liquidsoap.verify_correct_present_media(currently_playing) self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
self.future_scheduled_queue.put(scheduled_for_future) self.future_scheduled_queue.put(scheduled_for_future)
if loops % heartbeat_period == 0:
self.logger.info("heartbeat")
loops = 0
loops += 1
def separate_present_future(self, media_schedule): def separate_present_future(self, media_schedule):
tnow = datetime.utcnow() tnow = datetime.utcnow()
@ -105,7 +82,7 @@ class PypoPush(Thread):
media_item = media_schedule[mkey] media_item = media_schedule[mkey]
diff_td = tnow - media_item['start'] diff_td = tnow - media_item['start']
diff_sec = self.date_interval_to_seconds(diff_td) diff_sec = pure.date_interval_to_seconds(diff_td)
if diff_sec >= 0: if diff_sec >= 0:
present.append(media_item) present.append(media_item)
@ -114,80 +91,6 @@ class PypoPush(Thread):
return present, future return present, future
def get_current_stream_id_from_liquidsoap(self):
response = "-1"
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
msg = 'dynamic_source.get_id\n'
tn.write(msg)
response = tn.read_until("\r\n").strip(" \r\n")
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error("Error connecting to Liquidsoap: %s", e)
response = []
finally:
self.telnet_lock.release()
return response
#def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id):
#correct = False
#if media_item is None:
#correct = (len(liquidsoap_queue_approx) == 0 and liquidsoap_stream_id == "-1")
#else:
#if is_file(media_item):
#if len(liquidsoap_queue_approx) == 0:
#correct = False
#else:
#correct = liquidsoap_queue_approx[0]['start'] == media_item['start'] and \
#liquidsoap_queue_approx[0]['row_id'] == media_item['row_id'] and \
#liquidsoap_queue_approx[0]['end'] == media_item['end'] and \
#liquidsoap_queue_approx[0]['replay_gain'] == media_item['replay_gain']
#elif is_stream(media_item):
#correct = liquidsoap_stream_id == str(media_item['row_id'])
#self.logger.debug("Is current item correct?: %s", str(correct))
#return correct
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (interval.microseconds + \
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
return seconds
def stop_web_stream_all(self):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['LS_HOST'], self.config['LS_PORT'])
#msg = 'dynamic_source.read_stop_all xxx\n'
msg = 'http.stop\n'
self.logger.debug(msg)
tn.write(msg)
msg = 'dynamic_source.output_stop\n'
self.logger.debug(msg)
tn.write(msg)
msg = 'dynamic_source.id -1\n'
self.logger.debug(msg)
tn.write(msg)
tn.write("exit\n")
self.logger.debug(tn.read_all())
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
def run(self): def run(self):
try: self.main() try: self.main()
except Exception, e: except Exception, e:

View file

@ -1,4 +1,14 @@
"""
schedule.telnetliquidsoap
~~~~~~~~~
Module exposing API to directly communicate with Liquidsoap via telnet.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
import telnetlib import telnetlib
from threading import Lock
def create_liquidsoap_annotation(media): def create_liquidsoap_annotation(media):
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.
@ -16,8 +26,8 @@ def create_liquidsoap_annotation(media):
class TelnetLiquidsoap: class TelnetLiquidsoap:
def __init__(self, telnet_lock, logger, ls_host, ls_port, queues): def __init__(self, logger, ls_host, ls_port, queues):
self.telnet_lock = telnet_lock self.telnet_lock = Lock()
self.ls_host = ls_host self.ls_host = ls_host
self.ls_port = ls_port self.ls_port = ls_port
self.logger = logger self.logger = logger
@ -247,6 +257,95 @@ class TelnetLiquidsoap:
self.telnet_send([command]) self.telnet_send([command])
def liquidsoap_get_info(self):
self.logger.debug("Checking to see if Liquidsoap is running")
response = ""
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.host, self.port)
msg = "version\n"
tn.write(msg)
tn.write("exit\n")
response = tn.read_all()
except Exception, e:
self.logger.error(str(e))
return None
finally:
self.telnet_lock.release()
return response
def update_liquidsoap_station_name(self, station_name):
try:
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.host, self.port)
command = ('vars.station_name %s\n' % station_name).encode('utf-8')
self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
except Exception, e:
self.logger.error("Exception %s", e)
def get_liquidsoap_connection_status(self, current_time):
output = None
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.host, self.port)
# update the boot up time of Liquidsoap. Since Liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
boot_up_time_command = "vars.bootup_time %s\n" % str(current_time)
self.logger.info(boot_up_time_command)
tn.write(boot_up_time_command)
connection_status = "streams.connection_status\n"
self.logger.info(connection_status)
tn.write(connection_status)
tn.write('exit\n')
output = tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
return None
def update_liquidsoap_stream_format(self, stream_format):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.host, self.port)
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error("Exception %s", e)
finally:
self.telnet_lock.release()
def update_liquidsoap_transition_fade(self, fade):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.host, self.port)
command = ('vars.default_dj_fade %s\n' % fade).encode('utf-8')
self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error("Exception %s", e)
finally:
self.telnet_lock.release()
class DummyTelnetLiquidsoap: class DummyTelnetLiquidsoap:
def __init__(self, telnet_lock, logger): def __init__(self, telnet_lock, logger):
@ -285,5 +384,7 @@ class DummyTelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
class QueueNotEmptyException(Exception): class QueueNotEmptyException(Exception):
pass pass

View file

@ -1,9 +1,9 @@
#!/bin/bash #!/bin/bash
which py.test which py.test
pytest_exist=$? pytest_exist="$?"
if [ "$pytest_exist" != "0" ]; then if [ $pytest_exist != "0" ]; then
echo "Need to have py.test installed. Exiting..." echo "Need to have py.test installed. Exiting..."
exit 1 exit 1
fi fi

View file

@ -1,6 +1,17 @@
"""
std_err_override.LogWriter
~~~~~~~~~
This module presents a simple function to reroute output intended
for stderr to the output log file.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
import sys import sys
class LogWriter(): class _LogWriter():
def __init__(self, logger): def __init__(self, logger):
self.logger = logger self.logger = logger
@ -8,4 +19,8 @@ class LogWriter():
self.logger.error(txt) self.logger.error(txt)
def override_std_err(logger): def override_std_err(logger):
sys.stderr = LogWriter(logger) """
Create wrapper to intercept any messages that would have been printed out
to stderr and write them to our logger instead.
"""
sys.stderr = _LogWriter(logger)