sintonia/playout/libretime_playout/history/stats.py

159 lines
4.7 KiB
Python
Raw Permalink Normal View History

2023-02-26 01:27:00 +01:00
import logging
from dataclasses import dataclass
from datetime import datetime
from threading import Thread
from time import sleep
from typing import Any, Dict, List, Optional, Union
2023-03-01 17:13:02 +01:00
import requests
from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_shared.config import IcecastOutput, ShoutcastOutput
from lxml import etree
from requests import Session
from ..config import Config
2023-02-26 01:27:00 +01:00
logger = logging.getLogger(__name__)
AnyOutput = Union[IcecastOutput, ShoutcastOutput]
@dataclass
class Stats:
listeners: int
# pylint: disable=too-few-public-methods
class StatsCollector:
"""
Collect stats from Icecast and Shoutcast.
"""
_session: Session
def __init__(self, legacy_client: LegacyClient):
self._session = Session()
self._timeout = 30
self._legacy_client = legacy_client
def get_output_url(self, output: AnyOutput) -> str:
if output.kind == "icecast":
return f"http://{output.host}:{output.port}/admin/stats.xml"
return f"http://{output.host}:{output.port}/admin.cgi?sid=1&mode=viewxml"
def collect_output_stats(
self,
output: AnyOutput,
) -> Dict[str, Stats]:
response = self._session.get(
url=self.get_output_url(output),
2023-03-01 20:27:27 +01:00
auth=(output.admin_user, output.admin_password or ""),
timeout=self._timeout,
)
response.raise_for_status()
root = etree.fromstring( # nosec
response.content,
parser=etree.XMLParser(resolve_entities=False),
)
stats = {}
# Shoutcast specific parsing
if output.kind == "shoutcast":
listeners_el = root.find("CURRENTLISTENERS")
listeners = 0 if listeners_el is None else int(listeners_el.text)
stats["shoutcast"] = Stats(
listeners=listeners,
)
return stats
# Icecast specific parsing
for source in root.iterchildren("source"):
mount = source.attrib.get("mount")
if mount is None:
continue
listeners_el = source.find("listeners")
listeners = 0 if listeners_el is None else int(listeners_el.text)
mount = mount.lstrip("/")
stats[mount] = Stats(
listeners=listeners,
)
return stats
def collect(
self,
outputs: List[AnyOutput],
*,
_timestamp: Optional[datetime] = None,
2023-03-01 20:27:27 +01:00
) -> None:
if _timestamp is None:
_timestamp = datetime.utcnow()
stats: List[Dict[str, Any]] = []
stats_timestamp = _timestamp.strftime("%Y-%m-%d %H:%M:%S")
cache: Dict[str, Dict[str, Stats]] = {}
for output_id, output in enumerate(outputs, start=1):
if (
output.kind not in ("icecast", "shoutcast")
or not output.enabled
or output.admin_password is None
):
continue
output_url = self.get_output_url(output)
if output_url not in cache:
try:
cache[output_url] = self.collect_output_stats(output)
except (
etree.XMLSyntaxError,
2023-03-01 17:13:02 +01:00
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.Timeout,
) as exception:
logger.exception(exception)
self._legacy_client.update_stream_setting_table(
{output_id: str(exception)}
)
continue
output_stats = cache[output_url]
mount = "shoutcast" if output.kind == "shoutcast" else output.mount
if mount in output_stats:
stats.append(
{
"timestamp": stats_timestamp,
"num_listeners": output_stats[mount].listeners,
"mount_name": mount,
}
)
if stats:
self._legacy_client.push_stream_stats(stats)
class StatsCollectorThread(Thread):
name = "stats collector"
daemon = True
def __init__(self, config: Config, legacy_client: LegacyClient) -> None:
super().__init__()
self._config = config
self._collector = StatsCollector(legacy_client)
2023-03-01 20:27:27 +01:00
def run(self) -> None:
logger.info("starting %s", self.name)
while True:
try:
self._collector.collect(self._config.stream.outputs.merged)
except Exception as exception: # pylint: disable=broad-exception-caught
logger.exception(exception)
sleep(120)