SAAS-602: airtime_analyzer deadlocks in S3 hostname lookup

* Stopped using multiprocess.subprocess because it's dangerous and
  unreliable due to Python bug 6721: http://bugs.python.org/issue6721
This commit is contained in:
Albert Santoni 2015-03-11 18:33:08 -04:00
parent 47e6879766
commit e182e73626
2 changed files with 15 additions and 3 deletions

View file

@ -3,6 +3,7 @@
import logging import logging
import threading import threading
import multiprocessing import multiprocessing
import Queue
import ConfigParser import ConfigParser
from metadata_analyzer import MetadataAnalyzer from metadata_analyzer import MetadataAnalyzer
from filemover_analyzer import FileMoverAnalyzer from filemover_analyzer import FileMoverAnalyzer
@ -45,8 +46,8 @@ class AnalyzerPipeline:
AnalyzerPipeline.python_logger_deadlock_workaround() AnalyzerPipeline.python_logger_deadlock_workaround()
try: try:
if not isinstance(queue, multiprocessing.queues.Queue): if not isinstance(queue, Queue.Queue):
raise TypeError("queue must be a multiprocessing.Queue()") raise TypeError("queue must be a Queue.Queue()")
if not isinstance(audio_file_path, unicode): if not isinstance(audio_file_path, unicode):
raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__ + " instead.") raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__ + " instead.")
if not isinstance(import_directory, unicode): if not isinstance(import_directory, unicode):

View file

@ -6,6 +6,7 @@ import select
import signal import signal
import logging import logging
import multiprocessing import multiprocessing
import Queue
from analyzer_pipeline import AnalyzerPipeline from analyzer_pipeline import AnalyzerPipeline
from status_reporter import StatusReporter from status_reporter import StatusReporter
from cloud_storage_uploader import CloudStorageUploader from cloud_storage_uploader import CloudStorageUploader
@ -212,6 +213,7 @@ class MessageListener:
@staticmethod @staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config): def spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
'''
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config)) args=(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config))
@ -223,7 +225,16 @@ class MessageListener:
logging.info(results) logging.info(results)
else: else:
raise Exception("Analyzer process terminated unexpectedly.") raise Exception("Analyzer process terminated unexpectedly.")
'''
q = Queue.Queue()
try:
AnalyzerPipeline.run_analysis(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config)
results = q.get()
except Exception as e:
logging.error("Analyzer pipeline exception", e)
pass
# Ensure our queue doesn't fill up and block due to unexpected behaviour. Defensive code. # Ensure our queue doesn't fill up and block due to unexpected behaviour. Defensive code.
while not q.empty(): while not q.empty():
q.get() q.get()