Merge branch 'saas' into saas-embed-player
This commit is contained in:
commit
b48d10a45e
|
@ -66,10 +66,10 @@ class AirtimeAnalyzerServer:
|
||||||
rootLogger = logging.getLogger()
|
rootLogger = logging.getLogger()
|
||||||
rootLogger.setLevel(self._log_level)
|
rootLogger.setLevel(self._log_level)
|
||||||
|
|
||||||
fileHandler = logging.handlers.RotatingFileHandler(filename=self._LOG_PATH, maxBytes=1024*1024*30,
|
#fileHandler = logging.handlers.RotatingFileHandler(filename=self._LOG_PATH, maxBytes=1024*1024*30,
|
||||||
backupCount=8)
|
# backupCount=8)
|
||||||
fileHandler.setFormatter(logFormatter)
|
#fileHandler.setFormatter(logFormatter)
|
||||||
rootLogger.addHandler(fileHandler)
|
#rootLogger.addHandler(fileHandler)
|
||||||
|
|
||||||
consoleHandler = logging.StreamHandler()
|
consoleHandler = logging.StreamHandler()
|
||||||
consoleHandler.setFormatter(logFormatter)
|
consoleHandler.setFormatter(logFormatter)
|
||||||
|
|
|
@ -88,7 +88,7 @@ class AnalyzerPipeline:
|
||||||
queue.put(metadata)
|
queue.put(metadata)
|
||||||
except UnplayableFileError as e:
|
except UnplayableFileError as e:
|
||||||
logging.exception(e)
|
logging.exception(e)
|
||||||
metadata["import_status"] = IMPORT_STATUS_FAILED
|
metadata["import_status"] = AnalyzerPipeline.IMPORT_STATUS_FAILED
|
||||||
metadata["reason"] = "The file could not be played."
|
metadata["reason"] = "The file could not be played."
|
||||||
raise e
|
raise e
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
@ -99,13 +99,10 @@ def send_http_request(picklable_request, retry_queue):
|
||||||
if not isinstance(picklable_request, PicklableHttpRequest):
|
if not isinstance(picklable_request, PicklableHttpRequest):
|
||||||
raise TypeError("picklable_request must be a PicklableHttpRequest. Was of type " + type(picklable_request).__name__)
|
raise TypeError("picklable_request must be a PicklableHttpRequest. Was of type " + type(picklable_request).__name__)
|
||||||
try:
|
try:
|
||||||
#t = threading.Timer(60, alert_hung_request)
|
|
||||||
#t.start()
|
|
||||||
bare_request = picklable_request.create_request()
|
bare_request = picklable_request.create_request()
|
||||||
s = requests.Session()
|
s = requests.Session()
|
||||||
prepared_request = s.prepare_request(bare_request)
|
prepared_request = s.prepare_request(bare_request)
|
||||||
r = s.send(prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT, verify=False) # SNI is a pain in the ass
|
r = s.send(prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT, verify=False) # SNI is a pain in the ass
|
||||||
#t.cancel() # Watchdog no longer needed.
|
|
||||||
r.raise_for_status() # Raise an exception if there was an http error code returned
|
r.raise_for_status() # Raise an exception if there was an http error code returned
|
||||||
logging.info("HTTP request sent successfully.")
|
logging.info("HTTP request sent successfully.")
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
|
@ -152,63 +149,6 @@ def is_web_server_broken(url):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def alert_hung_request():
|
|
||||||
''' Temporary function to alert our Airtime developers when we have a request that's
|
|
||||||
blocked indefinitely. We're working with the python requests developers to figure this
|
|
||||||
one out. (We need to strace airtime_analyzer to figure out where exactly it's blocked.)
|
|
||||||
There's some weird circumstance where this can happen, even though we specify a timeout.
|
|
||||||
'''
|
|
||||||
pid = os.getpid()
|
|
||||||
|
|
||||||
# Capture a list of the open file/socket handles so we can interpret the strace log
|
|
||||||
lsof_log = subprocess.check_output(["lsof -p %s" % str(pid)], shell=True)
|
|
||||||
|
|
||||||
strace_log = ""
|
|
||||||
# Run strace on us for 10 seconds
|
|
||||||
try:
|
|
||||||
subprocess.check_output(["timeout 10 strace -p %s -s 1000 -f -v -o /var/log/airtime/airtime_analyzer_strace.log -ff " % str(pid)],
|
|
||||||
shell=True)
|
|
||||||
except subprocess.CalledProcessError as e: # When the timeout fires, it returns a crazy code
|
|
||||||
strace_log = e.output
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# Dump a traceback
|
|
||||||
code = []
|
|
||||||
for threadId, stack in sys._current_frames().items():
|
|
||||||
code.append("\n# ThreadID: %s" % threadId)
|
|
||||||
for filename, lineno, name, line in traceback.extract_stack(stack):
|
|
||||||
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
|
|
||||||
if line:
|
|
||||||
code.append(" %s" % (line.strip()))
|
|
||||||
stack_trace = ('\n'.join(code))
|
|
||||||
|
|
||||||
logging.critical(stack_trace)
|
|
||||||
logging.critical(strace_log)
|
|
||||||
logging.critical(lsof_log)
|
|
||||||
# Exit the program so that upstart respawns us
|
|
||||||
#sys.exit(-1) #deadlocks :(
|
|
||||||
subprocess.check_output(["kill -9 %s" % str(pid)], shell=True) # Ugh, avert your eyes
|
|
||||||
|
|
||||||
'''
|
|
||||||
request_running = False
|
|
||||||
request_running_lock = threading.Lock()
|
|
||||||
def is_request_running():
|
|
||||||
request_running_lock.acquire()
|
|
||||||
rr = request_running
|
|
||||||
request_running_lock.release()
|
|
||||||
return rr
|
|
||||||
def set_request_running(is_running):
|
|
||||||
request_running_lock.acquire()
|
|
||||||
request_running = is_running
|
|
||||||
request_running_lock.release()
|
|
||||||
def is_request_hung():
|
|
||||||
'''
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class StatusReporter():
|
class StatusReporter():
|
||||||
''' Reports the extracted audio file metadata and job status back to the
|
''' Reports the extracted audio file metadata and job status back to the
|
||||||
Airtime web application.
|
Airtime web application.
|
||||||
|
|
|
@ -21,15 +21,16 @@ def teardown():
|
||||||
|
|
||||||
def test_basic():
|
def test_basic():
|
||||||
filename = os.path.basename(DEFAULT_AUDIO_FILE)
|
filename = os.path.basename(DEFAULT_AUDIO_FILE)
|
||||||
q = multiprocessing.Queue()
|
q = Queue.Queue()
|
||||||
#cloud_storage_config_path = '/etc/airtime-saas/production/cloud_storage.conf'
|
#cloud_storage_config_path = '/etc/airtime-saas/production/cloud_storage.conf'
|
||||||
#cloud_storage_config = config_file.read_config_file(cloud_storage_config_path)
|
#cloud_storage_config = config_file.read_config_file(cloud_storage_config_path)
|
||||||
cloud_storage_config = SafeConfigParser()
|
cloud_storage_config = SafeConfigParser()
|
||||||
cloud_storage_config.add_section("current_backend")
|
cloud_storage_config.add_section("current_backend")
|
||||||
cloud_storage_config.set("current_backend", "storage_backend", "file")
|
cloud_storage_config.set("current_backend", "storage_backend", "file")
|
||||||
file_prefix = u''
|
file_prefix = u''
|
||||||
|
storage_backend = "file"
|
||||||
#This actually imports the file into the "./Test Artist" directory.
|
#This actually imports the file into the "./Test Artist" directory.
|
||||||
AnalyzerPipeline.run_analysis(q, DEFAULT_AUDIO_FILE, u'.', filename, file_prefix, cloud_storage_config)
|
AnalyzerPipeline.run_analysis(q, DEFAULT_AUDIO_FILE, u'.', filename, storage_backend, file_prefix, cloud_storage_config)
|
||||||
metadata = q.get()
|
metadata = q.get()
|
||||||
assert metadata['track_title'] == u'Test Title'
|
assert metadata['track_title'] == u'Test Title'
|
||||||
assert metadata['artist_name'] == u'Test Artist'
|
assert metadata['artist_name'] == u'Test Artist'
|
||||||
|
@ -47,13 +48,13 @@ def test_wrong_type_queue_param():
|
||||||
|
|
||||||
@raises(TypeError)
|
@raises(TypeError)
|
||||||
def test_wrong_type_string_param2():
|
def test_wrong_type_string_param2():
|
||||||
AnalyzerPipeline.run_analysis(multiprocessing.queues.Queue(), '', u'', u'')
|
AnalyzerPipeline.run_analysis(Queue.Queue(), '', u'', u'')
|
||||||
|
|
||||||
@raises(TypeError)
|
@raises(TypeError)
|
||||||
def test_wrong_type_string_param3():
|
def test_wrong_type_string_param3():
|
||||||
AnalyzerPipeline.run_analysis(multiprocessing.queues.Queue(), u'', '', u'')
|
AnalyzerPipeline.run_analysis(Queue.Queue(), u'', '', u'')
|
||||||
|
|
||||||
@raises(TypeError)
|
@raises(TypeError)
|
||||||
def test_wrong_type_string_param4():
|
def test_wrong_type_string_param4():
|
||||||
AnalyzerPipeline.run_analysis(multiprocessing.queues.Queue(), u'', u'', '')
|
AnalyzerPipeline.run_analysis(Queue.Queue(), u'', u'', '')
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue