cc-3447: pypo telnet class

This commit is contained in:
Martin Konecny 2012-03-15 23:14:19 -04:00
parent 275fc7f987
commit c8c9e46228
3 changed files with 132 additions and 74 deletions

View File

@ -12,6 +12,8 @@ import logging.config
import logging.handlers import logging.handlers
from Queue import Queue from Queue import Queue
from threading import Lock
from pypopush import PypoPush from pypopush import PypoPush
from pypofetch import PypoFetch from pypofetch import PypoFetch
from pypofile import PypoFile from pypofile import PypoFile
@ -125,6 +127,8 @@ if __name__ == '__main__':
recorder_q = Queue() recorder_q = Queue()
pypoPush_q = Queue() pypoPush_q = Queue()
telnet_lock = Lock()
""" """
This queue is shared between pypo-fetch and pypo-file, where pypo-file This queue is shared between pypo-fetch and pypo-file, where pypo-file
is the receiver. Pypo-fetch will send every schedule it gets to pypo-file is the receiver. Pypo-fetch will send every schedule it gets to pypo-file
@ -141,11 +145,11 @@ if __name__ == '__main__':
pfile.daemon = True pfile.daemon = True
pfile.start() pfile.start()
pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q) pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock)
pf.daemon = True pf.daemon = True
pf.start() pf.start()
pp = PypoPush(pypoPush_q) pp = PypoPush(pypoPush_q, telnet_lock)
pp.daemon = True pp.daemon = True
pp.start() pp.start()

View File

@ -12,6 +12,8 @@ import telnetlib
import math import math
import copy import copy
from threading import Thread from threading import Thread
from threading import Lock
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
@ -38,13 +40,15 @@ except Exception, e:
sys.exit() sys.exit()
class PypoFetch(Thread): class PypoFetch(Thread):
def __init__(self, pypoFetch_q, pypoPush_q, media_q): def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock):
Thread.__init__(self) Thread.__init__(self)
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.fetch_queue = pypoFetch_q self.fetch_queue = pypoFetch_q
self.push_queue = pypoPush_q self.push_queue = pypoPush_q
self.media_prepare_queue = media_q self.media_prepare_queue = media_q
self.telnet_lock = telnet_lock
self.logger = logging.getLogger(); self.logger = logging.getLogger();
self.cache_dir = os.path.join(config["cache_dir"], "scheduler") self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
@ -113,14 +117,16 @@ class PypoFetch(Thread):
elif(sourcename == "live_dj"): elif(sourcename == "live_dj"):
command += "live_dj_harbor.kick\n" command += "live_dj_harbor.kick\n"
self.telnet_lock.acquire()
try: try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write(command) tn.write(command)
tn.write('exit\n') tn.write('exit\n')
tn.read_all() tn.read_all()
except Exception, e: except Exception, e:
self.logger.debug(e) self.logger.error(str(e))
self.logger.debug('Could not connect to liquidsoap') finally:
self.telnet_lock.release()
def switch_source(self, sourcename, status): def switch_source(self, sourcename, status):
self.logger.debug('Switching source: %s to "%s" status', sourcename, status) self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
@ -137,14 +143,16 @@ class PypoFetch(Thread):
else: else:
command += "stop\n" command += "stop\n"
self.telnet_lock.acquire()
try: try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write(command) tn.write(command)
tn.write('exit\n') tn.write('exit\n')
tn.read_all() tn.read_all()
except Exception, e: except Exception, e:
self.logger.debug(e) self.logger.error(str(e))
self.logger.debug('Could not connect to liquidsoap') finally:
self.telnet_lock.release()
""" """
This check current switch status from Airtime and update the status This check current switch status from Airtime and update the status
@ -280,17 +288,25 @@ class PypoFetch(Thread):
updates the status of liquidsoap connection to the streaming server updates the status of liquidsoap connection to the streaming server
This fucntion updates the bootup time variable in liquidsoap script This fucntion updates the bootup time variable in liquidsoap script
""" """
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
# update the boot up time of liquidsoap. Since liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
boot_up_time_command = "vars.bootup_time "+str(current_time)+"\n"
tn.write(boot_up_time_command)
tn.write("streams.connection_status\n")
tn.write('exit\n')
output = tn.read_all() self.telnet_lock.acquire()
try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
# update the boot up time of liquidsoap. Since liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
boot_up_time_command = "vars.bootup_time "+str(current_time)+"\n"
tn.write(boot_up_time_command)
tn.write("streams.connection_status\n")
tn.write('exit\n')
output = tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
output_list = output.split("\r\n") output_list = output.split("\r\n")
stream_info = output_list[2] stream_info = output_list[2]
@ -313,12 +329,19 @@ class PypoFetch(Thread):
try: try:
self.logger.info(LS_HOST) self.logger.info(LS_HOST)
self.logger.info(LS_PORT) self.logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') self.telnet_lock.acquire()
self.logger.info(command) try:
tn.write(command) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write('exit\n') command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
tn.read_all() self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
except Exception, e: except Exception, e:
self.logger.error("Exception %s", e) self.logger.error("Exception %s", e)
@ -328,12 +351,19 @@ class PypoFetch(Thread):
try: try:
self.logger.info(LS_HOST) self.logger.info(LS_HOST)
self.logger.info(LS_PORT) self.logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.station_name %s\n' % station_name).encode('utf-8') self.telnet_lock.acquire()
self.logger.info(command) try:
tn.write(command) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write('exit\n') command = ('vars.station_name %s\n' % station_name).encode('utf-8')
tn.read_all() self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
except Exception, e: except Exception, e:
self.logger.error("Exception %s", e) self.logger.error("Exception %s", e)

