Merge branch 'cc-5709-airtime-analyzer' into cc-5709-airtime-analyzer-saas

This commit is contained in:
Albert Santoni 2014-04-16 11:11:47 -04:00
commit 0431044ee2
5 changed files with 181 additions and 14 deletions

View File

@ -38,6 +38,9 @@ This application can be run as a daemon by running:
$ airtime_analyzer -d $ airtime_analyzer -d
Other runtime flags can be listed by running:
$ airtime_analyzer --help
Developers Developers
@ -81,8 +84,12 @@ To run the unit tests and generate a code coverage report, run:
$ nosetests --with-coverage --cover-package=airtime_analyzer $ nosetests --with-coverage --cover-package=airtime_analyzer
Running in a Multi-Tenant Environment
===========
History and Design Motivation History and Design Motivation
=========== ===========

View File

@ -7,6 +7,7 @@ import sys
from functools import partial from functools import partial
from metadata_analyzer import MetadataAnalyzer from metadata_analyzer import MetadataAnalyzer
from replaygain_analyzer import ReplayGainAnalyzer from replaygain_analyzer import ReplayGainAnalyzer
from status_reporter import StatusReporter
from message_listener import MessageListener from message_listener import MessageListener
@ -20,17 +21,22 @@ class AirtimeAnalyzerServer:
# Variables # Variables
_log_level = logging.INFO _log_level = logging.INFO
def __init__(self, config_path, debug=False): def __init__(self, rmq_config_path, http_retry_queue_path, debug=False):
# Configure logging # Configure logging
self.setup_logging(debug) self.setup_logging(debug)
# Read our config file # Read our config file
rabbitmq_config = self.read_config_file(config_path) rabbitmq_config = self.read_config_file(rmq_config_path)
# Start up the StatusReporter process
StatusReporter.start_child_process(http_retry_queue_path)
# Start listening for RabbitMQ messages telling us about newly # Start listening for RabbitMQ messages telling us about newly
# uploaded files. # uploaded files.
self._msg_listener = MessageListener(rabbitmq_config) self._msg_listener = MessageListener(rabbitmq_config)
StatusReporter.stop_child_process()
def setup_logging(self, debug): def setup_logging(self, debug):

View File

@ -183,12 +183,12 @@ class MessageListener:
channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False, channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False,
requeue=False) #Important that it doesn't requeue the message requeue=False) #Important that it doesn't requeue the message
# TODO: Report this as a failed upload to the File Upload REST API.
# #
# TODO: If the JSON was invalid or the web server is down, # TODO: If the JSON was invalid or the web server is down,
# then don't report that failure to the REST API # then don't report that failure to the REST API
#TODO: Catch exceptions from this HTTP request too: #TODO: Catch exceptions from this HTTP request too:
if callback_url: # If we got an invalid message, there might be no callback_url in the JSON if callback_url: # If we got an invalid message, there might be no callback_url in the JSON
# Report this as a failed upload to the File Upload REST API.
StatusReporter.report_failure_to_callback_url(callback_url, api_key, import_status=2, StatusReporter.report_failure_to_callback_url(callback_url, api_key, import_status=2,
reason=u'An error occurred while importing this file') reason=u'An error occurred while importing this file')

View File

