# -*- coding: utf-8 -*- import os import sys import time import logging.config import json import telnetlib import copy from threading import Thread import subprocess from Queue import Empty from api_clients import api_client from std_err_override import LogWriter from configobj import ConfigObj # configure logging logging.config.fileConfig("logging.cfg") logger = logging.getLogger() LogWriter.override_std_err(logger) #need to wait for Python 2.7 for this.. #logging.captureWarnings(True) # loading config file try: config = ConfigObj('/etc/airtime/pypo.cfg') LS_HOST = config['ls_host'] LS_PORT = config['ls_port'] #POLL_INTERVAL = int(config['poll_interval']) POLL_INTERVAL = 1800 except Exception, e: logger.error('Error loading config file: %s', e) sys.exit() class PypoFetch(Thread): def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock): Thread.__init__(self) self.api_client = api_client.AirtimeApiClient() self.fetch_queue = pypoFetch_q self.push_queue = pypoPush_q self.media_prepare_queue = media_q self.last_update_schedule_timestamp = time.time() self.listener_timeout = POLL_INTERVAL self.telnet_lock = telnet_lock self.logger = logging.getLogger(); self.cache_dir = os.path.join(config["cache_dir"], "scheduler") self.logger.debug("Cache dir %s", self.cache_dir) try: if not os.path.isdir(dir): """ We get here if path does not exist, or path does exist but is a file. We are not handling the second case, but don't think we actually care about handling it. """ self.logger.debug("Cache dir does not exist. Creating...") os.makedirs(dir) except Exception, e: pass self.schedule_data = [] self.logger.info("PypoFetch: init complete") """ Handle a message from RabbitMQ, put it into our yucky global var. Hopefully there is a better way to do this. """ def handle_message(self, message): try: self.logger.info("Received event from Pypo Message Handler: %s" % message) m = json.loads(message) command = m['event_type'] self.logger.info("Handling command: " + command) if command == 'update_schedule': self.schedule_data = m['schedule'] self.process_schedule(self.schedule_data) elif command == 'reset_liquidsoap_bootstrap': self.set_bootstrap_variables() elif command == 'update_stream_setting': self.logger.info("Updating stream setting...") self.regenerate_liquidsoap_conf(m['setting']) elif command == 'update_stream_format': self.logger.info("Updating stream format...") self.update_liquidsoap_stream_format(m['stream_format']) elif command == 'update_station_name': self.logger.info("Updating station name...") self.update_liquidsoap_station_name(m['station_name']) elif command == 'update_transition_fade': self.logger.info("Updating transition_fade...") self.update_liquidsoap_transition_fade(m['transition_fade']) elif command == 'switch_source': self.logger.info("switch_on_source show command received...") self.switch_source(self.logger, self.telnet_lock, m['sourcename'], m['status']) elif command == 'disconnect_source': self.logger.info("disconnect_on_source show command received...") self.disconnect_source(self.logger, self.telnet_lock, m['sourcename']) # update timeout value if command == 'update_schedule': self.listener_timeout = POLL_INTERVAL else: self.listener_timeout = self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL if self.listener_timeout < 0: self.listener_timeout = 0 self.logger.info("New timeout: %s" % self.listener_timeout) except Exception, e: import traceback top = traceback.format_exc() self.logger.error('Exception: %s', e) self.logger.error("traceback: %s", top) self.logger.error("Exception in handling Message Handler message: %s", e) @staticmethod def disconnect_source(logger, lock, sourcename): logger.debug('Disconnecting source: %s', sourcename) command = "" if(sourcename == "master_dj"): command += "master_harbor.kick\n" elif(sourcename == "live_dj"): command += "live_dj_harbor.kick\n" lock.acquire() try: tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn.write(command) tn.write('exit\n') tn.read_all() except Exception, e: logger.error(str(e)) finally: lock.release() @staticmethod def switch_source(logger, lock, sourcename, status): logger.debug('Switching source: %s to "%s" status', sourcename, status) command = "streams." if sourcename == "master_dj": command += "master_dj_" elif sourcename == "live_dj": command += "live_dj_" elif sourcename == "scheduled_play": command += "scheduled_play_" if status == "on": command += "start\n" else: command += "stop\n" lock.acquire() try: tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn.write(command) tn.write('exit\n') tn.read_all() except Exception, e: logger.error(str(e)) finally: lock.release() """ grabs some information that are needed to be set on bootstrap time and configures them """ def set_bootstrap_variables(self): self.logger.debug('Getting information needed on bootstrap from Airtime') info = self.api_client.get_bootstrap_info() if info is None: self.logger.error('Unable to get bootstrap info.. Exiting pypo...') sys.exit(1) else: self.logger.debug('info:%s', info) for k, v in info['switch_status'].iteritems(): self.switch_source(self.logger, self.telnet_lock, k, v) self.update_liquidsoap_stream_format(info['stream_label']) self.update_liquidsoap_station_name(info['station_name']) self.update_liquidsoap_transition_fade(info['transition_fade']) def write_liquidsoap_config(self, setting): fh = open('/etc/airtime/liquidsoap.cfg', 'w') self.logger.info("Rewriting liquidsoap.cfg...") fh.write("################################################\n") fh.write("# THIS FILE IS AUTO GENERATED. DO NOT CHANGE!! #\n") fh.write("################################################\n") for k, d in setting: buffer_str = d[u'keyname'] + " = " if d[u'type'] == 'string': temp = d[u'value'] buffer_str += '"%s"' % temp else: temp = d[u'value'] if temp == "": temp = "0" buffer_str += temp buffer_str += "\n" fh.write(api_client.encode_to(buffer_str)) fh.write("log_file = \"/var/log/airtime/pypo-liquidsoap/