sintonia/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py

297 lines
13 KiB
Python
Raw Normal View History

import requests
import json
import logging
import collections
import Queue
import subprocess
import multiprocessing
import time
import sys
import traceback
import os
import pickle
import threading
from urlparse import urlparse
class PicklableHttpRequest:
def __init__(self, method, url, data, api_key):
self.method = method
self.url = url
self.data = data
self.api_key = api_key
def create_request(self):
return requests.Request(method=self.method, url=self.url, data=self.data,
auth=requests.auth.HTTPBasicAuth(self.api_key, ''))
def process_http_requests(ipc_queue, http_retry_queue_path):
''' Runs in a separate process and performs all the HTTP requests where we're
reporting extracted audio file metadata or errors back to the Airtime web application.
This process also checks every 5 seconds if there's failed HTTP requests that we
need to retry. We retry failed HTTP requests so that we don't lose uploads if the
web server is temporarily down.
'''
# Store any failed requests (eg. due to web server errors or downtime) to be
# retried later:
retry_queue = collections.deque()
shutdown = False
# Unpickle retry_queue from disk so that we won't have lost any uploads
# if airtime_analyzer is shut down while the web server is down or unreachable,
# and there were failed HTTP requests pending, waiting to be retried.
try:
with open(http_retry_queue_path, 'rb') as pickle_file:
retry_queue = pickle.load(pickle_file)
except IOError as e:
if e.errno == 2:
pass
else:
raise e
except Exception as e:
# If we fail to unpickle a saved queue of failed HTTP requests, then we'll just log an error
# and continue because those HTTP requests are lost anyways. The pickled file will be
# overwritten the next time the analyzer is shut down too.
logging.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path)
pass
while not shutdown:
try:
request = ipc_queue.get(block=True, timeout=5)
if isinstance(request, str) and request == "shutdown": # Bit of a cheat
shutdown = True
break
if not isinstance(request, PicklableHttpRequest):
raise TypeError("request must be a PicklableHttpRequest. Was of type " + type(request).__name__)
except Queue.Empty:
request = None
# If there's no new HTTP request we need to execute, let's check our "retry
# queue" and see if there's any failed HTTP requests we can retry:
if request:
send_http_request(request, retry_queue)
else:
# Using a for loop instead of while so we only iterate over all the requests once!
for i in range(len(retry_queue)):
request = retry_queue.popleft()
send_http_request(request, retry_queue)
logging.info("Shutting down status_reporter")
# Pickle retry_queue to disk so that we don't lose uploads if we're shut down while
# while the web server is down or unreachable.
with open(http_retry_queue_path, 'wb') as pickle_file:
pickle.dump(retry_queue, pickle_file)
def send_http_request(picklable_request, retry_queue):
if not isinstance(picklable_request, PicklableHttpRequest):
raise TypeError("picklable_request must be a PicklableHttpRequest. Was of type " + type(picklable_request).__name__)
try:
t = threading.Timer(60, alert_hung_request)
t.start()
bare_request = picklable_request.create_request()
s = requests.Session()
prepared_request = s.prepare_request(bare_request)
r = s.send(prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
t.cancel() # Watchdog no longer needed.
r.raise_for_status() # Raise an exception if there was an http error code returned
logging.info("HTTP request sent successfully.")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 422:
# Do no retry the request if there was a metadata validation error
logging.error("HTTP request failed due to an HTTP exception. Exception was: %s" % str(e))
else:
# The request failed with an error 500 probably, so let's check if Airtime and/or
# the web server are broken. If not, then our request was probably causing an
# error 500 in the media API (ie. a bug), so there's no point in retrying it.
logging.error("HTTP request failed. Exception was: %s" % str(e))
parsed_url = urlparse(e.response.request.url)
if is_web_server_broken(parsed_url.scheme + "://" + parsed_url.netloc):
# If the web server is having problems, retry the request later:
retry_queue.append(picklable_request)
# Otherwise, if the request was bad, the request is never retried.
# You will have to find these bad requests in logs or you'll be
# notified by sentry.
except requests.exceptions.ConnectionError as e:
logging.error("HTTP request failed due to a connection error. Retrying later. %s" % str(e))
retry_queue.append(picklable_request) # Retry it later
except Exception as e:
logging.error("HTTP request failed with unhandled exception. %s" % str(e))
# Don't put the request into the retry queue, just give up on this one.
# I'm doing this to protect against us getting some pathological request
# that breaks our code. I don't want us pickling data that potentially
# breaks airtime_analyzer.
def is_web_server_broken(url):
''' Do a naive test to check if the web server we're trying to access is down.
We use this to try to differentiate between error 500s that are coming
from (for example) a bug in the Airtime Media REST API and error 500s
caused by Airtime or the webserver itself being broken temporarily.
'''
try:
test_req = requests.get(url)
test_req.raise_for_status()
except Exception as e:
2014-10-22 01:23:48 +02:00
return True
else:
# The request worked fine, so the web server and Airtime are still up.
2014-10-22 01:23:48 +02:00
return False
return False
def alert_hung_request():
''' Temporary function to alert our Airtime developers when we have a request that's
blocked indefinitely. We're working with the python requests developers to figure this
one out. (We need to strace airtime_analyzer to figure out where exactly it's blocked.)
There's some weird circumstance where this can happen, even though we specify a timeout.
'''
pid = os.getpid()
# Capture a list of the open file/socket handles so we can interpret the strace log
lsof_log = subprocess.check_output(["lsof -p %s" % str(pid)], shell=True)
strace_log = ""
# Run strace on us for 10 seconds
try:
subprocess.check_output(["timeout 10 strace -p %s -s 1000 -f -v -o /var/log/airtime/airtime_analyzer_strace.log -ff " % str(pid)],
shell=True)
except subprocess.CalledProcessError as e: # When the timeout fires, it returns a crazy code
strace_log = e.output
pass
# Dump a traceback
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
stack_trace = ('\n'.join(code))
logging.critical(stack_trace)
logging.critical(strace_log)
logging.critical(lsof_log)
# Exit the program so that upstart respawns us
#sys.exit(-1) #deadlocks :(
subprocess.check_output(["kill -9 %s" % str(pid)], shell=True) # Ugh, avert your eyes
'''
request_running = False
request_running_lock = threading.Lock()
def is_request_running():
request_running_lock.acquire()
rr = request_running
request_running_lock.release()
return rr
def set_request_running(is_running):
request_running_lock.acquire()
request_running = is_running
request_running_lock.release()
def is_request_hung():
'''
class StatusReporter():
''' Reports the extracted audio file metadata and job status back to the
Airtime web application.
'''
_HTTP_REQUEST_TIMEOUT = 30
''' We use multiprocessing.Process again here because we need a thread for this stuff
anyways, and Python gives us process isolation for free (crash safety).
'''
_ipc_queue = multiprocessing.Queue()
#_request_process = multiprocessing.Process(target=process_http_requests,
# args=(_ipc_queue,))
_request_process = None
@classmethod
def start_thread(self, http_retry_queue_path):
StatusReporter._request_process = threading.Thread(target=process_http_requests,
args=(StatusReporter._ipc_queue,http_retry_queue_path))
StatusReporter._request_process.start()
@classmethod
def stop_thread(self):
logging.info("Terminating status_reporter process")
#StatusReporter._request_process.terminate() # Triggers SIGTERM on the child process
StatusReporter._ipc_queue.put("shutdown") # Special trigger
StatusReporter._request_process.join()
@classmethod
def _send_http_request(self, request):
StatusReporter._ipc_queue.put(request)
@classmethod
def report_success_to_callback_url(self, callback_url, api_key, audio_metadata):
''' Report the extracted metadata and status of the successfully imported file
to the callback URL (which should be the Airtime File Upload API)
'''
put_payload = json.dumps(audio_metadata)
#r = requests.Request(method='PUT', url=callback_url, data=put_payload,
# auth=requests.auth.HTTPBasicAuth(api_key, ''))
'''
r = requests.Request(method='PUT', url=callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''))
StatusReporter._send_http_request(r)
'''
StatusReporter._send_http_request(PicklableHttpRequest(method='PUT', url=callback_url,
data=put_payload, api_key=api_key))
'''
try:
r.raise_for_status() # Raise an exception if there was an http error code returned
except requests.exceptions.RequestException:
StatusReporter._ipc_queue.put(r.prepare())
'''
'''
# Encode the audio metadata as json and post it back to the callback_url
put_payload = json.dumps(audio_metadata)
logging.debug("sending http put with payload: " + put_payload)
r = requests.put(callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''),
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
logging.debug("HTTP request returned status: " + str(r.status_code))
logging.debug(r.text) # log the response body
#TODO: queue up failed requests and try them again later.
r.raise_for_status() # Raise an exception if there was an http error code returned
'''
@classmethod
def report_failure_to_callback_url(self, callback_url, api_key, import_status, reason):
if not isinstance(import_status, (int, long) ):
raise TypeError("import_status must be an integer. Was of type " + type(import_status).__name__)
logging.debug("Reporting import failure to Airtime REST API...")
audio_metadata = dict()
audio_metadata["import_status"] = import_status
audio_metadata["comment"] = reason # hack attack
put_payload = json.dumps(audio_metadata)
#logging.debug("sending http put with payload: " + put_payload)
'''
r = requests.put(callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''),
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
'''
StatusReporter._send_http_request(PicklableHttpRequest(method='PUT', url=callback_url,
data=put_payload, api_key=api_key))
'''
logging.debug("HTTP request returned status: " + str(r.status_code))
logging.debug(r.text) # log the response body
#TODO: queue up failed requests and try them again later.
r.raise_for_status() # raise an exception if there was an http error code returned
'''