chore(analyzer): clean old comments
This commit is contained in:
parent
58230b7974
commit
0babe0f243
|
@ -16,45 +16,6 @@ ROUTING_KEY = ""
|
||||||
QUEUE = "airtime-uploads"
|
QUEUE = "airtime-uploads"
|
||||||
|
|
||||||
|
|
||||||
""" A message listener class that waits for messages from Airtime through RabbitMQ
|
|
||||||
notifying us about new uploads.
|
|
||||||
|
|
||||||
This is probably the most important class in this application. It connects
|
|
||||||
to RabbitMQ (or an AMQP server) and listens for messages that notify us
|
|
||||||
when a user uploads a new file to Airtime, either through the web interface
|
|
||||||
or via FTP (on Airtime Pro). When we get a notification, we spawn a child
|
|
||||||
process that extracts the uploaded audio file's metadata and moves it into
|
|
||||||
Airtime's music library directory. Lastly, the extracted metadata is
|
|
||||||
reported back to the Airtime web application.
|
|
||||||
|
|
||||||
There's a couple of Very Important technical details and constraints that you
|
|
||||||
need to know if you're going to work on this code:
|
|
||||||
|
|
||||||
1) airtime_analyzer is designed so it doesn't have to run on the same
|
|
||||||
computer as the web server. It just needs access to your Airtime library
|
|
||||||
folder (stor).
|
|
||||||
2) airtime_analyzer is multi-tenant - One process can be used for many
|
|
||||||
Airtime instances. It's designed NOT to know about whether it's running
|
|
||||||
in a single tenant or multi-tenant environment. All the information it
|
|
||||||
needs to import a file into an Airtime instance is passed in via those
|
|
||||||
RabbitMQ messages.
|
|
||||||
3) We're using a "topic exchange" for the new upload notification RabbitMQ
|
|
||||||
messages. This means if we run several airtime_analyzer processes on
|
|
||||||
different computers, RabbitMQ will do round-robin dispatching of the
|
|
||||||
file notification. This is cheap, easy load balancing and
|
|
||||||
redundancy for us. You can even run multiple airtime_analyzer processes
|
|
||||||
on one machine if you want.
|
|
||||||
4) We run the actual work (metadata analysis and file moving) in a separate
|
|
||||||
child process so that if it crashes, we can stop RabbitMQ from resending
|
|
||||||
the file notification message to another airtime_analyzer process (NACK),
|
|
||||||
which would otherwise cause cascading failure. We also do this so that we
|
|
||||||
can report the problem file to the Airtime web interface ("import failed").
|
|
||||||
|
|
||||||
So that is a quick overview of the design constraints for this application, and
|
|
||||||
why airtime_analyzer is written this way.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class MessageListener:
|
class MessageListener:
|
||||||
def __init__(self, config: RabbitMQConfig):
|
def __init__(self, config: RabbitMQConfig):
|
||||||
"""
|
"""
|
||||||
|
@ -106,7 +67,7 @@ class MessageListener:
|
||||||
self._channel.exchange_declare(
|
self._channel.exchange_declare(
|
||||||
exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE, durable=True
|
exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE, durable=True
|
||||||
)
|
)
|
||||||
result = self._channel.queue_declare(queue=QUEUE, durable=True)
|
self._channel.queue_declare(queue=QUEUE, durable=True)
|
||||||
|
|
||||||
self._channel.queue_bind(
|
self._channel.queue_bind(
|
||||||
exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY
|
exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY
|
||||||
|
@ -121,9 +82,6 @@ class MessageListener:
|
||||||
|
|
||||||
def disconnect_from_messaging_server(self):
|
def disconnect_from_messaging_server(self):
|
||||||
"""Stop consuming RabbitMQ messages and disconnect"""
|
"""Stop consuming RabbitMQ messages and disconnect"""
|
||||||
# If you try to close a connection that's already closed, you're going to have a bad time.
|
|
||||||
# We're breaking EAFP because this can be called multiple times depending on exception
|
|
||||||
# handling flow here.
|
|
||||||
if not self._channel.is_closed:
|
if not self._channel.is_closed:
|
||||||
self._channel.stop_consuming()
|
self._channel.stop_consuming()
|
||||||
if not self._connection.is_closed:
|
if not self._connection.is_closed:
|
||||||
|
@ -142,7 +100,6 @@ class MessageListener:
|
||||||
"""
|
"""
|
||||||
logger.info(f" - Received '{body}' on routing_key '{method_frame.routing_key}'")
|
logger.info(f" - Received '{body}' on routing_key '{method_frame.routing_key}'")
|
||||||
|
|
||||||
# Declare all variables here so they exist in the exception handlers below, no matter what.
|
|
||||||
audio_file_path = ""
|
audio_file_path = ""
|
||||||
# final_file_path = ""
|
# final_file_path = ""
|
||||||
import_directory = ""
|
import_directory = ""
|
||||||
|
@ -151,12 +108,6 @@ class MessageListener:
|
||||||
api_key = ""
|
api_key = ""
|
||||||
file_prefix = ""
|
file_prefix = ""
|
||||||
|
|
||||||
""" Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
|
|
||||||
to pass objects between the processes so that if the analyzer process crashes, it does not
|
|
||||||
take down the rest of the daemon and we NACK that message so that it doesn't get
|
|
||||||
propagated to other airtime_analyzer daemons (eg. running on other servers).
|
|
||||||
We avoid cascading failure this way.
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
body = body.decode()
|
body = body.decode()
|
||||||
|
@ -184,37 +135,22 @@ class MessageListener:
|
||||||
)
|
)
|
||||||
|
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
# A field in msg_dict that we needed was missing (eg. audio_file_path)
|
logger.exception("A mandatory field was missing from the message.")
|
||||||
logger.exception(
|
|
||||||
"A mandatory airtime_analyzer message field was missing from the message."
|
|
||||||
)
|
|
||||||
# See the huge comment about NACK below.
|
|
||||||
channel.basic_nack(
|
channel.basic_nack(
|
||||||
delivery_tag=method_frame.delivery_tag, multiple=False, requeue=False
|
delivery_tag=method_frame.delivery_tag,
|
||||||
) # Important that it doesn't requeue the message
|
multiple=False,
|
||||||
|
requeue=False,
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(e)
|
logger.exception(e)
|
||||||
""" If ANY exception happens while processing a file, we're going to NACK to the
|
|
||||||
messaging server and tell it to remove the message from the queue.
|
|
||||||
(NACK is a negative acknowledgement. We could use ACK instead, but this might come
|
|
||||||
in handy in the future.)
|
|
||||||
Exceptions in this context are unexpected, unhandled errors. We try to recover
|
|
||||||
from as many errors as possible in AnalyzerPipeline, but we're safeguarding ourselves
|
|
||||||
here from any catastrophic or genuinely unexpected errors:
|
|
||||||
"""
|
|
||||||
channel.basic_nack(
|
channel.basic_nack(
|
||||||
delivery_tag=method_frame.delivery_tag, multiple=False, requeue=False
|
delivery_tag=method_frame.delivery_tag,
|
||||||
) # Important that it doesn't requeue the message
|
multiple=False,
|
||||||
|
requeue=False,
|
||||||
|
)
|
||||||
|
|
||||||
#
|
if callback_url:
|
||||||
# TODO: If the JSON was invalid or the web server is down,
|
|
||||||
# then don't report that failure to the REST API
|
|
||||||
# TODO: Catch exceptions from this HTTP request too:
|
|
||||||
if (
|
|
||||||
callback_url
|
|
||||||
): # If we got an invalid message, there might be no callback_url in the JSON
|
|
||||||
# Report this as a failed upload to the File Upload REST API.
|
|
||||||
StatusReporter.report_failure_to_callback_url(
|
StatusReporter.report_failure_to_callback_url(
|
||||||
callback_url,
|
callback_url,
|
||||||
api_key,
|
api_key,
|
||||||
|
@ -223,8 +159,6 @@ class MessageListener:
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# ACK at the very end, after the message has been successfully processed.
|
|
||||||
# If we don't ack, then RabbitMQ will redeliver the message in the future.
|
|
||||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -235,20 +169,6 @@ class MessageListener:
|
||||||
storage_backend,
|
storage_backend,
|
||||||
file_prefix,
|
file_prefix,
|
||||||
):
|
):
|
||||||
"""Spawn a child process to analyze and import a new audio file."""
|
|
||||||
"""
|
|
||||||
q = multiprocessing.Queue()
|
|
||||||
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
|
|
||||||
args=(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix))
|
|
||||||
p.start()
|
|
||||||
p.join()
|
|
||||||
if p.exitcode == 0:
|
|
||||||
results = q.get()
|
|
||||||
logger.info("Main process received results from child: ")
|
|
||||||
logger.info(results)
|
|
||||||
else:
|
|
||||||
raise Exception("Analyzer process terminated unexpectedly.")
|
|
||||||
"""
|
|
||||||
metadata = {}
|
metadata = {}
|
||||||
|
|
||||||
q = queue.Queue()
|
q = queue.Queue()
|
||||||
|
|
|
@ -163,7 +163,6 @@ def is_web_server_broken(url):
|
||||||
else:
|
else:
|
||||||
# The request worked fine, so the web server and Airtime are still up.
|
# The request worked fine, so the web server and Airtime are still up.
|
||||||
return False
|
return False
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
class StatusReporter:
|
class StatusReporter:
|
||||||
|
@ -173,12 +172,7 @@ class StatusReporter:
|
||||||
|
|
||||||
_HTTP_REQUEST_TIMEOUT = 30
|
_HTTP_REQUEST_TIMEOUT = 30
|
||||||
|
|
||||||
""" We use multiprocessing.Process again here because we need a thread for this stuff
|
|
||||||
anyways, and Python gives us process isolation for free (crash safety).
|
|
||||||
"""
|
|
||||||
_ipc_queue = queue.Queue()
|
_ipc_queue = queue.Queue()
|
||||||
# _http_thread = multiprocessing.Process(target=process_http_requests,
|
|
||||||
# args=(_ipc_queue,))
|
|
||||||
_http_thread = None
|
_http_thread = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -192,8 +186,7 @@ class StatusReporter:
|
||||||
@classmethod
|
@classmethod
|
||||||
def stop_thread(self):
|
def stop_thread(self):
|
||||||
logger.info("Terminating status_reporter process")
|
logger.info("Terminating status_reporter process")
|
||||||
# StatusReporter._http_thread.terminate() # Triggers SIGTERM on the child process
|
StatusReporter._ipc_queue.put("shutdown")
|
||||||
StatusReporter._ipc_queue.put("shutdown") # Special trigger
|
|
||||||
StatusReporter._http_thread.join()
|
StatusReporter._http_thread.join()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -201,50 +194,32 @@ class StatusReporter:
|
||||||
StatusReporter._ipc_queue.put(request)
|
StatusReporter._ipc_queue.put(request)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def report_success_to_callback_url(self, callback_url, api_key, audio_metadata):
|
def report_success_to_callback_url(
|
||||||
|
self,
|
||||||
|
callback_url,
|
||||||
|
api_key,
|
||||||
|
audio_metadata,
|
||||||
|
):
|
||||||
"""Report the extracted metadata and status of the successfully imported file
|
"""Report the extracted metadata and status of the successfully imported file
|
||||||
to the callback URL (which should be the Airtime File Upload API)
|
to the callback URL (which should be the Airtime File Upload API)
|
||||||
"""
|
"""
|
||||||
put_payload = json.dumps(audio_metadata)
|
put_payload = json.dumps(audio_metadata)
|
||||||
# r = requests.Request(method='PUT', url=callback_url, data=put_payload,
|
|
||||||
# auth=requests.auth.HTTPBasicAuth(api_key, ''))
|
|
||||||
"""
|
|
||||||
r = requests.Request(method='PUT', url=callback_url, data=put_payload,
|
|
||||||
auth=requests.auth.HTTPBasicAuth(api_key, ''))
|
|
||||||
|
|
||||||
StatusReporter._send_http_request(r)
|
|
||||||
"""
|
|
||||||
|
|
||||||
StatusReporter._send_http_request(
|
StatusReporter._send_http_request(
|
||||||
PicklableHttpRequest(
|
PicklableHttpRequest(
|
||||||
method="PUT", url=callback_url, data=put_payload, api_key=api_key
|
method="PUT",
|
||||||
|
url=callback_url,
|
||||||
|
data=put_payload,
|
||||||
|
api_key=api_key,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
r.raise_for_status() # Raise an exception if there was an http error code returned
|
|
||||||
except requests.exceptions.RequestException:
|
|
||||||
StatusReporter._ipc_queue.put(r.prepare())
|
|
||||||
"""
|
|
||||||
|
|
||||||
"""
|
|
||||||
# Encode the audio metadata as json and post it back to the callback_url
|
|
||||||
put_payload = json.dumps(audio_metadata)
|
|
||||||
logger.debug("sending http put with payload: " + put_payload)
|
|
||||||
r = requests.put(callback_url, data=put_payload,
|
|
||||||
auth=requests.auth.HTTPBasicAuth(api_key, ''),
|
|
||||||
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
|
|
||||||
logger.debug("HTTP request returned status: " + str(r.status_code))
|
|
||||||
logger.debug(r.text) # log the response body
|
|
||||||
|
|
||||||
#TODO: queue up failed requests and try them again later.
|
|
||||||
r.raise_for_status() # Raise an exception if there was an http error code returned
|
|
||||||
"""
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def report_failure_to_callback_url(
|
def report_failure_to_callback_url(
|
||||||
self, callback_url, api_key, import_status, reason
|
self,
|
||||||
|
callback_url,
|
||||||
|
api_key,
|
||||||
|
import_status,
|
||||||
|
reason,
|
||||||
):
|
):
|
||||||
if not isinstance(import_status, int):
|
if not isinstance(import_status, int):
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
|
@ -258,20 +233,12 @@ class StatusReporter:
|
||||||
audio_metadata["comment"] = reason # hack attack
|
audio_metadata["comment"] = reason # hack attack
|
||||||
put_payload = json.dumps(audio_metadata)
|
put_payload = json.dumps(audio_metadata)
|
||||||
# logger.debug("sending http put with payload: " + put_payload)
|
# logger.debug("sending http put with payload: " + put_payload)
|
||||||
"""
|
|
||||||
r = requests.put(callback_url, data=put_payload,
|
|
||||||
auth=requests.auth.HTTPBasicAuth(api_key, ''),
|
|
||||||
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
|
|
||||||
"""
|
|
||||||
StatusReporter._send_http_request(
|
StatusReporter._send_http_request(
|
||||||
PicklableHttpRequest(
|
PicklableHttpRequest(
|
||||||
method="PUT", url=callback_url, data=put_payload, api_key=api_key
|
method="PUT",
|
||||||
|
url=callback_url,
|
||||||
|
data=put_payload,
|
||||||
|
api_key=api_key,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
"""
|
|
||||||
logger.debug("HTTP request returned status: " + str(r.status_code))
|
|
||||||
logger.debug(r.text) # log the response body
|
|
||||||
|
|
||||||
#TODO: queue up failed requests and try them again later.
|
|
||||||
r.raise_for_status() # raise an exception if there was an http error code returned
|
|
||||||
"""
|
|
||||||
|
|
Loading…
Reference in New Issue