refactor(analyzer): fix linting errors (#2029)
This commit is contained in:
parent
b465629977
commit
1b93b7645e
7 changed files with 51 additions and 74 deletions
|
@ -4,11 +4,11 @@ import pickle
|
|||
import queue
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
from loguru import logger
|
||||
from requests.exceptions import HTTPError
|
||||
|
||||
|
||||
class PicklableHttpRequest:
|
||||
|
@ -48,17 +48,14 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
|
|||
try:
|
||||
with open(http_retry_queue_path, "rb") as pickle_file:
|
||||
retry_queue = pickle.load(pickle_file)
|
||||
except OSError as e:
|
||||
if e.errno == 2:
|
||||
pass
|
||||
else:
|
||||
raise e
|
||||
except Exception as e:
|
||||
except OSError as exception:
|
||||
if exception.errno != 2:
|
||||
raise exception
|
||||
except Exception:
|
||||
# If we fail to unpickle a saved queue of failed HTTP requests, then we'll just log an error
|
||||
# and continue because those HTTP requests are lost anyways. The pickled file will be
|
||||
# overwritten the next time the analyzer is shut down too.
|
||||
logger.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path)
|
||||
pass
|
||||
logger.error(f"Failed to unpickle {http_retry_queue_path}. Continuing...")
|
||||
|
||||
while True:
|
||||
try:
|
||||
|
@ -70,11 +67,6 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
|
|||
): # Bit of a cheat
|
||||
shutdown = True
|
||||
break
|
||||
if not isinstance(request, PicklableHttpRequest):
|
||||
raise TypeError(
|
||||
"request must be a PicklableHttpRequest. Was of type "
|
||||
+ type(request).__name__
|
||||
)
|
||||
except queue.Empty:
|
||||
request = None
|
||||
|
||||
|
@ -84,7 +76,7 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
|
|||
send_http_request(request, retry_queue)
|
||||
else:
|
||||
# Using a for loop instead of while so we only iterate over all the requests once!
|
||||
for i in range(len(retry_queue)):
|
||||
for _ in range(len(retry_queue)):
|
||||
request = retry_queue.popleft()
|
||||
send_http_request(request, retry_queue)
|
||||
|
||||
|
@ -94,55 +86,49 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
|
|||
with open(http_retry_queue_path, "wb") as pickle_file:
|
||||
pickle.dump(retry_queue, pickle_file)
|
||||
return
|
||||
except Exception as e: # Terrible top-level exception handler to prevent the thread from dying, just in case.
|
||||
except Exception as exception: # Terrible top-level exception handler to prevent the thread from dying, just in case.
|
||||
if shutdown:
|
||||
return
|
||||
logger.exception("Unhandled exception in StatusReporter")
|
||||
logger.exception(e)
|
||||
logger.exception(f"Unhandled exception in StatusReporter {exception}")
|
||||
logger.info("Restarting StatusReporter thread")
|
||||
time.sleep(2) # Throttle it
|
||||
|
||||
|
||||
def send_http_request(picklable_request, retry_queue):
|
||||
if not isinstance(picklable_request, PicklableHttpRequest):
|
||||
raise TypeError(
|
||||
"picklable_request must be a PicklableHttpRequest. Was of type "
|
||||
+ type(picklable_request).__name__
|
||||
)
|
||||
def send_http_request(picklable_request: PicklableHttpRequest, retry_queue):
|
||||
try:
|
||||
bare_request = picklable_request.create_request()
|
||||
s = requests.Session()
|
||||
prepared_request = s.prepare_request(bare_request)
|
||||
r = s.send(prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
|
||||
r.raise_for_status() # Raise an exception if there was an http error code returned
|
||||
session = requests.Session()
|
||||
prepared_request = session.prepare_request(bare_request)
|
||||
resp = session.send(
|
||||
prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT
|
||||
)
|
||||
resp.raise_for_status() # Raise an exception if there was an http error code returned
|
||||
logger.info("HTTP request sent successfully.")
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code == 422:
|
||||
except requests.exceptions.HTTPError as exception:
|
||||
if exception.response.status_code == 422:
|
||||
# Do no retry the request if there was a metadata validation error
|
||||
logger.error(
|
||||
"HTTP request failed due to an HTTP exception. Exception was: %s"
|
||||
% str(e)
|
||||
logger.exception(
|
||||
f"HTTP request failed due to an HTTP exception: {exception}"
|
||||
)
|
||||
else:
|
||||
# The request failed with an error 500 probably, so let's check if Airtime and/or
|
||||
# the web server are broken. If not, then our request was probably causing an
|
||||
# error 500 in the media API (ie. a bug), so there's no point in retrying it.
|
||||
logger.error("HTTP request failed. Exception was: %s" % str(e))
|
||||
parsed_url = urlparse(e.response.request.url)
|
||||
logger.exception(f"HTTP request failed: {exception}")
|
||||
parsed_url = urlparse(exception.response.request.url)
|
||||
if is_web_server_broken(parsed_url.scheme + "://" + parsed_url.netloc):
|
||||
# If the web server is having problems, retry the request later:
|
||||
retry_queue.append(picklable_request)
|
||||
# Otherwise, if the request was bad, the request is never retried.
|
||||
# You will have to find these bad requests in logs or you'll be
|
||||
# notified by sentry.
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
logger.error(
|
||||
"HTTP request failed due to a connection error. Retrying later. %s" % str(e)
|
||||
except requests.exceptions.ConnectionError as exception:
|
||||
logger.exception(
|
||||
f"HTTP request failed due to a connection error. Retrying later. {exception}"
|
||||
)
|
||||
retry_queue.append(picklable_request) # Retry it later
|
||||
except Exception as e:
|
||||
logger.error("HTTP request failed with unhandled exception. %s" % str(e))
|
||||
logger.error(traceback.format_exc())
|
||||
except Exception as exception:
|
||||
logger.exception(f"HTTP request failed with unhandled exception. {exception}")
|
||||
# Don't put the request into the retry queue, just give up on this one.
|
||||
# I'm doing this to protect against us getting some pathological request
|
||||
# that breaks our code. I don't want us pickling data that potentially
|
||||
|
@ -158,11 +144,9 @@ def is_web_server_broken(url):
|
|||
try:
|
||||
test_req = requests.get(url)
|
||||
test_req.raise_for_status()
|
||||
except Exception as e:
|
||||
except HTTPError:
|
||||
return True
|
||||
else:
|
||||
# The request worked fine, so the web server and Airtime are still up.
|
||||
return False
|
||||
return False
|
||||
|
||||
|
||||
class StatusReporter:
|
||||
|
@ -176,7 +160,7 @@ class StatusReporter:
|
|||
_http_thread = None
|
||||
|
||||
@classmethod
|
||||
def start_thread(self, http_retry_queue_path):
|
||||
def start_thread(cls, http_retry_queue_path):
|
||||
StatusReporter._http_thread = threading.Thread(
|
||||
target=process_http_requests,
|
||||
args=(StatusReporter._ipc_queue, http_retry_queue_path),
|
||||
|
@ -184,18 +168,18 @@ class StatusReporter:
|
|||
StatusReporter._http_thread.start()
|
||||
|
||||
@classmethod
|
||||
def stop_thread(self):
|
||||
def stop_thread(cls):
|
||||
logger.info("Terminating status_reporter process")
|
||||
StatusReporter._ipc_queue.put("shutdown")
|
||||
StatusReporter._http_thread.join()
|
||||
|
||||
@classmethod
|
||||
def _send_http_request(self, request):
|
||||
def _send_http_request(cls, request):
|
||||
StatusReporter._ipc_queue.put(request)
|
||||
|
||||
@classmethod
|
||||
def report_success(
|
||||
self,
|
||||
cls,
|
||||
callback_url: str,
|
||||
callback_api_key: str,
|
||||
metadata: dict,
|
||||
|
@ -215,18 +199,12 @@ class StatusReporter:
|
|||
|
||||
@classmethod
|
||||
def report_failure(
|
||||
self,
|
||||
cls,
|
||||
callback_url,
|
||||
callback_api_key,
|
||||
import_status,
|
||||
import_status: int,
|
||||
reason,
|
||||
):
|
||||
if not isinstance(import_status, int):
|
||||
raise TypeError(
|
||||
"import_status must be an integer. Was of type "
|
||||
+ type(import_status).__name__
|
||||
)
|
||||
|
||||
logger.debug("Reporting import failure to Airtime REST API...")
|
||||
audio_metadata = {}
|
||||
audio_metadata["import_status"] = import_status
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue