refactor(analyzer): fix linting

This commit is contained in:
jo 2022-07-25 01:35:14 +02:00 committed by Kyle Robbertze
parent 0babe0f243
commit 7bd369f20c
3 changed files with 25 additions and 34 deletions

View File

@ -41,6 +41,6 @@ def cli(
# Start listening for RabbitMQ messages telling us about newly # Start listening for RabbitMQ messages telling us about newly
# uploaded files. This blocks until we receive a shutdown signal. # uploaded files. This blocks until we receive a shutdown signal.
_msg_listener = MessageListener(config.rabbitmq) MessageListener(config.rabbitmq)
StatusReporter.stop_thread() StatusReporter.stop_thread()

View File

@ -1,7 +1,7 @@
import json import json
import queue
import signal import signal
import time import time
from queue import Queue
import pika import pika
from libretime_shared.config import RabbitMQConfig from libretime_shared.config import RabbitMQConfig
@ -39,11 +39,11 @@ class MessageListener:
break # Break out of the while loop and exit the application break # Break out of the while loop and exit the application
except OSError: except OSError:
pass pass
except pika.exceptions.AMQPError as e: except pika.exceptions.AMQPError as exception:
if self._shutdown: if self._shutdown:
break break
logger.error("Connection to message queue failed. ") logger.error("Connection to message queue failed. ")
logger.error(e) logger.error(exception)
logger.info("Retrying in 5 seconds...") logger.info("Retrying in 5 seconds...")
time.sleep(5) time.sleep(5)
@ -134,7 +134,7 @@ class MessageListener:
callback_url, api_key, audio_metadata callback_url, api_key, audio_metadata
) )
except KeyError as e: except KeyError:
logger.exception("A mandatory field was missing from the message.") logger.exception("A mandatory field was missing from the message.")
channel.basic_nack( channel.basic_nack(
delivery_tag=method_frame.delivery_tag, delivery_tag=method_frame.delivery_tag,
@ -142,8 +142,8 @@ class MessageListener:
requeue=False, requeue=False,
) )
except Exception as e: except Exception as exception:
logger.exception(e) logger.exception(exception)
channel.basic_nack( channel.basic_nack(
delivery_tag=method_frame.delivery_tag, delivery_tag=method_frame.delivery_tag,
multiple=False, multiple=False,
@ -171,23 +171,23 @@ class MessageListener:
): ):
metadata = {} metadata = {}
q = queue.Queue() queue = Queue()
try: try:
Pipeline.run_analysis( Pipeline.run_analysis(
q, queue,
audio_file_path, audio_file_path,
import_directory, import_directory,
original_filename, original_filename,
storage_backend, storage_backend,
file_prefix, file_prefix,
) )
metadata = q.get() metadata = queue.get()
except Exception as e: except Exception as exception:
logger.error("Analyzer pipeline exception: %s" % str(e)) logger.error("Analyzer pipeline exception: %s" % str(exception))
metadata["import_status"] = PipelineStatus.failed metadata["import_status"] = PipelineStatus.failed
# Ensure our queue doesn't fill up and block due to unexpected behaviour. Defensive code. # Ensure our queue doesn't fill up and block due to unexpected behavior. Defensive code.
while not q.empty(): while not queue.empty():
q.get() queue.get()
return metadata return metadata

View File

@ -1,5 +1,3 @@
""" Analyzes and imports an audio file into the Airtime library.
"""
from enum import Enum from enum import Enum
from queue import Queue from queue import Queue
from typing import Any, Dict from typing import Any, Dict
@ -31,9 +29,7 @@ class Pipeline:
This currently performs metadata extraction (eg. gets the ID3 tags from an MP3), This currently performs metadata extraction (eg. gets the ID3 tags from an MP3),
then moves the file to the Airtime music library (stor/imported), and returns then moves the file to the Airtime music library (stor/imported), and returns
the results back to the parent process. This class is used in an isolated process the results back to the parent process.
so that if it crashes, it does not kill the entire airtime_analyzer daemon and
the failure to import can be reported back to the web application.
""" """
@staticmethod @staticmethod
@ -90,7 +86,7 @@ class Pipeline:
# Analyze the audio file we were told to analyze: # Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata: # First, we extract the ID3 tags and other metadata:
metadata = dict() metadata = {}
metadata["file_prefix"] = file_prefix metadata["file_prefix"] = file_prefix
metadata = analyze_metadata(audio_file_path, metadata) metadata = analyze_metadata(audio_file_path, metadata)
@ -102,20 +98,15 @@ class Pipeline:
audio_file_path, import_directory, original_filename, metadata audio_file_path, import_directory, original_filename, metadata
) )
metadata["import_status"] = 0 # Successfully imported metadata["import_status"] = PipelineStatus.succeed
# Note that the queue we're putting the results into is our interprocess communication # Pass all the file metadata back to the main analyzer process
# back to the main process.
# Pass all the file metadata back to the main analyzer process, which then passes
# it back to the Airtime web application.
queue.put(metadata) queue.put(metadata)
except UnplayableFileError as e: except UnplayableFileError as exception:
logger.exception(e) logger.exception(exception)
metadata["import_status"] = PipelineStatus.failed metadata["import_status"] = PipelineStatus.failed
metadata["reason"] = "The file could not be played." metadata["reason"] = "The file could not be played."
raise e raise exception
except Exception as e: except Exception as exception:
# Ensures the traceback for this child process gets written to our log files: logger.exception(exception)
logger.exception(e) raise exception
raise e