From c8c9e462283e6405738be3bbb4542a1cd12013e7 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Thu, 15 Mar 2012 23:14:19 -0400 Subject: [PATCH] cc-3447: pypo telnet class --- python_apps/pypo/pypocli.py | 8 ++- python_apps/pypo/pypofetch.py | 84 +++++++++++++++++-------- python_apps/pypo/pypopush.py | 114 ++++++++++++++++++++-------------- 3 files changed, 132 insertions(+), 74 deletions(-) diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index 9eeb15a6f..db6dd7e1a 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -12,6 +12,8 @@ import logging.config import logging.handlers from Queue import Queue +from threading import Lock + from pypopush import PypoPush from pypofetch import PypoFetch from pypofile import PypoFile @@ -125,6 +127,8 @@ if __name__ == '__main__': recorder_q = Queue() pypoPush_q = Queue() + telnet_lock = Lock() + """ This queue is shared between pypo-fetch and pypo-file, where pypo-file is the receiver. Pypo-fetch will send every schedule it gets to pypo-file @@ -141,11 +145,11 @@ if __name__ == '__main__': pfile.daemon = True pfile.start() - pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q) + pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock) pf.daemon = True pf.start() - pp = PypoPush(pypoPush_q) + pp = PypoPush(pypoPush_q, telnet_lock) pp.daemon = True pp.start() diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index 72c896339..fa162b0fc 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -12,6 +12,8 @@ import telnetlib import math import copy from threading import Thread +from threading import Lock + from subprocess import Popen, PIPE from datetime import datetime from datetime import timedelta @@ -38,13 +40,15 @@ except Exception, e: sys.exit() class PypoFetch(Thread): - def __init__(self, pypoFetch_q, pypoPush_q, media_q): + def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock): Thread.__init__(self) self.api_client = api_client.api_client_factory(config) self.fetch_queue = pypoFetch_q self.push_queue = pypoPush_q self.media_prepare_queue = media_q + self.telnet_lock = telnet_lock + self.logger = logging.getLogger(); self.cache_dir = os.path.join(config["cache_dir"], "scheduler") @@ -113,14 +117,16 @@ class PypoFetch(Thread): elif(sourcename == "live_dj"): command += "live_dj_harbor.kick\n" + self.telnet_lock.acquire() try: tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn.write(command) tn.write('exit\n') tn.read_all() except Exception, e: - self.logger.debug(e) - self.logger.debug('Could not connect to liquidsoap') + self.logger.error(str(e)) + finally: + self.telnet_lock.release() def switch_source(self, sourcename, status): self.logger.debug('Switching source: %s to "%s" status', sourcename, status) @@ -137,14 +143,16 @@ class PypoFetch(Thread): else: command += "stop\n" + self.telnet_lock.acquire() try: tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn.write(command) tn.write('exit\n') tn.read_all() except Exception, e: - self.logger.debug(e) - self.logger.debug('Could not connect to liquidsoap') + self.logger.error(str(e)) + finally: + self.telnet_lock.release() """ This check current switch status from Airtime and update the status @@ -280,17 +288,25 @@ class PypoFetch(Thread): updates the status of liquidsoap connection to the streaming server This fucntion updates the bootup time variable in liquidsoap script """ - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - # update the boot up time of liquidsoap. Since liquidsoap is not restarting, - # we are manually adjusting the bootup time variable so the status msg will get - # updated. - current_time = time.time() - boot_up_time_command = "vars.bootup_time "+str(current_time)+"\n" - tn.write(boot_up_time_command) - tn.write("streams.connection_status\n") - tn.write('exit\n') - output = tn.read_all() + self.telnet_lock.acquire() + try: + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + # update the boot up time of liquidsoap. Since liquidsoap is not restarting, + # we are manually adjusting the bootup time variable so the status msg will get + # updated. + current_time = time.time() + boot_up_time_command = "vars.bootup_time "+str(current_time)+"\n" + tn.write(boot_up_time_command) + tn.write("streams.connection_status\n") + tn.write('exit\n') + + output = tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + output_list = output.split("\r\n") stream_info = output_list[2] @@ -313,12 +329,19 @@ class PypoFetch(Thread): try: self.logger.info(LS_HOST) self.logger.info(LS_PORT) - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') - self.logger.info(command) - tn.write(command) - tn.write('exit\n') - tn.read_all() + + self.telnet_lock.acquire() + try: + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') + self.logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() except Exception, e: self.logger.error("Exception %s", e) @@ -328,12 +351,19 @@ class PypoFetch(Thread): try: self.logger.info(LS_HOST) self.logger.info(LS_PORT) - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - command = ('vars.station_name %s\n' % station_name).encode('utf-8') - self.logger.info(command) - tn.write(command) - tn.write('exit\n') - tn.read_all() + + self.telnet_lock.acquire() + try: + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + command = ('vars.station_name %s\n' % station_name).encode('utf-8') + self.logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() except Exception, e: self.logger.error("Exception %s", e) diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 45f28b82b..b09fb705b 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -18,6 +18,8 @@ retrieved ("first-in, first-out"); however, lists are not efficient for this pur from collections import deque from threading import Thread +from threading import Lock + from api_clients import api_client from configobj import ConfigObj @@ -38,12 +40,14 @@ except Exception, e: sys.exit() class PypoPush(Thread): - def __init__(self, q): + def __init__(self, q, telnet_lock): Thread.__init__(self) self.api_client = api_client.api_client_factory(config) self.queue = q self.media = dict() + + self.telnet_lock = telnet_lock self.liquidsoap_state_play = True self.push_ahead = 10 @@ -161,13 +165,20 @@ class PypoPush(Thread): This function connects to Liquidsoap to find what media items are in its queue. """ - tn = telnetlib.Telnet(LS_HOST, LS_PORT) + self.telnet_lock.acquire() - msg = 'queue.queue\n' - tn.write(msg) - response = tn.read_until("\r\n").strip(" \r\n") - tn.write('exit\n') - tn.read_all() + try: + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + + msg = 'queue.queue\n' + tn.write(msg) + response = tn.read_until("\r\n").strip(" \r\n") + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() liquidsoap_queue_approx = [] @@ -243,21 +254,28 @@ class PypoPush(Thread): if 'queue_id' in media_item: queue_id = media_item['queue_id'] - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - msg = "queue.remove %s\n" % queue_id - tn.write(msg) - response = tn.read_until("\r\n").strip("\r\n") - - if "No such request in my queue" in response: - """ - Cannot remove because Liquidsoap started playing the item. Need - to use source.skip instead - """ - msg = "source.skip" - tn.write("source.skip") + self.telnet_lock.acquire() + try: + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + msg = "queue.remove %s\n" % queue_id + tn.write(msg) + response = tn.read_until("\r\n").strip("\r\n") + + if "No such request in my queue" in response: + """ + Cannot remove because Liquidsoap started playing the item. Need + to use source.skip instead + """ + msg = "source.skip" + tn.write("source.skip") + + tn.write("exit\n") + tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() - tn.write("exit\n") - tn.read_all() else: self.logger.error("'queue_id' key doesn't exist in media_item dict()") @@ -294,30 +312,36 @@ class PypoPush(Thread): about which show is playing. """ - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - - #tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8')) - - annotation = media_item['annotation'] - msg = 'queue.push %s\n' % annotation.encode('utf-8') - self.logger.debug(msg) - tn.write(msg) - queue_id = tn.read_until("\r\n").strip("\r\n") - - #remember the media_item's queue id which we may use - #later if we need to remove it from the queue. - media_item['queue_id'] = queue_id - - #add media_item to the end of our queue - self.pushed_objects[queue_id] = media_item - - show_name = media_item['show_name'] - msg = 'vars.show_name %s\n' % show_name.encode('utf-8') - tn.write(msg) - self.logger.debug(msg) - - tn.write("exit\n") - self.logger.debug(tn.read_all()) + self.telnet_lock.acquire() + try: + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + + #tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8')) + + annotation = media_item['annotation'] + msg = 'queue.push %s\n' % annotation.encode('utf-8') + self.logger.debug(msg) + tn.write(msg) + queue_id = tn.read_until("\r\n").strip("\r\n") + + #remember the media_item's queue id which we may use + #later if we need to remove it from the queue. + media_item['queue_id'] = queue_id + + #add media_item to the end of our queue + self.pushed_objects[queue_id] = media_item + + show_name = media_item['show_name'] + msg = 'vars.show_name %s\n' % show_name.encode('utf-8') + tn.write(msg) + self.logger.debug(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() def run(self): loops = 0