Format code using black

This commit is contained in:
jo 2021-05-27 16:23:02 +02:00
parent efe4fa027e
commit c27f020d73
85 changed files with 3238 additions and 2243 deletions

View file

@ -2,5 +2,5 @@
# -*- coding: utf-8 -*-
import runpy
# Run the liquidsoap python module
runpy.run_module('liquidsoap')
# Run the liquidsoap python module
runpy.run_module("liquidsoap")

View file

@ -3,4 +3,3 @@
import runpy
runpy.run_module("pypo", run_name="__main__")

View file

@ -27,27 +27,75 @@ import json
from configobj import ConfigObj
# custom imports
#from util import *
# from util import *
from api_clients import version1 as api_client
LOG_LEVEL = logging.INFO
LOG_PATH = '/var/log/airtime/pypo/notify.log'
LOG_PATH = "/var/log/airtime/pypo/notify.log"
# help screeen / info
usage = "%prog [options]" + " - notification gateway"
parser = OptionParser(usage=usage)
# Options
parser.add_option("-d", "--data", help="Pass JSON data from Liquidsoap into this script.", metavar="data")
parser.add_option("-m", "--media-id", help="ID of the file that is currently playing.", metavar="media_id")
parser.add_option("-e", "--error", action="store", dest="error", type="string", help="Liquidsoap error msg.", metavar="error_msg")
parser.add_option(
"-d",
"--data",
help="Pass JSON data from Liquidsoap into this script.",
metavar="data",
)
parser.add_option(
"-m",
"--media-id",
help="ID of the file that is currently playing.",
metavar="media_id",
)
parser.add_option(
"-e",
"--error",
action="store",
dest="error",
type="string",
help="Liquidsoap error msg.",
metavar="error_msg",
)
parser.add_option("-s", "--stream-id", help="ID stream", metavar="stream_id")
parser.add_option("-c", "--connect", help="Liquidsoap connected", action="store_true", metavar="connect")
parser.add_option("-t", "--time", help="Liquidsoap boot up time", action="store", dest="time", metavar="time", type="string")
parser.add_option("-x", "--source-name", help="source connection name", metavar="source_name")
parser.add_option("-y", "--source-status", help="source connection status", metavar="source_status")
parser.add_option("-w", "--webstream", help="JSON metadata associated with webstream", metavar="json_data")
parser.add_option("-n", "--liquidsoap-started", help="notify liquidsoap started", metavar="json_data", action="store_true", default=False)
parser.add_option(
"-c",
"--connect",
help="Liquidsoap connected",
action="store_true",
metavar="connect",
)
parser.add_option(
"-t",
"--time",
help="Liquidsoap boot up time",
action="store",
dest="time",
metavar="time",
type="string",
)
parser.add_option(
"-x", "--source-name", help="source connection name", metavar="source_name"
)
parser.add_option(
"-y", "--source-status", help="source connection status", metavar="source_status"
)
parser.add_option(
"-w",
"--webstream",
help="JSON metadata associated with webstream",
metavar="json_data",
)
parser.add_option(
"-n",
"--liquidsoap-started",
help="notify liquidsoap started",
metavar="json_data",
action="store_true",
default=False,
)
# parse options
@ -55,12 +103,15 @@ parser.add_option("-n", "--liquidsoap-started", help="notify liquidsoap started"
# Set up logging
logging.captureWarnings(True)
logFormatter = logging.Formatter("%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s")
logFormatter = logging.Formatter(
"%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s"
)
rootLogger = logging.getLogger()
rootLogger.setLevel(LOG_LEVEL)
fileHandler = logging.handlers.RotatingFileHandler(filename=LOG_PATH, maxBytes=1024*1024*30,
backupCount=8)
fileHandler = logging.handlers.RotatingFileHandler(
filename=LOG_PATH, maxBytes=1024 * 1024 * 30, backupCount=8
)
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)
@ -69,15 +120,15 @@ consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
logger = rootLogger
#need to wait for Python 2.7 for this..
#logging.captureWarnings(True)
# need to wait for Python 2.7 for this..
# logging.captureWarnings(True)
# loading config file
try:
config = ConfigObj('/etc/airtime/airtime.conf')
config = ConfigObj("/etc/airtime/airtime.conf")
except Exception as e:
logger.error('Error loading config file: %s', e)
logger.error("Error loading config file: %s", e)
sys.exit()
@ -90,39 +141,41 @@ class Notify:
self.api_client.notify_liquidsoap_started()
def notify_media_start_playing(self, media_id):
logger.debug('#################################################')
logger.debug('# Calling server to update about what\'s playing #')
logger.debug('#################################################')
logger.debug("#################################################")
logger.debug("# Calling server to update about what's playing #")
logger.debug("#################################################")
response = self.api_client.notify_media_item_start_playing(media_id)
logger.debug("Response: " + json.dumps(response))
# @pram time: time that LS started
def notify_liquidsoap_status(self, msg, stream_id, time):
logger.info('#################################################')
logger.info('# Calling server to update liquidsoap status #')
logger.info('#################################################')
logger.info('msg = ' + str(msg))
logger.info("#################################################")
logger.info("# Calling server to update liquidsoap status #")
logger.info("#################################################")
logger.info("msg = " + str(msg))
response = self.api_client.notify_liquidsoap_status(msg, stream_id, time)
logger.info("Response: " + json.dumps(response))
def notify_source_status(self, source_name, status):
logger.debug('#################################################')
logger.debug('# Calling server to update source status #')
logger.debug('#################################################')
logger.debug('msg = ' + str(source_name) + ' : ' + str(status))
logger.debug("#################################################")
logger.debug("# Calling server to update source status #")
logger.debug("#################################################")
logger.debug("msg = " + str(source_name) + " : " + str(status))
response = self.api_client.notify_source_status(source_name, status)
logger.debug("Response: " + json.dumps(response))
def notify_webstream_data(self, data, media_id):
logger.debug('#################################################')
logger.debug('# Calling server to update webstream data #')
logger.debug('#################################################')
logger.debug("#################################################")
logger.debug("# Calling server to update webstream data #")
logger.debug("#################################################")
response = self.api_client.notify_webstream_data(data, media_id)
logger.debug("Response: " + json.dumps(response))
def run_with_options(self, options):
if options.error and options.stream_id:
self.notify_liquidsoap_status(options.error, options.stream_id, options.time)
self.notify_liquidsoap_status(
options.error, options.stream_id, options.time
)
elif options.connect and options.stream_id:
self.notify_liquidsoap_status("OK", options.stream_id, options.time)
elif options.source_name and options.source_status:
@ -134,15 +187,17 @@ class Notify:
elif options.liquidsoap_started:
self.notify_liquidsoap_started()
else:
logger.debug("Unrecognized option in options({}). Doing nothing".format(options))
logger.debug(
"Unrecognized option in options({}). Doing nothing".format(options)
)
if __name__ == '__main__':
if __name__ == "__main__":
print()
print('#########################################')
print('# *** pypo *** #')
print('# pypo notification gateway #')
print('#########################################')
print("#########################################")
print("# *** pypo *** #")
print("# pypo notification gateway #")
print("#########################################")
# initialize
try:
@ -150,4 +205,3 @@ if __name__ == '__main__':
n.run_with_options(options)
except Exception as e:
print(traceback.format_exc())

View file

@ -7,9 +7,10 @@ import time
import traceback
from api_clients.version1 import AirtimeApiClient
def generate_liquidsoap_config(ss):
data = ss['msg']
fh = open('/etc/airtime/liquidsoap.cfg', 'w')
data = ss["msg"]
fh = open("/etc/airtime/liquidsoap.cfg", "w")
fh.write("################################################\n")
fh.write("# THIS FILE IS AUTO GENERATED. DO NOT CHANGE!! #\n")
fh.write("################################################\n")
@ -17,17 +18,17 @@ def generate_liquidsoap_config(ss):
for key, value in data.items():
try:
if not "port" in key and not "bitrate" in key: # Stupid hack
if not "port" in key and not "bitrate" in key: # Stupid hack
raise ValueError()
str_buffer = "%s = %s\n" % (key, int(value))
except ValueError:
try: # Is it a boolean?
if value=="true" or value=="false":
try: # Is it a boolean?
if value == "true" or value == "false":
str_buffer = "%s = %s\n" % (key, value.lower())
else:
raise ValueError() # Just drop into the except below
except: #Everything else is a string
str_buffer = "%s = \"%s\"\n" % (key, value)
raise ValueError() # Just drop into the except below
except: # Everything else is a string
str_buffer = '%s = "%s"\n' % (key, value)
fh.write(str_buffer)
# ignore squashes unused variable errors from Liquidsoap
@ -38,8 +39,9 @@ def generate_liquidsoap_config(ss):
fh.write('auth_path = "%s/liquidsoap_auth.py"\n' % auth_path)
fh.close()
def run():
logging.basicConfig(format='%(message)s')
logging.basicConfig(format="%(message)s")
attempts = 0
max_attempts = 10
successful = False

View file

@ -9,16 +9,16 @@ dj_type = sys.argv[1]
username = sys.argv[2]
password = sys.argv[3]
source_type = ''
if dj_type == '--master':
source_type = 'master'
elif dj_type == '--dj':
source_type = 'dj'
source_type = ""
if dj_type == "--master":
source_type = "master"
elif dj_type == "--dj":
source_type = "dj"
response = api_clients.check_live_stream_auth(username, password, source_type)
if 'msg' in response and response['msg'] == True:
print(response['msg'])
if "msg" in response and response["msg"] == True:
print(response["msg"])
sys.exit(0)
else:
print(False)

View file

@ -4,17 +4,16 @@ import telnetlib
import sys
try:
config = ConfigObj('/etc/airtime/airtime.conf')
LS_HOST = config['pypo']['ls_host']
LS_PORT = config['pypo']['ls_port']
config = ConfigObj("/etc/airtime/airtime.conf")
LS_HOST = config["pypo"]["ls_host"]
LS_PORT = config["pypo"]["ls_port"]
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write("master_harbor.stop\n")
tn.write("live_dj_harbor.stop\n")
tn.write('exit\n')
tn.write("exit\n")
tn.read_all()
except Exception as e:
print("Error loading config file: {}".format(e))
sys.exit()

View file

@ -18,6 +18,7 @@ from configobj import ConfigObj
from datetime import datetime
from optparse import OptionParser
import importlib
try:
from queue import Queue
except ImportError: # Python 2.7.5 (CentOS 7)

View file

