From 0babe0f2431c38d9e89cbf99e47e2349c5b95ad8 Mon Sep 17 00:00:00 2001 From: jo Date: Mon, 25 Jul 2022 01:34:21 +0200 Subject: [PATCH] chore(analyzer): clean old comments --- .../libretime_analyzer/message_listener.py | 102 ++---------------- .../libretime_analyzer/status_reporter.py | 75 ++++--------- 2 files changed, 32 insertions(+), 145 deletions(-) diff --git a/analyzer/libretime_analyzer/message_listener.py b/analyzer/libretime_analyzer/message_listener.py index cbe05b83e..cef9fed2e 100644 --- a/analyzer/libretime_analyzer/message_listener.py +++ b/analyzer/libretime_analyzer/message_listener.py @@ -16,45 +16,6 @@ ROUTING_KEY = "" QUEUE = "airtime-uploads" -""" A message listener class that waits for messages from Airtime through RabbitMQ - notifying us about new uploads. - - This is probably the most important class in this application. It connects - to RabbitMQ (or an AMQP server) and listens for messages that notify us - when a user uploads a new file to Airtime, either through the web interface - or via FTP (on Airtime Pro). When we get a notification, we spawn a child - process that extracts the uploaded audio file's metadata and moves it into - Airtime's music library directory. Lastly, the extracted metadata is - reported back to the Airtime web application. - - There's a couple of Very Important technical details and constraints that you - need to know if you're going to work on this code: - - 1) airtime_analyzer is designed so it doesn't have to run on the same - computer as the web server. It just needs access to your Airtime library - folder (stor). - 2) airtime_analyzer is multi-tenant - One process can be used for many - Airtime instances. It's designed NOT to know about whether it's running - in a single tenant or multi-tenant environment. All the information it - needs to import a file into an Airtime instance is passed in via those - RabbitMQ messages. - 3) We're using a "topic exchange" for the new upload notification RabbitMQ - messages. This means if we run several airtime_analyzer processes on - different computers, RabbitMQ will do round-robin dispatching of the - file notification. This is cheap, easy load balancing and - redundancy for us. You can even run multiple airtime_analyzer processes - on one machine if you want. - 4) We run the actual work (metadata analysis and file moving) in a separate - child process so that if it crashes, we can stop RabbitMQ from resending - the file notification message to another airtime_analyzer process (NACK), - which would otherwise cause cascading failure. We also do this so that we - can report the problem file to the Airtime web interface ("import failed"). - - So that is a quick overview of the design constraints for this application, and - why airtime_analyzer is written this way. -""" - - class MessageListener: def __init__(self, config: RabbitMQConfig): """ @@ -106,7 +67,7 @@ class MessageListener: self._channel.exchange_declare( exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE, durable=True ) - result = self._channel.queue_declare(queue=QUEUE, durable=True) + self._channel.queue_declare(queue=QUEUE, durable=True) self._channel.queue_bind( exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY @@ -121,9 +82,6 @@ class MessageListener: def disconnect_from_messaging_server(self): """Stop consuming RabbitMQ messages and disconnect""" - # If you try to close a connection that's already closed, you're going to have a bad time. - # We're breaking EAFP because this can be called multiple times depending on exception - # handling flow here. if not self._channel.is_closed: self._channel.stop_consuming() if not self._connection.is_closed: @@ -142,7 +100,6 @@ class MessageListener: """ logger.info(f" - Received '{body}' on routing_key '{method_frame.routing_key}'") - # Declare all variables here so they exist in the exception handlers below, no matter what. audio_file_path = "" # final_file_path = "" import_directory = "" @@ -151,12 +108,6 @@ class MessageListener: api_key = "" file_prefix = "" - """ Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue - to pass objects between the processes so that if the analyzer process crashes, it does not - take down the rest of the daemon and we NACK that message so that it doesn't get - propagated to other airtime_analyzer daemons (eg. running on other servers). - We avoid cascading failure this way. - """ try: try: body = body.decode() @@ -184,37 +135,22 @@ class MessageListener: ) except KeyError as e: - # A field in msg_dict that we needed was missing (eg. audio_file_path) - logger.exception( - "A mandatory airtime_analyzer message field was missing from the message." - ) - # See the huge comment about NACK below. + logger.exception("A mandatory field was missing from the message.") channel.basic_nack( - delivery_tag=method_frame.delivery_tag, multiple=False, requeue=False - ) # Important that it doesn't requeue the message + delivery_tag=method_frame.delivery_tag, + multiple=False, + requeue=False, + ) except Exception as e: logger.exception(e) - """ If ANY exception happens while processing a file, we're going to NACK to the - messaging server and tell it to remove the message from the queue. - (NACK is a negative acknowledgement. We could use ACK instead, but this might come - in handy in the future.) - Exceptions in this context are unexpected, unhandled errors. We try to recover - from as many errors as possible in AnalyzerPipeline, but we're safeguarding ourselves - here from any catastrophic or genuinely unexpected errors: - """ channel.basic_nack( - delivery_tag=method_frame.delivery_tag, multiple=False, requeue=False - ) # Important that it doesn't requeue the message + delivery_tag=method_frame.delivery_tag, + multiple=False, + requeue=False, + ) - # - # TODO: If the JSON was invalid or the web server is down, - # then don't report that failure to the REST API - # TODO: Catch exceptions from this HTTP request too: - if ( - callback_url - ): # If we got an invalid message, there might be no callback_url in the JSON - # Report this as a failed upload to the File Upload REST API. + if callback_url: StatusReporter.report_failure_to_callback_url( callback_url, api_key, @@ -223,8 +159,6 @@ class MessageListener: ) else: - # ACK at the very end, after the message has been successfully processed. - # If we don't ack, then RabbitMQ will redeliver the message in the future. channel.basic_ack(delivery_tag=method_frame.delivery_tag) @staticmethod @@ -235,20 +169,6 @@ class MessageListener: storage_backend, file_prefix, ): - """Spawn a child process to analyze and import a new audio file.""" - """ - q = multiprocessing.Queue() - p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, - args=(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix)) - p.start() - p.join() - if p.exitcode == 0: - results = q.get() - logger.info("Main process received results from child: ") - logger.info(results) - else: - raise Exception("Analyzer process terminated unexpectedly.") - """ metadata = {} q = queue.Queue() diff --git a/analyzer/libretime_analyzer/status_reporter.py b/analyzer/libretime_analyzer/status_reporter.py index 034fb2351..e1fc281e3 100644 --- a/analyzer/libretime_analyzer/status_reporter.py +++ b/analyzer/libretime_analyzer/status_reporter.py @@ -163,7 +163,6 @@ def is_web_server_broken(url): else: # The request worked fine, so the web server and Airtime are still up. return False - return False class StatusReporter: @@ -173,12 +172,7 @@ class StatusReporter: _HTTP_REQUEST_TIMEOUT = 30 - """ We use multiprocessing.Process again here because we need a thread for this stuff - anyways, and Python gives us process isolation for free (crash safety). - """ _ipc_queue = queue.Queue() - # _http_thread = multiprocessing.Process(target=process_http_requests, - # args=(_ipc_queue,)) _http_thread = None @classmethod @@ -192,8 +186,7 @@ class StatusReporter: @classmethod def stop_thread(self): logger.info("Terminating status_reporter process") - # StatusReporter._http_thread.terminate() # Triggers SIGTERM on the child process - StatusReporter._ipc_queue.put("shutdown") # Special trigger + StatusReporter._ipc_queue.put("shutdown") StatusReporter._http_thread.join() @classmethod @@ -201,50 +194,32 @@ class StatusReporter: StatusReporter._ipc_queue.put(request) @classmethod - def report_success_to_callback_url(self, callback_url, api_key, audio_metadata): + def report_success_to_callback_url( + self, + callback_url, + api_key, + audio_metadata, + ): """Report the extracted metadata and status of the successfully imported file to the callback URL (which should be the Airtime File Upload API) """ put_payload = json.dumps(audio_metadata) - # r = requests.Request(method='PUT', url=callback_url, data=put_payload, - # auth=requests.auth.HTTPBasicAuth(api_key, '')) - """ - r = requests.Request(method='PUT', url=callback_url, data=put_payload, - auth=requests.auth.HTTPBasicAuth(api_key, '')) - - StatusReporter._send_http_request(r) - """ - StatusReporter._send_http_request( PicklableHttpRequest( - method="PUT", url=callback_url, data=put_payload, api_key=api_key + method="PUT", + url=callback_url, + data=put_payload, + api_key=api_key, ) ) - """ - try: - r.raise_for_status() # Raise an exception if there was an http error code returned - except requests.exceptions.RequestException: - StatusReporter._ipc_queue.put(r.prepare()) - """ - - """ - # Encode the audio metadata as json and post it back to the callback_url - put_payload = json.dumps(audio_metadata) - logger.debug("sending http put with payload: " + put_payload) - r = requests.put(callback_url, data=put_payload, - auth=requests.auth.HTTPBasicAuth(api_key, ''), - timeout=StatusReporter._HTTP_REQUEST_TIMEOUT) - logger.debug("HTTP request returned status: " + str(r.status_code)) - logger.debug(r.text) # log the response body - - #TODO: queue up failed requests and try them again later. - r.raise_for_status() # Raise an exception if there was an http error code returned - """ - @classmethod def report_failure_to_callback_url( - self, callback_url, api_key, import_status, reason + self, + callback_url, + api_key, + import_status, + reason, ): if not isinstance(import_status, int): raise TypeError( @@ -258,20 +233,12 @@ class StatusReporter: audio_metadata["comment"] = reason # hack attack put_payload = json.dumps(audio_metadata) # logger.debug("sending http put with payload: " + put_payload) - """ - r = requests.put(callback_url, data=put_payload, - auth=requests.auth.HTTPBasicAuth(api_key, ''), - timeout=StatusReporter._HTTP_REQUEST_TIMEOUT) - """ + StatusReporter._send_http_request( PicklableHttpRequest( - method="PUT", url=callback_url, data=put_payload, api_key=api_key + method="PUT", + url=callback_url, + data=put_payload, + api_key=api_key, ) ) - """ - logger.debug("HTTP request returned status: " + str(r.status_code)) - logger.debug(r.text) # log the response body - - #TODO: queue up failed requests and try them again later. - r.raise_for_status() # raise an exception if there was an http error code returned - """