refactor(playout): improve exceptions handling (#2027)

This commit is contained in:
Jonas L 2022-08-09 21:05:21 +02:00 committed by GitHub
parent 1b93b7645e
commit 9413bd5a29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 123 additions and 169 deletions

View File

@ -1,7 +1,6 @@
import os
import sys
import time
import traceback
from pathlib import Path
from typing import Optional
@ -54,10 +53,8 @@ def generate_entrypoint(log_filepath: Optional[Path]):
ss = legacy_client.get_stream_setting()
generate_liquidsoap_config(ss, log_filepath)
successful = True
except Exception as e:
print("Unable to connect to the Airtime server.")
logger.error(str(e))
logger.error("traceback: %s", traceback.format_exc())
except Exception:
logger.exception("Unable to connect to the Airtime server")
if attempts == max_attempts:
logger.error("giving up and exiting...")
sys.exit(1)

View File

@ -143,8 +143,8 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
try:
legacy_client.register_component("pypo")
success = True
except Exception as e:
logger.error(str(e))
except Exception as exception:
logger.exception(exception)
time.sleep(10)
telnet_lock = Lock()

View File

@ -1,6 +1,5 @@
import json
import time
import traceback
from threading import Thread
# For RabbitMQ
@ -49,8 +48,8 @@ class PypoMessageHandler(Thread):
) as connection:
rabbit = RabbitConsumer(connection, [schedule_queue], self)
rabbit.run()
except Exception as e:
logger.error(e)
except Exception as exception:
logger.exception(exception)
# Handle a message from RabbitMQ, put it into our yucky global var.
# Hopefully there is a better way to do this.
@ -97,15 +96,14 @@ class PypoMessageHandler(Thread):
self.recorder_queue.put(message)
else:
logger.info("Unknown command: %s" % command)
except Exception as e:
logger.error("Exception in handling RabbitMQ message: %s", e)
except Exception as exception:
logger.exception(exception)
def main(self):
try:
self.init_rabbit_mq()
except Exception as e:
logger.error("Exception: %s", e)
logger.error("traceback: %s", traceback.format_exc())
except Exception as exception:
logger.exception(exception)
logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)

View File

