feat(api-client): rewrite api-client v1 using abstract client

This commit is contained in:
jo 2023-03-21 14:15:32 +01:00 committed by Kyle Robbertze
parent 98aaa4214a
commit 6ab407a23a
4 changed files with 173 additions and 386 deletions

View File

@ -1,167 +0,0 @@
import logging
from time import sleep
import requests
from requests.auth import AuthBase
class UrlParamDict(dict):
def __missing__(self, key):
return "{" + key + "}"
class UrlException(Exception):
pass
class IncompleteUrl(UrlException):
def __init__(self, url):
super().__init__()
self.url = url
def __str__(self):
return f"Incomplete url: '{self.url}'"
class UrlBadParam(UrlException):
def __init__(self, url, param):
super().__init__()
self.url = url
self.param = param
def __str__(self):
return f"Bad param '{self.param}' passed into url: '{self.url}'"
# pylint: disable=too-few-public-methods
class KeyAuth(AuthBase):
def __init__(self, key):
self.key = key
def __call__(self, r):
r.headers["Authorization"] = f"Api-Key {self.key}"
return r
class ApcUrl:
"""A safe abstraction and testable for filling in parameters in
api_client.cfg"""
def __init__(self, base_url):
self.base_url = base_url
def params(self, **params):
temp_url = self.base_url
for k in params:
wrapped_param = "{" + k + "}"
if wrapped_param not in temp_url:
raise UrlBadParam(self.base_url, k)
temp_url = temp_url.format_map(UrlParamDict(**params))
return ApcUrl(temp_url)
def url(self):
if "{" in self.base_url:
raise IncompleteUrl(self.base_url)
return self.base_url
class ApiRequest:
API_HTTP_REQUEST_TIMEOUT = 30 # 30 second HTTP request timeout
def __init__(self, name, url, logger=None, api_key=None):
self.name = name
self.url = url
self.__req = None
if logger is None:
self.logger = logging
else:
self.logger = logger
self.auth = KeyAuth(api_key)
def __call__(self, *, _post_data=None, _put_data=None, params=None, **kwargs):
final_url = self.url.params(**kwargs).url()
self.logger.debug(final_url)
try:
if _post_data is not None:
res = requests.post(
final_url,
data=_post_data,
auth=self.auth,
timeout=ApiRequest.API_HTTP_REQUEST_TIMEOUT,
)
elif _put_data is not None:
res = requests.put(
final_url,
data=_put_data,
auth=self.auth,
timeout=ApiRequest.API_HTTP_REQUEST_TIMEOUT,
)
else:
res = requests.get(
final_url,
params=params,
auth=self.auth,
timeout=ApiRequest.API_HTTP_REQUEST_TIMEOUT,
)
# Check for bad HTTP status code
res.raise_for_status()
if "application/json" in res.headers["content-type"]:
return res.json()
return res
except requests.exceptions.Timeout:
self.logger.error("HTTP request to %s timed out", final_url)
raise
except requests.exceptions.HTTPError:
self.logger.error(
f"{res.request.method} {res.request.url} request failed '{res.status_code}':"
f"\nPayload: {res.request.body}"
f"\nResponse: {res.text}"
)
raise
def req(self, *args, **kwargs):
self.__req = lambda: self(*args, **kwargs)
return self
def retry(self, count, delay=5):
"""Try to send request n times. If after n times it fails then
we finally raise exception"""
for _ in range(0, count - 1):
try:
return self.__req()
except requests.exceptions.RequestException:
sleep(delay)
return self.__req()
class RequestProvider:
"""
Creates the available ApiRequest instance
"""
def __init__(self, base_url: str, api_key: str, endpoints: dict):
self.requests = {}
self.url = ApcUrl(base_url + "/{action}")
# Now we must discover the possible actions
for action_name, action_value in endpoints.items():
new_url = self.url.params(action=action_value)
if "{api_key}" in action_value:
new_url = new_url.params(api_key=api_key)
self.requests[action_name] = ApiRequest(
action_name, new_url, api_key=api_key
)
def available_requests(self):
return list(self.requests.keys())
def __contains__(self, request):
return request in self.requests
def __getattr__(self, attr):
if attr in self:
return self.requests[attr]
return super().__getattribute__(attr)