View File

@ -18,6 +18,8 @@ retrieved ("first-in, first-out"); however, lists are not efficient for this pur
from collections import deque from collections import deque
from threading import Thread from threading import Thread
from threading import Lock
from api_clients import api_client from api_clients import api_client
from configobj import ConfigObj from configobj import ConfigObj
@ -38,12 +40,14 @@ except Exception, e:
sys.exit() sys.exit()
class PypoPush(Thread): class PypoPush(Thread):
def __init__(self, q): def __init__(self, q, telnet_lock):
Thread.__init__(self) Thread.__init__(self)
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.queue = q self.queue = q
self.media = dict() self.media = dict()
self.telnet_lock = telnet_lock
self.liquidsoap_state_play = True self.liquidsoap_state_play = True
self.push_ahead = 10 self.push_ahead = 10
@ -161,13 +165,20 @@ class PypoPush(Thread):
This function connects to Liquidsoap to find what media items are in its queue. This function connects to Liquidsoap to find what media items are in its queue.
""" """
tn = telnetlib.Telnet(LS_HOST, LS_PORT) self.telnet_lock.acquire()
msg = 'queue.queue\n' try:
tn.write(msg) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
response = tn.read_until("\r\n").strip(" \r\n")
tn.write('exit\n') msg = 'queue.queue\n'
tn.read_all() tn.write(msg)
response = tn.read_until("\r\n").strip(" \r\n")
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
liquidsoap_queue_approx = [] liquidsoap_queue_approx = []
@ -243,21 +254,28 @@ class PypoPush(Thread):
if 'queue_id' in media_item: if 'queue_id' in media_item:
queue_id = media_item['queue_id'] queue_id = media_item['queue_id']
tn = telnetlib.Telnet(LS_HOST, LS_PORT) self.telnet_lock.acquire()
msg = "queue.remove %s\n" % queue_id try:
tn.write(msg) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
response = tn.read_until("\r\n").strip("\r\n") msg = "queue.remove %s\n" % queue_id
tn.write(msg)
if "No such request in my queue" in response: response = tn.read_until("\r\n").strip("\r\n")
"""
Cannot remove because Liquidsoap started playing the item. Need if "No such request in my queue" in response:
to use source.skip instead """
""" Cannot remove because Liquidsoap started playing the item. Need
msg = "source.skip" to use source.skip instead
tn.write("source.skip") """
msg = "source.skip"
tn.write("source.skip")
tn.write("exit\n")
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
tn.write("exit\n")
tn.read_all()
else: else:
self.logger.error("'queue_id' key doesn't exist in media_item dict()") self.logger.error("'queue_id' key doesn't exist in media_item dict()")
@ -294,30 +312,36 @@ class PypoPush(Thread):
about which show is playing. about which show is playing.
""" """
tn = telnetlib.Telnet(LS_HOST, LS_PORT) self.telnet_lock.acquire()
try:
#tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8')) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
annotation = media_item['annotation'] #tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8'))
msg = 'queue.push %s\n' % annotation.encode('utf-8')
self.logger.debug(msg) annotation = media_item['annotation']
tn.write(msg) msg = 'queue.push %s\n' % annotation.encode('utf-8')
queue_id = tn.read_until("\r\n").strip("\r\n") self.logger.debug(msg)
tn.write(msg)
#remember the media_item's queue id which we may use queue_id = tn.read_until("\r\n").strip("\r\n")
#later if we need to remove it from the queue.
media_item['queue_id'] = queue_id #remember the media_item's queue id which we may use
#later if we need to remove it from the queue.
#add media_item to the end of our queue media_item['queue_id'] = queue_id
self.pushed_objects[queue_id] = media_item
#add media_item to the end of our queue
show_name = media_item['show_name'] self.pushed_objects[queue_id] = media_item
msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
tn.write(msg) show_name = media_item['show_name']
self.logger.debug(msg) msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
tn.write(msg)
tn.write("exit\n") self.logger.debug(msg)
self.logger.debug(tn.read_all())
tn.write("exit\n")
self.logger.debug(tn.read_all())
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
def run(self): def run(self):
loops = 0 loops = 0