@ -10,9 +10,10 @@ import time
from api_clients import version1 as api_client
class ListenerStat(Thread):
HTTP_REQUEST_TIMEOUT = 30 # 30 second HTTP request timeout
HTTP_REQUEST_TIMEOUT = 30 # 30 second HTTP request timeout
def __init__(self, config, logger=None):
Thread.__init__(self)
@ -28,50 +29,49 @@ class ListenerStat(Thread):
for node in nodelist:
if node.nodeType == node.TEXT_NODE:
rc.append(node.data)
return ''.join(rc)
return "".join(rc)
def get_stream_parameters(self):
#[{"user":"", "password":"", "url":"", "port":""},{},{}]
# [{"user":"", "password":"", "url":"", "port":""},{},{}]
return self.api_client.get_stream_parameters()
def get_stream_server_xml(self, ip, url, is_shoutcast=False):
auth_string = "%(admin_user)s:%(admin_pass)s" % ip
encoded = base64.b64encode(auth_string.encode('utf-8'))
encoded = base64.b64encode(auth_string.encode("utf-8"))
header = {"Authorization":"Basic %s" % encoded.decode('ascii')}
header = {"Authorization": "Basic %s" % encoded.decode("ascii")}
if is_shoutcast:
#user agent is required for shoutcast auth, otherwise it returns 404.
# user agent is required for shoutcast auth, otherwise it returns 404.
user_agent = "Mozilla/5.0 (Linux; rv:22.0) Gecko/20130405 Firefox/22.0"
header["User-Agent"] = user_agent
req = urllib.request.Request(
#assuming that the icecast stats path is /admin/stats.xml
#need to fix this
# assuming that the icecast stats path is /admin/stats.xml
# need to fix this
url=url,
headers=header)
headers=header,
)
f = urllib.request.urlopen(req, timeout=ListenerStat.HTTP_REQUEST_TIMEOUT)
document = f.read()
return document
def get_icecast_stats(self, ip):
document = None
if "airtime.pro" in ip["host"].lower():
url = 'http://%(host)s:%(port)s/stats.xsl' % ip
url = "http://%(host)s:%(port)s/stats.xsl" % ip
document = self.get_stream_server_xml(ip, url)
else:
url = 'http://%(host)s:%(port)s/admin/stats.xml' % ip
url = "http://%(host)s:%(port)s/admin/stats.xml" % ip
document = self.get_stream_server_xml(ip, url)
dom = defusedxml.minidom.parseString(document)
sources = dom.getElementsByTagName("source")
mount_stats = None
for s in sources:
#drop the leading '/' character
# drop the leading '/' character
mount_name = s.getAttribute("mount")[1:]
if mount_name == ip["mount"]:
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
@ -80,14 +80,16 @@ class ListenerStat(Thread):
if len(listeners):
num_listeners = self.get_node_text(listeners[0].childNodes)
mount_stats = {"timestamp":timestamp, \
"num_listeners": num_listeners, \
"mount_name": mount_name}
mount_stats = {
"timestamp": timestamp,
"num_listeners": num_listeners,
"mount_name": mount_name,
}
return mount_stats
def get_shoutcast_stats(self, ip):
url = 'http://%(host)s:%(port)s/admin.cgi?sid=1&mode=viewxml' % ip
url = "http://%(host)s:%(port)s/admin.cgi?sid=1&mode=viewxml" % ip
document = self.get_stream_server_xml(ip, url, is_shoutcast=True)
dom = defusedxml.minidom.parseString(document)
current_listeners = dom.getElementsByTagName("CURRENTLISTENERS")
@ -97,34 +99,37 @@ class ListenerStat(Thread):
if len(current_listeners):
num_listeners = self.get_node_text(current_listeners[0].childNodes)
mount_stats = {"timestamp":timestamp, \
"num_listeners": num_listeners, \
"mount_name": "shoutcast"}
mount_stats = {
"timestamp": timestamp,
"num_listeners": num_listeners,
"mount_name": "shoutcast",
}
return mount_stats
def get_stream_stats(self, stream_parameters):
stats = []
#iterate over stream_parameters which is a list of dicts. Each dict
#represents one Airtime stream (currently this limit is 3).
#Note that there can be optimizations done, since if all three
#streams are the same server, we will still initiate 3 separate
#connections
# iterate over stream_parameters which is a list of dicts. Each dict
# represents one Airtime stream (currently this limit is 3).
# Note that there can be optimizations done, since if all three
# streams are the same server, we will still initiate 3 separate
# connections
for k, v in stream_parameters.items():
if v["enable"] == 'true':
if v["enable"] == "true":
try:
if v["output"] == "icecast":
mount_stats = self.get_icecast_stats(v)
if mount_stats: stats.append(mount_stats)
if mount_stats:
stats.append(mount_stats)
else:
stats.append(self.get_shoutcast_stats(v))
self.update_listener_stat_error(v["mount"], 'OK')
self.update_listener_stat_error(v["mount"], "OK")
except Exception as e:
try:
self.update_listener_stat_error(v["mount"], str(e))
except Exception as e:
self.logger.error('Exception: %s', e)
self.logger.error("Exception: %s", e)
return stats
@ -132,15 +137,15 @@ class ListenerStat(Thread):
self.api_client.push_stream_stats(stats)
def update_listener_stat_error(self, stream_id, error):
keyname = '%s_listener_stat_error' % stream_id
keyname = "%s_listener_stat_error" % stream_id
data = {keyname: error}
self.api_client.update_stream_setting_table(data)
def run(self):
#Wake up every 120 seconds and gather icecast statistics. Note that we
#are currently querying the server every 2 minutes for list of
#mountpoints as well. We could remove this query if we hooked into
#rabbitmq events, and listened for these changes instead.
# Wake up every 120 seconds and gather icecast statistics. Note that we
# are currently querying the server every 2 minutes for list of
# mountpoints as well. We could remove this query if we hooked into
# rabbitmq events, and listened for these changes instead.
while True:
try:
stream_parameters = self.get_stream_parameters()
@ -149,25 +154,27 @@ class ListenerStat(Thread):
if stats:
self.push_stream_stats(stats)
except Exception as e:
self.logger.error('Exception: %s', e)
self.logger.error("Exception: %s", e)
time.sleep(120)
self.logger.info('ListenerStat thread exiting')
self.logger.info("ListenerStat thread exiting")
if __name__ == "__main__":
# create logger
logger = logging.getLogger('std_out')
logger = logging.getLogger("std_out")
logger.setLevel(logging.DEBUG)
# create console handler and set level to debug
#ch = logging.StreamHandler()
#ch.setLevel(logging.DEBUG)
# ch = logging.StreamHandler()
# ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)s - %(levelname)s - %(message)s')
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(lineno)s - %(levelname)s - %(message)s"
)
# add formatter to ch
#ch.setFormatter(formatter)
# ch.setFormatter(formatter)
# add ch to logger
#logger.addHandler(ch)
# logger.addHandler(ch)
#ls = ListenerStat(logger=logger)
#ls.run()
# ls = ListenerStat(logger=logger)
# ls.run()

View file

