From f49121116709c95e188f3d3559e911293b56d71d Mon Sep 17 00:00:00 2001 From: Albert Santoni Date: Wed, 16 Apr 2014 11:10:06 -0400 Subject: [PATCH] CC-5709: Airtime Analyzer * Retry HTTP requests (kinda crappy, should find a better design pattern to solve this) - CC-5775 * Persist failed HTTP requests at shutdown --- python_apps/airtime_analyzer/README.rst | 13 +- .../airtime_analyzer/airtime_analyzer.py | 12 +- .../airtime_analyzer/message_listener.py | 2 +- .../airtime_analyzer/status_reporter.py | 149 +++++++++++++++++- .../airtime_analyzer/bin/airtime_analyzer | 19 ++- 5 files changed, 181 insertions(+), 14 deletions(-) diff --git a/python_apps/airtime_analyzer/README.rst b/python_apps/airtime_analyzer/README.rst index 41881992e..8ae0bdcac 100644 --- a/python_apps/airtime_analyzer/README.rst +++ b/python_apps/airtime_analyzer/README.rst @@ -38,6 +38,9 @@ This application can be run as a daemon by running: $ airtime_analyzer -d +Other runtime flags can be listed by running: + + $ airtime_analyzer --help Developers @@ -81,8 +84,12 @@ To run the unit tests and generate a code coverage report, run: $ nosetests --with-coverage --cover-package=airtime_analyzer + +Running in a Multi-Tenant Environment +=========== + - History and Design Motivation - =========== +History and Design Motivation +=========== + - \ No newline at end of file diff --git a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py index 8fe25d0c5..39f3039f4 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py @@ -7,6 +7,7 @@ import sys from functools import partial from metadata_analyzer import MetadataAnalyzer from replaygain_analyzer import ReplayGainAnalyzer +from status_reporter import StatusReporter from message_listener import MessageListener @@ -20,17 +21,22 @@ class AirtimeAnalyzerServer: # Variables _log_level = logging.INFO - def __init__(self, config_path, debug=False): + def __init__(self, rmq_config_path, http_retry_queue_path, debug=False): # Configure logging self.setup_logging(debug) # Read our config file - rabbitmq_config = self.read_config_file(config_path) - + rabbitmq_config = self.read_config_file(rmq_config_path) + + # Start up the StatusReporter process + StatusReporter.start_child_process(http_retry_queue_path) + # Start listening for RabbitMQ messages telling us about newly # uploaded files. self._msg_listener = MessageListener(rabbitmq_config) + + StatusReporter.stop_child_process() def setup_logging(self, debug): diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index b2a008517..b39a7f805 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -183,12 +183,12 @@ class MessageListener: channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False, requeue=False) #Important that it doesn't requeue the message - # TODO: Report this as a failed upload to the File Upload REST API. # # TODO: If the JSON was invalid or the web server is down, # then don't report that failure to the REST API #TODO: Catch exceptions from this HTTP request too: if callback_url: # If we got an invalid message, there might be no callback_url in the JSON + # Report this as a failed upload to the File Upload REST API. StatusReporter.report_failure_to_callback_url(callback_url, api_key, import_status=2, reason=u'An error occurred while importing this file') diff --git a/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py b/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py index 4e1dccf2c..ee5062943 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py @@ -1,19 +1,157 @@ import requests import json import logging +import collections +import Queue +import signal +import multiprocessing +import pickle +import threading + +class PicklableHttpRequest: + def __init__(self, method, url, data, api_key): + self.method = method + self.url = url + self.data = data + self.api_key = api_key + + def create_request(self): + return requests.Request(method=self.method, url=self.url, data=self.data, + auth=requests.auth.HTTPBasicAuth(self.api_key, '')) + +def process_http_requests(ipc_queue, http_retry_queue_path): + ''' Runs in a separate process and performs all the HTTP requests where we're + reporting extracted audio file metadata or errors back to the Airtime web application. + + This process also checks every 5 seconds if there's failed HTTP requests that we + need to retry. We retry failed HTTP requests so that we don't lose uploads if the + web server is temporarily down. + + ''' + + # Store any failed requests (eg. due to web server errors or downtime) to be + # retried later: + retry_queue = collections.deque() + shutdown = False + + # Unpickle retry_queue from disk so that we won't have lost any uploads + # if airtime_analyzer is shut down while the web server is down or unreachable, + # and there were failed HTTP requests pending, waiting to be retried. + try: + with open(http_retry_queue_path, 'rb') as pickle_file: + retry_queue = pickle.load(pickle_file) + except IOError as e: + if e.errno == 2: + pass + else: + raise e + + while not shutdown: + try: + request = ipc_queue.get(block=True, timeout=5) + if isinstance(request, str) and request == "shutdown": # Bit of a cheat + shutdown = True + break + if not isinstance(request, PicklableHttpRequest): + raise TypeError("request must be a PicklableHttpRequest. Was of type " + type(request).__name__) + except Queue.Empty: + request = None + + # If there's no new HTTP request we need to execute, let's check our "retry + # queue" and see if there's any failed HTTP requests we can retry: + if request: + send_http_request(request, retry_queue) + else: + # Using a for loop instead of while so we only iterate over all the requests once! + for i in range(len(retry_queue)): + request = retry_queue.popleft() + send_http_request(request, retry_queue) + + logging.info("Shutting down status_reporter") + # Pickle retry_queue to disk so that we don't lose uploads if we're shut down while + # while the web server is down or unreachable. + with open(http_retry_queue_path, 'wb') as pickle_file: + pickle.dump(retry_queue, pickle_file) + +def send_http_request(picklable_request, retry_queue): + if not isinstance(picklable_request, PicklableHttpRequest): + raise TypeError("picklable_request must be a PicklableHttpRequest. Was of type " + type(picklable_request).__name__) + try: + prepared_request = picklable_request.create_request() + prepared_request = prepared_request.prepare() + s = requests.Session() + r = s.send(prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT) + r.raise_for_status() # Raise an exception if there was an http error code returned + logging.info("HTTP request sent successfully.") + except requests.exceptions.RequestException as e: + # If the web server is having problems, retry the request later: + logging.error("HTTP request failed. Retrying later! Exception was: %s" % str(e)) + retry_queue.append(picklable_request) + except Exception as e: + logging.error("HTTP request failed with unhandled exception. %s" % str(e)) + # Don't put the request into the retry queue, just give up on this one. + # I'm doing this to protect against us getting some pathological request + # that breaks our code. I don't want us having + + class StatusReporter(): ''' Reports the extracted audio file metadata and job status back to the Airtime web application. ''' _HTTP_REQUEST_TIMEOUT = 30 + + ''' We use multiprocessing.Process again here because we need a thread for this stuff + anyways, and Python gives us process isolation for free (crash safety). + ''' + _ipc_queue = multiprocessing.Queue() + #_request_process = multiprocessing.Process(target=process_http_requests, + # args=(_ipc_queue,)) + _request_process = None + + @classmethod + def start_child_process(self, http_retry_queue_path): + StatusReporter._request_process = threading.Thread(target=process_http_requests, + args=(StatusReporter._ipc_queue,http_retry_queue_path)) + StatusReporter._request_process.start() + + @classmethod + def stop_child_process(self): + logging.info("Terminating status_reporter process") + #StatusReporter._request_process.terminate() # Triggers SIGTERM on the child process + StatusReporter._ipc_queue.put("shutdown") # Special trigger + StatusReporter._request_process.join() + + @classmethod + def _send_http_request(self, request): + StatusReporter._ipc_queue.put(request) @classmethod def report_success_to_callback_url(self, callback_url, api_key, audio_metadata): ''' Report the extracted metadata and status of the successfully imported file to the callback URL (which should be the Airtime File Upload API) ''' - + put_payload = json.dumps(audio_metadata) + #r = requests.Request(method='PUT', url=callback_url, data=put_payload, + # auth=requests.auth.HTTPBasicAuth(api_key, '')) + ''' + r = requests.Request(method='PUT', url=callback_url, data=put_payload, + auth=requests.auth.HTTPBasicAuth(api_key, '')) + + StatusReporter._send_http_request(r) + ''' + + StatusReporter._send_http_request(PicklableHttpRequest(method='PUT', url=callback_url, + data=put_payload, api_key=api_key)) + + ''' + try: + r.raise_for_status() # Raise an exception if there was an http error code returned + except requests.exceptions.RequestException: + StatusReporter._ipc_queue.put(r.prepare()) + ''' + + ''' # Encode the audio metadata as json and post it back to the callback_url put_payload = json.dumps(audio_metadata) logging.debug("sending http put with payload: " + put_payload) @@ -25,6 +163,7 @@ class StatusReporter(): #TODO: queue up failed requests and try them again later. r.raise_for_status() # Raise an exception if there was an http error code returned + ''' @classmethod def report_failure_to_callback_url(self, callback_url, api_key, import_status, reason): @@ -36,13 +175,19 @@ class StatusReporter(): audio_metadata["import_status"] = import_status audio_metadata["comment"] = reason # hack attack put_payload = json.dumps(audio_metadata) - logging.debug("sending http put with payload: " + put_payload) + #logging.debug("sending http put with payload: " + put_payload) + ''' r = requests.put(callback_url, data=put_payload, auth=requests.auth.HTTPBasicAuth(api_key, ''), timeout=StatusReporter._HTTP_REQUEST_TIMEOUT) + ''' + StatusReporter._send_http_request(PicklableHttpRequest(method='PUT', url=callback_url, + data=put_payload, api_key=api_key)) + ''' logging.debug("HTTP request returned status: " + str(r.status_code)) logging.debug(r.text) # log the response body #TODO: queue up failed requests and try them again later. r.raise_for_status() # raise an exception if there was an http error code returned + ''' diff --git a/python_apps/airtime_analyzer/bin/airtime_analyzer b/python_apps/airtime_analyzer/bin/airtime_analyzer index c74ad044c..6e8eab367 100755 --- a/python_apps/airtime_analyzer/bin/airtime_analyzer +++ b/python_apps/airtime_analyzer/bin/airtime_analyzer @@ -9,6 +9,7 @@ import airtime_analyzer.airtime_analyzer as aa VERSION = "1.0" DEFAULT_CONFIG_PATH = '/etc/airtime/airtime.conf' +DEFAULT_HTTP_RETRY_PATH = '/tmp/airtime_analyzer_http_retries' def run(): '''Entry-point for this application''' @@ -16,22 +17,30 @@ def run(): parser = argparse.ArgumentParser() parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") parser.add_argument("--debug", help="log full debugging output", action="store_true") - parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is /etc/airtime/airtime.conf)") + parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH) + parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH) args = parser.parse_args() check_if_media_monitor_is_running() #Default config file path - config_path = DEFAULT_CONFIG_PATH + rmq_config_path = DEFAULT_CONFIG_PATH + http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH if args.rmq_config_file: - config_path = args.rmq_config_file + rmq_config_path = args.rmq_config_file + if args.http_retry_queue_file: + http_retry_queue_path = args.http_retry_queue_file if args.daemon: with daemon.DaemonContext(): - aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug) + aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path, + http_retry_queue_path=http_retry_queue_path, + debug=args.debug) else: # Run without daemonizing - aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug) + aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path, + http_retry_queue_path=http_retry_queue_path, + debug=args.debug) def check_if_media_monitor_is_running():