2021-06-03 15:20:39 +02:00
|
|
|
import collections
|
2014-03-06 04:43:47 +01:00
|
|
|
import json
|
2021-06-03 15:20:39 +02:00
|
|
|
import pickle
|
2020-01-16 15:32:51 +01:00
|
|
|
import queue
|
2021-06-03 15:20:39 +02:00
|
|
|
import threading
|
2014-05-27 00:59:28 +02:00
|
|
|
import time
|
|
|
|
import traceback
|
2020-01-16 15:32:51 +01:00
|
|
|
from urllib.parse import urlparse
|
2014-04-16 17:10:06 +02:00
|
|
|
|
2021-06-03 15:20:39 +02:00
|
|
|
import requests
|
2022-01-17 09:26:30 +01:00
|
|
|
from loguru import logger
|
2021-06-03 15:20:39 +02:00
|
|
|
|
2021-05-27 16:23:02 +02:00
|
|
|
|
2014-04-16 17:10:06 +02:00
|
|
|
class PicklableHttpRequest:
|
|
|
|
def __init__(self, method, url, data, api_key):
|
|
|
|
self.method = method
|
|
|
|
self.url = url
|
|
|
|
self.data = data
|
|
|
|
self.api_key = api_key
|
|
|
|
|
|
|
|
def create_request(self):
|
2021-05-27 16:23:02 +02:00
|
|
|
return requests.Request(
|
|
|
|
method=self.method,
|
|
|
|
url=self.url,
|
|
|
|
data=self.data,
|
|
|
|
auth=requests.auth.HTTPBasicAuth(self.api_key, ""),
|
|
|
|
)
|
|
|
|
|
2014-04-16 17:10:06 +02:00
|
|
|
|
|
|
|
def process_http_requests(ipc_queue, http_retry_queue_path):
|
2021-05-27 16:23:02 +02:00
|
|
|
"""Runs in a separate thread and performs all the HTTP requests where we're
|
|
|
|
reporting extracted audio file metadata or errors back to the Airtime web application.
|
2014-04-16 17:10:06 +02:00
|
|
|
|
2021-05-27 16:23:02 +02:00
|
|
|
This process also checks every 5 seconds if there's failed HTTP requests that we
|
|
|
|
need to retry. We retry failed HTTP requests so that we don't lose uploads if the
|
|
|
|
web server is temporarily down.
|
2014-04-16 17:10:06 +02:00
|
|
|
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|
2014-04-16 17:10:06 +02:00
|
|
|
|
|
|
|
# Store any failed requests (eg. due to web server errors or downtime) to be
|
|
|
|
# retried later:
|
|
|
|
retry_queue = collections.deque()
|
|
|
|
shutdown = False
|
2014-10-22 17:39:22 +02:00
|
|
|
|
|
|
|
# Unpickle retry_queue from disk so that we won't have lost any uploads
|
|
|
|
# if airtime_analyzer is shut down while the web server is down or unreachable,
|
2014-04-16 17:10:06 +02:00
|
|
|
# and there were failed HTTP requests pending, waiting to be retried.
|
|
|
|
try:
|
2021-05-27 16:23:02 +02:00
|
|
|
with open(http_retry_queue_path, "rb") as pickle_file:
|
2014-04-16 17:10:06 +02:00
|
|
|
retry_queue = pickle.load(pickle_file)
|
2022-01-25 23:45:00 +01:00
|
|
|
except OSError as e:
|
2014-04-16 17:10:06 +02:00
|
|
|
if e.errno == 2:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
raise e
|
2014-04-23 23:42:09 +02:00
|
|
|
except Exception as e:
|
|
|
|
# If we fail to unpickle a saved queue of failed HTTP requests, then we'll just log an error
|
|
|
|
# and continue because those HTTP requests are lost anyways. The pickled file will be
|
|
|
|
# overwritten the next time the analyzer is shut down too.
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path)
|
2014-04-23 23:42:09 +02:00
|
|
|
pass
|
|
|
|
|
2014-10-22 17:39:22 +02:00
|
|
|
while True:
|
2014-04-16 17:10:06 +02:00
|
|
|
try:
|
2014-10-22 17:39:22 +02:00
|
|
|
while not shutdown:
|
|
|
|
try:
|
|
|
|
request = ipc_queue.get(block=True, timeout=5)
|
2021-05-27 16:23:02 +02:00
|
|
|
if (
|
|
|
|
isinstance(request, str) and request == "shutdown"
|
|
|
|
): # Bit of a cheat
|
2014-10-22 17:39:22 +02:00
|
|
|
shutdown = True
|
|
|
|
break
|
|
|
|
if not isinstance(request, PicklableHttpRequest):
|
2021-05-27 16:23:02 +02:00
|
|
|
raise TypeError(
|
|
|
|
"request must be a PicklableHttpRequest. Was of type "
|
|
|
|
+ type(request).__name__
|
|
|
|
)
|
2020-01-16 15:32:51 +01:00
|
|
|
except queue.Empty:
|
2014-10-22 17:39:22 +02:00
|
|
|
request = None
|
|
|
|
|
|
|
|
# If there's no new HTTP request we need to execute, let's check our "retry
|
|
|
|
# queue" and see if there's any failed HTTP requests we can retry:
|
|
|
|
if request:
|
|
|
|
send_http_request(request, retry_queue)
|
|
|
|
else:
|
|
|
|
# Using a for loop instead of while so we only iterate over all the requests once!
|
|
|
|
for i in range(len(retry_queue)):
|
|
|
|
request = retry_queue.popleft()
|
|
|
|
send_http_request(request, retry_queue)
|
|
|
|
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.info("Shutting down status_reporter")
|
2014-10-22 17:39:22 +02:00
|
|
|
# Pickle retry_queue to disk so that we don't lose uploads if we're shut down while
|
|
|
|
# while the web server is down or unreachable.
|
2021-05-27 16:23:02 +02:00
|
|
|
with open(http_retry_queue_path, "wb") as pickle_file:
|
2014-10-22 17:39:22 +02:00
|
|
|
pickle.dump(retry_queue, pickle_file)
|
2014-11-12 23:01:59 +01:00
|
|
|
return
|
2021-05-27 16:23:02 +02:00
|
|
|
except Exception as e: # Terrible top-level exception handler to prevent the thread from dying, just in case.
|
2014-10-22 17:39:22 +02:00
|
|
|
if shutdown:
|
|
|
|
return
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.exception("Unhandled exception in StatusReporter")
|
|
|
|
logger.exception(e)
|
|
|
|
logger.info("Restarting StatusReporter thread")
|
2021-05-27 16:23:02 +02:00
|
|
|
time.sleep(2) # Throttle it
|
2014-04-16 17:10:06 +02:00
|
|
|
|
|
|
|
|
|
|
|
def send_http_request(picklable_request, retry_queue):
|
|
|
|
if not isinstance(picklable_request, PicklableHttpRequest):
|
2021-05-27 16:23:02 +02:00
|
|
|
raise TypeError(
|
|
|
|
"picklable_request must be a PicklableHttpRequest. Was of type "
|
|
|
|
+ type(picklable_request).__name__
|
|
|
|
)
|
|
|
|
try:
|
2014-05-27 00:59:28 +02:00
|
|
|
bare_request = picklable_request.create_request()
|
2014-04-16 17:10:06 +02:00
|
|
|
s = requests.Session()
|
2014-05-27 00:59:28 +02:00
|
|
|
prepared_request = s.prepare_request(bare_request)
|
2022-02-14 09:37:14 +01:00
|
|
|
r = s.send(prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
|
2021-05-27 16:23:02 +02:00
|
|
|
r.raise_for_status() # Raise an exception if there was an http error code returned
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.info("HTTP request sent successfully.")
|
2014-06-04 19:46:25 +02:00
|
|
|
except requests.exceptions.HTTPError as e:
|
|
|
|
if e.response.status_code == 422:
|
2014-05-08 19:01:49 +02:00
|
|
|
# Do no retry the request if there was a metadata validation error
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.error(
|
2021-05-27 16:23:02 +02:00
|
|
|
"HTTP request failed due to an HTTP exception. Exception was: %s"
|
|
|
|
% str(e)
|
|
|
|
)
|
2014-06-04 19:46:25 +02:00
|
|
|
else:
|
|
|
|
# The request failed with an error 500 probably, so let's check if Airtime and/or
|
|
|
|
# the web server are broken. If not, then our request was probably causing an
|
|
|
|
# error 500 in the media API (ie. a bug), so there's no point in retrying it.
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.error("HTTP request failed. Exception was: %s" % str(e))
|
2014-06-04 19:46:25 +02:00
|
|
|
parsed_url = urlparse(e.response.request.url)
|
|
|
|
if is_web_server_broken(parsed_url.scheme + "://" + parsed_url.netloc):
|
|
|
|
# If the web server is having problems, retry the request later:
|
|
|
|
retry_queue.append(picklable_request)
|
|
|
|
# Otherwise, if the request was bad, the request is never retried.
|
|
|
|
# You will have to find these bad requests in logs or you'll be
|
|
|
|
# notified by sentry.
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.error(
|
2021-05-27 16:23:02 +02:00
|
|
|
"HTTP request failed due to a connection error. Retrying later. %s" % str(e)
|
|
|
|
)
|
|
|
|
retry_queue.append(picklable_request) # Retry it later
|
2014-04-16 17:10:06 +02:00
|
|
|
except Exception as e:
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.error("HTTP request failed with unhandled exception. %s" % str(e))
|
|
|
|
logger.error(traceback.format_exc())
|
2014-04-16 17:10:06 +02:00
|
|
|
# Don't put the request into the retry queue, just give up on this one.
|
|
|
|
# I'm doing this to protect against us getting some pathological request
|
2014-04-24 00:20:50 +02:00
|
|
|
# that breaks our code. I don't want us pickling data that potentially
|
|
|
|
# breaks airtime_analyzer.
|
2014-04-16 17:10:06 +02:00
|
|
|
|
2021-05-27 16:23:02 +02:00
|
|
|
|
2014-06-04 19:46:25 +02:00
|
|
|
def is_web_server_broken(url):
|
2021-05-27 16:23:02 +02:00
|
|
|
"""Do a naive test to check if the web server we're trying to access is down.
|
|
|
|
We use this to try to differentiate between error 500s that are coming
|
|
|
|
from (for example) a bug in the Airtime Media REST API and error 500s
|
|
|
|
caused by Airtime or the webserver itself being broken temporarily.
|
|
|
|
"""
|
2014-06-04 19:46:25 +02:00
|
|
|
try:
|
2022-02-14 09:37:14 +01:00
|
|
|
test_req = requests.get(url)
|
2014-06-04 19:46:25 +02:00
|
|
|
test_req.raise_for_status()
|
|
|
|
except Exception as e:
|
2014-10-22 01:23:48 +02:00
|
|
|
return True
|
2014-06-04 19:46:25 +02:00
|
|
|
else:
|
|
|
|
# The request worked fine, so the web server and Airtime are still up.
|
2021-05-27 16:23:02 +02:00
|
|
|
return False
|
2014-10-22 01:23:48 +02:00
|
|
|
return False
|
2014-06-04 19:46:25 +02:00
|
|
|
|
|
|
|
|
2021-05-27 16:23:02 +02:00
|
|
|
class StatusReporter:
|
|
|
|
"""Reports the extracted audio file metadata and job status back to the
|
|
|
|
Airtime web application.
|
|
|
|
"""
|
|
|
|
|
2014-03-06 04:43:47 +01:00
|
|
|
_HTTP_REQUEST_TIMEOUT = 30
|
2021-05-27 16:23:02 +02:00
|
|
|
|
|
|
|
""" We use multiprocessing.Process again here because we need a thread for this stuff
|
2014-04-16 17:10:06 +02:00
|
|
|
anyways, and Python gives us process isolation for free (crash safety).
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|
2020-01-16 15:32:51 +01:00
|
|
|
_ipc_queue = queue.Queue()
|
2021-05-27 16:23:02 +02:00
|
|
|
# _http_thread = multiprocessing.Process(target=process_http_requests,
|
2014-04-16 17:10:06 +02:00
|
|
|
# args=(_ipc_queue,))
|
2015-05-19 18:02:19 +02:00
|
|
|
_http_thread = None
|
2014-04-16 17:10:06 +02:00
|
|
|
|
|
|
|
@classmethod
|
2014-05-27 00:59:28 +02:00
|
|
|
def start_thread(self, http_retry_queue_path):
|
2021-05-27 16:23:02 +02:00
|
|
|
StatusReporter._http_thread = threading.Thread(
|
|
|
|
target=process_http_requests,
|
|
|
|
args=(StatusReporter._ipc_queue, http_retry_queue_path),
|
|
|
|
)
|
2015-05-19 18:02:19 +02:00
|
|
|
StatusReporter._http_thread.start()
|
2014-04-16 17:10:06 +02:00
|
|
|
|
|
|
|
@classmethod
|
2014-05-27 00:59:28 +02:00
|
|
|
def stop_thread(self):
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.info("Terminating status_reporter process")
|
2021-05-27 16:23:02 +02:00
|
|
|
# StatusReporter._http_thread.terminate() # Triggers SIGTERM on the child process
|
|
|
|
StatusReporter._ipc_queue.put("shutdown") # Special trigger
|
2015-05-19 18:02:19 +02:00
|
|
|
StatusReporter._http_thread.join()
|
2014-04-16 17:10:06 +02:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _send_http_request(self, request):
|
|
|
|
StatusReporter._ipc_queue.put(request)
|
2014-03-06 04:43:47 +01:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def report_success_to_callback_url(self, callback_url, api_key, audio_metadata):
|
2021-05-27 16:23:02 +02:00
|
|
|
"""Report the extracted metadata and status of the successfully imported file
|
|
|
|
to the callback URL (which should be the Airtime File Upload API)
|
|
|
|
"""
|
2014-04-16 17:10:06 +02:00
|
|
|
put_payload = json.dumps(audio_metadata)
|
2021-05-27 16:23:02 +02:00
|
|
|
# r = requests.Request(method='PUT', url=callback_url, data=put_payload,
|
2014-04-16 17:10:06 +02:00
|
|
|
# auth=requests.auth.HTTPBasicAuth(api_key, ''))
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|
2021-05-27 16:33:20 +02:00
|
|
|
r = requests.Request(method='PUT', url=callback_url, data=put_payload,
|
2014-04-16 17:10:06 +02:00
|
|
|
auth=requests.auth.HTTPBasicAuth(api_key, ''))
|
|
|
|
|
|
|
|
StatusReporter._send_http_request(r)
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|
2014-04-16 17:10:06 +02:00
|
|
|
|
2021-05-27 16:23:02 +02:00
|
|
|
StatusReporter._send_http_request(
|
|
|
|
PicklableHttpRequest(
|
|
|
|
method="PUT", url=callback_url, data=put_payload, api_key=api_key
|
|
|
|
)
|
|
|
|
)
|
2014-04-16 17:10:06 +02:00
|
|
|
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|
2014-04-16 17:10:06 +02:00
|
|
|
try:
|
|
|
|
r.raise_for_status() # Raise an exception if there was an http error code returned
|
|
|
|
except requests.exceptions.RequestException:
|
|
|
|
StatusReporter._ipc_queue.put(r.prepare())
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|
2014-04-16 17:10:06 +02:00
|
|
|
|
2021-05-27 16:33:20 +02:00
|
|
|
"""
|
2014-04-03 22:13:26 +02:00
|
|
|
# Encode the audio metadata as json and post it back to the callback_url
|
2014-03-06 22:55:20 +01:00
|
|
|
put_payload = json.dumps(audio_metadata)
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.debug("sending http put with payload: " + put_payload)
|
2021-05-27 16:33:20 +02:00
|
|
|
r = requests.put(callback_url, data=put_payload,
|
2014-03-24 21:05:53 +01:00
|
|
|
auth=requests.auth.HTTPBasicAuth(api_key, ''),
|
|
|
|
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.debug("HTTP request returned status: " + str(r.status_code))
|
|
|
|
logger.debug(r.text) # log the response body
|
2014-03-06 04:43:47 +01:00
|
|
|
|
2014-04-03 22:13:26 +02:00
|
|
|
#TODO: queue up failed requests and try them again later.
|
|
|
|
r.raise_for_status() # Raise an exception if there was an http error code returned
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|
2014-03-06 04:43:47 +01:00
|
|
|
|
|
|
|
@classmethod
|
2021-05-27 16:23:02 +02:00
|
|
|
def report_failure_to_callback_url(
|
|
|
|
self, callback_url, api_key, import_status, reason
|
|
|
|
):
|
|
|
|
if not isinstance(import_status, int):
|
|
|
|
raise TypeError(
|
|
|
|
"import_status must be an integer. Was of type "
|
|
|
|
+ type(import_status).__name__
|
|
|
|
)
|
2014-04-03 22:13:26 +02:00
|
|
|
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.debug("Reporting import failure to Airtime REST API...")
|
2014-03-24 21:05:53 +01:00
|
|
|
audio_metadata = dict()
|
2014-03-22 07:12:03 +01:00
|
|
|
audio_metadata["import_status"] = import_status
|
|
|
|
audio_metadata["comment"] = reason # hack attack
|
|
|
|
put_payload = json.dumps(audio_metadata)
|
2022-01-17 09:26:30 +01:00
|
|
|
# logger.debug("sending http put with payload: " + put_payload)
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|
2021-05-27 16:33:20 +02:00
|
|
|
r = requests.put(callback_url, data=put_payload,
|
2014-03-24 21:05:53 +01:00
|
|
|
auth=requests.auth.HTTPBasicAuth(api_key, ''),
|
|
|
|
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|
|
|
|
StatusReporter._send_http_request(
|
|
|
|
PicklableHttpRequest(
|
|
|
|
method="PUT", url=callback_url, data=put_payload, api_key=api_key
|
|
|
|
)
|
|
|
|
)
|
|
|
|
"""
|
2022-01-17 09:26:30 +01:00
|
|
|
logger.debug("HTTP request returned status: " + str(r.status_code))
|
|
|
|
logger.debug(r.text) # log the response body
|
2014-03-22 07:12:03 +01:00
|
|
|
|
2014-03-24 21:05:53 +01:00
|
|
|
#TODO: queue up failed requests and try them again later.
|
2014-03-22 07:12:03 +01:00
|
|
|
r.raise_for_status() # raise an exception if there was an http error code returned
|
2021-05-27 16:23:02 +02:00
|
|
|
"""
|