feat(api-client): rewrite api-client v2

This commit is contained in:
jo 2022-07-20 16:33:03 +02:00 committed by Kyle Robbertze
parent 067b35e9bb
commit cf9b08906b
4 changed files with 112 additions and 62 deletions

View file

@ -1,47 +1,33 @@
###############################################################################
# This file holds the implementations for all the API clients.
#
# If you want to develop a new client, here are some suggestions: Get the fetch
# methods working first, then the push, then the liquidsoap notifier. You will
# probably want to create a script on your server side to automatically
# schedule a playlist one minute from the current time.
###############################################################################
import logging
from ._config import Config
from ._utils import RequestProvider
LIBRETIME_API_VERSION = "2.0"
from ._client import AbstractApiClient, Response
api_endpoints = {}
class ApiClient(AbstractApiClient):
VERSION = "2.0"
api_endpoints["version_url"] = "version/"
api_endpoints["schedule_url"] = "schedule/"
api_endpoints["webstream_url"] = "webstreams/{id}/"
api_endpoints["show_instance_url"] = "show-instances/{id}/"
api_endpoints["show_url"] = "shows/{id}/"
api_endpoints["file_url"] = "files/{id}/"
api_endpoints["file_download_url"] = "files/{id}/download/"
def __init__(self, base_url: str, api_key: str):
super().__init__(base_url=base_url)
self.session.headers.update({"Authorization": f"Api-Key {api_key}"})
def get_version(self, **kwargs) -> Response:
return self._request("GET", "/api/v2/version", **kwargs)
class ApiClient:
API_BASE = "/api/v2"
def get_show(self, item_id: int, **kwargs) -> Response:
return self._request("GET", f"/api/v2/shows/{item_id}", **kwargs)
def __init__(self, logger=None, config_path="/etc/libretime/config.yml"):
self.logger = logger or logging
def get_show_instance(self, item_id: int, **kwargs) -> Response:
return self._request("GET", f"/api/v2/show-instances/{item_id}", **kwargs)
config = Config(filepath=config_path)
self.base_url = config.general.public_url
self.api_key = config.general.api_key
def list_schedule(self, **kwargs) -> Response:
return self._request("GET", "/api/v2/schedule", **kwargs)
self.services = RequestProvider(
base_url=self.base_url + self.API_BASE,
api_key=self.api_key,
endpoints=api_endpoints,
)
def get_webstream(self, item_id: int, **kwargs) -> Response:
return self._request("GET", f"/api/v2/webstreams/{item_id}", **kwargs)
def update_file(self, file_id, payload):
data = self.services.file_url(id=file_id)
data.update(payload)
return self.services.file_url(id=file_id, _put_data=data)
def get_file(self, item_id: int, **kwargs) -> Response:
return self._request("GET", f"/api/v2/files/{item_id}", **kwargs)
def update_file(self, item_id: int, **kwargs) -> Response:
return self._request("PATCH", f"/api/v2/files/{item_id}", **kwargs)
def download_file(self, item_id: int, **kwargs) -> Response:
return self._request("GET", f"/api/v2/files/{item_id}/download", **kwargs)