@ -120,8 +120,8 @@ class PypoFetch(Thread):
if self.listener_timeout < 0:
self.listener_timeout = 0
logger.info("New timeout: %s" % self.listener_timeout)
except Exception as e:
logger.exception("Exception in handling Message Handler message")
except Exception as exception:
logger.exception(exception)
def switch_source_temp(self, sourcename, status):
logger.debug('Switching source: %s to "%s" status', sourcename, status)
@ -146,8 +146,8 @@ class PypoFetch(Thread):
logger.debug("Getting information needed on bootstrap from Airtime")
try:
info = self.legacy_client.get_bootstrap_info()
except Exception as e:
logger.exception("Unable to get bootstrap info.. Exiting pypo...")
except Exception as exception:
logger.exception(f"Unable to get bootstrap info: {exception}")
logger.debug("info:%s", info)
commands = []
@ -170,11 +170,11 @@ class PypoFetch(Thread):
def restart_liquidsoap(self):
try:
"""do not block - if we receive the lock then good - no other thread
will try communicating with Liquidsoap. If we don't receive, it may
mean some thread blocked and is still holding the lock. Restarting
Liquidsoap will cause that thread to release the lock as an Exception
will be thrown."""
# do not block - if we receive the lock then good - no other thread
# will try communicating with Liquidsoap. If we don't receive, it may
# mean some thread blocked and is still holding the lock. Restarting
# Liquidsoap will cause that thread to release the lock as an Exception
# will be thrown.
self.telnet_lock.acquire(False)
logger.info("Restarting Liquidsoap")
@ -194,12 +194,12 @@ class PypoFetch(Thread):
tn.read_all()
logger.info("Liquidsoap is up and running")
break
except Exception as e:
except Exception:
# sleep 0.5 seconds and try again
time.sleep(0.5)
except Exception as e:
logger.exception(e)
except Exception as exception:
logger.exception(exception)
finally:
if self.telnet_lock.locked():
self.telnet_lock.release()
@ -240,8 +240,8 @@ class PypoFetch(Thread):
tn.write(b"exit\n")
output = tn.read_all()
except Exception as e:
logger.exception(e)
except Exception as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -254,8 +254,8 @@ class PypoFetch(Thread):
logger.info(streams)
fake_time = current_time + 1
for s in streams:
info = s.split(":")
for stream in streams:
info = stream.split(":")
stream_id = info[0]
status = info[1]
if status == "true":
@ -278,8 +278,8 @@ class PypoFetch(Thread):
tn.write(command)
tn.write(b"exit\n")
tn.read_all()
except Exception as e:
logger.exception(e)
except Exception as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -298,8 +298,8 @@ class PypoFetch(Thread):
tn.write(command)
tn.write(b"exit\n")
tn.read_all()
except Exception as e:
logger.exception(e)
except Exception as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -319,12 +319,12 @@ class PypoFetch(Thread):
tn.write(command)
tn.write(b"exit\n")
tn.read_all()
except Exception as e:
logger.exception(e)
except Exception as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
except Exception as e:
logger.exception(e)
except Exception as exception:
logger.exception(exception)
# Process the schedule
# - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for")
@ -346,7 +346,7 @@ class PypoFetch(Thread):
download_dir = self.cache_dir
try:
os.makedirs(download_dir)
except Exception as e:
except Exception:
pass
media_copy = {}
@ -368,8 +368,8 @@ class PypoFetch(Thread):
media_copy[key] = media_item
self.media_prepare_queue.put(copy.copy(media_filtered))
except Exception as e:
logger.exception(e)
except Exception as exception:
logger.exception(exception)
# Send the data to pypo-push
logger.debug("Pushing to pypo-push")
@ -378,8 +378,8 @@ class PypoFetch(Thread):
# cleanup
try:
self.cache_cleanup(media)
except Exception as e:
logger.exception(e)
except Exception as exception:
logger.exception(exception)
# do basic validation of file parameters. Useful for debugging
# purposes
@ -445,8 +445,8 @@ class PypoFetch(Thread):
logger.info("File '%s' removed" % path)
else:
logger.info("File '%s' not removed. Still busy!" % path)
except Exception as e:
logger.exception("Problem removing file '%s'" % f)
except Exception as exception:
logger.exception(f"Problem removing file '{f}': {exception}")
def manual_schedule_fetch(self):
try:
@ -454,9 +454,8 @@ class PypoFetch(Thread):
logger.debug(f"Received event from API client: {self.schedule_data}")
self.process_schedule(self.schedule_data)
return True
except Exception as e:
logger.error("Unable to fetch schedule")
logger.exception(e)
except Exception as exception:
logger.exception(f"Unable to fetch schedule: {exception}")
return False
def persistent_manual_schedule_fetch(self, max_attempts=1):
@ -517,17 +516,17 @@ class PypoFetch(Thread):
)
manual_fetch_needed = False
self.handle_message(message)
except Empty as e:
except Empty:
logger.info("Queue timeout. Fetching schedule manually")
manual_fetch_needed = True
except Exception as e:
logger.exception(e)
except Exception as exception:
logger.exception(exception)
try:
if manual_fetch_needed:
self.persistent_manual_schedule_fetch(max_attempts=5)
except Exception as e:
logger.exception("Failed to manually fetch the schedule.")
except Exception as exception:
logger.exception(f"Failed to manually fetch the schedule: {exception}")
loops += 1

View File