@ -2,6 +2,7 @@
import re
from packaging.version import Version, parse
def version_cmp(version1, version2):
version1 = parse(version1)
version2 = parse(version2)
@ -11,12 +12,14 @@ def version_cmp(version1, version2):
return 0
return -1
def date_interval_to_seconds(interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (interval.microseconds + \
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
seconds = (
interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10 ** 6
) / float(10 ** 6)
return seconds

View file

@ -23,20 +23,24 @@ from .timeout import ls_timeout
def keyboardInterruptHandler(signum, frame):
logger = logging.getLogger()
logger.info('\nKeyboard Interrupt\n')
logger.info("\nKeyboard Interrupt\n")
sys.exit(0)
signal.signal(signal.SIGINT, keyboardInterruptHandler)
logging.captureWarnings(True)
POLL_INTERVAL = 400
class PypoFetch(Thread):
def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config):
class PypoFetch(Thread):
def __init__(
self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config
):
Thread.__init__(self)
#Hacky...
# Hacky...
PypoFetch.ref = self
self.v1_api_client = v1_api_client.AirtimeApiClient()
@ -76,6 +80,7 @@ class PypoFetch(Thread):
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
"""
def handle_message(self, message):
try:
self.logger.info("Received event from Pypo Message Handler: %s" % message)
@ -85,50 +90,52 @@ class PypoFetch(Thread):
except (UnicodeDecodeError, AttributeError):
pass
m = json.loads(message)
command = m['event_type']
command = m["event_type"]
self.logger.info("Handling command: " + command)
if command == 'update_schedule':
self.schedule_data = m['schedule']
if command == "update_schedule":
self.schedule_data = m["schedule"]
self.process_schedule(self.schedule_data)
elif command == 'reset_liquidsoap_bootstrap':
elif command == "reset_liquidsoap_bootstrap":
self.set_bootstrap_variables()
elif command == 'update_stream_setting':
elif command == "update_stream_setting":
self.logger.info("Updating stream setting...")
self.regenerate_liquidsoap_conf(m['setting'])
elif command == 'update_stream_format':
self.regenerate_liquidsoap_conf(m["setting"])
elif command == "update_stream_format":
self.logger.info("Updating stream format...")
self.update_liquidsoap_stream_format(m['stream_format'])
elif command == 'update_station_name':
self.update_liquidsoap_stream_format(m["stream_format"])
elif command == "update_station_name":
self.logger.info("Updating station name...")
self.update_liquidsoap_station_name(m['station_name'])
elif command == 'update_transition_fade':
self.update_liquidsoap_station_name(m["station_name"])
elif command == "update_transition_fade":
self.logger.info("Updating transition_fade...")
self.update_liquidsoap_transition_fade(m['transition_fade'])
elif command == 'switch_source':
self.update_liquidsoap_transition_fade(m["transition_fade"])
elif command == "switch_source":
self.logger.info("switch_on_source show command received...")
self.pypo_liquidsoap.\
get_telnet_dispatcher().\
switch_source(m['sourcename'], m['status'])
elif command == 'disconnect_source':
self.pypo_liquidsoap.get_telnet_dispatcher().switch_source(
m["sourcename"], m["status"]
)
elif command == "disconnect_source":
self.logger.info("disconnect_on_source show command received...")
self.pypo_liquidsoap.get_telnet_dispatcher().\
disconnect_source(m['sourcename'])
self.pypo_liquidsoap.get_telnet_dispatcher().disconnect_source(
m["sourcename"]
)
else:
self.logger.info("Unknown command: %s" % command)
# update timeout value
if command == 'update_schedule':
if command == "update_schedule":
self.listener_timeout = POLL_INTERVAL
else:
self.listener_timeout = self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL
self.listener_timeout = (
self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL
)
if self.listener_timeout < 0:
self.listener_timeout = 0
self.logger.info("New timeout: %s" % self.listener_timeout)
except Exception as e:
self.logger.exception("Exception in handling Message Handler message")
def switch_source_temp(self, sourcename, status):
self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
command = "streams."
@ -149,25 +156,28 @@ class PypoFetch(Thread):
"""
Initialize Liquidsoap environment
"""
def set_bootstrap_variables(self):
self.logger.debug('Getting information needed on bootstrap from Airtime')
self.logger.debug("Getting information needed on bootstrap from Airtime")
try:
info = self.v1_api_client.get_bootstrap_info()
except Exception as e:
self.logger.exception('Unable to get bootstrap info.. Exiting pypo...')
self.logger.exception("Unable to get bootstrap info.. Exiting pypo...")
self.logger.debug('info:%s', info)
self.logger.debug("info:%s", info)
commands = []
for k, v in info['switch_status'].items():
for k, v in info["switch_status"].items():
commands.append(self.switch_source_temp(k, v))
stream_format = info['stream_label']
station_name = info['station_name']
fade = info['transition_fade']
stream_format = info["stream_label"]
station_name = info["station_name"]
fade = info["transition_fade"]
commands.append(('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8'))
commands.append(('vars.station_name %s\n' % station_name).encode('utf-8'))
commands.append(('vars.default_dj_fade %s\n' % fade).encode('utf-8'))
commands.append(
("vars.stream_metadata_type %s\n" % stream_format).encode("utf-8")
)
commands.append(("vars.station_name %s\n" % station_name).encode("utf-8"))
commands.append(("vars.default_dj_fade %s\n" % fade).encode("utf-8"))
self.pypo_liquidsoap.get_telnet_dispatcher().telnet_send(commands)
self.pypo_liquidsoap.clear_all_queues()
@ -182,21 +192,24 @@ class PypoFetch(Thread):
will be thrown."""
self.telnet_lock.acquire(False)
self.logger.info("Restarting Liquidsoap")
subprocess.call('kill -9 `pidof airtime-liquidsoap`', shell=True, close_fds=True)
subprocess.call(
"kill -9 `pidof airtime-liquidsoap`", shell=True, close_fds=True
)
#Wait here and poll Liquidsoap until it has started up
# Wait here and poll Liquidsoap until it has started up
self.logger.info("Waiting for Liquidsoap to start")
while True:
try:
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
tn.write('exit\n'.encode('utf-8'))
tn = telnetlib.Telnet(
self.config["ls_host"], self.config["ls_port"]
)
tn.write("exit\n".encode("utf-8"))
tn.read_all()
self.logger.info("Liquidsoap is up and running")
break
except Exception as e:
#sleep 0.5 seconds and try again
# sleep 0.5 seconds and try again
time.sleep(0.5)
except Exception as e:
@ -208,11 +221,11 @@ class PypoFetch(Thread):
"""
NOTE: This function is quite short after it was refactored.
"""
def regenerate_liquidsoap_conf(self, setting):
self.restart_liquidsoap()
self.update_liquidsoap_connection_status()
@ls_timeout
def update_liquidsoap_connection_status(self):
"""
@ -222,20 +235,22 @@ class PypoFetch(Thread):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
tn = telnetlib.Telnet(self.config["ls_host"], self.config["ls_port"])
# update the boot up time of Liquidsoap. Since Liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
boot_up_time_command = ("vars.bootup_time " + str(current_time) + "\n").encode('utf-8')
boot_up_time_command = (
"vars.bootup_time " + str(current_time) + "\n"
).encode("utf-8")
self.logger.info(boot_up_time_command)
tn.write(boot_up_time_command)
connection_status = ("streams.connection_status\n").encode('utf-8')
connection_status = ("streams.connection_status\n").encode("utf-8")
self.logger.info(connection_status)
tn.write(connection_status)
tn.write('exit\n'.encode('utf-8'))
tn.write("exit\n".encode("utf-8"))
output = tn.read_all()
except Exception as e:
@ -253,12 +268,13 @@ class PypoFetch(Thread):
fake_time = current_time + 1
for s in streams:
info = s.split(':')
info = s.split(":")
stream_id = info[0]
status = info[1]
if(status == "true"):
self.v1_api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time))
if status == "true":
self.v1_api_client.notify_liquidsoap_status(
"OK", stream_id, str(fake_time)
)
@ls_timeout
def update_liquidsoap_stream_format(self, stream_format):
@ -266,11 +282,11 @@ class PypoFetch(Thread):
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
tn = telnetlib.Telnet(self.config["ls_host"], self.config["ls_port"])
command = ("vars.stream_metadata_type %s\n" % stream_format).encode("utf-8")
self.logger.info(command)
tn.write(command)
tn.write('exit\n'.encode('utf-8'))
tn.write("exit\n".encode("utf-8"))
tn.read_all()
except Exception as e:
self.logger.exception(e)
@ -283,11 +299,11 @@ class PypoFetch(Thread):
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
command = ('vars.default_dj_fade %s\n' % fade).encode('utf-8')
tn = telnetlib.Telnet(self.config["ls_host"], self.config["ls_port"])
command = ("vars.default_dj_fade %s\n" % fade).encode("utf-8")
self.logger.info(command)
tn.write(command)
tn.write('exit\n'.encode('utf-8'))
tn.write("exit\n".encode("utf-8"))
tn.read_all()
except Exception as e:
self.logger.exception(e)
@ -301,11 +317,11 @@ class PypoFetch(Thread):
try:
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
command = ('vars.station_name %s\n' % station_name).encode('utf-8')
tn = telnetlib.Telnet(self.config["ls_host"], self.config["ls_port"])
command = ("vars.station_name %s\n" % station_name).encode("utf-8")
self.logger.info(command)
tn.write(command)
tn.write('exit\n'.encode('utf-8'))
tn.write("exit\n".encode("utf-8"))
tn.read_all()
except Exception as e:
self.logger.exception(e)
@ -322,6 +338,7 @@ class PypoFetch(Thread):
to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
- runs the cleanup routine, to get rid of unused cached files
"""
def process_schedule(self, schedule_data):
self.last_update_schedule_timestamp = time.time()
self.logger.debug(schedule_data)
@ -343,20 +360,21 @@ class PypoFetch(Thread):
media_copy = {}
for key in media:
media_item = media[key]
if (media_item['type'] == 'file'):
if media_item["type"] == "file":
fileExt = self.sanity_check_media_item(media_item)
dst = os.path.join(download_dir, f'{media_item["id"]}{fileExt}')
media_item['dst'] = dst
media_item['file_ready'] = False
media_item["dst"] = dst
media_item["file_ready"] = False
media_filtered[key] = media_item
media_item['start'] = datetime.strptime(media_item['start'],
"%Y-%m-%d-%H-%M-%S")
media_item['end'] = datetime.strptime(media_item['end'],
"%Y-%m-%d-%H-%M-%S")
media_item["start"] = datetime.strptime(
media_item["start"], "%Y-%m-%d-%H-%M-%S"
)
media_item["end"] = datetime.strptime(
media_item["end"], "%Y-%m-%d-%H-%M-%S"
)
media_copy[key] = media_item
self.media_prepare_queue.put(copy.copy(media_filtered))
except Exception as e:
self.logger.exception(e)
@ -365,37 +383,36 @@ class PypoFetch(Thread):
self.logger.debug("Pushing to pypo-push")
self.push_queue.put(media_copy)
# cleanup
try:
self.cache_cleanup(media)
except Exception as e:
self.logger.exception(e)
#do basic validation of file parameters. Useful for debugging
#purposes
# do basic validation of file parameters. Useful for debugging
# purposes
def sanity_check_media_item(self, media_item):
start = datetime.strptime(media_item['start'], "%Y-%m-%d-%H-%M-%S")
end = datetime.strptime(media_item['end'], "%Y-%m-%d-%H-%M-%S")
start = datetime.strptime(media_item["start"], "%Y-%m-%d-%H-%M-%S")
end = datetime.strptime(media_item["end"], "%Y-%m-%d-%H-%M-%S")
mime = media_item['metadata']['mime']
mime = media_item["metadata"]["mime"]
mimetypes.init(["%s/mime.types" % os.path.dirname(os.path.realpath(__file__))])
mime_ext = mimetypes.guess_extension(mime, strict=False)
length1 = pure.date_interval_to_seconds(end - start)
length2 = media_item['cue_out'] - media_item['cue_in']
length2 = media_item["cue_out"] - media_item["cue_in"]
if abs(length2 - length1) > 1:
self.logger.error("end - start length: %s", length1)
self.logger.error("cue_out - cue_in length: %s", length2)
self.logger.error("Two lengths are not equal!!!")
media_item['file_ext'] = mime_ext
media_item["file_ext"] = mime_ext
return mime_ext
def is_file_opened(self, path):
#Capture stderr to avoid polluting py-interpreter.log
# Capture stderr to avoid polluting py-interpreter.log
proc = Popen(["lsof", path], stdout=PIPE, stderr=PIPE)
out = proc.communicate()[0].strip()
return bool(out)
@ -411,10 +428,14 @@ class PypoFetch(Thread):
for mkey in media:
media_item = media[mkey]
if media_item['type'] == 'file':
if media_item["type"] == "file":
if "file_ext" not in media_item.keys():
media_item["file_ext"] = mimetypes.guess_extension(media_item['metadata']['mime'], strict=False)
scheduled_file_set.add("{}{}".format(media_item["id"], media_item["file_ext"]))
media_item["file_ext"] = mimetypes.guess_extension(
media_item["metadata"]["mime"], strict=False
)
scheduled_file_set.add(
"{}{}".format(media_item["id"], media_item["file_ext"])
)
expired_files = cached_file_set - scheduled_file_set
@ -424,9 +445,9 @@ class PypoFetch(Thread):
path = os.path.join(self.cache_dir, f)
self.logger.debug("Removing %s" % path)
#check if this file is opened (sometimes Liquidsoap is still
#playing the file due to our knowledge of the track length
#being incorrect!)
# check if this file is opened (sometimes Liquidsoap is still
# playing the file due to our knowledge of the track length
# being incorrect!)
if not self.is_file_opened(path):
os.remove(path)
self.logger.info("File '%s' removed" % path)
@ -441,7 +462,7 @@ class PypoFetch(Thread):
self.process_schedule(self.schedule_data)
return True
except Exception as e:
self.logger.error('Unable to fetch schedule')
self.logger.error("Unable to fetch schedule")
self.logger.exception(e)
return False
@ -462,11 +483,11 @@ class PypoFetch(Thread):
Timer(120, self.update_metadata_on_tunein).start()
def main(self):
#Make sure all Liquidsoap queues are empty. This is important in the
#case where we've just restarted the pypo scheduler, but Liquidsoap still
#is playing tracks. In this case let's just restart everything from scratch
#so that we can repopulate our dictionary that keeps track of what
#Liquidsoap is playing much more easily.
# Make sure all Liquidsoap queues are empty. This is important in the
# case where we've just restarted the pypo scheduler, but Liquidsoap still
# is playing tracks. In this case let's just restart everything from scratch
# so that we can repopulate our dictionary that keeps track of what
# Liquidsoap is playing much more easily.
self.pypo_liquidsoap.clear_all_queues()
self.set_bootstrap_variables()
@ -500,7 +521,9 @@ class PypoFetch(Thread):
Currently we are checking every POLL_INTERVAL seconds
"""
message = self.fetch_queue.get(block=True, timeout=self.listener_timeout)
message = self.fetch_queue.get(
block=True, timeout=self.listener_timeout
)
manual_fetch_needed = False
self.handle_message(message)
except Empty as e:
@ -513,7 +536,7 @@ class PypoFetch(Thread):
if manual_fetch_needed:
self.persistent_manual_schedule_fetch(max_attempts=5)
except Exception as e:
self.logger.exception('Failed to manually fetch the schedule.')
self.logger.exception("Failed to manually fetch the schedule.")
loops += 1
@ -522,4 +545,4 @@ class PypoFetch(Thread):
Entry point of the thread
"""
self.main()
self.logger.info('PypoFetch thread exiting')
self.logger.info("PypoFetch thread exiting")

View file

@ -18,13 +18,12 @@ import hashlib
from requests.exceptions import ConnectionError, HTTPError, Timeout
from api_clients import version2 as api_client
CONFIG_PATH = '/etc/airtime/airtime.conf'
CONFIG_PATH = "/etc/airtime/airtime.conf"
logging.captureWarnings(True)
class PypoFile(Thread):
def __init__(self, schedule_queue, config):
Thread.__init__(self)
self.logger = logging.getLogger()
@ -38,10 +37,10 @@ class PypoFile(Thread):
"""
Copy media_item from local library directory to local cache directory.
"""
src = media_item['uri']
dst = media_item['dst']
src = media_item["uri"]
dst = media_item["dst"]
src_size = media_item['filesize']
src_size = media_item["filesize"]
dst_exists = True
try:
@ -59,34 +58,44 @@ class PypoFile(Thread):
# become an issue here... This needs proper cache management.
# https://github.com/LibreTime/libretime/issues/756#issuecomment-477853018
# https://github.com/LibreTime/libretime/pull/845
self.logger.debug("file %s already exists in local cache as %s, skipping copying..." % (src, dst))
self.logger.debug(
"file %s already exists in local cache as %s, skipping copying..."
% (src, dst)
)
else:
do_copy = True
media_item['file_ready'] = not do_copy
media_item["file_ready"] = not do_copy
if do_copy:
self.logger.info("copying from %s to local cache %s" % (src, dst))
try:
with open(dst, "wb") as handle:
self.logger.info(media_item)
response = self.api_client.services.file_download_url(id=media_item['id'])
response = self.api_client.services.file_download_url(
id=media_item["id"]
)
if not response.ok:
self.logger.error(response)
raise Exception("%s - Error occurred downloading file" % response.status_code)
raise Exception(
"%s - Error occurred downloading file"
% response.status_code
)
for chunk in response.iter_content(chunk_size=1024):
handle.write(chunk)
#make file world readable and owner writable
# make file world readable and owner writable
os.chmod(dst, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
if media_item['filesize'] == 0:
file_size = self.report_file_size_and_md5_to_airtime(dst, media_item["id"], host, username)
if media_item["filesize"] == 0:
file_size = self.report_file_size_and_md5_to_airtime(
dst, media_item["id"], host, username
)
media_item["filesize"] = file_size
media_item['file_ready'] = True
media_item["file_ready"] = True
except Exception as e:
self.logger.error("Could not copy from %s to %s" % (src, dst))
self.logger.error(e)
@ -95,7 +104,7 @@ class PypoFile(Thread):
try:
file_size = os.path.getsize(file_path)
with open(file_path, 'rb') as fh:
with open(file_path, "rb") as fh:
m = hashlib.md5()
while True:
data = fh.read(8192)
@ -105,15 +114,21 @@ class PypoFile(Thread):
md5_hash = m.hexdigest()
except (OSError, IOError) as e:
file_size = 0
self.logger.error("Error getting file size and md5 hash for file id %s" % file_id)
self.logger.error(
"Error getting file size and md5 hash for file id %s" % file_id
)
self.logger.error(e)
# Make PUT request to Airtime to update the file size and hash
error_msg = "Could not update media file %s with file size and md5 hash" % file_id
error_msg = (
"Could not update media file %s with file size and md5 hash" % file_id
)
try:
put_url = "%s://%s:%s/rest/media/%s" % (host[0], host[1], host[2], file_id)
payload = json.dumps({'filesize': file_size, 'md5': md5_hash})
response = requests.put(put_url, data=payload, auth=requests.auth.HTTPBasicAuth(api_key, ''))
payload = json.dumps({"filesize": file_size, "md5": md5_hash})
response = requests.put(
put_url, data=payload, auth=requests.auth.HTTPBasicAuth(api_key, "")
)
if not response.ok:
self.logger.error(error_msg)
except (ConnectionError, Timeout):
@ -160,7 +175,9 @@ class PypoFile(Thread):
try:
config.readfp(open(config_path))
except IOError as e:
logging.debug("Failed to open config file at %s: %s" % (config_path, e.strerror))
logging.debug(
"Failed to open config file at %s: %s" % (config_path, e.strerror)
)
sys.exit()
except Exception as e:
logging.debug(e.strerror)
@ -189,12 +206,12 @@ class PypoFile(Thread):
except Empty as e:
pass
media_item = self.get_highest_priority_media_item(self.media)
if media_item is not None:
self.copy_file(media_item)
except Exception as e:
import traceback
top = traceback.format_exc()
self.logger.error(str(e))
self.logger.error(top)
@ -204,9 +221,10 @@ class PypoFile(Thread):
"""
Entry point of the thread
"""
try: self.main()
try:
self.main()
except Exception as e:
top = traceback.format_exc()
self.logger.error('PypoFile Exception: %s', top)
self.logger.error("PypoFile Exception: %s", top)
time.sleep(5)
self.logger.info('PypoFile thread exiting')
self.logger.info("PypoFile thread exiting")

View file

@ -11,12 +11,17 @@ import time
from queue import Empty
import signal
def keyboardInterruptHandler(signum, frame):
logger = logging.getLogger()
logger.info('\nKeyboard Interrupt\n')
logger.info("\nKeyboard Interrupt\n")
sys.exit(0)
signal.signal(signal.SIGINT, keyboardInterruptHandler)
class PypoLiqQueue(Thread):
def __init__(self, q, pypo_liquidsoap, logger):
Thread.__init__(self)
@ -35,18 +40,20 @@ class PypoLiqQueue(Thread):
self.logger.info("waiting indefinitely for schedule")
media_schedule = self.queue.get(block=True)
else:
self.logger.info("waiting %ss until next scheduled item" % \
time_until_next_play)
media_schedule = self.queue.get(block=True, \
timeout=time_until_next_play)
self.logger.info(
"waiting %ss until next scheduled item" % time_until_next_play
)
media_schedule = self.queue.get(
block=True, timeout=time_until_next_play
)
except Empty as e:
#Time to push a scheduled item.
# Time to push a scheduled item.
media_item = schedule_deque.popleft()
self.pypo_liquidsoap.play(media_item)
if len(schedule_deque):
time_until_next_play = \
self.date_interval_to_seconds(
schedule_deque[0]['start'] - datetime.utcnow())
time_until_next_play = self.date_interval_to_seconds(
schedule_deque[0]["start"] - datetime.utcnow()
)
if time_until_next_play < 0:
time_until_next_play = 0
else:
@ -54,7 +61,7 @@ class PypoLiqQueue(Thread):
else:
self.logger.info("New schedule received: %s", media_schedule)
#new schedule received. Replace old one with this.
# new schedule received. Replace old one with this.
schedule_deque.clear()
keys = sorted(media_schedule.keys())
@ -63,28 +70,28 @@ class PypoLiqQueue(Thread):
if len(keys):
time_until_next_play = self.date_interval_to_seconds(
media_schedule[keys[0]]['start'] -
datetime.utcnow())
media_schedule[keys[0]]["start"] - datetime.utcnow()
)
else:
time_until_next_play = None
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (interval.microseconds + \
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
if seconds < 0: seconds = 0
seconds = (
interval.microseconds
+ (interval.seconds + interval.days * 24 * 3600) * 10 ** 6
) / float(10 ** 6)
if seconds < 0:
seconds = 0
return seconds
def run(self):
try: self.main()
try:
self.main()
except Exception as e:
self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc())
self.logger.error("PypoLiqQueue Exception: %s", traceback.format_exc())

View file

@ -8,27 +8,25 @@ from datetime import timedelta
from . import eventtypes
import time
class PypoLiquidsoap():
class PypoLiquidsoap:
def __init__(self, logger, telnet_lock, host, port):
self.logger = logger
self.liq_queue_tracker = {
"s0": None,
"s1": None,
"s2": None,
"s3": None,
"s4": None,
}
"s0": None,
"s1": None,
"s2": None,
"s3": None,
"s4": None,
}
self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \
logger,\
host,\
port,\
list(self.liq_queue_tracker.keys()))
self.telnet_liquidsoap = TelnetLiquidsoap(
telnet_lock, logger, host, port, list(self.liq_queue_tracker.keys())
)
def get_telnet_dispatcher(self):
return self.telnet_liquidsoap
def play(self, media_item):
if media_item["type"] == eventtypes.FILE:
self.handle_file_type(media_item)
@ -37,28 +35,32 @@ class PypoLiquidsoap():
elif media_item["type"] == eventtypes.STREAM_BUFFER_START:
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
elif media_item["type"] == eventtypes.STREAM_OUTPUT_START:
if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id:
#this is called if the stream wasn't scheduled sufficiently ahead of time
#so that the prebuffering stage could take effect. Let's do the prebuffering now.
if (
media_item["row_id"]
!= self.telnet_liquidsoap.current_prebuffering_stream_id
):
# this is called if the stream wasn't scheduled sufficiently ahead of time
# so that the prebuffering stage could take effect. Let's do the prebuffering now.
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
self.telnet_liquidsoap.start_web_stream(media_item)
elif media_item['type'] == eventtypes.STREAM_BUFFER_END:
elif media_item["type"] == eventtypes.STREAM_BUFFER_END:
self.telnet_liquidsoap.stop_web_stream_buffer()
elif media_item['type'] == eventtypes.STREAM_OUTPUT_END:
elif media_item["type"] == eventtypes.STREAM_OUTPUT_END:
self.telnet_liquidsoap.stop_web_stream_output()
else: raise UnknownMediaItemType(str(media_item))
else:
raise UnknownMediaItemType(str(media_item))
def handle_file_type(self, media_item):
"""
Wait 200 seconds (2000 iterations) for file to become ready,
Wait 200 seconds (2000 iterations) for file to become ready,
otherwise give up on it.
"""
iter_num = 0
while not media_item['file_ready'] and iter_num < 2000:
while not media_item["file_ready"] and iter_num < 2000:
time.sleep(0.1)
iter_num += 1
if media_item['file_ready']:
if media_item["file_ready"]:
available_queue = self.find_available_queue()
try:
@ -68,27 +70,29 @@ class PypoLiquidsoap():
self.logger.error(e)
raise
else:
self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst'])
self.logger.warn(
"File %s did not become ready in less than 5 seconds. Skipping...",
media_item["dst"],
)
def handle_event_type(self, media_item):
if media_item['event_type'] == "kick_out":
if media_item["event_type"] == "kick_out":
self.telnet_liquidsoap.disconnect_source("live_dj")
elif media_item['event_type'] == "switch_off":
elif media_item["event_type"] == "switch_off":
self.telnet_liquidsoap.switch_source("live_dj", "off")
def is_media_item_finished(self, media_item):
if media_item is None:
return True
else:
return datetime.utcnow() > media_item['end']
return datetime.utcnow() > media_item["end"]
def find_available_queue(self):
available_queue = None
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi == None or self.is_media_item_finished(mi):
#queue "i" is available. Push to this queue
# queue "i" is available. Push to this queue
available_queue = i
if available_queue == None:
@ -96,7 +100,6 @@ class PypoLiquidsoap():
return available_queue
def verify_correct_present_media(self, scheduled_now):
"""
verify whether Liquidsoap is currently playing the correct files.
@ -122,11 +125,13 @@ class PypoLiquidsoap():
"""
try:
scheduled_now_files = \
[x for x in scheduled_now if x["type"] == eventtypes.FILE]
scheduled_now_files = [
x for x in scheduled_now if x["type"] == eventtypes.FILE
]
scheduled_now_webstream = \
[x for x in scheduled_now if x["type"] == eventtypes.STREAM_OUTPUT_START]
scheduled_now_webstream = [
x for x in scheduled_now if x["type"] == eventtypes.STREAM_OUTPUT_START
]
schedule_ids = set([x["row_id"] for x in scheduled_now_files])
@ -141,19 +146,21 @@ class PypoLiquidsoap():
to_be_removed = set()
to_be_added = set()
#Iterate over the new files, and compare them to currently scheduled
#tracks. If already in liquidsoap queue still need to make sure they don't
#have different attributes
#if replay gain changes, it shouldn't change the amplification of the currently playing song
# Iterate over the new files, and compare them to currently scheduled
# tracks. If already in liquidsoap queue still need to make sure they don't
# have different attributes
# if replay gain changes, it shouldn't change the amplification of the currently playing song
for i in scheduled_now_files:
if i["row_id"] in row_id_map:
mi = row_id_map[i["row_id"]]
correct = mi['start'] == i['start'] and \
mi['end'] == i['end'] and \
mi['row_id'] == i['row_id']
correct = (
mi["start"] == i["start"]
and mi["end"] == i["end"]
and mi["row_id"] == i["row_id"]
)
if not correct:
#need to re-add
# need to re-add
self.logger.info("Track %s found to have new attr." % i)
to_be_removed.add(i["row_id"])
to_be_added.add(i["row_id"])
@ -162,37 +169,38 @@ class PypoLiquidsoap():
to_be_added.update(schedule_ids - liq_queue_ids)
if to_be_removed:
self.logger.info("Need to remove items from Liquidsoap: %s" % \
to_be_removed)
self.logger.info(
"Need to remove items from Liquidsoap: %s" % to_be_removed
)
#remove files from Liquidsoap's queue
# remove files from Liquidsoap's queue
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi is not None and mi["row_id"] in to_be_removed:
self.stop(i)
if to_be_added:
self.logger.info("Need to add items to Liquidsoap *now*: %s" % \
to_be_added)
self.logger.info(
"Need to add items to Liquidsoap *now*: %s" % to_be_added
)
for i in scheduled_now_files:
if i["row_id"] in to_be_added:
self.modify_cue_point(i)
self.play(i)
#handle webstreams
# handle webstreams
current_stream_id = self.telnet_liquidsoap.get_current_stream_id()
if scheduled_now_webstream:
if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]):
self.play(scheduled_now_webstream[0])
elif current_stream_id != "-1":
#something is playing and it shouldn't be.
# something is playing and it shouldn't be.
self.telnet_liquidsoap.stop_web_stream_buffer()
self.telnet_liquidsoap.stop_web_stream_output()
except KeyError as e:
self.logger.error("Error: Malformed event in schedule. " + str(e))
def stop(self, queue):
self.telnet_liquidsoap.queue_remove(queue)
self.liq_queue_tracker[queue] = None
@ -209,24 +217,32 @@ class PypoLiquidsoap():
tnow = datetime.utcnow()
link_start = link['start']
link_start = link["start"]
diff_td = tnow - link_start
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec > 0:
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec
self.logger.debug(
"media item was supposed to start %s ago. Preparing to start..",
diff_sec,
)
original_cue_in_td = timedelta(seconds=float(link["cue_in"]))
link["cue_in"] = (
self.date_interval_to_seconds(original_cue_in_td) + diff_sec
)
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (interval.microseconds + \
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
if seconds < 0: seconds = 0
seconds = (
interval.microseconds
+ (interval.seconds + interval.days * 24 * 3600) * 10 ** 6
) / float(10 ** 6)
if seconds < 0:
seconds = 0
return seconds
@ -237,5 +253,6 @@ class PypoLiquidsoap():
class UnknownMediaItemType(Exception):
pass
class NoQueueAvailableException(Exception):
pass

View file

@ -6,6 +6,7 @@ import os
import sys
from threading import Thread
import time
# For RabbitMQ
from kombu.connection import Connection
from kombu.messaging import Exchange, Queue
@ -26,17 +27,18 @@ class RabbitConsumer(ConsumerMixin):
def get_consumers(self, Consumer, channel):
return [
Consumer(self.queues, callbacks=[self.on_message], accept=['text/plain']),
Consumer(self.queues, callbacks=[self.on_message], accept=["text/plain"]),
]
def on_message(self, body, message):
self.handler.handle_message(message.payload)
message.ack()
class PypoMessageHandler(Thread):
def __init__(self, pq, rq, config):
Thread.__init__(self)
self.logger = logging.getLogger('message_h')
self.logger = logging.getLogger("message_h")
self.pypo_queue = pq
self.recorder_queue = rq
self.config = config
@ -44,13 +46,17 @@ class PypoMessageHandler(Thread):
def init_rabbit_mq(self):
self.logger.info("Initializing RabbitMQ stuff")
try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_exchange = Exchange(
"airtime-pypo", "direct", durable=True, auto_delete=True
)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
with Connection(self.config["host"], \
self.config["user"], \
self.config["password"], \
self.config["vhost"], \
heartbeat = 5) as connection:
with Connection(
self.config["host"],
self.config["user"],
self.config["password"],
self.config["vhost"],
heartbeat=5,
) as connection:
rabbit = RabbitConsumer(connection, [schedule_queue], self)
rabbit.run()
except Exception as e:
@ -60,6 +66,7 @@ class PypoMessageHandler(Thread):
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
"""
def handle_message(self, message):
try:
self.logger.info("Received event from RabbitMQ: %s" % message)
@ -69,36 +76,36 @@ class PypoMessageHandler(Thread):
except (UnicodeDecodeError, AttributeError):
pass
m = json.loads(message)
command = m['event_type']
command = m["event_type"]
self.logger.info("Handling command: " + command)
if command == 'update_schedule':
if command == "update_schedule":
self.logger.info("Updating schedule...")
self.pypo_queue.put(message)
elif command == 'reset_liquidsoap_bootstrap':
elif command == "reset_liquidsoap_bootstrap":
self.logger.info("Resetting bootstrap vars...")
self.pypo_queue.put(message)
elif command == 'update_stream_setting':
elif command == "update_stream_setting":
self.logger.info("Updating stream setting...")
self.pypo_queue.put(message)
elif command == 'update_stream_format':
elif command == "update_stream_format":
self.logger.info("Updating stream format...")
self.pypo_queue.put(message)
elif command == 'update_station_name':
elif command == "update_station_name":
self.logger.info("Updating station name...")
self.pypo_queue.put(message)
elif command == 'switch_source':
elif command == "switch_source":
self.logger.info("switch_source command received...")
self.pypo_queue.put(message)
elif command == 'update_transition_fade':
elif command == "update_transition_fade":
self.logger.info("Updating trasition fade...")
self.pypo_queue.put(message)
elif command == 'disconnect_source':
elif command == "disconnect_source":
self.logger.info("disconnect_source command received...")
self.pypo_queue.put(message)
elif command == 'update_recorder_schedule':
elif command == "update_recorder_schedule":
self.recorder_queue.put(message)
elif command == 'cancel_recording':
elif command == "cancel_recording":
self.recorder_queue.put(message)
else:
self.logger.info("Unknown command: %s" % command)
@ -109,9 +116,11 @@ class PypoMessageHandler(Thread):
try:
self.init_rabbit_mq()
except Exception as e:
self.logger.error('Exception: %s', e)
self.logger.error("Exception: %s", e)
self.logger.error("traceback: %s", traceback.format_exc())
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
self.logger.error(
"Error connecting to RabbitMQ Server. Trying again in few seconds"
)
time.sleep(5)
"""
@ -119,7 +128,7 @@ class PypoMessageHandler(Thread):
Wait for schedule updates from RabbitMQ, but in case there aren't any,
poll the server to get the upcoming schedule.
"""
def run(self):
while True:
self.main()

View file

@ -29,10 +29,12 @@ PUSH_INTERVAL = 2
def is_stream(media_item):
return media_item['type'] == 'stream_output_start'
return media_item["type"] == "stream_output_start"
def is_file(media_item):
return media_item['type'] == 'file'
return media_item["type"] == "file"
class PypoPush(Thread):
def __init__(self, q, telnet_lock, pypo_liquidsoap, config):
@ -44,20 +46,19 @@ class PypoPush(Thread):
self.config = config
self.pushed_objects = {}
self.logger = logging.getLogger('push')
self.logger = logging.getLogger("push")
self.current_prebuffering_stream_id = None
self.queue_id = 0
self.future_scheduled_queue = Queue()
self.pypo_liquidsoap = pypo_liquidsoap
self.plq = PypoLiqQueue(self.future_scheduled_queue, \
self.pypo_liquidsoap, \
self.logger)
self.plq = PypoLiqQueue(
self.future_scheduled_queue, self.pypo_liquidsoap, self.logger
)
self.plq.daemon = True
self.plq.start()
def main(self):
loops = 0
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
@ -72,10 +73,11 @@ class PypoPush(Thread):
raise
else:
self.logger.debug(media_schedule)
#separate media_schedule list into currently_playing and
#scheduled_for_future lists
currently_playing, scheduled_for_future = \
self.separate_present_future(media_schedule)
# separate media_schedule list into currently_playing and
# scheduled_for_future lists
currently_playing, scheduled_for_future = self.separate_present_future(
media_schedule
)
self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
self.future_scheduled_queue.put(scheduled_for_future)
@ -85,7 +87,6 @@ class PypoPush(Thread):
loops = 0
loops += 1
def separate_present_future(self, media_schedule):
tnow = datetime.utcnow()
@ -96,7 +97,7 @@ class PypoPush(Thread):
for mkey in sorted_keys:
media_item = media_schedule[mkey]
diff_td = tnow - media_item['start']
diff_td = tnow - media_item["start"]
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec >= 0:
@ -111,8 +112,10 @@ class PypoPush(Thread):
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (interval.microseconds + \
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
seconds = (
interval.microseconds
+ (interval.seconds + interval.days * 24 * 3600) * 10 ** 6
) / float(10 ** 6)
return seconds
@ -120,18 +123,18 @@ class PypoPush(Thread):
def stop_web_stream_all(self):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['LS_HOST'], self.config['LS_PORT'])
tn = telnetlib.Telnet(self.config["LS_HOST"], self.config["LS_PORT"])
#msg = 'dynamic_source.read_stop_all xxx\n'
msg = 'http.stop\n'
# msg = 'dynamic_source.read_stop_all xxx\n'
msg = "http.stop\n"
self.logger.debug(msg)
tn.write(msg)
msg = 'dynamic_source.output_stop\n'
msg = "dynamic_source.output_stop\n"
self.logger.debug(msg)
tn.write(msg)
msg = 'dynamic_source.id -1\n'
msg = "dynamic_source.id -1\n"
self.logger.debug(msg)
tn.write(msg)
@ -145,10 +148,10 @@ class PypoPush(Thread):
def run(self):
while True:
try: self.main()
try:
self.main()
except Exception as e:
top = traceback.format_exc()
self.logger.error('Pypo Push Exception: %s', top)
self.logger.error("Pypo Push Exception: %s", top)
time.sleep(5)
self.logger.info('PypoPush thread exiting')
self.logger.info("PypoPush thread exiting")

View file

@ -24,6 +24,7 @@ import mutagen
from api_clients import version1 as v1_api_client
from api_clients import version2 as api_client
def api_client(logger):
"""
api_client returns the correct instance of AirtimeApiClient. Although there is only one
@ -31,15 +32,17 @@ def api_client(logger):
"""
return v1_api_client.AirtimeApiClient(logger)
# loading config file
try:
config = ConfigObj('/etc/airtime/airtime.conf')
config = ConfigObj("/etc/airtime/airtime.conf")
except Exception as e:
print("Error loading config file: {}".format(e))
sys.exit()
# TODO : add docstrings everywhere in this module
def getDateTimeObj(time):
# TODO : clean up for this function later.
# - use tuples to parse result from split (instead of indices)
@ -49,17 +52,20 @@ def getDateTimeObj(time):
# shadowed
# - add docstring to document all behaviour of this function
timeinfo = time.split(" ")
date = [ int(x) for x in timeinfo[0].split("-") ]
my_time = [ int(x) for x in timeinfo[1].split(":") ]
return datetime.datetime(date[0], date[1], date[2], my_time[0], my_time[1], my_time[2], 0, None)
date = [int(x) for x in timeinfo[0].split("-")]
my_time = [int(x) for x in timeinfo[1].split(":")]
return datetime.datetime(
date[0], date[1], date[2], my_time[0], my_time[1], my_time[2], 0, None
)
PUSH_INTERVAL = 2
class ShowRecorder(Thread):
def __init__ (self, show_instance, show_name, filelength, start_time):
class ShowRecorder(Thread):
def __init__(self, show_instance, show_name, filelength, start_time):
Thread.__init__(self)
self.logger = logging.getLogger('recorder')
self.logger = logging.getLogger("recorder")
self.api_client = api_client(self.logger)
self.filelength = filelength
self.start_time = start_time
@ -75,35 +81,41 @@ class ShowRecorder(Thread):
if config["pypo"]["record_file_type"] in ["mp3", "ogg"]:
filetype = config["pypo"]["record_file_type"]
else:
filetype = "ogg";
filetype = "ogg"
joined_path = os.path.join(config["pypo"]["base_recorded_files"], filename)
filepath = "%s.%s" % (joined_path, filetype)
br = config["pypo"]["record_bitrate"]
sr = config["pypo"]["record_samplerate"]
c = config["pypo"]["record_channels"]
c = config["pypo"]["record_channels"]
ss = config["pypo"]["record_sample_size"]
#-f:16,2,44100
#-b:256
command = "ecasound -f:%s,%s,%s -i alsa -o %s,%s000 -t:%s" % \
(ss, c, sr, filepath, br, length)
# -f:16,2,44100
# -b:256
command = "ecasound -f:%s,%s,%s -i alsa -o %s,%s000 -t:%s" % (
ss,
c,
sr,
filepath,
br,
length,
)
args = command.split(" ")
self.logger.info("starting record")
self.logger.info("command " + command)
self.p = Popen(args,stdout=PIPE,stderr=PIPE)
self.p = Popen(args, stdout=PIPE, stderr=PIPE)
#blocks at the following line until the child process
#quits
# blocks at the following line until the child process
# quits
self.p.wait()
outmsgs = self.p.stdout.readlines()
for msg in outmsgs:
m = re.search('^ERROR',msg)
m = re.search("^ERROR", msg)
if not m == None:
self.logger.info('Recording error is found: %s', outmsgs)
self.logger.info("Recording error is found: %s", outmsgs)
self.logger.info("finishing record, return code %s", self.p.returncode)
code = self.p.returncode
@ -112,21 +124,25 @@ class ShowRecorder(Thread):
return code, filepath
def cancel_recording(self):
#send signal interrupt (2)
# send signal interrupt (2)
self.logger.info("Show manually cancelled!")
if (self.p is not None):
if self.p is not None:
self.p.send_signal(signal.SIGINT)
#if self.p is defined, then the child process ecasound is recording
# if self.p is defined, then the child process ecasound is recording
def is_recording(self):
return (self.p is not None)
return self.p is not None
def upload_file(self, filepath):
filename = os.path.split(filepath)[1]
# files is what requests actually expects
files = {'file': open(filepath, "rb"), 'name': filename, 'show_instance': self.show_instance}
files = {
"file": open(filepath, "rb"),
"name": filename,
"show_instance": self.show_instance,
}
self.api_client.upload_recorded_show(files, self.show_instance)
@ -136,27 +152,25 @@ class ShowRecorder(Thread):
self.start_time, self.show_name, self.show_instance
"""
try:
full_date, full_time = self.start_time.split(" ",1)
full_date, full_time = self.start_time.split(" ", 1)
# No idea why we translated - to : before
#full_time = full_time.replace(":","-")
# full_time = full_time.replace(":","-")
self.logger.info("time: %s" % full_time)
artist = "Airtime Show Recorder"
#set some metadata for our file daemon
recorded_file = mutagen.File(filepath, easy = True)
recorded_file['artist'] = artist
recorded_file['date'] = full_date
recorded_file['title'] = "%s-%s-%s" % (self.show_name,
full_date, full_time)
#You cannot pass ints into the metadata of a file. Even tracknumber needs to be a string
recorded_file['tracknumber'] = self.show_instance
# set some metadata for our file daemon
recorded_file = mutagen.File(filepath, easy=True)
recorded_file["artist"] = artist
recorded_file["date"] = full_date
recorded_file["title"] = "%s-%s-%s" % (self.show_name, full_date, full_time)
# You cannot pass ints into the metadata of a file. Even tracknumber needs to be a string
recorded_file["tracknumber"] = self.show_instance
recorded_file.save()
except Exception as e:
top = traceback.format_exc()
self.logger.error('Exception: %s', e)
self.logger.error("Exception: %s", e)
self.logger.error("traceback: %s", top)
def run(self):
code, filepath = self.record_show()
@ -174,14 +188,15 @@ class ShowRecorder(Thread):
self.logger.info("problem recording show")
os.remove(filepath)
class Recorder(Thread):
def __init__(self, q):
Thread.__init__(self)
self.logger = logging.getLogger('recorder')
self.logger = logging.getLogger("recorder")
self.api_client = api_client(self.logger)
self.sr = None
self.shows_to_record = {}
self.server_timezone = ''
self.server_timezone = ""
self.queue = q
self.loops = 0
self.logger.info("RecorderFetch: init complete")
@ -189,7 +204,7 @@ class Recorder(Thread):
success = False
while not success:
try:
self.api_client.register_component('show-recorder')
self.api_client.register_component("show-recorder")
success = True
except Exception as e:
self.logger.error(str(e))
@ -205,7 +220,7 @@ class Recorder(Thread):
msg = json.loads(message)
command = msg["event_type"]
self.logger.info("Received msg from Pypo Message Handler: %s", msg)
if command == 'cancel_recording':
if command == "cancel_recording":
if self.currently_recording():
self.cancel_recording()
else:
@ -218,14 +233,18 @@ class Recorder(Thread):
def process_recorder_schedule(self, m):
self.logger.info("Parsing recording show schedules...")
temp_shows_to_record = {}
shows = m['shows']
shows = m["shows"]
for show in shows:
show_starts = getDateTimeObj(show['starts'])
show_end = getDateTimeObj(show['ends'])
show_starts = getDateTimeObj(show["starts"])
show_end = getDateTimeObj(show["ends"])
time_delta = show_end - show_starts
temp_shows_to_record[show['starts']] = [time_delta,
show['instance_id'], show['name'], m['server_timezone']]
temp_shows_to_record[show["starts"]] = [
time_delta,
show["instance_id"],
show["name"],
m["server_timezone"],
]
self.shows_to_record = temp_shows_to_record
def get_time_till_next_show(self):
@ -237,7 +256,7 @@ class Recorder(Thread):
next_show = getDateTimeObj(start_time)
delta = next_show - tnow
s = '%s.%s' % (delta.seconds, delta.microseconds)
s = "%s.%s" % (delta.seconds, delta.microseconds)
out = float(s)
if out < 5:
@ -257,7 +276,8 @@ class Recorder(Thread):
return False
def start_record(self):
if len(self.shows_to_record) == 0: return None
if len(self.shows_to_record) == 0:
return None
try:
delta = self.get_time_till_next_show()
if delta < 5:
@ -273,16 +293,25 @@ class Recorder(Thread):
T = pytz.timezone(server_timezone)
start_time_on_UTC = getDateTimeObj(start_time)
start_time_on_server = start_time_on_UTC.replace(tzinfo=pytz.utc).astimezone(T)
start_time_formatted = '%(year)d-%(month)02d-%(day)02d %(hour)02d:%(min)02d:%(sec)02d' % \
{'year': start_time_on_server.year, 'month': start_time_on_server.month, 'day': start_time_on_server.day, \
'hour': start_time_on_server.hour, 'min': start_time_on_server.minute, 'sec': start_time_on_server.second}
start_time_on_server = start_time_on_UTC.replace(
tzinfo=pytz.utc
).astimezone(T)
start_time_formatted = (
"%(year)d-%(month)02d-%(day)02d %(hour)02d:%(min)02d:%(sec)02d"
% {
"year": start_time_on_server.year,
"month": start_time_on_server.month,
"day": start_time_on_server.day,
"hour": start_time_on_server.hour,
"min": start_time_on_server.minute,
"sec": start_time_on_server.second,
}
)
seconds_waiting = 0
#avoiding CC-5299
while(True):
# avoiding CC-5299
while True:
if self.currently_recording():
self.logger.info("Previous record not finished, sleeping 100ms")
seconds_waiting = seconds_waiting + 0.1
@ -290,16 +319,21 @@ class Recorder(Thread):
else:
show_length_seconds = show_length.seconds - seconds_waiting
self.sr = ShowRecorder(show_instance, show_name, show_length_seconds, start_time_formatted)
self.sr = ShowRecorder(
show_instance,
show_name,
show_length_seconds,
start_time_formatted,
)
self.sr.start()
break
#remove show from shows to record.
# remove show from shows to record.
del self.shows_to_record[start_time]
#self.time_till_next_show = self.get_time_till_next_show()
except Exception as e :
# self.time_till_next_show = self.get_time_till_next_show()
except Exception as e:
top = traceback.format_exc()
self.logger.error('Exception: %s', e)
self.logger.error("Exception: %s", e)
self.logger.error("traceback: %s", top)
def run(self):
@ -318,7 +352,7 @@ class Recorder(Thread):
self.process_recorder_schedule(temp)
self.logger.info("Bootstrap recorder schedule received: %s", temp)
except Exception as e:
self.logger.error( traceback.format_exc() )
self.logger.error(traceback.format_exc())
self.logger.error(e)
self.logger.info("Bootstrap complete: got initial copy of the schedule")
@ -338,16 +372,16 @@ class Recorder(Thread):
self.process_recorder_schedule(temp)
self.logger.info("updated recorder schedule received: %s", temp)
except Exception as e:
self.logger.error( traceback.format_exc() )
self.logger.error(traceback.format_exc())
self.logger.error(e)
try: self.handle_message()
try:
self.handle_message()
except Exception as e:
self.logger.error( traceback.format_exc() )
self.logger.error('Pypo Recorder Exception: %s', e)
self.logger.error(traceback.format_exc())
self.logger.error("Pypo Recorder Exception: %s", e)
time.sleep(PUSH_INTERVAL)
self.loops += 1
except Exception as e :
except Exception as e:
top = traceback.format_exc()
self.logger.error('Exception: %s', e)
self.logger.error("Exception: %s", e)
self.logger.error("traceback: %s", top)

View file

@ -4,32 +4,36 @@ import telnetlib
from .timeout import ls_timeout
import traceback
def create_liquidsoap_annotation(media):
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.
filename = media['dst']
annotation = ('annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",' + \
'liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",' + \
'schedule_table_id="%s",replay_gain="%s dB"') % \
(media['id'],
float(media['fade_in']) / 1000,
float(media['fade_out']) / 1000,
float(media['cue_in']),
float(media['cue_out']),
media['row_id'],
media['replay_gain'])
filename = media["dst"]
annotation = (
'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",'
+ 'liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",'
+ 'schedule_table_id="%s",replay_gain="%s dB"'
) % (
media["id"],
float(media["fade_in"]) / 1000,
float(media["fade_out"]) / 1000,
float(media["cue_in"]),
float(media["cue_out"]),
media["row_id"],
media["replay_gain"],
)
# Override the the artist/title that Liquidsoap extracts from a file's metadata
# with the metadata we get from Airtime. (You can modify metadata in Airtime's library,
# which doesn't get saved back to the file.)
if 'metadata' in media:
if "metadata" in media:
if 'artist_name' in media['metadata']:
artist_name = media['metadata']['artist_name']
if "artist_name" in media["metadata"]:
artist_name = media["metadata"]["artist_name"]
if isinstance(artist_name, str):
annotation += ',artist="%s"' % (artist_name.replace('"', '\\"'))
if 'track_title' in media['metadata']:
track_title = media['metadata']['track_title']
if "track_title" in media["metadata"]:
track_title = media["metadata"]["track_title"]
if isinstance(track_title, str):
annotation += ',title="%s"' % (track_title.replace('"', '\\"'))
@ -37,8 +41,8 @@ def create_liquidsoap_annotation(media):
return annotation
class TelnetLiquidsoap:
class TelnetLiquidsoap:
def __init__(self, telnet_lock, logger, ls_host, ls_port, queues):
self.telnet_lock = telnet_lock
self.ls_host = ls_host
@ -53,9 +57,9 @@ class TelnetLiquidsoap:
def __is_empty(self, queue_id):
return True
tn = self.__connect()
msg = '%s.queue\nexit\n' % queue_id
tn.write(msg.encode('utf-8'))
output = tn.read_all().decode('utf-8').splitlines()
msg = "%s.queue\nexit\n" % queue_id
tn.write(msg.encode("utf-8"))
output = tn.read_all().decode("utf-8").splitlines()
if len(output) == 3:
return len(output[0]) == 0
else:
@ -68,12 +72,12 @@ class TelnetLiquidsoap:
tn = self.__connect()
for i in self.queues:
msg = 'queues.%s_skip\n' % i
msg = "queues.%s_skip\n" % i
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode('utf-8'))
self.logger.debug(tn.read_all().decode('utf-8'))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
except Exception:
raise
finally:
@ -85,18 +89,17 @@ class TelnetLiquidsoap:
self.telnet_lock.acquire()
tn = self.__connect()
msg = 'queues.%s_skip\n' % queue_id
msg = "queues.%s_skip\n" % queue_id
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode('utf-8'))
self.logger.debug(tn.read_all().decode('utf-8'))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
except Exception:
raise
finally:
self.telnet_lock.release()
@ls_timeout
def queue_push(self, queue_id, media_item):
try:
@ -107,40 +110,39 @@ class TelnetLiquidsoap:
tn = self.__connect()
annotation = create_liquidsoap_annotation(media_item)
msg = '%s.push %s\n' % (queue_id, annotation)
msg = "%s.push %s\n" % (queue_id, annotation)
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
show_name = media_item['show_name']
msg = 'vars.show_name %s\n' % show_name
tn.write(msg.encode('utf-8'))
show_name = media_item["show_name"]
msg = "vars.show_name %s\n" % show_name
tn.write(msg.encode("utf-8"))
self.logger.debug(msg)
tn.write("exit\n".encode('utf-8'))
self.logger.debug(tn.read_all().decode('utf-8'))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
except Exception:
raise
finally:
self.telnet_lock.release()
@ls_timeout
def stop_web_stream_buffer(self):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
# dynamic_source.stop http://87.230.101.24:80/top100station.mp3
msg = 'http.stop\n'
msg = "http.stop\n"
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
msg = 'dynamic_source.id -1\n'
msg = "dynamic_source.id -1\n"
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode('utf-8'))
self.logger.debug(tn.read_all().decode('utf-8'))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
except Exception as e:
self.logger.error(str(e))
@ -153,14 +155,14 @@ class TelnetLiquidsoap:
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
# dynamic_source.stop http://87.230.101.24:80/top100station.mp3
msg = 'dynamic_source.output_stop\n'
msg = "dynamic_source.output_stop\n"
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode('utf-8'))
self.logger.debug(tn.read_all().decode('utf-8'))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
except Exception as e:
self.logger.error(str(e))
@ -174,16 +176,16 @@ class TelnetLiquidsoap:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
#TODO: DO we need this?
msg = 'streams.scheduled_play_start\n'
tn.write(msg.encode('utf-8'))
# TODO: DO we need this?
msg = "streams.scheduled_play_start\n"
tn.write(msg.encode("utf-8"))
msg = 'dynamic_source.output_start\n'
msg = "dynamic_source.output_start\n"
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode('utf-8'))
self.logger.debug(tn.read_all().decode('utf-8'))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
self.current_prebuffering_stream_id = None
except Exception as e:
@ -198,18 +200,18 @@ class TelnetLiquidsoap:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
msg = 'dynamic_source.id %s\n' % media_item['row_id']
msg = "dynamic_source.id %s\n" % media_item["row_id"]
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
msg = 'http.restart %s\n' % media_item['uri']
msg = "http.restart %s\n" % media_item["uri"]
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode('utf-8'))
self.logger.debug(tn.read_all().decode('utf-8'))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
self.current_prebuffering_stream_id = media_item['row_id']
self.current_prebuffering_stream_id = media_item["row_id"]
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
@ -222,12 +224,12 @@ class TelnetLiquidsoap:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
msg = 'dynamic_source.get_id\n'
msg = "dynamic_source.get_id\n"
self.logger.debug(msg)
tn.write(msg.encode('utf-8'))
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode('utf-8'))
stream_id = tn.read_all().decode('utf-8').splitlines()[0]
tn.write("exit\n".encode("utf-8"))
stream_id = tn.read_all().decode("utf-8").splitlines()[0]
self.logger.debug("stream_id: %s" % stream_id)
return stream_id
@ -239,20 +241,20 @@ class TelnetLiquidsoap:
@ls_timeout
def disconnect_source(self, sourcename):
self.logger.debug('Disconnecting source: %s', sourcename)
self.logger.debug("Disconnecting source: %s", sourcename)
command = ""
if(sourcename == "master_dj"):
if sourcename == "master_dj":
command += "master_harbor.stop\n"
elif(sourcename == "live_dj"):
elif sourcename == "live_dj":
command += "live_dj_harbor.stop\n"
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
self.logger.info(command)
tn.write(command.encode('utf-8'))
tn.write('exit\n'.encode('utf-8'))
tn.read_all().decode('utf-8')
tn.write(command.encode("utf-8"))
tn.write("exit\n".encode("utf-8"))
tn.read_all().decode("utf-8")
except Exception as e:
self.logger.error(traceback.format_exc())
finally:
@ -267,18 +269,17 @@ class TelnetLiquidsoap:
for i in commands:
self.logger.info(i)
if type(i) is str:
i = i.encode('utf-8')
i = i.encode("utf-8")
tn.write(i)
tn.write('exit\n'.encode('utf-8'))
tn.read_all().decode('utf-8')
tn.write("exit\n".encode("utf-8"))
tn.read_all().decode("utf-8")
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
def switch_source(self, sourcename, status):
self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
command = "streams."
@ -296,15 +297,15 @@ class TelnetLiquidsoap:
self.telnet_send([command])
class DummyTelnetLiquidsoap:
class DummyTelnetLiquidsoap:
def __init__(self, telnet_lock, logger):
self.telnet_lock = telnet_lock
self.liquidsoap_mock_queues = {}
self.logger = logger
for i in range(4):
self.liquidsoap_mock_queues["s"+str(i)] = []
self.liquidsoap_mock_queues["s" + str(i)] = []
@ls_timeout
def queue_push(self, queue_id, media_item):
@ -313,6 +314,7 @@ class DummyTelnetLiquidsoap:
self.logger.info("Pushing %s to queue %s" % (media_item, queue_id))
from datetime import datetime
print("Time now: {:s}".format(datetime.utcnow()))
annotation = create_liquidsoap_annotation(media_item)
@ -329,6 +331,7 @@ class DummyTelnetLiquidsoap:
self.logger.info("Purging queue %s" % queue_id)
from datetime import datetime
print("Time now: {:s}".format(datetime.utcnow()))
except Exception:
@ -336,5 +339,6 @@ class DummyTelnetLiquidsoap:
finally:
self.telnet_lock.release()
class QueueNotEmptyException(Exception):
pass

