Merge branch 'cc-5709-airtime-analyzer' into cc-5709-airtime-analyzer-cloud-storage

Conflicts:
	python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py
This commit is contained in:
drigato 2014-10-22 17:16:46 -04:00
commit 70ff67374b
7 changed files with 66 additions and 44 deletions

View file

@ -41,16 +41,17 @@ class FileMoverAnalyzer(Analyzer):
#Import the file over to it's final location.
# TODO: Also, handle the case where the move fails and write some code
# to possibly move the file to problem_files.
max_dir_len = 32
max_file_len = 32
max_dir_len = 48
max_file_len = 48
final_file_path = import_directory
orig_file_basename, orig_file_extension = os.path.splitext(original_filename)
if metadata.has_key("artist_name"):
final_file_path += "/" + metadata["artist_name"][0:max_dir_len] # truncating with array slicing
if metadata.has_key("album_title"):
final_file_path += "/" + metadata["album_title"][0:max_dir_len]
final_file_path += "/" + original_filename[0:max_file_len]
final_file_path += "/" + metadata["album_title"][0:max_dir_len]
# Note that orig_file_extension includes the "." already
final_file_path += "/" + orig_file_basename[0:max_file_len] + orig_file_extension
#Ensure any redundant slashes are stripped
final_file_path = os.path.normpath(final_file_path)

View file

@ -128,8 +128,13 @@ class MessageListener:
def disconnect_from_messaging_server(self):
'''Stop consuming RabbitMQ messages and disconnect'''
self._channel.stop_consuming()
self._connection.close()
# If you try to close a connection that's already closed, you're going to have a bad time.
# We're breaking EAFP because this can be called multiple times depending on exception
# handling flow here.
if not self._channel.is_closed and not self._channel.is_closing:
self._channel.stop_consuming()
if not self._connection.is_closed and not self._connection.is_closing:
self._connection.close()
def graceful_shutdown(self, signum, frame):
'''Disconnect and break out of the message listening loop'''

View file

@ -38,9 +38,9 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
# retried later:
retry_queue = collections.deque()
shutdown = False
# Unpickle retry_queue from disk so that we won't have lost any uploads
# if airtime_analyzer is shut down while the web server is down or unreachable,
# Unpickle retry_queue from disk so that we won't have lost any uploads
# if airtime_analyzer is shut down while the web server is down or unreachable,
# and there were failed HTTP requests pending, waiting to be retried.
try:
with open(http_retry_queue_path, 'rb') as pickle_file:
@ -57,33 +57,42 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
logging.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path)
pass
while not shutdown:
while True:
try:
request = ipc_queue.get(block=True, timeout=5)
if isinstance(request, str) and request == "shutdown": # Bit of a cheat
shutdown = True
break
if not isinstance(request, PicklableHttpRequest):
raise TypeError("request must be a PicklableHttpRequest. Was of type " + type(request).__name__)
except Queue.Empty:
request = None
# If there's no new HTTP request we need to execute, let's check our "retry
# queue" and see if there's any failed HTTP requests we can retry:
if request:
send_http_request(request, retry_queue)
else:
# Using a for loop instead of while so we only iterate over all the requests once!
for i in range(len(retry_queue)):
request = retry_queue.popleft()
send_http_request(request, retry_queue)
while not shutdown:
try:
request = ipc_queue.get(block=True, timeout=5)
if isinstance(request, str) and request == "shutdown": # Bit of a cheat
shutdown = True
break
if not isinstance(request, PicklableHttpRequest):
raise TypeError("request must be a PicklableHttpRequest. Was of type " + type(request).__name__)
except Queue.Empty:
request = None
# If there's no new HTTP request we need to execute, let's check our "retry
# queue" and see if there's any failed HTTP requests we can retry:
if request:
send_http_request(request, retry_queue)
else:
# Using a for loop instead of while so we only iterate over all the requests once!
for i in range(len(retry_queue)):
request = retry_queue.popleft()
send_http_request(request, retry_queue)
logging.info("Shutting down status_reporter")
# Pickle retry_queue to disk so that we don't lose uploads if we're shut down while
# while the web server is down or unreachable.
with open(http_retry_queue_path, 'wb') as pickle_file:
pickle.dump(retry_queue, pickle_file)
except Exception as e: # Terrible top-level exception handler to prevent the thread from dying, just in case.
if shutdown:
return
logging.exception("Unhandled exception in StatusReporter")
logging.exception(e)
logging.info("Restarting StatusReporter thread")
time.sleep(2) # Throttle it
logging.info("Shutting down status_reporter")
# Pickle retry_queue to disk so that we don't lose uploads if we're shut down while
# while the web server is down or unreachable.
with open(http_retry_queue_path, 'wb') as pickle_file:
pickle.dump(retry_queue, pickle_file)
def send_http_request(picklable_request, retry_queue):
if not isinstance(picklable_request, PicklableHttpRequest):
@ -134,11 +143,11 @@ def is_web_server_broken(url):
test_req = requests.get(url)
test_req.raise_for_status()
except Exception as e:
return true
return True
else:
# The request worked fine, so the web server and Airtime are still up.
return false
return false
return False
return False
def alert_hung_request():

View file

@ -9,14 +9,16 @@ respawn
setuid www-data
setgid www-data
expect fork
#expect fork
env LANG='en_US.UTF-8'
env LC_ALL='en_US.UTF-8'
script
airtime_analyzer
end script
#script
# airtime_analyzer
#end script
exec airtime_analyzer