diff --git a/python_apps/pypo/pypo.py b/python_apps/pypo/pypo.py index 12bd426fb..e0c55bc15 100644 --- a/python_apps/pypo/pypo.py +++ b/python_apps/pypo/pypo.py @@ -2,10 +2,9 @@ Python part of radio playout (pypo) """ -from optparse import OptionParser from datetime import datetime - -import telnetlib +from configobj import ConfigObj +from Queue import Queue import time import sys @@ -13,10 +12,6 @@ import signal import logging import locale import os -import re - -from Queue import Queue -from threading import Lock from schedule.pypopush import PypoPush from schedule.pypofetch import PypoFetch @@ -30,55 +25,10 @@ from schedule.pypoliquidsoap import PypoLiquidsoap from media.update.replaygainupdater import ReplayGainUpdater from media.update.silananalyzer import SilanAnalyzer -from configobj import ConfigObj - # custom imports from api_clients import api_client from std_err_override import LogWriter -# Set up command-line options -parser = OptionParser() - -# help screen / info -usage = "%prog [options]" + " - python playout system" -parser = OptionParser(usage=usage) - -# Options -parser.add_option("-v", "--compat", - help="Check compatibility with server API version", - default=False, - action="store_true", - dest="check_compat") - -parser.add_option("-t", "--test", - help="Do a test to make sure everything is working properly.", - default=False, - action="store_true", - dest="test") - -parser.add_option("-b", - "--cleanup", - help="Cleanup", - default=False, - action="store_true", - dest="cleanup") - -parser.add_option("-c", - "--check", - help="Check the cached schedule and exit", - default=False, - action="store_true", - dest="check") - -# parse options -(options, args) = parser.parse_args() - -LIQUIDSOAP_MIN_VERSION = "1.1.1" - - -#need to wait for Python 2.7 for this.. -#logging.captureWarnings(True) - # configure logging try: logging.config.fileConfig("configs/logging.cfg") @@ -131,16 +81,6 @@ def configure_locale(): current_locale_encoding) sys.exit(1) - -configure_locale() - -# loading config file -try: - config = ConfigObj('/etc/airtime/pypo.cfg') -except Exception, e: - logger.error('Error loading config file: %s', e) - sys.exit(1) - class Global: def __init__(self, api_client): self.api_client = api_client @@ -148,67 +88,21 @@ class Global: def selfcheck(self): return self.api_client.is_server_compatible() - def test_api(self): - self.api_client.test() - def keyboardInterruptHandler(signum, frame): logger = logging.getLogger() logger.info('\nKeyboard Interrupt\n') sys.exit(0) -def liquidsoap_get_info(telnet_lock, host, port, logger): - logger.debug("Checking to see if Liquidsoap is running") - try: - telnet_lock.acquire() - tn = telnetlib.Telnet(host, port) - msg = "version\n" - tn.write(msg) - tn.write("exit\n") - response = tn.read_all() - except Exception, e: - logger.error(str(e)) - return None - finally: - telnet_lock.release() - - return get_liquidsoap_version(response) - -def get_liquidsoap_version(version_string): - m = re.match(r"Liquidsoap (\d+.\d+.\d+)", "Liquidsoap 1.1.1") - - if m: - return m.group(1) - else: - return None - - - if m: - current_version = m.group(1) - return pure.version_cmp(current_version, LIQUIDSOAP_MIN_VERSION) >= 0 - return False - -def liquidsoap_startup_test(): - - liquidsoap_version_string = \ - liquidsoap_get_info(telnet_lock, ls_host, ls_port, logger) - while not liquidsoap_version_string: - logger.warning("Liquidsoap doesn't appear to be running!, " + \ - "Sleeping and trying again") - time.sleep(1) - liquidsoap_version_string = \ - liquidsoap_get_info(telnet_lock, ls_host, ls_port, logger) - - while pure.version_cmp(liquidsoap_version_string, LIQUIDSOAP_MIN_VERSION) < 0: - logger.warning("Liquidsoap is running but in incorrect version! " + \ - "Make sure you have at least Liquidsoap %s installed" % LIQUIDSOAP_MIN_VERSION) - time.sleep(1) - liquidsoap_version_string = \ - liquidsoap_get_info(telnet_lock, ls_host, ls_port, logger) - - logger.info("Liquidsoap version string found %s" % liquidsoap_version_string) - - if __name__ == '__main__': + configure_locale() + + # loading config file + try: + config = ConfigObj('/etc/airtime/pypo.cfg') + except Exception, e: + logger.error('Error loading config file: %s', e) + sys.exit(1) + logger.info('###########################################') logger.info('# *** pypo *** #') logger.info('# Liquidsoap Scheduled Playout System #') @@ -237,17 +131,11 @@ if __name__ == '__main__': logger.error(str(e)) time.sleep(10) - telnet_lock = Lock() - ls_host = config['ls_host'] ls_port = config['ls_port'] - liquidsoap_startup_test() - - if options.test: - g.test_api() - sys.exit(0) - + pypo_liquidsoap = PypoLiquidsoap(logger, ls_host, ls_port) + pypo_liquidsoap.liquidsoap_startup_test() ReplayGainUpdater.start_reply_gain(api_client) SilanAnalyzer.start_silan(api_client, logger) @@ -256,9 +144,6 @@ if __name__ == '__main__': recorder_q = Queue() pypoPush_q = Queue() - pypo_liquidsoap = PypoLiquidsoap(logger, telnet_lock,\ - ls_host, ls_port) - """ This queue is shared between pypo-fetch and pypo-file, where pypo-file is the consumer. Pypo-fetch will send every schedule it gets to pypo-file @@ -275,11 +160,11 @@ if __name__ == '__main__': pfile.daemon = True pfile.start() - pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config) + pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, pypo_liquidsoap, config) pf.daemon = True pf.start() - pp = PypoPush(pypoPush_q, telnet_lock, pypo_liquidsoap, config) + pp = PypoPush(pypoPush_q, pypo_liquidsoap) pp.daemon = True pp.start() diff --git a/python_apps/pypo/schedule/constants.py b/python_apps/pypo/schedule/constants.py new file mode 100644 index 000000000..9f3a3d58f --- /dev/null +++ b/python_apps/pypo/schedule/constants.py @@ -0,0 +1 @@ +LIQUIDSOAP_MIN_VERSION = "1.1.1" diff --git a/python_apps/pypo/schedule/eventtypes.py b/python_apps/pypo/schedule/eventtypes.py index 5f9c871db..27dc06e1e 100644 --- a/python_apps/pypo/schedule/eventtypes.py +++ b/python_apps/pypo/schedule/eventtypes.py @@ -1,3 +1,14 @@ +""" + schedule.eventtypes + ~~~~~~~~~ + + This module exports a set of constants indicating different types + of events the pypo scheduler may receive. + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" + FILE = "file" EVENT = "event" STREAM_BUFFER_START = "stream_buffer_start" diff --git a/python_apps/pypo/schedule/listenerstat.py b/python_apps/pypo/schedule/listenerstat.py index 10d9359fd..ec4696c2b 100644 --- a/python_apps/pypo/schedule/listenerstat.py +++ b/python_apps/pypo/schedule/listenerstat.py @@ -1,3 +1,16 @@ +""" + schedule.listenerstat + ~~~~~~~~~ + + This module exports a set of functions to help communicate with both + icecast and shoutcast servers. Its function is to parse the output XML + provided and return the current number of clients listening. + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" + + from threading import Thread import urllib2 import xml.dom.minidom diff --git a/python_apps/pypo/schedule/pure.py b/python_apps/pypo/schedule/pure.py index 3aeeeebcb..70cb4a0a4 100644 --- a/python_apps/pypo/schedule/pure.py +++ b/python_apps/pypo/schedule/pure.py @@ -1,5 +1,15 @@ -import re +""" + schedule.pure + ~~~~~~~~~ + This module exports a set of 'pure' common functions with no side-effects + that may be used by various parts of the pypo scheduler. + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" + +import re def version_cmp(version1, version2): def normalize(v): diff --git a/python_apps/pypo/schedule/pypofetch.py b/python_apps/pypo/schedule/pypofetch.py index d0bbe1c09..60c8a246c 100644 --- a/python_apps/pypo/schedule/pypofetch.py +++ b/python_apps/pypo/schedule/pypofetch.py @@ -1,25 +1,41 @@ # -*- coding: utf-8 -*- +""" + schedule.pypofetch + ~~~~~~~~~ + + The purpose of this module is to parse schedule data received from the + server and perform various tasks based on this schedule such as pre-cache + tracks that are scheduled, do sanity checks on the schedule sent, perform + cache clean-up etc. + + The schedule for the immediate future is pushed via RabbitMQ when a user is + editing the schedule, however a pull for this schedule is also initiated at + a specified interval in order to keep current when there is no user + activity. + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" import os import sys import time import logging.config import json -import telnetlib import copy import subprocess import signal -from datetime import datetime import traceback -from schedule import pure from Queue import Empty +from datetime import datetime from threading import Thread from subprocess import Popen, PIPE from api_clients import api_client from std_err_override import LogWriter +import pure # configure logging logging_cfg = os.path.join(os.path.dirname(__file__), "../configs/logging.cfg") @@ -33,17 +49,11 @@ def keyboardInterruptHandler(signum, frame): sys.exit(0) signal.signal(signal.SIGINT, keyboardInterruptHandler) -#need to wait for Python 2.7 for this.. -#logging.captureWarnings(True) - POLL_INTERVAL = 1800 -config_static = None - class PypoFetch(Thread): - def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config): + def __init__(self, pypoFetch_q, pypoPush_q, media_q, pypo_liquidsoap, config): Thread.__init__(self) - global config_static self.api_client = api_client.AirtimeApiClient() self.fetch_queue = pypoFetch_q @@ -51,13 +61,9 @@ class PypoFetch(Thread): self.media_prepare_queue = media_q self.last_update_schedule_timestamp = time.time() self.config = config - config_static = config self.listener_timeout = POLL_INTERVAL - self.telnet_lock = telnet_lock - self.logger = logging.getLogger() - self.pypo_liquidsoap = pypo_liquidsoap self.cache_dir = os.path.join(config["cache_dir"], "scheduler") @@ -100,13 +106,19 @@ class PypoFetch(Thread): self.regenerate_liquidsoap_conf(m['setting']) elif command == 'update_stream_format': self.logger.info("Updating stream format...") - self.update_liquidsoap_stream_format(m['stream_format']) + self.telnetliquidsoap.\ + get_telnet_dispatcher().\ + update_liquidsoap_stream_format(m['stream_format']) elif command == 'update_station_name': self.logger.info("Updating station name...") - self.update_liquidsoap_station_name(m['station_name']) + self.pypo_liquidsoap.\ + get_telnet_dispatcher().\ + update_liquidsoap_station_name(m['station_name']) elif command == 'update_transition_fade': self.logger.info("Updating transition_fade...") - self.update_liquidsoap_transition_fade(m['transition_fade']) + self.pypo_liquidsoap.\ + get_telnet_dispatcher().\ + update_liquidsoap_transition_fade(m['transition_fade']) elif command == 'switch_source': self.logger.info("switch_on_source show command received...") self.pypo_liquidsoap.\ @@ -179,28 +191,12 @@ class PypoFetch(Thread): self.pypo_liquidsoap.clear_queue_tracker() def restart_liquidsoap(self): - try: - self.telnet_lock.acquire() - self.logger.info("Restarting Liquidsoap") - subprocess.call('/etc/init.d/airtime-liquidsoap restart', shell=True) + self.logger.info("Restarting Liquidsoap") + subprocess.call(['/etc/init.d/airtime-liquidsoap', 'restart']) - #Wait here and poll Liquidsoap until it has started up - self.logger.info("Waiting for Liquidsoap to start") - while True: - try: - tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) - tn.write("exit\n") - tn.read_all() - self.logger.info("Liquidsoap is up and running") - break - except Exception, e: - #sleep 0.5 seconds and try again - time.sleep(0.5) - - except Exception, e: - self.logger.error(e) - finally: - self.telnet_lock.release() + #Wait here and poll Liquidsoap until it has started up + self.logger.info("Waiting for Liquidsoap to start") + self.telnetliquidsoap.liquidsoap_startup_test() try: self.set_bootstrap_variables() @@ -313,28 +309,9 @@ class PypoFetch(Thread): This function updates the bootup time variable in Liquidsoap script """ - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) - # update the boot up time of Liquidsoap. Since Liquidsoap is not restarting, - # we are manually adjusting the bootup time variable so the status msg will get - # updated. - current_time = time.time() - boot_up_time_command = "vars.bootup_time " + str(current_time) + "\n" - self.logger.info(boot_up_time_command) - tn.write(boot_up_time_command) - - connection_status = "streams.connection_status\n" - self.logger.info(connection_status) - tn.write(connection_status) - - tn.write('exit\n') - - output = tn.read_all() - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() + output = self.telnetliquidsoap.\ + get_telnet_dispatcher().\ + get_liquidsoap_connection_status(time.time()) output_list = output.split("\r\n") stream_info = output_list[2] @@ -353,56 +330,11 @@ class PypoFetch(Thread): self.api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time)) - def update_liquidsoap_stream_format(self, stream_format): - # Push stream metadata to liquidsoap - # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) - command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') - self.logger.info(command) - tn.write(command) - tn.write('exit\n') - tn.read_all() - except Exception, e: - self.logger.error("Exception %s", e) - finally: - self.telnet_lock.release() - def update_liquidsoap_transition_fade(self, fade): - # Push stream metadata to liquidsoap - # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) - command = ('vars.default_dj_fade %s\n' % fade).encode('utf-8') - self.logger.info(command) - tn.write(command) - tn.write('exit\n') - tn.read_all() - except Exception, e: - self.logger.error("Exception %s", e) - finally: - self.telnet_lock.release() - def update_liquidsoap_station_name(self, station_name): - # Push stream metadata to liquidsoap - # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! - try: - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) - command = ('vars.station_name %s\n' % station_name).encode('utf-8') - self.logger.info(command) - tn.write(command) - tn.write('exit\n') - tn.read_all() - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() - except Exception, e: - self.logger.error("Exception %s", e) + + + """ Process the schedule diff --git a/python_apps/pypo/schedule/pypofile.py b/python_apps/pypo/schedule/pypofile.py index 2f35489e2..22f69b809 100644 --- a/python_apps/pypo/schedule/pypofile.py +++ b/python_apps/pypo/schedule/pypofile.py @@ -1,4 +1,14 @@ # -*- coding: utf-8 -*- +""" + schedule.pypofile + ~~~~~~~~~ + + This module is directly responsible for pre-caching a list of files + scheduled in the near future. + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" from threading import Thread from Queue import Empty diff --git a/python_apps/pypo/schedule/pypoliqqueue.py b/python_apps/pypo/schedule/pypoliqqueue.py index a5e50831e..a831732d3 100644 --- a/python_apps/pypo/schedule/pypoliqqueue.py +++ b/python_apps/pypo/schedule/pypoliqqueue.py @@ -1,7 +1,20 @@ +""" + schedule.pypoliqqueue + ~~~~~~~~~ + + This module takes a collection of media_items scheduled in the near future + and fires off a start event when the item's start time begins. + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" + from threading import Thread from collections import deque from datetime import datetime +from schedule import pure + import traceback import sys import time @@ -44,7 +57,7 @@ class PypoLiqQueue(Thread): self.pypo_liquidsoap.play(media_item) if len(schedule_deque): time_until_next_play = \ - self.date_interval_to_seconds( + pure.date_interval_to_seconds( schedule_deque[0]['start'] - datetime.utcnow()) if time_until_next_play < 0: time_until_next_play = 0 @@ -61,23 +74,11 @@ class PypoLiqQueue(Thread): schedule_deque.append(media_schedule[i]) if len(keys): - time_until_next_play = self.date_interval_to_seconds(\ + time_until_next_play = pure.date_interval_to_seconds(\ keys[0] - datetime.utcnow()) else: time_until_next_play = None - - def date_interval_to_seconds(self, interval): - """ - Convert timedelta object into int representing the number of seconds. If - number of seconds is less than 0, then return 0. - """ - seconds = (interval.microseconds + \ - (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) - if seconds < 0: seconds = 0 - - return seconds - def run(self): try: self.main() except Exception, e: diff --git a/python_apps/pypo/schedule/pypoliquidsoap.py b/python_apps/pypo/schedule/pypoliquidsoap.py index 50a47db94..e5739c20d 100644 --- a/python_apps/pypo/schedule/pypoliquidsoap.py +++ b/python_apps/pypo/schedule/pypoliquidsoap.py @@ -1,14 +1,30 @@ +""" + schedule.pypoliquidsoap + ~~~~~~~~~ + + An attempt to abstract the various different ways we need to start/stop + files and webstreams into one unified interface with play() and stop() + methods instead. + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" + from pypofetch import PypoFetch from telnetliquidsoap import TelnetLiquidsoap +from schedule import pure from datetime import datetime from datetime import timedelta import eventtypes +import constants + import time +import re class PypoLiquidsoap(): - def __init__(self, logger, telnet_lock, host, port): + def __init__(self, logger, host, port): self.logger = logger self.liq_queue_tracker = { "s0": None, @@ -17,7 +33,7 @@ class PypoLiquidsoap(): "s3": None, } - self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ + self.telnet_liquidsoap = TelnetLiquidsoap( logger,\ host,\ port,\ @@ -208,27 +224,43 @@ class PypoLiquidsoap(): link_start = link['start'] diff_td = tnow - link_start - diff_sec = self.date_interval_to_seconds(diff_td) + diff_sec = pure.date_interval_to_seconds(diff_td) if diff_sec > 0: self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec) original_cue_in_td = timedelta(seconds=float(link['cue_in'])) - link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec - - def date_interval_to_seconds(self, interval): - """ - Convert timedelta object into int representing the number of seconds. If - number of seconds is less than 0, then return 0. - """ - seconds = (interval.microseconds + \ - (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) - if seconds < 0: seconds = 0 - - return seconds + link['cue_in'] = pure.date_interval_to_seconds(original_cue_in_td) + diff_sec def clear_all_queues(self): self.telnet_liquidsoap.queue_clear_all() + def get_liquidsoap_version(self, version_string): + m = re.match(r"Liquidsoap (\d+.\d+.\d+)", "Liquidsoap 1.1.1") + if m: + return m.group(1) + else: + return None + + def liquidsoap_startup_test(self): + liquidsoap_version_string = \ + self.get_liquidsoap_version(self.telnet_liquidsoap.liquidsoap_get_info()) + while not liquidsoap_version_string: + self.logger.warning("Liquidsoap doesn't appear to be running!, " + + "Sleeping and trying again") + time.sleep(1) + liquidsoap_version_string = \ + self.get_liquidsoap_version(self.telnet_liquidsoap.liquidsoap_get_info()) + + while pure.version_cmp(liquidsoap_version_string, constants.LIQUIDSOAP_MIN_VERSION) < 0: + self.logger.warning("Liquidsoap is running but in incorrect version! " + + "Make sure you have at least Liquidsoap %s installed" % + constants.LIQUIDSOAP_MIN_VERSION) + time.sleep(1) + liquidsoap_version_string = \ + self.get_liquidsoap_version(self.telnet_liquidsoap.liquidsoap_get_info()) + + self.logger.info("Liquidsoap version string found %s" % liquidsoap_version_string) + class UnknownMediaItemType(Exception): pass diff --git a/python_apps/pypo/schedule/pypopush.py b/python_apps/pypo/schedule/pypopush.py index f9cc1926b..a015752d7 100644 --- a/python_apps/pypo/schedule/pypopush.py +++ b/python_apps/pypo/schedule/pypopush.py @@ -1,28 +1,32 @@ # -*- coding: utf-8 -*- +""" + schedule.pypopush + ~~~~~~~~~ + + Purpose of pypopush is to separate the schedule into items currently + scheduled and scheduled in the future. Currently scheduled items are + handled immediately, and future scheduled items are handed off to + PypoLiqQueue + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" + from datetime import datetime from datetime import timedelta +from threading import Thread +from Queue import Queue -import sys -import time import logging.config -import telnetlib -import calendar import math import traceback import os -from pypofetch import PypoFetch from pypoliqqueue import PypoLiqQueue - -from Queue import Empty, Queue - -from threading import Thread - +from schedule import pure from api_clients import api_client from std_err_override import LogWriter -from configobj import ConfigObj - # configure logging logging_cfg = os.path.join(os.path.dirname(__file__), "../configs/logging.cfg") @@ -30,31 +34,13 @@ logging.config.fileConfig(logging_cfg) logger = logging.getLogger() LogWriter.override_std_err(logger) -#need to wait for Python 2.7 for this.. -#logging.captureWarnings(True) - -PUSH_INTERVAL = 2 - - -def is_stream(media_item): - return media_item['type'] == 'stream_output_start' - -def is_file(media_item): - return media_item['type'] == 'file' - class PypoPush(Thread): - def __init__(self, q, telnet_lock, pypo_liquidsoap, config): + def __init__(self, q, pypo_liquidsoap): Thread.__init__(self) self.api_client = api_client.AirtimeApiClient() self.queue = q - self.telnet_lock = telnet_lock - self.config = config - - self.pushed_objects = {} self.logger = logging.getLogger('push') - self.current_prebuffering_stream_id = None - self.queue_id = 0 self.future_scheduled_queue = Queue() self.pypo_liquidsoap = pypo_liquidsoap @@ -67,9 +53,6 @@ class PypoPush(Thread): def main(self): - loops = 0 - heartbeat_period = math.floor(30 / PUSH_INTERVAL) - media_schedule = None while True: @@ -88,12 +71,6 @@ class PypoPush(Thread): self.pypo_liquidsoap.verify_correct_present_media(currently_playing) self.future_scheduled_queue.put(scheduled_for_future) - if loops % heartbeat_period == 0: - self.logger.info("heartbeat") - loops = 0 - loops += 1 - - def separate_present_future(self, media_schedule): tnow = datetime.utcnow() @@ -105,7 +82,7 @@ class PypoPush(Thread): media_item = media_schedule[mkey] diff_td = tnow - media_item['start'] - diff_sec = self.date_interval_to_seconds(diff_td) + diff_sec = pure.date_interval_to_seconds(diff_td) if diff_sec >= 0: present.append(media_item) @@ -114,80 +91,6 @@ class PypoPush(Thread): return present, future - def get_current_stream_id_from_liquidsoap(self): - response = "-1" - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) - - msg = 'dynamic_source.get_id\n' - tn.write(msg) - response = tn.read_until("\r\n").strip(" \r\n") - tn.write('exit\n') - tn.read_all() - except Exception, e: - self.logger.error("Error connecting to Liquidsoap: %s", e) - response = [] - finally: - self.telnet_lock.release() - - return response - - #def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id): - #correct = False - #if media_item is None: - #correct = (len(liquidsoap_queue_approx) == 0 and liquidsoap_stream_id == "-1") - #else: - #if is_file(media_item): - #if len(liquidsoap_queue_approx) == 0: - #correct = False - #else: - #correct = liquidsoap_queue_approx[0]['start'] == media_item['start'] and \ - #liquidsoap_queue_approx[0]['row_id'] == media_item['row_id'] and \ - #liquidsoap_queue_approx[0]['end'] == media_item['end'] and \ - #liquidsoap_queue_approx[0]['replay_gain'] == media_item['replay_gain'] - #elif is_stream(media_item): - #correct = liquidsoap_stream_id == str(media_item['row_id']) - - #self.logger.debug("Is current item correct?: %s", str(correct)) - #return correct - - def date_interval_to_seconds(self, interval): - """ - Convert timedelta object into int representing the number of seconds. If - number of seconds is less than 0, then return 0. - """ - seconds = (interval.microseconds + \ - (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) - - return seconds - - def stop_web_stream_all(self): - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(self.config['LS_HOST'], self.config['LS_PORT']) - - #msg = 'dynamic_source.read_stop_all xxx\n' - msg = 'http.stop\n' - self.logger.debug(msg) - tn.write(msg) - - msg = 'dynamic_source.output_stop\n' - self.logger.debug(msg) - tn.write(msg) - - msg = 'dynamic_source.id -1\n' - self.logger.debug(msg) - tn.write(msg) - - tn.write("exit\n") - self.logger.debug(tn.read_all()) - - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() - def run(self): try: self.main() except Exception, e: diff --git a/python_apps/pypo/schedule/telnetliquidsoap.py b/python_apps/pypo/schedule/telnetliquidsoap.py index 0cfb8709c..8cbf971b3 100644 --- a/python_apps/pypo/schedule/telnetliquidsoap.py +++ b/python_apps/pypo/schedule/telnetliquidsoap.py @@ -1,4 +1,14 @@ +""" + schedule.telnetliquidsoap + ~~~~~~~~~ + + Module exposing API to directly communicate with Liquidsoap via telnet. + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" import telnetlib +from threading import Lock def create_liquidsoap_annotation(media): # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. @@ -16,8 +26,8 @@ def create_liquidsoap_annotation(media): class TelnetLiquidsoap: - def __init__(self, telnet_lock, logger, ls_host, ls_port, queues): - self.telnet_lock = telnet_lock + def __init__(self, logger, ls_host, ls_port, queues): + self.telnet_lock = Lock() self.ls_host = ls_host self.ls_port = ls_port self.logger = logger @@ -247,6 +257,95 @@ class TelnetLiquidsoap: self.telnet_send([command]) + def liquidsoap_get_info(self): + self.logger.debug("Checking to see if Liquidsoap is running") + response = "" + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.host, self.port) + msg = "version\n" + tn.write(msg) + tn.write("exit\n") + response = tn.read_all() + except Exception, e: + self.logger.error(str(e)) + return None + finally: + self.telnet_lock.release() + + return response + + def update_liquidsoap_station_name(self, station_name): + try: + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.host, self.port) + command = ('vars.station_name %s\n' % station_name).encode('utf-8') + self.logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + except Exception, e: + self.logger.error("Exception %s", e) + + def get_liquidsoap_connection_status(self, current_time): + output = None + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.host, self.port) + # update the boot up time of Liquidsoap. Since Liquidsoap is not restarting, + # we are manually adjusting the bootup time variable so the status msg will get + # updated. + boot_up_time_command = "vars.bootup_time %s\n" % str(current_time) + self.logger.info(boot_up_time_command) + tn.write(boot_up_time_command) + + connection_status = "streams.connection_status\n" + self.logger.info(connection_status) + tn.write(connection_status) + + tn.write('exit\n') + + output = tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + return None + + def update_liquidsoap_stream_format(self, stream_format): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.host, self.port) + command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') + self.logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error("Exception %s", e) + finally: + self.telnet_lock.release() + + def update_liquidsoap_transition_fade(self, fade): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.host, self.port) + command = ('vars.default_dj_fade %s\n' % fade).encode('utf-8') + self.logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error("Exception %s", e) + finally: + self.telnet_lock.release() + class DummyTelnetLiquidsoap: def __init__(self, telnet_lock, logger): @@ -285,5 +384,7 @@ class DummyTelnetLiquidsoap: finally: self.telnet_lock.release() + + class QueueNotEmptyException(Exception): pass diff --git a/python_apps/pypo/tests/run_tests.sh b/python_apps/pypo/tests/run_tests.sh index 830a9bb85..3915e7eba 100755 --- a/python_apps/pypo/tests/run_tests.sh +++ b/python_apps/pypo/tests/run_tests.sh @@ -1,9 +1,9 @@ #!/bin/bash which py.test -pytest_exist=$? +pytest_exist="$?" -if [ "$pytest_exist" != "0" ]; then +if [ $pytest_exist != "0" ]; then echo "Need to have py.test installed. Exiting..." exit 1 fi diff --git a/python_apps/std_err_override/LogWriter.py b/python_apps/std_err_override/LogWriter.py index 65da21ff8..529f263ad 100644 --- a/python_apps/std_err_override/LogWriter.py +++ b/python_apps/std_err_override/LogWriter.py @@ -1,6 +1,17 @@ +""" + std_err_override.LogWriter + ~~~~~~~~~ + + This module presents a simple function to reroute output intended + for stderr to the output log file. + + :author: (c) 2012 by Martin Konecny. + :license: GPLv3, see LICENSE for more details. +""" + import sys -class LogWriter(): +class _LogWriter(): def __init__(self, logger): self.logger = logger @@ -8,4 +19,8 @@ class LogWriter(): self.logger.error(txt) def override_std_err(logger): - sys.stderr = LogWriter(logger) + """ + Create wrapper to intercept any messages that would have been printed out + to stderr and write them to our logger instead. + """ + sys.stderr = _LogWriter(logger)