feat(playout): enhance playout logging (#1495)

Some initial work on modernizing the playout app. This replace any custom logger or
logging based logger with the logging tools from libretime_shared.logging and loguru.

Removed all the thread/function assigned logger (self.logger = ...), as this makes it
part of the logic (passing logger though function args) as it should not.

Of a dedicated logger is required for a specific task, it should use
the create_task_logger function.

- refactor: remove dead code
- refactor: remove py2 specific fix
- feat: remove unused test command
- feat: setup shared cli and logging tools
- feat: replace logging with loguru
- feat: setup shared cli and logging tools for notify
- fix: warn method deos not exist
- feat: make cli setup the entrypoint
- fix: install shared modules globally in production
  use extra_requires to load local packages in dev environement
- feat: configure log path in systemd service
- feat: default behavior is to log to console only
- feat: create log dir during install
- chore: add comment
- fix: don't create useless dir in install
- fix: move notify logs to /var/log/libretime dir
- fix: update setup_logger attrs
- style: linting
- fix: replace verbosity flag with log-level flag
- feat: use shared logging tool in liquidsoap
- fix: pass logger down to api client
- feat: allow custom log_filepath in liquidsoap config
- chore: add pylintrc to playout
- refactor: fix pylint errors
- feat: set liquidsoap log filepath in systemd service
- fix: missing setup entrypoint update

BREAKING CHANGE: for playout and liquidsoap the default log file path changed to None
and will only log to the console when developing / testing. Unless you are running the
app as a systemd service (production) the default logs filepaths changed:
from "/var/log/airtime/pypo/pypo.log" to "/var/log/libretime/playout.log" and
from "/var/log/airtime/pypo-liquidsoap/ls_script.log" to "/var/log/libretime/liquidsoap.log"

BREAKING CHANGE: for playout-notify the default log file path changed
from "/var/log/airtime/pypo/notify.log" to "/var/log/libretime/playout-notify.log"
This commit is contained in:
Jonas L 2022-01-13 16:11:37 +01:00 committed by GitHub
parent 56a3875e2d
commit 5c72f714a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 323 additions and 452 deletions

11
install
View File

@ -93,6 +93,12 @@ skip_postgres=0
skip_rabbitmq=0
default_value="Y"
# <user:group> <path>
mkdir_and_chown() {
mkdir -p "$2"
chown -R "$1" "$2"
}
function verbose() {
if [[ ${_v} -eq 1 ]]; then
echo -e "$@"
@ -1027,12 +1033,17 @@ if [ ! -d /var/log/airtime ]; then
verbose "\n * Creating /var/log/airtime"
loudCmd "mkdir -p /var/log/airtime"
mkdir_and_chown "$web_user:$web_user" "/var/log/libretime"
verbose "\n * Copying logrotate files..."
loudCmd "cp ${AIRTIMEROOT}/legacy/build/airtime-php.logrotate /etc/logrotate.d/airtime-php"
loudCmd "cp ${AIRTIMEROOT}/playout/install/logrotate/libretime-liquidsoap.conf /etc/logrotate.d/libretime-liquidsoap"
fi
verbose "\n * Installing Shared..."
loudCmd "$pip_cmd install ${AIRTIMEROOT}/shared"
verbose "...Done"
verbose "\n * Installing API client..."
loudCmd "$pip_cmd install ${AIRTIMEROOT}/api_client"
verbose "...Done"

4
playout/.pylintrc Normal file
View File

@ -0,0 +1,4 @@
[MESSAGES CONTROL]
disable=missing-module-docstring,
missing-class-docstring,
missing-function-docstring,

View File

@ -2,7 +2,7 @@ all: lint
include ../tools/python.mk
PIP_INSTALL := --editable .
PIP_INSTALL := --editable .[dev]
PYLINT_ARG := libretime_liquidsoap libretime_playout
MYPY_ARG := libretime_liquidsoap libretime_playout

View File

@ -1,4 +1,4 @@
/var/log/airtime/pypo-liquidsoap/ls_script.log {
/var/log/libretime/liquidsoap.log {
compress
rotate 10
size 1000k

View File

@ -2,6 +2,8 @@
Description=Libretime Liquidsoap Service
[Service]
Environment=LIBRETIME_LOG_FILEPATH=/var/log/libretime/liquidsoap.log
ExecStart=/usr/local/bin/libretime-liquidsoap
User=libretime-playout
Group=libretime-playout

View File

@ -3,6 +3,8 @@ Description=Libretime Playout Service
After=network-online.target
[Service]
Environment=LIBRETIME_LOG_FILEPATH=/var/log/libretime/playout.log
ExecStart=/usr/local/bin/libretime-playout
User=libretime-pypo
Group=libretime-pypo

View File

@ -1,13 +1,15 @@
import logging
import os
import sys
import time
import traceback
from pathlib import Path
from typing import Optional
from libretime_api_client.version1 import AirtimeApiClient
from loguru import logger
def generate_liquidsoap_config(ss):
def generate_liquidsoap_config(ss, log_filepath: Optional[Path]):
data = ss["msg"]
fh = open("/etc/airtime/liquidsoap.cfg", "w")
fh.write("################################################\n")
@ -34,31 +36,31 @@ def generate_liquidsoap_config(ss):
fh.write("ignore(%s)\n" % key)
auth_path = os.path.dirname(os.path.realpath(__file__))
fh.write('log_file = "/var/log/airtime/pypo-liquidsoap/<script>.log"\n')
if log_filepath is not None:
fh.write(f'log_file = "{log_filepath.resolve()}"\n')
fh.write('auth_path = "%s/liquidsoap_auth.py"\n' % auth_path)
fh.close()
def run():
logging.basicConfig(format="%(message)s")
def run(log_filepath: Optional[Path]):
attempts = 0
max_attempts = 10
successful = False
while not successful:
try:
ac = AirtimeApiClient(logging.getLogger())
ac = AirtimeApiClient(logger)
ss = ac.get_stream_setting()
generate_liquidsoap_config(ss)
generate_liquidsoap_config(ss, log_filepath)
successful = True
except Exception as e:
print("Unable to connect to the Airtime server.")
logging.error(str(e))
logging.error("traceback: %s", traceback.format_exc())
logger.error(str(e))
logger.error("traceback: %s", traceback.format_exc())
if attempts == max_attempts:
logging.error("giving up and exiting...")
logger.error("giving up and exiting...")
sys.exit(1)
else:
logging.info("Retrying in 3 seconds...")
logger.info("Retrying in 3 seconds...")
time.sleep(3)
attempts += 1

View File

@ -1,31 +1,33 @@
""" Runs Airtime liquidsoap
"""
import argparse
import logging
import os
import subprocess
from pathlib import Path
from typing import Optional
from libretime_playout import pure
import click
from libretime_shared.cli import cli_logging_options
from libretime_shared.logging import level_from_name, setup_logger
from loguru import logger
from . import generate_liquidsoap_cfg
PYPO_HOME = "/var/tmp/airtime/pypo/"
def run():
"""Entry-point for this application"""
print("Airtime Liquidsoap")
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", help="run in debug mode", action="store_true")
args = parser.parse_args()
@click.command()
@cli_logging_options
def cli(log_level: int, log_filepath: Optional[Path]):
"""
Run liquidsoap.
"""
log_level = level_from_name(log_level)
setup_logger(log_level, log_filepath)
os.environ["HOME"] = PYPO_HOME
if args.debug:
logging.basicConfig(level=getattr(logging, "DEBUG", None))
generate_liquidsoap_cfg.run()
""" check liquidsoap version so we can run a scripts matching the liquidsoap minor version """
generate_liquidsoap_cfg.run(log_filepath)
# check liquidsoap version so we can run a scripts matching the liquidsoap minor version
liquidsoap_version = subprocess.check_output(
"liquidsoap 'print(liquidsoap.version) shutdown()'",
shell=True,
@ -40,7 +42,8 @@ def run():
"--verbose",
script_path,
]
if args.debug:
print(f"Liquidsoap {liquidsoap_version} using script: {script_path}")
if log_level.is_debug():
exec_args.append("--debug")
logger.debug(f"Liquidsoap {liquidsoap_version} using script: {script_path}")
os.execl(*exec_args)

View File

@ -1,5 +1,4 @@
import base64
import logging
import time
import traceback
import urllib.error
@ -10,20 +9,17 @@ from threading import Thread
import defusedxml.minidom
from libretime_api_client import version1 as api_client
from loguru import logger
class ListenerStat(Thread):
HTTP_REQUEST_TIMEOUT = 30 # 30 second HTTP request timeout
def __init__(self, config, logger=None):
def __init__(self, config):
Thread.__init__(self)
self.config = config
self.api_client = api_client.AirtimeApiClient()
if logger is None:
self.logger = logging.getLogger()
else:
self.logger = logger
def get_node_text(self, nodelist):
rc = []
@ -130,7 +126,7 @@ class ListenerStat(Thread):
try:
self.update_listener_stat_error(v["mount"], str(e))
except Exception as e:
self.logger.error("Exception: %s", e)
logger.error("Exception: %s", e)
return stats
@ -155,27 +151,6 @@ class ListenerStat(Thread):
if stats:
self.push_stream_stats(stats)
except Exception as e:
self.logger.error("Exception: %s", e)
logger.error("Exception: %s", e)
time.sleep(120)
self.logger.info("ListenerStat thread exiting")
if __name__ == "__main__":
# create logger
logger = logging.getLogger("std_out")
logger.setLevel(logging.DEBUG)
# create console handler and set level to debug
# ch = logging.StreamHandler()
# ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(lineno)s - %(levelname)s - %(message)s"
)
# add formatter to ch
# ch.setFormatter(formatter)
# add ch to logger
# logger.addHandler(ch)
# ls = ListenerStat(logger=logger)
# ls.run()

View File

@ -3,9 +3,6 @@ Python part of radio playout (pypo)
"""
import importlib
import locale
import logging
import os
import re
import signal
@ -13,17 +10,18 @@ import sys
import telnetlib
import time
from datetime import datetime
from optparse import OptionParser
from pathlib import Path
from queue import Queue
from threading import Lock
from typing import Optional
import click
from configobj import ConfigObj
from libretime_api_client.version1 import AirtimeApiClient as ApiClient
try:
from queue import Queue
except ImportError: # Python 2.7.5 (CentOS 7)
from queue import Queue
from threading import Lock
from libretime_shared.cli import cli_logging_options
from libretime_shared.config import DEFAULT_ENV_PREFIX
from libretime_shared.logging import level_from_name, setup_logger
from loguru import logger
from . import pure
from .listenerstat import ListenerStat
@ -35,57 +33,6 @@ from .pypopush import PypoPush
from .recorder import Recorder
from .timeout import ls_timeout
LOG_PATH = "/var/log/airtime/pypo/pypo.log"
LOG_LEVEL = logging.INFO
logging.captureWarnings(True)
# Set up command-line options
parser = OptionParser()
# help screen / info
usage = "%prog [options]" + " - python playout system"
parser = OptionParser(usage=usage)
# Options
parser.add_option(
"-v",
"--compat",
help="Check compatibility with server API version",
default=False,
action="store_true",
dest="check_compat",
)
parser.add_option(
"-t",
"--test",
help="Do a test to make sure everything is working properly.",
default=False,
action="store_true",
dest="test",
)
parser.add_option(
"-b",
"--cleanup",
help="Cleanup",
default=False,
action="store_true",
dest="cleanup",
)
parser.add_option(
"-c",
"--check",
help="Check the cached schedule and exit",
default=False,
action="store_true",
dest="check",
)
# parse options
(options, args) = parser.parse_args()
LIQUIDSOAP_MIN_VERSION = "1.1.1"
PYPO_HOME = "/var/tmp/airtime/pypo/"
@ -96,42 +43,6 @@ def configure_environment():
os.environ["TERM"] = "xterm"
configure_environment()
# need to wait for Python 2.7 for this..
logging.captureWarnings(True)
# configure logging
try:
# Set up logging
logFormatter = logging.Formatter(
"%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s"
)
rootLogger = logging.getLogger()
rootLogger.setLevel(LOG_LEVEL)
logger = rootLogger
fileHandler = logging.handlers.RotatingFileHandler(
filename=LOG_PATH, maxBytes=1024 * 1024 * 30, backupCount=8
)
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
except Exception as e:
print("Couldn't configure logging: {}".format(e))
sys.exit(1)
# loading config file
try:
config = ConfigObj("/etc/airtime/airtime.conf")
except Exception as e:
logger.error("Error loading config file: %s", e)
sys.exit(1)
class Global:
def __init__(self, api_client):
self.api_client = api_client
@ -144,13 +55,12 @@ class Global:
def keyboardInterruptHandler(signum, frame):
logger = logging.getLogger()
logger.info("\nKeyboard Interrupt\n")
sys.exit(0)
@ls_timeout
def liquidsoap_get_info(telnet_lock, host, port, logger):
def liquidsoap_get_info(telnet_lock, host, port):
logger.debug("Checking to see if Liquidsoap is running")
try:
telnet_lock.acquire()
@ -176,25 +86,16 @@ def get_liquidsoap_version(version_string):
else:
return None
if m:
current_version = m.group(1)
return pure.version_cmp(current_version, LIQUIDSOAP_MIN_VERSION) >= 0
return False
def liquidsoap_startup_test(telnet_lock, ls_host, ls_port):
liquidsoap_version_string = liquidsoap_get_info(
telnet_lock, ls_host, ls_port, logger
)
liquidsoap_version_string = liquidsoap_get_info(telnet_lock, ls_host, ls_port)
while not liquidsoap_version_string:
logger.warning(
"Liquidsoap doesn't appear to be running!, " + "Sleeping and trying again"
)
time.sleep(1)
liquidsoap_version_string = liquidsoap_get_info(
telnet_lock, ls_host, ls_port, logger
)
liquidsoap_version_string = liquidsoap_get_info(telnet_lock, ls_host, ls_port)
while pure.version_cmp(liquidsoap_version_string, LIQUIDSOAP_MIN_VERSION) < 0:
logger.warning(
@ -203,14 +104,28 @@ def liquidsoap_startup_test(telnet_lock, ls_host, ls_port):
% LIQUIDSOAP_MIN_VERSION
)
time.sleep(1)
liquidsoap_version_string = liquidsoap_get_info(
telnet_lock, ls_host, ls_port, logger
)
liquidsoap_version_string = liquidsoap_get_info(telnet_lock, ls_host, ls_port)
logger.info("Liquidsoap version string found %s" % liquidsoap_version_string)
def run():
@click.command()
@cli_logging_options
def cli(log_level: str, log_filepath: Optional[Path]):
"""
Run playout.
"""
setup_logger(level_from_name(log_level), log_filepath)
configure_environment()
# loading config file
try:
config = ConfigObj("/etc/airtime/airtime.conf")
except Exception as e:
logger.error("Error loading config file: %s", e)
sys.exit(1)
logger.info("###########################################")
logger.info("# *** pypo *** #")
logger.info("# Liquidsoap Scheduled Playout System #")
@ -246,15 +161,11 @@ def run():
liquidsoap_startup_test(telnet_lock, ls_host, ls_port)
if options.test:
g.test_api()
sys.exit(0)
pypoFetch_q = Queue()
recorder_q = Queue()
pypoPush_q = Queue()
pypo_liquidsoap = PypoLiquidsoap(logger, telnet_lock, ls_host, ls_port)
pypo_liquidsoap = PypoLiquidsoap(telnet_lock, ls_host, ls_port)
"""
This queue is shared between pypo-fetch and pypo-file, where pypo-file
@ -295,5 +206,3 @@ def run():
# This allows CTRL-C to work!
while True:
time.sleep(1)
logger.info("System exit")

View File

@ -1,5 +1,3 @@
import traceback
"""
Python part of radio playout (pypo)
@ -16,9 +14,10 @@ Main case:
"""
import json
import logging.config
import sys
import traceback
from optparse import OptionParser
from pathlib import Path
# additional modules (should be checked)
from configobj import ConfigObj
@ -26,9 +25,14 @@ from configobj import ConfigObj
# custom imports
# from util import *
from libretime_api_client import version1 as api_client
from libretime_shared.logging import INFO, setup_logger
from loguru import logger
LOG_LEVEL = logging.INFO
LOG_PATH = "/var/log/airtime/pypo/notify.log"
# TODO: Get log settings from cli/env variables
DEFAULT_LOG_LEVEL = INFO
DEFAULT_LOG_FILEPATH = Path("/var/log/libretime/playout-notify.log")
setup_logger(DEFAULT_LOG_LEVEL, DEFAULT_LOG_FILEPATH)
# help screeen / info
usage = "%prog [options]" + " - notification gateway"
@ -98,25 +102,6 @@ parser.add_option(
# parse options
(options, args) = parser.parse_args()
# Set up logging
logging.captureWarnings(True)
logFormatter = logging.Formatter(
"%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s"
)
rootLogger = logging.getLogger()
rootLogger.setLevel(LOG_LEVEL)
fileHandler = logging.handlers.RotatingFileHandler(
filename=LOG_PATH, maxBytes=1024 * 1024 * 30, backupCount=8
)
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
logger = rootLogger
# need to wait for Python 2.7 for this..
# logging.captureWarnings(True)
@ -150,8 +135,7 @@ class Notify:
logger.info("# Calling server to update liquidsoap status #")
logger.info("#################################################")
logger.info("msg = " + str(msg))
response = self.api_client.notify_liquidsoap_status(msg, stream_id, time)
logger.info("Response: " + json.dumps(response))
self.api_client.notify_liquidsoap_status(msg, stream_id, time)
def notify_source_status(self, source_name, status):
logger.debug("#################################################")
@ -165,8 +149,7 @@ class Notify:
logger.debug("#################################################")
logger.debug("# Calling server to update webstream data #")
logger.debug("#################################################")
response = self.api_client.notify_webstream_data(data, media_id)
logger.debug("Response: " + json.dumps(response))
self.api_client.notify_webstream_data(data, media_id)
def run_with_options(self, options):
if options.error and options.stream_id:

View File

@ -1,6 +1,5 @@
import copy
import json
import logging.config
import mimetypes
import os
import signal
@ -15,21 +14,19 @@ from threading import Thread, Timer
from libretime_api_client import version1 as v1_api_client
from libretime_api_client import version2 as api_client
from loguru import logger
from . import pure
from .timeout import ls_timeout
def keyboardInterruptHandler(signum, frame):
logger = logging.getLogger()
logger.info("\nKeyboard Interrupt\n")
sys.exit(0)
signal.signal(signal.SIGINT, keyboardInterruptHandler)
logging.captureWarnings(True)
POLL_INTERVAL = 400
@ -53,12 +50,10 @@ class PypoFetch(Thread):
self.telnet_lock = telnet_lock
self.logger = logging.getLogger()
self.pypo_liquidsoap = pypo_liquidsoap
self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
self.logger.debug("Cache dir %s", self.cache_dir)
logger.debug("Cache dir %s", self.cache_dir)
try:
if not os.path.isdir(dir):
@ -67,13 +62,13 @@ class PypoFetch(Thread):
is a file. We are not handling the second case, but don't
think we actually care about handling it.
"""
self.logger.debug("Cache dir does not exist. Creating...")
logger.debug("Cache dir does not exist. Creating...")
os.makedirs(dir)
except Exception as e:
pass
self.schedule_data = []
self.logger.info("PypoFetch: init complete")
logger.info("PypoFetch: init complete")
"""
Handle a message from RabbitMQ, put it into our yucky global var.
@ -82,7 +77,7 @@ class PypoFetch(Thread):
def handle_message(self, message):
try:
self.logger.info("Received event from Pypo Message Handler: %s" % message)
logger.info("Received event from Pypo Message Handler: %s" % message)
try:
message = message.decode()
@ -90,7 +85,7 @@ class PypoFetch(Thread):
pass
m = json.loads(message)
command = m["event_type"]
self.logger.info("Handling command: " + command)
logger.info("Handling command: " + command)
if command == "update_schedule":
self.schedule_data = m["schedule"]
@ -98,29 +93,29 @@ class PypoFetch(Thread):
elif command == "reset_liquidsoap_bootstrap":
self.set_bootstrap_variables()
elif command == "update_stream_setting":
self.logger.info("Updating stream setting...")
logger.info("Updating stream setting...")
self.regenerate_liquidsoap_conf(m["setting"])
elif command == "update_stream_format":
self.logger.info("Updating stream format...")
logger.info("Updating stream format...")
self.update_liquidsoap_stream_format(m["stream_format"])
elif command == "update_station_name":
self.logger.info("Updating station name...")
logger.info("Updating station name...")
self.update_liquidsoap_station_name(m["station_name"])
elif command == "update_transition_fade":
self.logger.info("Updating transition_fade...")
logger.info("Updating transition_fade...")
self.update_liquidsoap_transition_fade(m["transition_fade"])
elif command == "switch_source":
self.logger.info("switch_on_source show command received...")
logger.info("switch_on_source show command received...")
self.pypo_liquidsoap.get_telnet_dispatcher().switch_source(
m["sourcename"], m["status"]
)
elif command == "disconnect_source":
self.logger.info("disconnect_on_source show command received...")
logger.info("disconnect_on_source show command received...")
self.pypo_liquidsoap.get_telnet_dispatcher().disconnect_source(
m["sourcename"]
)
else:
self.logger.info("Unknown command: %s" % command)
logger.info("Unknown command: %s" % command)
# update timeout value
if command == "update_schedule":
@ -131,12 +126,12 @@ class PypoFetch(Thread):
)
if self.listener_timeout < 0:
self.listener_timeout = 0
self.logger.info("New timeout: %s" % self.listener_timeout)
logger.info("New timeout: %s" % self.listener_timeout)
except Exception as e:
self.logger.exception("Exception in handling Message Handler message")
logger.exception("Exception in handling Message Handler message")
def switch_source_temp(self, sourcename, status):
self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
logger.debug('Switching source: %s to "%s" status', sourcename, status)
command = "streams."
if sourcename == "master_dj":
command += "master_dj_"
@ -157,13 +152,13 @@ class PypoFetch(Thread):
"""
def set_bootstrap_variables(self):
self.logger.debug("Getting information needed on bootstrap from Airtime")
logger.debug("Getting information needed on bootstrap from Airtime")
try:
info = self.v1_api_client.get_bootstrap_info()
except Exception as e:
self.logger.exception("Unable to get bootstrap info.. Exiting pypo...")
logger.exception("Unable to get bootstrap info.. Exiting pypo...")
self.logger.debug("info:%s", info)
logger.debug("info:%s", info)
commands = []
for k, v in info["switch_status"].items():
commands.append(self.switch_source_temp(k, v))
@ -191,13 +186,13 @@ class PypoFetch(Thread):
will be thrown."""
self.telnet_lock.acquire(False)
self.logger.info("Restarting Liquidsoap")
logger.info("Restarting Liquidsoap")
subprocess.call(
"kill -9 `pidof libretime-liquidsoap`", shell=True, close_fds=True
)
# Wait here and poll Liquidsoap until it has started up
self.logger.info("Waiting for Liquidsoap to start")
logger.info("Waiting for Liquidsoap to start")
while True:
try:
tn = telnetlib.Telnet(
@ -205,14 +200,14 @@ class PypoFetch(Thread):
)
tn.write("exit\n".encode("utf-8"))
tn.read_all()
self.logger.info("Liquidsoap is up and running")
logger.info("Liquidsoap is up and running")
break
except Exception as e:
# sleep 0.5 seconds and try again
time.sleep(0.5)
except Exception as e:
self.logger.exception(e)
logger.exception(e)
finally:
if self.telnet_lock.locked():
self.telnet_lock.release()
@ -242,18 +237,18 @@ class PypoFetch(Thread):
boot_up_time_command = (
"vars.bootup_time " + str(current_time) + "\n"
).encode("utf-8")
self.logger.info(boot_up_time_command)
logger.info(boot_up_time_command)
tn.write(boot_up_time_command)
connection_status = ("streams.connection_status\n").encode("utf-8")
self.logger.info(connection_status)
logger.info(connection_status)
tn.write(connection_status)
tn.write("exit\n".encode("utf-8"))
output = tn.read_all()
except Exception as e:
self.logger.exception(e)
logger.exception(e)
finally:
self.telnet_lock.release()
@ -263,7 +258,7 @@ class PypoFetch(Thread):
# streamin info is in the form of:
# eg. s1:true,2:true,3:false
streams = stream_info.split(",")
self.logger.info(streams)
logger.info(streams)
fake_time = current_time + 1
for s in streams:
@ -283,12 +278,12 @@ class PypoFetch(Thread):
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config["ls_host"], self.config["ls_port"])
command = ("vars.stream_metadata_type %s\n" % stream_format).encode("utf-8")
self.logger.info(command)
logger.info(command)
tn.write(command)
tn.write("exit\n".encode("utf-8"))
tn.read_all()
except Exception as e:
self.logger.exception(e)
logger.exception(e)
finally:
self.telnet_lock.release()
@ -300,12 +295,12 @@ class PypoFetch(Thread):
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config["ls_host"], self.config["ls_port"])
command = ("vars.default_dj_fade %s\n" % fade).encode("utf-8")
self.logger.info(command)
logger.info(command)
tn.write(command)
tn.write("exit\n".encode("utf-8"))
tn.read_all()
except Exception as e:
self.logger.exception(e)
logger.exception(e)
finally:
self.telnet_lock.release()
@ -318,16 +313,16 @@ class PypoFetch(Thread):
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config["ls_host"], self.config["ls_port"])
command = ("vars.station_name %s\n" % station_name).encode("utf-8")
self.logger.info(command)
logger.info(command)
tn.write(command)
tn.write("exit\n".encode("utf-8"))
tn.read_all()
except Exception as e:
self.logger.exception(e)
logger.exception(e)
finally:
self.telnet_lock.release()
except Exception as e:
self.logger.exception(e)
logger.exception(e)
"""
Process the schedule
@ -340,7 +335,7 @@ class PypoFetch(Thread):
def process_schedule(self, schedule_data):
self.last_update_schedule_timestamp = time.time()
self.logger.debug(schedule_data)
logger.debug(schedule_data)
media = schedule_data["media"]
media_filtered = {}
@ -376,17 +371,17 @@ class PypoFetch(Thread):
self.media_prepare_queue.put(copy.copy(media_filtered))
except Exception as e:
self.logger.exception(e)
logger.exception(e)
# Send the data to pypo-push
self.logger.debug("Pushing to pypo-push")
logger.debug("Pushing to pypo-push")
self.push_queue.put(media_copy)
# cleanup
try:
self.cache_cleanup(media)
except Exception as e:
self.logger.exception(e)
logger.exception(e)
# do basic validation of file parameters. Useful for debugging
# purposes
@ -402,9 +397,9 @@ class PypoFetch(Thread):
length2 = media_item["cue_out"] - media_item["cue_in"]
if abs(length2 - length1) > 1:
self.logger.error("end - start length: %s", length1)
self.logger.error("cue_out - cue_in length: %s", length2)
self.logger.error("Two lengths are not equal!!!")
logger.error("end - start length: %s", length1)
logger.error("cue_out - cue_in length: %s", length2)
logger.error("Two lengths are not equal!!!")
media_item["file_ext"] = mime_ext
@ -438,22 +433,22 @@ class PypoFetch(Thread):
expired_files = cached_file_set - scheduled_file_set
self.logger.debug("Files to remove " + str(expired_files))
logger.debug("Files to remove " + str(expired_files))
for f in expired_files:
try:
path = os.path.join(self.cache_dir, f)
self.logger.debug("Removing %s" % path)
logger.debug("Removing %s" % path)
# check if this file is opened (sometimes Liquidsoap is still
# playing the file due to our knowledge of the track length
# being incorrect!)
if not self.is_file_opened(path):
os.remove(path)
self.logger.info("File '%s' removed" % path)
logger.info("File '%s' removed" % path)
else:
self.logger.info("File '%s' not removed. Still busy!" % path)
logger.info("File '%s' not removed. Still busy!" % path)
except Exception as e:
self.logger.exception("Problem removing file '%s'" % f)
logger.exception("Problem removing file '%s'" % f)
def manual_schedule_fetch(self):
try:
@ -461,8 +456,8 @@ class PypoFetch(Thread):
self.process_schedule(self.schedule_data)
return True
except Exception as e:
self.logger.error("Unable to fetch schedule")
self.logger.exception(e)
logger.error("Unable to fetch schedule")
logger.exception(e)
return False
def persistent_manual_schedule_fetch(self, max_attempts=1):
@ -499,11 +494,11 @@ class PypoFetch(Thread):
success = self.persistent_manual_schedule_fetch(max_attempts=5)
if success:
self.logger.info("Bootstrap schedule received: %s", self.schedule_data)
logger.info("Bootstrap schedule received: %s", self.schedule_data)
loops = 1
while True:
self.logger.info("Loop #%s", loops)
logger.info("Loop #%s", loops)
manual_fetch_needed = False
try:
"""
@ -526,16 +521,16 @@ class PypoFetch(Thread):
manual_fetch_needed = False
self.handle_message(message)
except Empty as e:
self.logger.info("Queue timeout. Fetching schedule manually")
logger.info("Queue timeout. Fetching schedule manually")
manual_fetch_needed = True
except Exception as e:
self.logger.exception(e)
logger.exception(e)
try:
if manual_fetch_needed:
self.persistent_manual_schedule_fetch(max_attempts=5)
except Exception as e:
self.logger.exception("Failed to manually fetch the schedule.")
logger.exception("Failed to manually fetch the schedule.")
loops += 1
@ -544,4 +539,3 @@ class PypoFetch(Thread):
Entry point of the thread
"""
self.main()
self.logger.info("PypoFetch thread exiting")

View File

@ -1,7 +1,6 @@
import configparser
import hashlib
import json
import logging
import os
import shutil
import stat
@ -14,17 +13,15 @@ from threading import Thread
import requests
from libretime_api_client import version2 as api_client
from loguru import logger
from requests.exceptions import ConnectionError, HTTPError, Timeout
CONFIG_PATH = "/etc/airtime/airtime.conf"
logging.captureWarnings(True)
class PypoFile(Thread):
def __init__(self, schedule_queue, config):
Thread.__init__(self)
self.logger = logging.getLogger()
self.media_queue = schedule_queue
self.media = None
self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
@ -56,7 +53,7 @@ class PypoFile(Thread):
# become an issue here... This needs proper cache management.
# https://github.com/LibreTime/libretime/issues/756#issuecomment-477853018
# https://github.com/LibreTime/libretime/pull/845
self.logger.debug(
logger.debug(
"file %s already exists in local cache as %s, skipping copying..."
% (src, dst)
)
@ -66,16 +63,16 @@ class PypoFile(Thread):
media_item["file_ready"] = not do_copy
if do_copy:
self.logger.info("copying from %s to local cache %s" % (src, dst))
logger.info("copying from %s to local cache %s" % (src, dst))
try:
with open(dst, "wb") as handle:
self.logger.info(media_item)
logger.info(media_item)
response = self.api_client.services.file_download_url(
id=media_item["id"]
)
if not response.ok:
self.logger.error(response)
logger.error(response)
raise Exception(
"%s - Error occurred downloading file"
% response.status_code
@ -95,8 +92,8 @@ class PypoFile(Thread):
media_item["file_ready"] = True
except Exception as e:
self.logger.error("Could not copy from %s to %s" % (src, dst))
self.logger.error(e)
logger.error("Could not copy from %s to %s" % (src, dst))
logger.error(e)
def report_file_size_and_md5_to_api(self, file_path, file_id):
try:
@ -112,10 +109,10 @@ class PypoFile(Thread):
md5_hash = m.hexdigest()
except (OSError, IOError) as e:
file_size = 0
self.logger.error(
logger.error(
"Error getting file size and md5 hash for file id %s" % file_id
)
self.logger.error(e)
logger.error(e)
# Make PUT request to LibreTime to update the file size and hash
error_msg = (
@ -125,10 +122,10 @@ class PypoFile(Thread):
payload = {"filesize": file_size, "md5": md5_hash}
response = self.api_client.update_file(file_id, payload)
except (ConnectionError, Timeout):
self.logger.error(error_msg)
logger.error(error_msg)
except Exception as e:
self.logger.error(error_msg)
self.logger.error(e)
logger.error(error_msg)
logger.error(e)
return file_size
@ -148,7 +145,7 @@ class PypoFile(Thread):
highest_priority = sorted_keys[0]
media_item = schedule[highest_priority]
self.logger.debug("Highest priority item: %s" % highest_priority)
logger.debug("Highest priority item: %s" % highest_priority)
"""
Remove this media_item from the dictionary. On the next iteration
@ -168,13 +165,10 @@ class PypoFile(Thread):
try:
config.readfp(open(config_path))
except IOError as e:
logging.debug(
logger.debug(
"Failed to open config file at %s: %s" % (config_path, e.strerror)
)
sys.exit()
except Exception as e:
logging.debug(e.strerror)
sys.exit()
return config
@ -206,8 +200,8 @@ class PypoFile(Thread):
import traceback
top = traceback.format_exc()
self.logger.error(str(e))
self.logger.error(top)
logger.error(str(e))
logger.error(top)
raise
def run(self):
@ -218,6 +212,6 @@ class PypoFile(Thread):
self.main()
except Exception as e:
top = traceback.format_exc()
self.logger.error("PypoFile Exception: %s", top)
logger.error("PypoFile Exception: %s", top)
time.sleep(5)
self.logger.info("PypoFile thread exiting")
logger.info("PypoFile thread exiting")

View File

@ -7,9 +7,10 @@ from datetime import datetime
from queue import Empty
from threading import Thread
from loguru import logger
def keyboardInterruptHandler(signum, frame):
logger = logging.getLogger()
logger.info("\nKeyboard Interrupt\n")
sys.exit(0)
@ -18,10 +19,9 @@ signal.signal(signal.SIGINT, keyboardInterruptHandler)
class PypoLiqQueue(Thread):
def __init__(self, q, pypo_liquidsoap, logger):
def __init__(self, q, pypo_liquidsoap):
Thread.__init__(self)
self.queue = q
self.logger = logger
self.pypo_liquidsoap = pypo_liquidsoap
def main(self):
@ -32,10 +32,10 @@ class PypoLiqQueue(Thread):
while True:
try:
if time_until_next_play is None:
self.logger.info("waiting indefinitely for schedule")
logger.info("waiting indefinitely for schedule")
media_schedule = self.queue.get(block=True)
else:
self.logger.info(
logger.info(
"waiting %ss until next scheduled item" % time_until_next_play
)
media_schedule = self.queue.get(
@ -54,7 +54,7 @@ class PypoLiqQueue(Thread):
else:
time_until_next_play = None
else:
self.logger.info("New schedule received: %s", media_schedule)
logger.info("New schedule received: %s", media_schedule)
# new schedule received. Replace old one with this.
schedule_deque.clear()
@ -89,4 +89,4 @@ class PypoLiqQueue(Thread):
try:
self.main()
except Exception as e:
self.logger.error("PypoLiqQueue Exception: %s", traceback.format_exc())
logger.error("PypoLiqQueue Exception: %s", traceback.format_exc())

View File

@ -1,14 +1,15 @@
import time
from datetime import datetime, timedelta
from loguru import logger
from . import eventtypes
from .pypofetch import PypoFetch
from .telnetliquidsoap import TelnetLiquidsoap
class PypoLiquidsoap:
def __init__(self, logger, telnet_lock, host, port):
self.logger = logger
def __init__(self, telnet_lock, host, port):
self.liq_queue_tracker = {
"s0": None,
"s1": None,
@ -18,7 +19,7 @@ class PypoLiquidsoap:
}
self.telnet_liquidsoap = TelnetLiquidsoap(
telnet_lock, logger, host, port, list(self.liq_queue_tracker.keys())
telnet_lock, host, port, list(self.liq_queue_tracker.keys())
)
def get_telnet_dispatcher(self):
@ -64,10 +65,10 @@ class PypoLiquidsoap:
self.telnet_liquidsoap.queue_push(available_queue, media_item)
self.liq_queue_tracker[available_queue] = media_item
except Exception as e:
self.logger.error(e)
logger.error(e)
raise
else:
self.logger.warn(
logger.warning(
"File %s did not become ready in less than 5 seconds. Skipping...",
media_item["dst"],
)
@ -158,7 +159,7 @@ class PypoLiquidsoap:
if not correct:
# need to re-add
self.logger.info("Track %s found to have new attr." % i)
logger.info("Track %s found to have new attr." % i)
to_be_removed.add(i["row_id"])
to_be_added.add(i["row_id"])
@ -166,9 +167,7 @@ class PypoLiquidsoap:
to_be_added.update(schedule_ids - liq_queue_ids)
if to_be_removed:
self.logger.info(
"Need to remove items from Liquidsoap: %s" % to_be_removed
)
logger.info("Need to remove items from Liquidsoap: %s" % to_be_removed)
# remove files from Liquidsoap's queue
for i in self.liq_queue_tracker:
@ -177,9 +176,7 @@ class PypoLiquidsoap:
self.stop(i)
if to_be_added:
self.logger.info(
"Need to add items to Liquidsoap *now*: %s" % to_be_added
)
logger.info("Need to add items to Liquidsoap *now*: %s" % to_be_added)
for i in scheduled_now_files:
if i["row_id"] in to_be_added:
@ -196,7 +193,7 @@ class PypoLiquidsoap:
self.telnet_liquidsoap.stop_web_stream_buffer()
self.telnet_liquidsoap.stop_web_stream_output()
except KeyError as e:
self.logger.error("Error: Malformed event in schedule. " + str(e))
logger.error("Error: Malformed event in schedule. " + str(e))
def stop(self, queue):
self.telnet_liquidsoap.queue_remove(queue)
@ -220,7 +217,7 @@ class PypoLiquidsoap:
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec > 0:
self.logger.debug(
logger.debug(
"media item was supposed to start %s ago. Preparing to start..",
diff_sec,
)

View File

@ -1,5 +1,4 @@
import json
import logging
import os
import sys
import time
@ -13,8 +12,7 @@ from kombu.connection import Connection
from kombu.messaging import Exchange, Queue
from kombu.mixins import ConsumerMixin
from kombu.simple import SimpleQueue
logging.captureWarnings(True)
from loguru import logger
class RabbitConsumer(ConsumerMixin):
@ -36,13 +34,12 @@ class RabbitConsumer(ConsumerMixin):
class PypoMessageHandler(Thread):
def __init__(self, pq, rq, config):
Thread.__init__(self)
self.logger = logging.getLogger("message_h")
self.pypo_queue = pq
self.recorder_queue = rq
self.config = config
def init_rabbit_mq(self):
self.logger.info("Initializing RabbitMQ stuff")
logger.info("Initializing RabbitMQ stuff")
try:
schedule_exchange = Exchange(
"airtime-pypo", "direct", durable=True, auto_delete=True
@ -58,7 +55,7 @@ class PypoMessageHandler(Thread):
rabbit = RabbitConsumer(connection, [schedule_queue], self)
rabbit.run()
except Exception as e:
self.logger.error(e)
logger.error(e)
"""
Handle a message from RabbitMQ, put it into our yucky global var.
@ -67,7 +64,7 @@ class PypoMessageHandler(Thread):
def handle_message(self, message):
try:
self.logger.info("Received event from RabbitMQ: %s" % message)
logger.info("Received event from RabbitMQ: %s" % message)
try:
message = message.decode()
@ -75,50 +72,48 @@ class PypoMessageHandler(Thread):
pass
m = json.loads(message)
command = m["event_type"]
self.logger.info("Handling command: " + command)
logger.info("Handling command: " + command)
if command == "update_schedule":
self.logger.info("Updating schedule...")
logger.info("Updating schedule...")
self.pypo_queue.put(message)
elif command == "reset_liquidsoap_bootstrap":
self.logger.info("Resetting bootstrap vars...")
logger.info("Resetting bootstrap vars...")
self.pypo_queue.put(message)
elif command == "update_stream_setting":
self.logger.info("Updating stream setting...")
logger.info("Updating stream setting...")
self.pypo_queue.put(message)
elif command == "update_stream_format":
self.logger.info("Updating stream format...")
logger.info("Updating stream format...")
self.pypo_queue.put(message)
elif command == "update_station_name":
self.logger.info("Updating station name...")
logger.info("Updating station name...")
self.pypo_queue.put(message)
elif command == "switch_source":
self.logger.info("switch_source command received...")
logger.info("switch_source command received...")
self.pypo_queue.put(message)
elif command == "update_transition_fade":
self.logger.info("Updating trasition fade...")
logger.info("Updating trasition fade...")
self.pypo_queue.put(message)
elif command == "disconnect_source":
self.logger.info("disconnect_source command received...")
logger.info("disconnect_source command received...")
self.pypo_queue.put(message)
elif command == "update_recorder_schedule":
self.recorder_queue.put(message)
elif command == "cancel_recording":
self.recorder_queue.put(message)
else:
self.logger.info("Unknown command: %s" % command)
logger.info("Unknown command: %s" % command)
except Exception as e:
self.logger.error("Exception in handling RabbitMQ message: %s", e)
logger.error("Exception in handling RabbitMQ message: %s", e)
def main(self):
try:
self.init_rabbit_mq()
except Exception as e:
self.logger.error("Exception: %s", e)
self.logger.error("traceback: %s", traceback.format_exc())
self.logger.error(
"Error connecting to RabbitMQ Server. Trying again in few seconds"
)
logger.error("Exception: %s", e)
logger.error("traceback: %s", traceback.format_exc())
logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
"""

View File

@ -1,5 +1,4 @@
import calendar
import logging.config
import math
import os
import sys
@ -12,13 +11,12 @@ from threading import Thread
from configobj import ConfigObj
from libretime_api_client import version1 as api_client
from loguru import logger
from .pypofetch import PypoFetch
from .pypoliqqueue import PypoLiqQueue
from .timeout import ls_timeout
logging.captureWarnings(True)
PUSH_INTERVAL = 2
@ -40,16 +38,13 @@ class PypoPush(Thread):
self.config = config
self.pushed_objects = {}
self.logger = logging.getLogger("push")
self.current_prebuffering_stream_id = None
self.queue_id = 0
self.future_scheduled_queue = Queue()
self.pypo_liquidsoap = pypo_liquidsoap
self.plq = PypoLiqQueue(
self.future_scheduled_queue, self.pypo_liquidsoap, self.logger
)
self.plq = PypoLiqQueue(self.future_scheduled_queue, self.pypo_liquidsoap)
self.plq.daemon = True
self.plq.start()
@ -63,10 +58,10 @@ class PypoPush(Thread):
try:
media_schedule = self.queue.get(block=True)
except Exception as e:
self.logger.error(str(e))
logger.error(str(e))
raise
else:
self.logger.debug(media_schedule)
logger.debug(media_schedule)
# separate media_schedule list into currently_playing and
# scheduled_for_future lists
currently_playing, scheduled_for_future = self.separate_present_future(
@ -77,7 +72,7 @@ class PypoPush(Thread):
self.future_scheduled_queue.put(scheduled_for_future)
if loops % heartbeat_period == 0:
self.logger.info("heartbeat")
logger.info("heartbeat")
loops = 0
loops += 1
@ -93,17 +88,17 @@ class PypoPush(Thread):
# Ignore track that already ended
if media_item["end"] < tnow:
self.logger.debug(f"ignoring ended media_item: {media_item}")
logger.debug(f"ignoring ended media_item: {media_item}")
continue
diff_td = tnow - media_item["start"]
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec >= 0:
self.logger.debug(f"adding media_item to present: {media_item}")
logger.debug(f"adding media_item to present: {media_item}")
present.append(media_item)
else:
self.logger.debug(f"adding media_item to future: {media_item}")
logger.debug(f"adding media_item to future: {media_item}")
future[mkey] = media_item
return present, future
@ -128,22 +123,22 @@ class PypoPush(Thread):
# msg = 'dynamic_source.read_stop_all xxx\n'
msg = "http.stop\n"
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg)
msg = "dynamic_source.output_stop\n"
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg)
msg = "dynamic_source.id -1\n"
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg)
tn.write("exit\n")
self.logger.debug(tn.read_all())
logger.debug(tn.read_all())
except Exception as e:
self.logger.error(str(e))
logger.error(str(e))
finally:
self.telnet_lock.release()
@ -153,6 +148,6 @@ class PypoPush(Thread):
self.main()
except Exception as e:
top = traceback.format_exc()
self.logger.error("Pypo Push Exception: %s", top)
logger.error("Pypo Push Exception: %s", top)
time.sleep(5)
self.logger.info("PypoPush thread exiting")
logger.info("PypoPush thread exiting")

View File

@ -1,6 +1,5 @@
import datetime
import json
import logging
import math
import os
import re
@ -14,16 +13,16 @@ from threading import Thread
import mutagen
import pytz
from configobj import ConfigObj
from libretime_api_client import version1 as v1_api_client
from libretime_api_client import version2 as api_client
from libretime_api_client.version1 import AirtimeApiClient as AirtimeApiClientV1
from loguru import logger
def api_client(logger):
def api_client():
"""
api_client returns the correct instance of AirtimeApiClient. Although there is only one
instance to choose from at the moment.
"""
return v1_api_client.AirtimeApiClient(logger)
return AirtimeApiClientV1()
# loading config file
@ -58,8 +57,7 @@ PUSH_INTERVAL = 2
class ShowRecorder(Thread):
def __init__(self, show_instance, show_name, filelength, start_time):
Thread.__init__(self)
self.logger = logging.getLogger("recorder")
self.api_client = api_client(self.logger)
self.api_client = api_client()
self.filelength = filelength
self.start_time = start_time
self.show_instance = show_instance
@ -96,8 +94,8 @@ class ShowRecorder(Thread):
)
args = command.split(" ")
self.logger.info("starting record")
self.logger.info("command " + command)
logger.info("starting record")
logger.info("command " + command)
self.p = Popen(args, stdout=PIPE, stderr=PIPE)
@ -108,8 +106,8 @@ class ShowRecorder(Thread):
for msg in outmsgs:
m = re.search("^ERROR", msg)
if not m == None:
self.logger.info("Recording error is found: %s", outmsgs)
self.logger.info("finishing record, return code %s", self.p.returncode)
logger.info("Recording error is found: %s", outmsgs)
logger.info("finishing record, return code %s", self.p.returncode)
code = self.p.returncode
self.p = None
@ -118,7 +116,7 @@ class ShowRecorder(Thread):
def cancel_recording(self):
# send signal interrupt (2)
self.logger.info("Show manually cancelled!")
logger.info("Show manually cancelled!")
if self.p is not None:
self.p.send_signal(signal.SIGINT)
@ -148,7 +146,7 @@ class ShowRecorder(Thread):
full_date, full_time = self.start_time.split(" ", 1)
# No idea why we translated - to : before
# full_time = full_time.replace(":","-")
self.logger.info("time: %s" % full_time)
logger.info("time: %s" % full_time)
artist = "Airtime Show Recorder"
# set some metadata for our file daemon
recorded_file = mutagen.File(filepath, easy=True)
@ -161,38 +159,37 @@ class ShowRecorder(Thread):
except Exception as e:
top = traceback.format_exc()
self.logger.error("Exception: %s", e)
self.logger.error("traceback: %s", top)
logger.error("Exception: %s", e)
logger.error("traceback: %s", top)
def run(self):
code, filepath = self.record_show()
if code == 0:
try:
self.logger.info("Preparing to upload %s" % filepath)
logger.info("Preparing to upload %s" % filepath)
self.set_metadata_and_save(filepath)
self.upload_file(filepath)
os.remove(filepath)
except Exception as e:
self.logger.error(e)
logger.error(e)
else:
self.logger.info("problem recording show")
logger.info("problem recording show")
os.remove(filepath)
class Recorder(Thread):
def __init__(self, q):
Thread.__init__(self)
self.logger = logging.getLogger("recorder")
self.api_client = api_client(self.logger)
self.api_client = api_client()
self.sr = None
self.shows_to_record = {}
self.server_timezone = ""
self.queue = q
self.loops = 0
self.logger.info("RecorderFetch: init complete")
logger.info("RecorderFetch: init complete")
success = False
while not success:
@ -200,7 +197,7 @@ class Recorder(Thread):
self.api_client.register_component("show-recorder")
success = True
except Exception as e:
self.logger.error(str(e))
logger.error(str(e))
time.sleep(10)
def handle_message(self):
@ -212,7 +209,7 @@ class Recorder(Thread):
pass
msg = json.loads(message)
command = msg["event_type"]
self.logger.info("Received msg from Pypo Message Handler: %s", msg)
logger.info("Received msg from Pypo Message Handler: %s", msg)
if command == "cancel_recording":
if self.currently_recording():
self.cancel_recording()
@ -224,7 +221,7 @@ class Recorder(Thread):
self.start_record()
def process_recorder_schedule(self, m):
self.logger.info("Parsing recording show schedules...")
logger.info("Parsing recording show schedules...")
temp_shows_to_record = {}
shows = m["shows"]
for show in shows:
@ -253,9 +250,9 @@ class Recorder(Thread):
out = float(s)
if out < 5:
self.logger.debug("Shows %s", self.shows_to_record)
self.logger.debug("Next show %s", next_show)
self.logger.debug("Now %s", tnow)
logger.debug("Shows %s", self.shows_to_record)
logger.debug("Next show %s", next_show)
logger.debug("Now %s", tnow)
return out
def cancel_recording(self):
@ -274,7 +271,7 @@ class Recorder(Thread):
try:
delta = self.get_time_till_next_show()
if delta < 5:
self.logger.debug("sleeping %s seconds until show", delta)
logger.debug("sleeping %s seconds until show", delta)
time.sleep(delta)
sorted_show_keys = sorted(self.shows_to_record.keys())
@ -306,7 +303,7 @@ class Recorder(Thread):
# avoiding CC-5299
while True:
if self.currently_recording():
self.logger.info("Previous record not finished, sleeping 100ms")
logger.info("Previous record not finished, sleeping 100ms")
seconds_waiting = seconds_waiting + 0.1
time.sleep(0.1)
else:
@ -326,8 +323,8 @@ class Recorder(Thread):
# self.time_till_next_show = self.get_time_till_next_show()
except Exception as e:
top = traceback.format_exc()
self.logger.error("Exception: %s", e)
self.logger.error("traceback: %s", top)
logger.error("Exception: %s", e)
logger.error("traceback: %s", top)
def run(self):
"""
@ -336,19 +333,19 @@ class Recorder(Thread):
poll the server to get the upcoming schedule.
"""
try:
self.logger.info("Started...")
logger.info("Started...")
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
self.logger.info("Bootstrap recorder schedule received: %s", temp)
logger.info("Bootstrap recorder schedule received: %s", temp)
except Exception as e:
self.logger.error(traceback.format_exc())
self.logger.error(e)
logger.error(traceback.format_exc())
logger.error(e)
self.logger.info("Bootstrap complete: got initial copy of the schedule")
logger.info("Bootstrap complete: got initial copy of the schedule")
self.loops = 0
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
@ -363,18 +360,18 @@ class Recorder(Thread):
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
self.logger.info("updated recorder schedule received: %s", temp)
logger.info("updated recorder schedule received: %s", temp)
except Exception as e:
self.logger.error(traceback.format_exc())
self.logger.error(e)
logger.error(traceback.format_exc())
logger.error(e)
try:
self.handle_message()
except Exception as e:
self.logger.error(traceback.format_exc())
self.logger.error("Pypo Recorder Exception: %s", e)
logger.error(traceback.format_exc())
logger.error("Pypo Recorder Exception: %s", e)
time.sleep(PUSH_INTERVAL)
self.loops += 1
except Exception as e:
top = traceback.format_exc()
self.logger.error("Exception: %s", e)
self.logger.error("traceback: %s", top)
logger.error("Exception: %s", e)
logger.error("traceback: %s", top)

View File

@ -1,6 +1,8 @@
import telnetlib
import traceback
from loguru import logger
from .timeout import ls_timeout
@ -42,11 +44,10 @@ def create_liquidsoap_annotation(media):
class TelnetLiquidsoap:
def __init__(self, telnet_lock, logger, ls_host, ls_port, queues):
def __init__(self, telnet_lock, ls_host, ls_port, queues):
self.telnet_lock = telnet_lock
self.ls_host = ls_host
self.ls_port = ls_port
self.logger = logger
self.queues = queues
self.current_prebuffering_stream_id = None
@ -72,11 +73,11 @@ class TelnetLiquidsoap:
for i in self.queues:
msg = "queues.%s_skip\n" % i
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
logger.debug(tn.read_all().decode("utf-8"))
except Exception:
raise
finally:
@ -89,11 +90,11 @@ class TelnetLiquidsoap:
tn = self.__connect()
msg = "queues.%s_skip\n" % queue_id
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
logger.debug(tn.read_all().decode("utf-8"))
except Exception:
raise
finally:
@ -110,16 +111,16 @@ class TelnetLiquidsoap:
tn = self.__connect()
annotation = create_liquidsoap_annotation(media_item)
msg = "%s.push %s\n" % (queue_id, annotation)
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
show_name = media_item["show_name"]
msg = "vars.show_name %s\n" % show_name
tn.write(msg.encode("utf-8"))
self.logger.debug(msg)
logger.debug(msg)
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
logger.debug(tn.read_all().decode("utf-8"))
except Exception:
raise
finally:
@ -133,19 +134,19 @@ class TelnetLiquidsoap:
# dynamic_source.stop http://87.230.101.24:80/top100station.mp3
msg = "http.stop\n"
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
msg = "dynamic_source.id -1\n"
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
logger.debug(tn.read_all().decode("utf-8"))
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
logger.error(str(e))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ -157,15 +158,15 @@ class TelnetLiquidsoap:
# dynamic_source.stop http://87.230.101.24:80/top100station.mp3
msg = "dynamic_source.output_stop\n"
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
logger.debug(tn.read_all().decode("utf-8"))
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
logger.error(str(e))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ -180,16 +181,16 @@ class TelnetLiquidsoap:
tn.write(msg.encode("utf-8"))
msg = "dynamic_source.output_start\n"
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
logger.debug(tn.read_all().decode("utf-8"))
self.current_prebuffering_stream_id = None
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
logger.error(str(e))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ -200,20 +201,20 @@ class TelnetLiquidsoap:
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
msg = "dynamic_source.id %s\n" % media_item["row_id"]
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
msg = "http.restart %s\n" % media_item["uri"]
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode("utf-8"))
self.logger.debug(tn.read_all().decode("utf-8"))
logger.debug(tn.read_all().decode("utf-8"))
self.current_prebuffering_stream_id = media_item["row_id"]
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
logger.error(str(e))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ -224,23 +225,23 @@ class TelnetLiquidsoap:
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
msg = "dynamic_source.get_id\n"
self.logger.debug(msg)
logger.debug(msg)
tn.write(msg.encode("utf-8"))
tn.write("exit\n".encode("utf-8"))
stream_id = tn.read_all().decode("utf-8").splitlines()[0]
self.logger.debug("stream_id: %s" % stream_id)
logger.debug("stream_id: %s" % stream_id)
return stream_id
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
logger.error(str(e))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ls_timeout
def disconnect_source(self, sourcename):
self.logger.debug("Disconnecting source: %s", sourcename)
logger.debug("Disconnecting source: %s", sourcename)
command = ""
if sourcename == "master_dj":
command += "master_harbor.stop\n"
@ -250,12 +251,12 @@ class TelnetLiquidsoap:
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
self.logger.info(command)
logger.info(command)
tn.write(command.encode("utf-8"))
tn.write("exit\n".encode("utf-8"))
tn.read_all().decode("utf-8")
except Exception as e:
self.logger.error(traceback.format_exc())
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ -266,7 +267,7 @@ class TelnetLiquidsoap:
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
for i in commands:
self.logger.info(i)
logger.info(i)
if type(i) is str:
i = i.encode("utf-8")
tn.write(i)
@ -274,13 +275,13 @@ class TelnetLiquidsoap:
tn.write("exit\n".encode("utf-8"))
tn.read_all().decode("utf-8")
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
logger.error(str(e))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
def switch_source(self, sourcename, status):
self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
logger.debug('Switching source: %s to "%s" status', sourcename, status)
command = "streams."
if sourcename == "master_dj":
command += "master_dj_"
@ -298,10 +299,9 @@ class TelnetLiquidsoap:
class DummyTelnetLiquidsoap:
def __init__(self, telnet_lock, logger):
def __init__(self, telnet_lock):
self.telnet_lock = telnet_lock
self.liquidsoap_mock_queues = {}
self.logger = logger
for i in range(4):
self.liquidsoap_mock_queues["s" + str(i)] = []
@ -311,7 +311,7 @@ class DummyTelnetLiquidsoap:
try:
self.telnet_lock.acquire()
self.logger.info("Pushing %s to queue %s" % (media_item, queue_id))
logger.info("Pushing %s to queue %s" % (media_item, queue_id))
from datetime import datetime
print("Time now: {:s}".format(datetime.utcnow()))
@ -328,7 +328,7 @@ class DummyTelnetLiquidsoap:
try:
self.telnet_lock.acquire()
self.logger.info("Purging queue %s" % queue_id)
logger.info("Purging queue %s" % queue_id)
from datetime import datetime
print("Time now: {:s}".format(datetime.utcnow()))

View File

@ -1,16 +1,17 @@
import logging
import signal
import sys
from datetime import datetime, timedelta
from queue import Queue
from threading import Lock
from libretime_shared.logging import TRACE, setup_logger
from loguru import logger
from .pypoliqqueue import PypoLiqQueue
from .telnetliquidsoap import DummyTelnetLiquidsoap, TelnetLiquidsoap
def keyboardInterruptHandler(signum, frame):
logger = logging.getLogger()
logger.info("\nKeyboard Interrupt\n")
sys.exit(0)
@ -18,9 +19,7 @@ def keyboardInterruptHandler(signum, frame):
signal.signal(signal.SIGINT, keyboardInterruptHandler)
# configure logging
format = "%(levelname)s - %(pathname)s - %(lineno)s - %(asctime)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=format)
logging.captureWarnings(True)
setup_logger(TRACE)
telnet_lock = Lock()
pypoPush_q = Queue()
@ -34,12 +33,15 @@ liq_queue_tracker = {
"s3": None,
}
# dummy_telnet_liquidsoap = DummyTelnetLiquidsoap(telnet_lock, logging)
dummy_telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, logging, "localhost", 1234)
plq = PypoLiqQueue(
pypoLiq_q, telnet_lock, logging, liq_queue_tracker, dummy_telnet_liquidsoap
# dummy_telnet_liquidsoap = DummyTelnetLiquidsoap(telnet_lock)
dummy_telnet_liquidsoap = TelnetLiquidsoap(
telnet_lock,
"localhost",
1234,
liq_queue_tracker,
)
plq = PypoLiqQueue(pypoLiq_q, dummy_telnet_liquidsoap)
plq.daemon = True
plq.start()

View File

@ -27,8 +27,8 @@ setup(
package_data={"": ["**/*.liq", "*.cfg", "*.types"]},
entry_points={
"console_scripts": [
"libretime-playout=libretime_playout.main:run",
"libretime-liquidsoap=libretime_liquidsoap.main:run",
"libretime-playout=libretime_playout.main:cli",
"libretime-liquidsoap=libretime_liquidsoap.main:cli",
"libretime-playout-notify=libretime_playout.notify.main:run",
]
},
@ -43,5 +43,11 @@ setup(
"pytz",
"requests",
],
extras_require={
"dev": [
f"libretime-shared @ file://localhost/{here.parent / 'shared'}#egg=libretime_shared",
f"libretime-api-client @ file://localhost/{here.parent / 'api_client'}#egg=libretime_api_client",
],
},
zip_safe=False,
)