View file

@ -13,14 +13,17 @@ import logging
from datetime import datetime
from datetime import timedelta
def keyboardInterruptHandler(signum, frame):
logger = logging.getLogger()
logger.info('\nKeyboard Interrupt\n')
logger.info("\nKeyboard Interrupt\n")
sys.exit(0)
signal.signal(signal.SIGINT, keyboardInterruptHandler)
# configure logging
format = '%(levelname)s - %(pathname)s - %(lineno)s - %(asctime)s - %(message)s'
format = "%(levelname)s - %(pathname)s - %(lineno)s - %(asctime)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=format)
logging.captureWarnings(True)
@ -30,19 +33,18 @@ pypoPush_q = Queue()
pypoLiq_q = Queue()
liq_queue_tracker = {
"s0": None,
"s1": None,
"s2": None,
"s3": None,
}
"s0": None,
"s1": None,
"s2": None,
"s3": None,
}
#dummy_telnet_liquidsoap = DummyTelnetLiquidsoap(telnet_lock, logging)
dummy_telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, logging, \
"localhost", \
1234)
# dummy_telnet_liquidsoap = DummyTelnetLiquidsoap(telnet_lock, logging)
dummy_telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, logging, "localhost", 1234)
plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logging, liq_queue_tracker, \
dummy_telnet_liquidsoap)
plq = PypoLiqQueue(
pypoLiq_q, telnet_lock, logging, liq_queue_tracker, dummy_telnet_liquidsoap
)
plq.daemon = True
plq.start()
@ -54,47 +56,43 @@ media_schedule = {}
start_dt = datetime.utcnow() + timedelta(seconds=1)
end_dt = datetime.utcnow() + timedelta(seconds=6)
media_schedule[start_dt] = {"id": 5, \
"type":"file", \
"row_id":9, \
"uri":"", \
"dst":"/home/martin/Music/ipod/Hot Chocolate - You Sexy Thing.mp3", \
"fade_in":0, \
"fade_out":0, \
"cue_in":0, \
"cue_out":300, \
"start": start_dt, \
"end": end_dt, \
"show_name":"Untitled", \
"replay_gain": 0, \
"independent_event": True \
}
media_schedule[start_dt] = {
"id": 5,
"type": "file",
"row_id": 9,
"uri": "",
"dst": "/home/martin/Music/ipod/Hot Chocolate - You Sexy Thing.mp3",
"fade_in": 0,
"fade_out": 0,
"cue_in": 0,
"cue_out": 300,
"start": start_dt,
"end": end_dt,
"show_name": "Untitled",
"replay_gain": 0,
"independent_event": True,
}
start_dt = datetime.utcnow() + timedelta(seconds=2)
end_dt = datetime.utcnow() + timedelta(seconds=6)
media_schedule[start_dt] = {"id": 5, \
"type":"file", \
"row_id":9, \
"uri":"", \
"dst":"/home/martin/Music/ipod/Good Charlotte - bloody valentine.mp3", \
"fade_in":0, \
"fade_out":0, \
"cue_in":0, \
"cue_out":300, \
"start": start_dt, \
"end": end_dt, \
"show_name":"Untitled", \
"replay_gain": 0, \
"independent_event": True \
}
media_schedule[start_dt] = {
"id": 5,
"type": "file",
"row_id": 9,
"uri": "",
"dst": "/home/martin/Music/ipod/Good Charlotte - bloody valentine.mp3",
"fade_in": 0,
"fade_out": 0,
"cue_in": 0,
"cue_out": 300,
"start": start_dt,
"end": end_dt,
"show_name": "Untitled",
"replay_gain": 0,
"independent_event": True,
}
pypoLiq_q.put(media_schedule)
plq.join()