@ -1,19 +1,157 @@
import requests import requests
import json import json
import logging import logging
import collections
import Queue
import signal
import multiprocessing
import pickle
import threading
class PicklableHttpRequest:
def __init__(self, method, url, data, api_key):
self.method = method
self.url = url
self.data = data
self.api_key = api_key
def create_request(self):
return requests.Request(method=self.method, url=self.url, data=self.data,
auth=requests.auth.HTTPBasicAuth(self.api_key, ''))
def process_http_requests(ipc_queue, http_retry_queue_path):
''' Runs in a separate process and performs all the HTTP requests where we're
reporting extracted audio file metadata or errors back to the Airtime web application.
This process also checks every 5 seconds if there's failed HTTP requests that we
need to retry. We retry failed HTTP requests so that we don't lose uploads if the
web server is temporarily down.
'''
# Store any failed requests (eg. due to web server errors or downtime) to be
# retried later:
retry_queue = collections.deque()
shutdown = False
# Unpickle retry_queue from disk so that we won't have lost any uploads
# if airtime_analyzer is shut down while the web server is down or unreachable,
# and there were failed HTTP requests pending, waiting to be retried.
try:
with open(http_retry_queue_path, 'rb') as pickle_file:
retry_queue = pickle.load(pickle_file)
except IOError as e:
if e.errno == 2:
pass
else:
raise e
while not shutdown:
try:
request = ipc_queue.get(block=True, timeout=5)
if isinstance(request, str) and request == "shutdown": # Bit of a cheat
shutdown = True
break
if not isinstance(request, PicklableHttpRequest):
raise TypeError("request must be a PicklableHttpRequest. Was of type " + type(request).__name__)
except Queue.Empty:
request = None
# If there's no new HTTP request we need to execute, let's check our "retry
# queue" and see if there's any failed HTTP requests we can retry:
if request:
send_http_request(request, retry_queue)
else:
# Using a for loop instead of while so we only iterate over all the requests once!
for i in range(len(retry_queue)):
request = retry_queue.popleft()
send_http_request(request, retry_queue)
logging.info("Shutting down status_reporter")
# Pickle retry_queue to disk so that we don't lose uploads if we're shut down while
# while the web server is down or unreachable.
with open(http_retry_queue_path, 'wb') as pickle_file:
pickle.dump(retry_queue, pickle_file)
def send_http_request(picklable_request, retry_queue):
if not isinstance(picklable_request, PicklableHttpRequest):
raise TypeError("picklable_request must be a PicklableHttpRequest. Was of type " + type(picklable_request).__name__)
try:
prepared_request = picklable_request.create_request()
prepared_request = prepared_request.prepare()
s = requests.Session()
r = s.send(prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
r.raise_for_status() # Raise an exception if there was an http error code returned
logging.info("HTTP request sent successfully.")
except requests.exceptions.RequestException as e:
# If the web server is having problems, retry the request later:
logging.error("HTTP request failed. Retrying later! Exception was: %s" % str(e))
retry_queue.append(picklable_request)
except Exception as e:
logging.error("HTTP request failed with unhandled exception. %s" % str(e))
# Don't put the request into the retry queue, just give up on this one.
# I'm doing this to protect against us getting some pathological request
# that breaks our code. I don't want us having
class StatusReporter(): class StatusReporter():
''' Reports the extracted audio file metadata and job status back to the ''' Reports the extracted audio file metadata and job status back to the
Airtime web application. Airtime web application.
''' '''
_HTTP_REQUEST_TIMEOUT = 30 _HTTP_REQUEST_TIMEOUT = 30
''' We use multiprocessing.Process again here because we need a thread for this stuff
anyways, and Python gives us process isolation for free (crash safety).
'''
_ipc_queue = multiprocessing.Queue()
#_request_process = multiprocessing.Process(target=process_http_requests,
# args=(_ipc_queue,))
_request_process = None
@classmethod
def start_child_process(self, http_retry_queue_path):
StatusReporter._request_process = threading.Thread(target=process_http_requests,
args=(StatusReporter._ipc_queue,http_retry_queue_path))
StatusReporter._request_process.start()
@classmethod
def stop_child_process(self):
logging.info("Terminating status_reporter process")
#StatusReporter._request_process.terminate() # Triggers SIGTERM on the child process
StatusReporter._ipc_queue.put("shutdown") # Special trigger
StatusReporter._request_process.join()
@classmethod
def _send_http_request(self, request):
StatusReporter._ipc_queue.put(request)
@classmethod @classmethod
def report_success_to_callback_url(self, callback_url, api_key, audio_metadata): def report_success_to_callback_url(self, callback_url, api_key, audio_metadata):
''' Report the extracted metadata and status of the successfully imported file ''' Report the extracted metadata and status of the successfully imported file
to the callback URL (which should be the Airtime File Upload API) to the callback URL (which should be the Airtime File Upload API)
''' '''
put_payload = json.dumps(audio_metadata)
#r = requests.Request(method='PUT', url=callback_url, data=put_payload,
# auth=requests.auth.HTTPBasicAuth(api_key, ''))
'''
r = requests.Request(method='PUT', url=callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''))
StatusReporter._send_http_request(r)
'''
StatusReporter._send_http_request(PicklableHttpRequest(method='PUT', url=callback_url,
data=put_payload, api_key=api_key))
'''
try:
r.raise_for_status() # Raise an exception if there was an http error code returned
except requests.exceptions.RequestException:
StatusReporter._ipc_queue.put(r.prepare())
'''
'''
# Encode the audio metadata as json and post it back to the callback_url # Encode the audio metadata as json and post it back to the callback_url
put_payload = json.dumps(audio_metadata) put_payload = json.dumps(audio_metadata)
logging.debug("sending http put with payload: " + put_payload) logging.debug("sending http put with payload: " + put_payload)
@ -25,6 +163,7 @@ class StatusReporter():
#TODO: queue up failed requests and try them again later. #TODO: queue up failed requests and try them again later.
r.raise_for_status() # Raise an exception if there was an http error code returned r.raise_for_status() # Raise an exception if there was an http error code returned
'''
@classmethod @classmethod
def report_failure_to_callback_url(self, callback_url, api_key, import_status, reason): def report_failure_to_callback_url(self, callback_url, api_key, import_status, reason):
@ -36,13 +175,19 @@ class StatusReporter():
audio_metadata["import_status"] = import_status audio_metadata["import_status"] = import_status
audio_metadata["comment"] = reason # hack attack audio_metadata["comment"] = reason # hack attack
put_payload = json.dumps(audio_metadata) put_payload = json.dumps(audio_metadata)
logging.debug("sending http put with payload: " + put_payload) #logging.debug("sending http put with payload: " + put_payload)
'''
r = requests.put(callback_url, data=put_payload, r = requests.put(callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''), auth=requests.auth.HTTPBasicAuth(api_key, ''),
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT) timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
'''
StatusReporter._send_http_request(PicklableHttpRequest(method='PUT', url=callback_url,
data=put_payload, api_key=api_key))
'''
logging.debug("HTTP request returned status: " + str(r.status_code)) logging.debug("HTTP request returned status: " + str(r.status_code))
logging.debug(r.text) # log the response body logging.debug(r.text) # log the response body
#TODO: queue up failed requests and try them again later. #TODO: queue up failed requests and try them again later.
r.raise_for_status() # raise an exception if there was an http error code returned r.raise_for_status() # raise an exception if there was an http error code returned
'''

View File

@ -9,6 +9,7 @@ import airtime_analyzer.airtime_analyzer as aa
VERSION = "1.0" VERSION = "1.0"
DEFAULT_CONFIG_PATH = '/etc/airtime/airtime.conf' DEFAULT_CONFIG_PATH = '/etc/airtime/airtime.conf'
DEFAULT_HTTP_RETRY_PATH = '/tmp/airtime_analyzer_http_retries'
def run(): def run():
'''Entry-point for this application''' '''Entry-point for this application'''
@ -16,22 +17,30 @@ def run():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
parser.add_argument("--debug", help="log full debugging output", action="store_true") parser.add_argument("--debug", help="log full debugging output", action="store_true")
parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is /etc/airtime/airtime.conf)") parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH)
parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH)
args = parser.parse_args() args = parser.parse_args()
check_if_media_monitor_is_running() check_if_media_monitor_is_running()
#Default config file path #Default config file path
config_path = DEFAULT_CONFIG_PATH rmq_config_path = DEFAULT_CONFIG_PATH
http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH
if args.rmq_config_file: if args.rmq_config_file:
config_path = args.rmq_config_file rmq_config_path = args.rmq_config_file
if args.http_retry_queue_file:
http_retry_queue_path = args.http_retry_queue_file
if args.daemon: if args.daemon:
with daemon.DaemonContext(): with daemon.DaemonContext():
aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug) aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path,
http_retry_queue_path=http_retry_queue_path,
debug=args.debug)
else: else:
# Run without daemonizing # Run without daemonizing
aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug) aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path,
http_retry_queue_path=http_retry_queue_path,
debug=args.debug)
def check_if_media_monitor_is_running(): def check_if_media_monitor_is_running():