View File

@ -1,108 +1,150 @@
import json import json
import logging import logging
import urllib.parse from functools import wraps
from time import sleep
from libretime_shared.config import BaseConfig, GeneralConfig from requests.exceptions import RequestException
from ._utils import RequestProvider from ._client import AbstractApiClient, Response
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Config(BaseConfig): def retry_decorator(max_retries: int = 5):
general: GeneralConfig def retry_request(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = max_retries
while True:
try:
return func(*args, **kwargs)
except RequestException as exception:
logger.warning(exception)
retries -= 1
if retries <= 0:
break
sleep(2.0)
return None
return wrapper
return retry_request
AIRTIME_API_VERSION = "1.1" class BaseApiClient(AbstractApiClient):
def __init__(self, base_url: str, api_key: str):
super().__init__(base_url=base_url)
self.session.headers.update({"Authorization": f"Api-Key {api_key}"})
self.session.params.update({"format": "json"}) # type: ignore[union-attr]
def version(self, **kwargs) -> Response:
return self._request(
"GET",
"/api/version",
**kwargs,
)
api_endpoints = {} def register_component(self, component: str, **kwargs) -> Response:
return self._request(
"GET",
"/api/register-component",
params={"component": component},
**kwargs,
)
# URL to get the version number of the server API def notify_media_item_start_play(self, media_id, **kwargs) -> Response:
api_endpoints["version_url"] = "version/api_key/{api_key}" return self._request(
# URL to register a components IP Address with the central web server "GET",
api_endpoints[ "/api/notify-media-item-start-play",
"register_component" params={"media_id": media_id},
] = "register-component/format/json/api_key/{api_key}/component/{component}" **kwargs,
)
# pypo def update_liquidsoap_status(self, msg, stream_id, boot_time, **kwargs) -> Response:
api_endpoints[ return self._request(
"update_start_playing_url" "POST",
] = "notify-media-item-start-play/api_key/{api_key}/media_id/{media_id}/" "/api/update-liquidsoap-status",
api_endpoints[ params={"stream_id": stream_id, "boot_time": boot_time},
"update_liquidsoap_status" data={"msg_post": msg},
] = "update-liquidsoap-status/format/json/api_key/{api_key}/msg/{msg}/stream_id/{stream_id}/boot_time/{boot_time}" **kwargs,
api_endpoints[ )
"update_source_status"
] = "update-source-status/format/json/api_key/{api_key}/sourcename/{sourcename}/status/{status}" def update_source_status(self, sourcename, status, **kwargs) -> Response:
api_endpoints[ return self._request(
"check_live_stream_auth" "GET",
] = "check-live-stream-auth/format/json/api_key/{api_key}/username/{username}/password/{password}/djtype/{djtype}" "/api/update-source-status",
api_endpoints[ params={"sourcename": sourcename, "status": status},
"notify_webstream_data" **kwargs,
] = "notify-webstream-data/api_key/{api_key}/media_id/{media_id}/format/json" )
api_endpoints[
"notify_liquidsoap_started" def check_live_stream_auth(self, username, password, djtype, **kwargs) -> Response:
] = "rabbitmq-do-push/api_key/{api_key}/format/json" return self._request(
api_endpoints["push_stream_stats"] = "push-stream-stats/api_key/{api_key}/format/json" "GET",
api_endpoints[ "/api/check-live-stream-auth",
"update_stream_setting_table" params={"username": username, "password": password, "djtype": djtype},
] = "update-stream-setting-table/api_key/{api_key}/format/json" **kwargs,
api_endpoints[ )
"update_metadata_on_tunein"
] = "update-metadata-on-tunein/api_key/{api_key}" def notify_webstream_data(self, media_id, data, **kwargs) -> Response:
return self._request(
"POST",
"/api/notify-webstream-data",
params={"media_id": media_id},
data={"data": data}, # Data is already a json formatted string
**kwargs,
)
def rabbitmq_do_push(self, **kwargs) -> Response:
return self._request(
"GET",
"/api/rabbitmq-do-push",
**kwargs,
)
def push_stream_stats(self, data, **kwargs) -> Response:
return self._request(
"POST",
"/api/push-stream-stats",
data={"data": json.dumps(data)},
**kwargs,
)
def update_stream_setting_table(self, data, **kwargs) -> Response:
return self._request(
"POST",
"/api/update-stream-setting-table",
data={"data": json.dumps(data)},
**kwargs,
)
def update_metadata_on_tunein(self, **kwargs) -> Response:
return self._request(
"GET",
"/api/update-metadata-on-tunein",
**kwargs,
)
class ApiClient: class ApiClient:
API_BASE = "/api" def __init__(self, base_url: str, api_key: str):
UPLOAD_RETRIES = 3 self._base_client = BaseApiClient(base_url=base_url, api_key=api_key)
UPLOAD_WAIT = 60
def __init__(self, config_path="/etc/libretime/config.yml"): def version(self):
config = Config(config_path)
self.base_url = config.general.public_url
self.api_key = config.general.api_key
self.services = RequestProvider(
base_url=self.base_url + self.API_BASE,
api_key=self.api_key,
endpoints=api_endpoints,
)
def __get_api_version(self):
try: try:
return self.services.version_url()["api_version"] resp = self._base_client.version()
except Exception as exception: payload = resp.json()
logger.exception(exception) return payload["api_version"]
except RequestException:
return -1 return -1
def is_server_compatible(self, verbose=True):
api_version = self.__get_api_version()
if api_version == -1:
if verbose:
logger.info("Unable to get Airtime API version number.\n")
return False
if api_version[0:3] != AIRTIME_API_VERSION[0:3]:
if verbose:
logger.info("Airtime API version found: %s", str(api_version))
logger.info(
"pypo is only compatible with API version: %s", AIRTIME_API_VERSION
)
return False
if verbose:
logger.info("Airtime API version found: %s", str(api_version))
logger.info(
"pypo is only compatible with API version: %s", AIRTIME_API_VERSION
)
return True
def notify_liquidsoap_started(self): def notify_liquidsoap_started(self):
try: try:
self.services.notify_liquidsoap_started() self._base_client.rabbitmq_do_push()
except Exception as exception: except RequestException:
logger.exception(exception) pass
def notify_media_item_start_playing(self, media_id): def notify_media_item_start_playing(self, media_id):
""" """
@ -111,18 +153,18 @@ class ApiClient:
which we handed to liquidsoap in get_liquidsoap_data(). which we handed to liquidsoap in get_liquidsoap_data().
""" """
try: try:
return self.services.update_start_playing_url(media_id=media_id) return self._base_client.notify_media_item_start_play(media_id=media_id)
except Exception as exception: except RequestException:
logger.exception(exception)
return None return None
def check_live_stream_auth(self, username, password, dj_type): def check_live_stream_auth(self, username, password, dj_type):
try: try:
return self.services.check_live_stream_auth( return self._base_client.check_live_stream_auth(
username=username, password=password, djtype=dj_type username=username,
password=password,
djtype=dj_type,
) )
except Exception as exception: except RequestException:
logger.exception(exception)
return {} return {}
def register_component(self, component): def register_component(self, component):
@ -133,56 +175,42 @@ class ApiClient:
to query monit via monit's http service, or download log files via a to query monit via monit's http service, or download log files via a
http server. http server.
""" """
return self.services.register_component(component=component) return self._base_client.register_component(component=component)
@retry_decorator()
def notify_liquidsoap_status(self, msg, stream_id, time): def notify_liquidsoap_status(self, msg, stream_id, time):
try: self._base_client.update_liquidsoap_status(
# encoded_msg is no longer used server_side!! msg=msg,
encoded_msg = urllib.parse.quote("dummy")
self.services.update_liquidsoap_status.req(
_post_data={"msg_post": msg},
msg=encoded_msg,
stream_id=stream_id, stream_id=stream_id,
boot_time=time, boot_time=time,
).retry(5) )
except Exception as exception:
logger.exception(exception)
@retry_decorator()
def notify_source_status(self, sourcename, status): def notify_source_status(self, sourcename, status):
try: return self._base_client.update_source_status(
return self.services.update_source_status.req( sourcename=sourcename,
sourcename=sourcename, status=status status=status,
).retry(5) )
except Exception as exception:
logger.exception(exception)
@retry_decorator()
def notify_webstream_data(self, data, media_id): def notify_webstream_data(self, data, media_id):
""" """
Update the server with the latest metadata we've received from the Update the server with the latest metadata we've received from the
external webstream external webstream
""" """
logger.info( return self._base_client.notify_webstream_data(
self.services.notify_webstream_data.req( data=data,
_post_data={"data": data}, media_id=str(media_id) media_id=str(media_id),
).retry(5)
) )
def push_stream_stats(self, data): def push_stream_stats(self, data):
# TODO : users of this method should do their own error handling return self._base_client.push_stream_stats(data=data)
response = self.services.push_stream_stats(
_post_data={"data": json.dumps(data)}
)
return response
def update_stream_setting_table(self, data): def update_stream_setting_table(self, data):
try: try:
response = self.services.update_stream_setting_table( return self._base_client.update_stream_setting_table(data=data)
_post_data={"data": json.dumps(data)} except RequestException:
) return None
return response
except Exception as exception:
logger.exception(exception)
def update_metadata_on_tunein(self): def update_metadata_on_tunein(self):
self.services.update_metadata_on_tunein() self._base_client.update_metadata_on_tunein()

View File

@ -1,95 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
from libretime_api_client._utils import (
ApcUrl,
ApiRequest,
IncompleteUrl,
RequestProvider,
UrlBadParam,
)
@pytest.mark.parametrize(
"url, params, expected",
[
("one/two/three", {}, "one/two/three"),
("/testing/{key}", {"key": "aaa"}, "/testing/aaa"),
(
"/more/{key_a}/{key_b}/testing",
{"key_a": "aaa", "key_b": "bbb"},
"/more/aaa/bbb/testing",
),
],
)
def test_apc_url(url: str, params: dict, expected: str):
found = ApcUrl(url)
assert found.base_url == url
assert found.params(**params).url() == expected
def test_apc_url_bad_param():
url = ApcUrl("/testing/{key}")
with pytest.raises(UrlBadParam):
url.params(bad_key="testing")
def test_apc_url_incomplete():
url = ApcUrl("/{one}/{two}/three").params(two="testing")
with pytest.raises(IncompleteUrl):
url.url()
def test_api_request_init():
req = ApiRequest("request_name", ApcUrl("/test/ing"))
assert req.name == "request_name"
def test_api_request_call_json():
return_value = {"ok": "ok"}
read = MagicMock()
read.headers = {"content-type": "application/json"}
read.json = MagicMock(return_value=return_value)
with patch("requests.get") as mock_method:
mock_method.return_value = read
request = ApiRequest("mm", ApcUrl("http://localhost/testing"))()
assert request == return_value
def test_api_request_call_html():
return_value = "<html><head></head><body></body></html>"
read = MagicMock()
read.headers = {"content-type": "application/html"}
read.text = MagicMock(return_value=return_value)
with patch("requests.get") as mock_method:
mock_method.return_value = read
request = ApiRequest("mm", ApcUrl("http://localhost/testing"))()
assert request.text() == return_value
def test_request_provider_init():
request_provider = RequestProvider(
base_url="http://localhost/test",
api_key="test_key",
endpoints={},
)
assert len(request_provider.available_requests()) == 0
def test_request_provider_contains():
endpoints = {
"upload_recorded": "/1/",
}
request_provider = RequestProvider(
base_url="http://localhost/test",
api_key="test_key",
endpoints=endpoints,
)
for endpoint in endpoints:
assert endpoint in request_provider.requests

View File

@ -0,0 +1,21 @@
import pytest
from libretime_api_client.v1 import ApiClient
@pytest.mark.parametrize(
"base_url",
[
("http://localhost:8080"),
("http://localhost:8080/base"),
],
)
def test_api_client(requests_mock, base_url):
api_client = ApiClient(base_url=base_url, api_key="test-key")
requests_mock.get(
f"{base_url}/api/version",
json={"api_version": "1.0.0"},
)
assert api_client.version() == "1.0.0"