CC-5860: Analyzer HTTP requests can hang indefinitely
* Log a backtrace when sent the USR2 signal (kill -SIGUSR2 <pid>) * Rigged up up something to strace and restart when we see a request hanging
This commit is contained in:
parent
e1a0429939
commit
d21cb596cd
|
@ -4,6 +4,8 @@ import ConfigParser
|
|||
import logging
|
||||
import logging.handlers
|
||||
import sys
|
||||
import signal
|
||||
import traceback
|
||||
from functools import partial
|
||||
from metadata_analyzer import MetadataAnalyzer
|
||||
from replaygain_analyzer import ReplayGainAnalyzer
|
||||
|
@ -23,6 +25,9 @@ class AirtimeAnalyzerServer:
|
|||
|
||||
def __init__(self, rmq_config_path, http_retry_queue_path, debug=False):
|
||||
|
||||
# Debug console. Access with 'kill -SIGUSR2 <PID>'
|
||||
signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace())
|
||||
|
||||
# Configure logging
|
||||
self.setup_logging(debug)
|
||||
|
||||
|
@ -30,13 +35,13 @@ class AirtimeAnalyzerServer:
|
|||
rabbitmq_config = self.read_config_file(rmq_config_path)
|
||||
|
||||
# Start up the StatusReporter process
|
||||
StatusReporter.start_child_process(http_retry_queue_path)
|
||||
StatusReporter.start_thread(http_retry_queue_path)
|
||||
|
||||
# Start listening for RabbitMQ messages telling us about newly
|
||||
# uploaded files.
|
||||
# uploaded files. This blocks until we recieve a shutdown signal.
|
||||
self._msg_listener = MessageListener(rabbitmq_config)
|
||||
|
||||
StatusReporter.stop_child_process()
|
||||
StatusReporter.stop_thread()
|
||||
|
||||
|
||||
def setup_logging(self, debug):
|
||||
|
@ -81,4 +86,17 @@ class AirtimeAnalyzerServer:
|
|||
exit(-1)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
@classmethod
|
||||
def dump_stacktrace(stack):
|
||||
''' Dump a stacktrace for all threads '''
|
||||
code = []
|
||||
for threadId, stack in sys._current_frames().items():
|
||||
code.append("\n# ThreadID: %s" % threadId)
|
||||
for filename, lineno, name, line in traceback.extract_stack(stack):
|
||||
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
|
||||
if line:
|
||||
code.append(" %s" % (line.strip()))
|
||||
logging.info('\n'.join(code))
|
||||
|
||||
|
||||
|
|
|
@ -3,8 +3,12 @@ import json
|
|||
import logging
|
||||
import collections
|
||||
import Queue
|
||||
import signal
|
||||
import subprocess
|
||||
import multiprocessing
|
||||
import time
|
||||
import sys
|
||||
import traceback
|
||||
import os
|
||||
import pickle
|
||||
import threading
|
||||
|
||||
|
@ -84,10 +88,13 @@ def send_http_request(picklable_request, retry_queue):
|
|||
if not isinstance(picklable_request, PicklableHttpRequest):
|
||||
raise TypeError("picklable_request must be a PicklableHttpRequest. Was of type " + type(picklable_request).__name__)
|
||||
try:
|
||||
prepared_request = picklable_request.create_request()
|
||||
prepared_request = prepared_request.prepare()
|
||||
t = threading.Timer(60, alert_hung_request)
|
||||
t.start()
|
||||
bare_request = picklable_request.create_request()
|
||||
s = requests.Session()
|
||||
prepared_request = s.prepare_request(bare_request)
|
||||
r = s.send(prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
|
||||
t.cancel() # Watchdog no longer needed.
|
||||
r.raise_for_status() # Raise an exception if there was an http error code returned
|
||||
logging.info("HTTP request sent successfully.")
|
||||
except requests.exceptions.RequestException as e:
|
||||
|
@ -105,6 +112,61 @@ def send_http_request(picklable_request, retry_queue):
|
|||
# that breaks our code. I don't want us pickling data that potentially
|
||||
# breaks airtime_analyzer.
|
||||
|
||||
def alert_hung_request():
|
||||
''' Temporary function to alert our Airtime developers when we have a request that's
|
||||
blocked indefinitely. We're working with the python requests developers to figure this
|
||||
one out. (We need to strace airtime_analyzer to figure out where exactly it's blocked.)
|
||||
There's some weird circumstance where this can happen, even though we specify a timeout.
|
||||
'''
|
||||
pid = os.getpid()
|
||||
|
||||
# Capture a list of the open file/socket handles so we can interpret the strace log
|
||||
lsof_log = subprocess.check_output(["lsof -p %s" % str(pid)], shell=True)
|
||||
|
||||
strace_log = ""
|
||||
# Run strace on us for 10 seconds
|
||||
try:
|
||||
subprocess.check_output(["timeout 10 strace -p %s -s 1000 -f -v -o /var/log/airtime/airtime_analyzer_strace.log -ff " % str(pid)],
|
||||
shell=True)
|
||||
except subprocess.CalledProcessError as e: # When the timeout fires, it returns a crazy code
|
||||
strace_log = e.output
|
||||
pass
|
||||
|
||||
|
||||
# Dump a traceback
|
||||
code = []
|
||||
for threadId, stack in sys._current_frames().items():
|
||||
code.append("\n# ThreadID: %s" % threadId)
|
||||
for filename, lineno, name, line in traceback.extract_stack(stack):
|
||||
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
|
||||
if line:
|
||||
code.append(" %s" % (line.strip()))
|
||||
stack_trace = ('\n'.join(code))
|
||||
|
||||
logging.critical(stack_trace)
|
||||
logging.critical(strace_log)
|
||||
logging.critical(lsof_log)
|
||||
# Exit the program so that upstart respawns us
|
||||
#sys.exit(-1) #deadlocks :(
|
||||
subprocess.check_output(["kill -9 %s" % str(pid)], shell=True) # Ugh, avert your eyes
|
||||
|
||||
'''
|
||||
request_running = False
|
||||
request_running_lock = threading.Lock()
|
||||
def is_request_running():
|
||||
request_running_lock.acquire()
|
||||
rr = request_running
|
||||
request_running_lock.release()
|
||||
return rr
|
||||
def set_request_running(is_running):
|
||||
request_running_lock.acquire()
|
||||
request_running = is_running
|
||||
request_running_lock.release()
|
||||
def is_request_hung():
|
||||
'''
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class StatusReporter():
|
||||
|
@ -122,13 +184,13 @@ class StatusReporter():
|
|||
_request_process = None
|
||||
|
||||
@classmethod
|
||||
def start_child_process(self, http_retry_queue_path):
|
||||
def start_thread(self, http_retry_queue_path):
|
||||
StatusReporter._request_process = threading.Thread(target=process_http_requests,
|
||||
args=(StatusReporter._ipc_queue,http_retry_queue_path))
|
||||
StatusReporter._request_process.start()
|
||||
|
||||
@classmethod
|
||||
def stop_child_process(self):
|
||||
def stop_thread(self):
|
||||
logging.info("Terminating status_reporter process")
|
||||
#StatusReporter._request_process.terminate() # Triggers SIGTERM on the child process
|
||||
StatusReporter._ipc_queue.put("shutdown") # Special trigger
|
||||
|
|
Loading…
Reference in New Issue