View file

@ -2,12 +2,13 @@
import threading
from . import pypofetch
def __timeout(func, timeout_duration, default, args, kwargs):
def __timeout(func, timeout_duration, default, args, kwargs):
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = func(*args, **kwargs)
@ -21,10 +22,10 @@ def __timeout(func, timeout_duration, default, args, kwargs):
it.join(timeout_duration)
if it.isAlive():
"""Restart Liquidsoap and try the command one more time. If it
"""Restart Liquidsoap and try the command one more time. If it
fails again then there is something critically wrong..."""
if first_attempt:
#restart liquidsoap
# restart liquidsoap
pypofetch.PypoFetch.ref.restart_liquidsoap()
else:
raise Exception("Thread did not terminate")
@ -33,7 +34,9 @@ def __timeout(func, timeout_duration, default, args, kwargs):
first_attempt = False
def ls_timeout(f, timeout=15, default=None):
def new_f(*args, **kwargs):
return __timeout(f, timeout, default, args, kwargs)
return new_f

View file

@ -10,64 +10,63 @@ print(script_path)
os.chdir(script_path)
# Allows us to avoid installing the upstart init script when deploying on Airtime Pro:
if '--no-init-script' in sys.argv:
if "--no-init-script" in sys.argv:
data_files = []
sys.argv.remove('--no-init-script') # super hax
sys.argv.remove("--no-init-script") # super hax
else:
pypo_files = []
for root, dirnames, filenames in os.walk('pypo'):
for root, dirnames, filenames in os.walk("pypo"):
for filename in filenames:
pypo_files.append(os.path.join(root, filename))
data_files = [
('/etc/init', ['install/upstart/airtime-playout.conf.template']),
('/etc/init', ['install/upstart/airtime-liquidsoap.conf.template']),
('/etc/init.d', ['install/sysvinit/airtime-playout']),
('/etc/init.d', ['install/sysvinit/airtime-liquidsoap']),
('/var/log/airtime/pypo', []),
('/var/log/airtime/pypo-liquidsoap', []),
('/var/tmp/airtime/pypo', []),
('/var/tmp/airtime/pypo/cache', []),
('/var/tmp/airtime/pypo/files', []),
('/var/tmp/airtime/pypo/tmp', []),
]
("/etc/init", ["install/upstart/airtime-playout.conf.template"]),
("/etc/init", ["install/upstart/airtime-liquidsoap.conf.template"]),
("/etc/init.d", ["install/sysvinit/airtime-playout"]),
("/etc/init.d", ["install/sysvinit/airtime-liquidsoap"]),
("/var/log/airtime/pypo", []),
("/var/log/airtime/pypo-liquidsoap", []),
("/var/tmp/airtime/pypo", []),
("/var/tmp/airtime/pypo/cache", []),
("/var/tmp/airtime/pypo/files", []),
("/var/tmp/airtime/pypo/tmp", []),
]
print(data_files)
setup(name='airtime-playout',
version='1.0',
description='Airtime Playout Engine',
url='http://github.com/sourcefabric/Airtime',
author='sourcefabric',
license='AGPLv3',
packages=['pypo', 'pypo.media', 'pypo.media.update',
'liquidsoap'],
package_data={'': ['**/*.liq', '*.cfg', '*.types']},
scripts=[
'bin/airtime-playout',
'bin/airtime-liquidsoap',
'bin/pyponotify'
],
install_requires=[
'amqplib',
'anyjson',
'argparse',
'configobj',
'docopt',
'future',
'kombu',
'mutagen',
'PyDispatcher',
'pyinotify',
'pytz',
'requests',
'defusedxml',
'packaging',
],
zip_safe=False,
data_files=data_files)
setup(
name="airtime-playout",
version="1.0",
description="Airtime Playout Engine",
url="http://github.com/sourcefabric/Airtime",
author="sourcefabric",
license="AGPLv3",
packages=["pypo", "pypo.media", "pypo.media.update", "liquidsoap"],
package_data={"": ["**/*.liq", "*.cfg", "*.types"]},
scripts=["bin/airtime-playout", "bin/airtime-liquidsoap", "bin/pyponotify"],
install_requires=[
"amqplib",
"anyjson",
"argparse",
"configobj",
"docopt",
"future",
"kombu",
"mutagen",
"PyDispatcher",
"pyinotify",
"pytz",
"requests",
"defusedxml",
"packaging",
],
zip_safe=False,
data_files=data_files,
)
# Reload the initctl config so that playout services works
if data_files:
print("Reloading initctl configuration")
#call(['initctl', 'reload-configuration'])
print("Run \"sudo service airtime-playout start\" and \"sudo service airtime-liquidsoap start\"")
# call(['initctl', 'reload-configuration'])
print(
'Run "sudo service airtime-playout start" and "sudo service airtime-liquidsoap start"'
)