Merge branch 'cc-5709-airtime-analyzer' into saas
This commit is contained in:
commit
363d022b0c
|
@ -120,8 +120,13 @@ class MessageListener:
|
||||||
|
|
||||||
def disconnect_from_messaging_server(self):
|
def disconnect_from_messaging_server(self):
|
||||||
'''Stop consuming RabbitMQ messages and disconnect'''
|
'''Stop consuming RabbitMQ messages and disconnect'''
|
||||||
self._channel.stop_consuming()
|
# If you try to close a connection that's already closed, you're going to have a bad time.
|
||||||
self._connection.close()
|
# We're breaking EAFP because this can be called multiple times depending on exception
|
||||||
|
# handling flow here.
|
||||||
|
if not self._channel.is_closed and not self._channel.is_closing:
|
||||||
|
self._channel.stop_consuming()
|
||||||
|
if not self._connection.is_closed and not self._connection.is_closing:
|
||||||
|
self._connection.close()
|
||||||
|
|
||||||
def graceful_shutdown(self, signum, frame):
|
def graceful_shutdown(self, signum, frame):
|
||||||
'''Disconnect and break out of the message listening loop'''
|
'''Disconnect and break out of the message listening loop'''
|
||||||
|
|
|
@ -38,9 +38,9 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
|
||||||
# retried later:
|
# retried later:
|
||||||
retry_queue = collections.deque()
|
retry_queue = collections.deque()
|
||||||
shutdown = False
|
shutdown = False
|
||||||
|
|
||||||
# Unpickle retry_queue from disk so that we won't have lost any uploads
|
# Unpickle retry_queue from disk so that we won't have lost any uploads
|
||||||
# if airtime_analyzer is shut down while the web server is down or unreachable,
|
# if airtime_analyzer is shut down while the web server is down or unreachable,
|
||||||
# and there were failed HTTP requests pending, waiting to be retried.
|
# and there were failed HTTP requests pending, waiting to be retried.
|
||||||
try:
|
try:
|
||||||
with open(http_retry_queue_path, 'rb') as pickle_file:
|
with open(http_retry_queue_path, 'rb') as pickle_file:
|
||||||
|
@ -57,33 +57,42 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
|
||||||
logging.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path)
|
logging.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path)
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
while True:
|
||||||
while not shutdown:
|
|
||||||
try:
|
try:
|
||||||
request = ipc_queue.get(block=True, timeout=5)
|
while not shutdown:
|
||||||
if isinstance(request, str) and request == "shutdown": # Bit of a cheat
|
try:
|
||||||
shutdown = True
|
request = ipc_queue.get(block=True, timeout=5)
|
||||||
break
|
if isinstance(request, str) and request == "shutdown": # Bit of a cheat
|
||||||
if not isinstance(request, PicklableHttpRequest):
|
shutdown = True
|
||||||
raise TypeError("request must be a PicklableHttpRequest. Was of type " + type(request).__name__)
|
break
|
||||||
except Queue.Empty:
|
if not isinstance(request, PicklableHttpRequest):
|
||||||
request = None
|
raise TypeError("request must be a PicklableHttpRequest. Was of type " + type(request).__name__)
|
||||||
|
except Queue.Empty:
|
||||||
# If there's no new HTTP request we need to execute, let's check our "retry
|
request = None
|
||||||
# queue" and see if there's any failed HTTP requests we can retry:
|
|
||||||
if request:
|
# If there's no new HTTP request we need to execute, let's check our "retry
|
||||||
send_http_request(request, retry_queue)
|
# queue" and see if there's any failed HTTP requests we can retry:
|
||||||
else:
|
if request:
|
||||||
# Using a for loop instead of while so we only iterate over all the requests once!
|
send_http_request(request, retry_queue)
|
||||||
for i in range(len(retry_queue)):
|
else:
|
||||||
request = retry_queue.popleft()
|
# Using a for loop instead of while so we only iterate over all the requests once!
|
||||||
send_http_request(request, retry_queue)
|
for i in range(len(retry_queue)):
|
||||||
|
request = retry_queue.popleft()
|
||||||
|
send_http_request(request, retry_queue)
|
||||||
|
|
||||||
|
logging.info("Shutting down status_reporter")
|
||||||
|
# Pickle retry_queue to disk so that we don't lose uploads if we're shut down while
|
||||||
|
# while the web server is down or unreachable.
|
||||||
|
with open(http_retry_queue_path, 'wb') as pickle_file:
|
||||||
|
pickle.dump(retry_queue, pickle_file)
|
||||||
|
except Exception as e: # Terrible top-level exception handler to prevent the thread from dying, just in case.
|
||||||
|
if shutdown:
|
||||||
|
return
|
||||||
|
logging.exception("Unhandled exception in StatusReporter")
|
||||||
|
logging.exception(e)
|
||||||
|
logging.info("Restarting StatusReporter thread")
|
||||||
|
time.sleep(2) # Throttle it
|
||||||
|
|
||||||
logging.info("Shutting down status_reporter")
|
|
||||||
# Pickle retry_queue to disk so that we don't lose uploads if we're shut down while
|
|
||||||
# while the web server is down or unreachable.
|
|
||||||
with open(http_retry_queue_path, 'wb') as pickle_file:
|
|
||||||
pickle.dump(retry_queue, pickle_file)
|
|
||||||
|
|
||||||
def send_http_request(picklable_request, retry_queue):
|
def send_http_request(picklable_request, retry_queue):
|
||||||
if not isinstance(picklable_request, PicklableHttpRequest):
|
if not isinstance(picklable_request, PicklableHttpRequest):
|
||||||
|
|
Loading…
Reference in New Issue