@ -2,7 +2,6 @@ import hashlib
import os
import stat
import time
import traceback
from queue import Empty
from threading import Thread
@ -30,7 +29,7 @@ class PypoFile(Thread):
dst_size = os.path.getsize(dst)
if dst_size == 0:
dst_exists = False
except Exception as e:
except Exception:
dst_exists = False
do_copy = False
@ -72,9 +71,8 @@ class PypoFile(Thread):
media_item["filesize"] = file_size
media_item["file_ready"] = True
except Exception as e:
logger.error(f"could not copy file {file_id} to {dst}")
logger.error(e)
except Exception as exception:
logger.exception(f"could not copy file {file_id} to {dst}: {exception}")
def report_file_size_and_md5_to_api(self, file_path, file_id):
try:
@ -88,27 +86,23 @@ class PypoFile(Thread):
break
m.update(data)
md5_hash = m.hexdigest()
except OSError as e:
except OSError as exception:
file_size = 0
logger.error(
"Error getting file size and md5 hash for file id %s" % file_id
logger.exception(
f"Error getting file size and md5 hash for file id {file_id}: {exception}"
)
logger.error(e)
# Make PUT request to LibreTime to update the file size and hash
error_msg = (
"Could not update media file %s with file size and md5 hash:" % file_id
)
error_msg = f"Could not update media file {file_id} with file size and md5 hash"
try:
self.api_client.update_file(
file_id,
json={"filesize": file_size, "md5": md5_hash},
)
except (ConnectionError, Timeout):
logger.error(error_msg)
except Exception as e:
logger.error(error_msg)
logger.error(e)
logger.exception(error_msg)
except Exception as exception:
logger.exception(f"{error_msg}: {exception}")
return file_size
@ -154,19 +148,15 @@ class PypoFile(Thread):
# or we don't), get back to work on preparing getting files.
try:
self.media = self.media_queue.get_nowait()
except Empty as e:
except Empty:
pass
media_item = self.get_highest_priority_media_item(self.media)
if media_item is not None:
self.copy_file(media_item)
except Exception as e:
import traceback
top = traceback.format_exc()
logger.error(str(e))
logger.error(top)
raise
except Exception as exception:
logger.exception(exception)
raise exception
def run(self):
"""
@ -174,8 +164,8 @@ class PypoFile(Thread):
"""
try:
self.main()
except Exception as e:
top = traceback.format_exc()
logger.error("PypoFile Exception: %s", top)
except Exception as exception:
logger.exception(exception)
time.sleep(5)
logger.info("PypoFile thread exiting")

View File

@ -64,9 +64,9 @@ class PypoLiquidsoap:
try:
self.telnet_liquidsoap.queue_push(available_queue, media_item)
self.liq_queue_tracker[available_queue] = media_item
except Exception as e:
logger.error(e)
raise
except Exception as exception:
logger.exception(exception)
raise exception
else:
logger.warning(
"File %s did not become ready in less than 5 seconds. Skipping...",
@ -87,13 +87,12 @@ class PypoLiquidsoap:
def find_available_queue(self):
available_queue = None
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi == None or self.is_media_item_finished(mi):
for queue_id, item in self.liq_queue_tracker.items():
if item is None or self.is_media_item_finished(item):
# queue "i" is available. Push to this queue
available_queue = i
available_queue = queue_id
if available_queue == None:
if available_queue is None:
raise NoQueueAvailableException()
return available_queue
@ -192,8 +191,8 @@ class PypoLiquidsoap:
# something is playing and it shouldn't be.
self.telnet_liquidsoap.stop_web_stream_buffer()
self.telnet_liquidsoap.stop_web_stream_output()
except KeyError as e:
logger.error("Error: Malformed event in schedule. " + str(e))
except KeyError as exception:
logger.exception(f"Malformed event in schedule: {exception}")
def stop(self, queue):
self.telnet_liquidsoap.queue_remove(queue)

View File

@ -1,5 +1,4 @@
import telnetlib
import traceback
from loguru import logger
@ -78,8 +77,6 @@ class TelnetLiquidsoap:
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception:
raise
finally:
self.telnet_lock.release()
@ -95,8 +92,6 @@ class TelnetLiquidsoap:
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception:
raise
finally:
self.telnet_lock.release()
@ -121,8 +116,6 @@ class TelnetLiquidsoap:
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception:
raise
finally:
self.telnet_lock.release()
@ -145,8 +138,7 @@ class TelnetLiquidsoap:
logger.debug(connection.read_all().decode("utf-8"))
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -165,8 +157,7 @@ class TelnetLiquidsoap:
logger.debug(connection.read_all().decode("utf-8"))
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -189,8 +180,7 @@ class TelnetLiquidsoap:
self.current_prebuffering_stream_id = None
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -213,8 +203,7 @@ class TelnetLiquidsoap:
self.current_prebuffering_stream_id = media_item["row_id"]
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -234,8 +223,7 @@ class TelnetLiquidsoap:
return stream_id
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -255,8 +243,8 @@ class TelnetLiquidsoap:
connection.write(command.encode("utf-8"))
connection.write(b"exit\n")
connection.read_all().decode("utf-8")
except Exception:
logger.error(traceback.format_exc())
except Exception as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -275,8 +263,7 @@ class TelnetLiquidsoap:
connection.write(b"exit\n")
connection.read_all().decode("utf-8")
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -318,8 +305,6 @@ class DummyTelnetLiquidsoap:
annotation = create_liquidsoap_annotation(media_item)
self.liquidsoap_mock_queues[queue_id].append(annotation)
except Exception:
raise
finally:
self.telnet_lock.release()
@ -332,9 +317,6 @@ class DummyTelnetLiquidsoap:
from datetime import datetime
print(f"Time now: {datetime.utcnow():s}")
except Exception:
raise
finally:
self.telnet_lock.release()

View File

@ -1,7 +1,6 @@
import math
import telnetlib
import time
import traceback
from datetime import datetime
from queue import Queue
from threading import Thread
@ -49,9 +48,9 @@ class PypoPush(Thread):
while True:
try:
media_schedule = self.queue.get(block=True)
except Exception as e:
logger.error(str(e))
raise
except Exception as exception:
logger.exception(exception)
raise exception
else:
logger.debug(media_schedule)
# separate media_schedule list into currently_playing and
@ -119,8 +118,8 @@ class PypoPush(Thread):
tn.write("exit\n")
logger.debug(tn.read_all())
except Exception as e:
logger.error(str(e))
except Exception as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ -128,8 +127,6 @@ class PypoPush(Thread):
while True:
try:
self.main()
except Exception as e:
top = traceback.format_exc()
logger.error("Pypo Push Exception: %s", top)
except Exception as exception:
logger.exception(exception)
time.sleep(5)
logger.info("PypoPush thread exiting")

View File

@ -1,6 +1,5 @@
import signal
import sys
import traceback
from collections import deque
from datetime import datetime
from queue import Empty
@ -42,7 +41,7 @@ class PypoLiqQueue(Thread):
media_schedule = self.queue.get(
block=True, timeout=time_until_next_play
)
except Empty as e:
except Empty:
# Time to push a scheduled item.
media_item = schedule_deque.popleft()
self.pypo_liquidsoap.play(media_item)
@ -75,5 +74,5 @@ class PypoLiqQueue(Thread):
def run(self):
try:
self.main()
except Exception as e:
logger.error("PypoLiqQueue Exception: %s", traceback.format_exc())
except Exception as exception:
logger.exception(exception)

View File

@ -5,7 +5,6 @@ import os
import re
import signal
import time
import traceback
from datetime import timezone
from subprocess import PIPE, Popen
from threading import Thread
@ -145,10 +144,8 @@ class ShowRecorder(Thread):
recorded_file["tracknumber"] = self.show_instance
recorded_file.save()
except Exception as e:
top = traceback.format_exc()
logger.error("Exception: %s", e)
logger.error("traceback: %s", top)
except Exception as exception:
logger.exception(exception)
def run(self):
code, filepath = self.record_show()
@ -161,8 +158,8 @@ class ShowRecorder(Thread):
self.upload_file(filepath)
os.remove(filepath)
except Exception as e:
logger.error(e)
except Exception as exception:
logger.exception(exception)
else:
logger.info("problem recording show")
os.remove(filepath)
@ -185,8 +182,8 @@ class Recorder(Thread):
try:
self.legacy_client.register_component("show-recorder")
success = True
except Exception as e:
logger.error(str(e))
except Exception as exception:
logger.exception(exception)
time.sleep(10)
def handle_message(self):
@ -313,10 +310,8 @@ class Recorder(Thread):
# remove show from shows to record.
del self.shows_to_record[start_time]
# self.time_till_next_show = self.get_time_till_next_show()
except Exception as e:
top = traceback.format_exc()
logger.error("Exception: %s", e)
logger.error("traceback: %s", top)
except Exception as exception:
logger.exception(exception)
def run(self):
"""
@ -333,9 +328,8 @@ class Recorder(Thread):
if temp is not None:
self.process_recorder_schedule(temp)
logger.info("Bootstrap recorder schedule received: %s", temp)
except Exception as e:
logger.error(traceback.format_exc())
logger.error(e)
except Exception as exception:
logger.exception(exception)
logger.info("Bootstrap complete: got initial copy of the schedule")
@ -351,17 +345,16 @@ class Recorder(Thread):
if temp is not None:
self.process_recorder_schedule(temp)
logger.info("updated recorder schedule received: %s", temp)
except Exception as e:
logger.error(traceback.format_exc())
logger.error(e)
except Exception as exception:
logger.exception(exception)
try:
self.handle_message()
except Exception as e:
logger.error(traceback.format_exc())
logger.error("Pypo Recorder Exception: %s", e)
except Exception as exception:
logger.exception(exception)
time.sleep(PUSH_INTERVAL)
self.loops += 1
except Exception as e:
top = traceback.format_exc()
logger.error("Exception: %s", e)
logger.error("traceback: %s", top)
except Exception as exception:
logger.exception(exception)

View File

@ -51,8 +51,8 @@ class ListenerStat(Thread):
headers=header,
)
f = urllib.request.urlopen(req, timeout=ListenerStat.HTTP_REQUEST_TIMEOUT)
document = f.read()
resp = urllib.request.urlopen(req, timeout=ListenerStat.HTTP_REQUEST_TIMEOUT)
document = resp.read()
return document
@ -68,12 +68,12 @@ class ListenerStat(Thread):
sources = dom.getElementsByTagName("source")
mount_stats = None
for s in sources:
for source in sources:
# drop the leading '/' character
mount_name = s.getAttribute("mount")[1:]
mount_name = source.getAttribute("mount")[1:]
if mount_name == ip["mount"]:
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
listeners = s.getElementsByTagName("listeners")
listeners = source.getElementsByTagName("listeners")
num_listeners = 0
if len(listeners):
num_listeners = self.get_node_text(listeners[0].childNodes)
@ -94,7 +94,7 @@ class ListenerStat(Thread):
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
num_listeners = 0
if len(current_listeners):
if current_listeners:
num_listeners = self.get_node_text(current_listeners[0].childNodes)
mount_stats = {
@ -123,11 +123,11 @@ class ListenerStat(Thread):
else:
stats.append(self.get_shoutcast_stats(v))
self.update_listener_stat_error(k, "OK")
except Exception as e:
except Exception as exception:
try:
self.update_listener_stat_error(k, str(e))
except Exception as e:
logger.error("Exception: %s", e)
self.update_listener_stat_error(k, str(exception))
except Exception as exception2:
logger.exception(exception2)
return stats
@ -150,7 +150,7 @@ class ListenerStat(Thread):
if stats:
self.push_stream_stats(stats)
except Exception as e:
logger.error("Exception: %s", e)
except Exception as exception:
logger.exception(exception)
time.sleep(120)