From 314c70a2081e72227244c6c29b011d589291dd69 Mon Sep 17 00:00:00 2001 From: jo Date: Wed, 10 Aug 2022 17:35:06 +0200 Subject: [PATCH] feat(playout): stats collector using stream config --- playout/libretime_playout/history/stats.py | 161 +++++++++------------ playout/libretime_playout/main.py | 2 +- playout/tests/conftest.py | 2 +- playout/tests/history/stats_test.py | 157 +++++++++++--------- 4 files changed, 156 insertions(+), 166 deletions(-) diff --git a/playout/libretime_playout/history/stats.py b/playout/libretime_playout/history/stats.py index 92e7cd054..2e56e95a4 100644 --- a/playout/libretime_playout/history/stats.py +++ b/playout/libretime_playout/history/stats.py @@ -2,9 +2,10 @@ from dataclasses import dataclass from datetime import datetime from threading import Thread from time import sleep -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Union from libretime_api_client.v1 import ApiClient as LegacyClient +from libretime_shared.config import IcecastOutput, ShoutcastOutput from loguru import logger from lxml import etree from requests import Session @@ -14,20 +15,9 @@ from requests.exceptions import ( # pylint: disable=redefined-builtin Timeout, ) +from ..config import Config -@dataclass -class Source: - stream_id: str - mount: str - - -@dataclass -class Server: - host: str - port: int - auth: Tuple[str, str] - sources: List[Source] - is_shoutcast: bool = False +AnyOutput = Union[IcecastOutput, ShoutcastOutput] @dataclass @@ -48,73 +38,32 @@ class StatsCollector: self._timeout = 30 self._legacy_client = legacy_client - def get_streams_grouped_by_server(self) -> List[Server]: - """ - Get streams grouped by server to prevent duplicate requests. - """ - dirty_streams: Dict[str, Dict[str, Any]] - dirty_streams = self._legacy_client.get_stream_parameters()["stream_params"] + def get_output_url(self, output: AnyOutput) -> str: + if output.kind == "icecast": + return f"http://{output.host}:{output.port}/admin/stats.xml" + return f"http://{output.host}:{output.port}/admin.cgi?sid=1&mode=viewxml" - servers: Dict[str, Server] = {} - for stream_id, dirty_stream in dirty_streams.items(): - if dirty_stream["enable"].lower() != "true": - continue + def collect_output_stats( + self, + output: AnyOutput, + ) -> Dict[str, Stats]: - source = Source(stream_id=stream_id, mount=dirty_stream["mount"]) - - server_id = f"{dirty_stream['host']}:{dirty_stream['port']}" - if server_id not in servers: - servers[server_id] = Server( - host=dirty_stream["host"], - port=dirty_stream["port"], - auth=(dirty_stream["admin_user"], dirty_stream["admin_pass"]), - sources=[source], - is_shoutcast=dirty_stream["output"] == "shoutcast", - ) - else: - servers[server_id].sources.append(source) - - return list(servers.values()) - - def report_server_error(self, server: Server, error: Exception): - self._legacy_client.update_stream_setting_table( - {source.stream_id: str(error) for source in server.sources} + response = self._session.get( + url=self.get_output_url(output), + auth=(output.admin_user, output.admin_password), + timeout=self._timeout, ) + response.raise_for_status() - def collect_server_stats(self, server: Server) -> Dict[str, Stats]: - url = f"http://{server.host}:{server.port}/admin/stats.xml" - - # Shoutcast specific url - if server.is_shoutcast: - url = f"http://{server.host}:{server.port}/admin.cgi?sid=1&mode=viewxml" - - try: - response = self._session.get(url, auth=server.auth, timeout=self._timeout) - response.raise_for_status() - - except ( - ConnectionError, - HTTPError, - Timeout, - ) as exception: - logger.exception(exception) - self.report_server_error(server, exception) - return {} - - try: - root = etree.fromstring( # nosec - response.content, - parser=etree.XMLParser(resolve_entities=False), - ) - except etree.XMLSyntaxError as exception: - logger.exception(exception) - self.report_server_error(server, exception) - return {} + root = etree.fromstring( # nosec + response.content, + parser=etree.XMLParser(resolve_entities=False), + ) stats = {} # Shoutcast specific parsing - if server.is_shoutcast: + if output.kind == "shoutcast": listeners_el = root.find("CURRENTLISTENERS") listeners = 0 if listeners_el is None else int(listeners_el.text) @@ -123,46 +72,71 @@ class StatsCollector: ) return stats - mounts = [source.mount for source in server.sources] + # Icecast specific parsing for source in root.iterchildren("source"): mount = source.attrib.get("mount") if mount is None: continue - mount = mount.lstrip("/") - if mount not in mounts: - continue listeners_el = source.find("listeners") listeners = 0 if listeners_el is None else int(listeners_el.text) + mount = mount.lstrip("/") stats[mount] = Stats( listeners=listeners, ) return stats - def collect(self, *, _timestamp: Optional[datetime] = None): + def collect( + self, + outputs: List[AnyOutput], + *, + _timestamp: Optional[datetime] = None, + ): if _timestamp is None: _timestamp = datetime.utcnow() - servers = self.get_streams_grouped_by_server() - stats: List[Dict[str, Any]] = [] stats_timestamp = _timestamp.strftime("%Y-%m-%d %H:%M:%S") + cache: Dict[str, Dict[str, Stats]] = {} - for server in servers: - server_stats = self.collect_server_stats(server) - if not server_stats: + for output_id, output in enumerate(outputs, start=1): + if ( + output.kind not in ("icecast", "shoutcast") + or not output.enabled + or output.admin_password is None + ): continue - stats.extend( - { - "timestamp": stats_timestamp, - "num_listeners": mount_stats.listeners, - "mount_name": mount, - } - for mount, mount_stats in server_stats.items() - ) + output_url = self.get_output_url(output) + if output_url not in cache: + try: + cache[output_url] = self.collect_output_stats(output) + except ( + etree.XMLSyntaxError, + ConnectionError, + HTTPError, + Timeout, + ) as exception: + logger.exception(exception) + self._legacy_client.update_stream_setting_table( + {output_id: str(exception)} + ) + continue + + output_stats = cache[output_url] + + mount = "shoutcast" if output.kind == "shoutcast" else output.mount + + if mount in output_stats: + stats.append( + { + "timestamp": stats_timestamp, + "num_listeners": output_stats[mount].listeners, + "mount_name": mount, + } + ) if stats: self._legacy_client.push_stream_stats(stats) @@ -172,15 +146,16 @@ class StatsCollectorThread(Thread): name = "stats collector" daemon = True - def __init__(self, legacy_client: LegacyClient) -> None: + def __init__(self, config: Config, legacy_client: LegacyClient) -> None: super().__init__() + self._config = config self._collector = StatsCollector(legacy_client) def run(self): logger.info(f"starting {self.name}") while True: try: - self._collector.collect() + self._collector.collect(self._config.stream.outputs.merged) except Exception as exception: logger.exception(exception) sleep(120) diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index d46ef9245..9fdc5feb5 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -135,7 +135,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ recorder_thread.daemon = True recorder_thread.start() - stats_collector_thread = StatsCollectorThread(legacy_client) + stats_collector_thread = StatsCollectorThread(config, legacy_client) stats_collector_thread.start() # Just sleep the main thread, instead of blocking on fetch_thread.join(). diff --git a/playout/tests/conftest.py b/playout/tests/conftest.py index d9f24e4e2..cb30a3d62 100644 --- a/playout/tests/conftest.py +++ b/playout/tests/conftest.py @@ -3,7 +3,7 @@ import pytest from libretime_playout.config import Config -@pytest.fixture(scope="session") +@pytest.fixture() def config(): return Config( **{ diff --git a/playout/tests/history/stats_test.py b/playout/tests/history/stats_test.py index ec418ae0d..012db80af 100644 --- a/playout/tests/history/stats_test.py +++ b/playout/tests/history/stats_test.py @@ -1,90 +1,97 @@ from datetime import datetime +from typing import List from unittest.mock import Mock, call import pytest +from libretime_shared.config import IcecastOutput, ShoutcastOutput +from lxml.etree import XMLSyntaxError +from requests.exceptions import HTTPError -from libretime_playout.history.stats import Server, Source, Stats, StatsCollector +from libretime_playout.history.stats import AnyOutput, Stats, StatsCollector from ..fixtures import icecast_stats, shoutcast_admin -@pytest.fixture(name="server") -def _server_fixture(): - return Server( - host="example.com", - port=8000, - auth=("admin", "hackme"), - sources=[ - Source("s1", "main.ogg"), - ], - ) +@pytest.fixture(name="outputs") +def outputs_fixture(): + default_output = { + "enabled": True, + "mount": "main.ogg", + "source_password": "hackme", + "admin_password": "hackme", + "audio": {"format": "ogg", "bitrate": 256}, + } + return [ + IcecastOutput(**default_output), + IcecastOutput( + **{ + **default_output, + "mount": "main.mp3", + "audio": {"format": "mp3", "bitrate": 256}, + } + ), + ] -def test_stats_collector_collect_server_stats(requests_mock, server): +def test_stats_collector_collect_server_stats( + requests_mock, + outputs: List[AnyOutput], +): requests_mock.get( - "http://example.com:8000/admin/stats.xml", + "http://localhost:8000/admin/stats.xml", content=icecast_stats.read_bytes(), ) legacy_client = Mock() collector = StatsCollector(legacy_client) - assert collector.collect_server_stats(server) == {"main.ogg": Stats(listeners=2)} + result = collector.collect_output_stats(outputs[0]) + assert result == { + "main.ogg": Stats(listeners=2), + "main.mp3": Stats(listeners=3), + } legacy_client.assert_not_called() -def test_stats_collector_collect_server_stats_unauthorized(requests_mock, server): +def test_stats_collector_collect_server_stats_unauthorized( + requests_mock, + outputs: List[AnyOutput], +): requests_mock.get( - "http://example.com:8000/admin/stats.xml", + "http://localhost:8000/admin/stats.xml", status_code=401, ) legacy_client = Mock() collector = StatsCollector(legacy_client) - assert not collector.collect_server_stats(server) - - legacy_client.assert_has_calls( - [ - call.update_stream_setting_table( - { - "s1": "401 Client Error: None for url: http://example.com:8000/admin/stats.xml", - } - ) - ] - ) + with pytest.raises(HTTPError): + collector.collect_output_stats(outputs[0]) -def test_stats_collector_collect_server_stats_invalid_xml(requests_mock, server): +def test_stats_collector_collect_server_stats_invalid_xml( + requests_mock, + outputs: List[AnyOutput], +): requests_mock.get( - "http://example.com:8000/admin/stats.xml", - content=b""" - - localhost - - """, + "http://localhost:8000/admin/stats.xml", + content=b"""localhost""", ) legacy_client = Mock() collector = StatsCollector(legacy_client) - assert not collector.collect_server_stats(server) - - legacy_client.assert_has_calls( - [ - call.update_stream_setting_table( - { - "s1": "Opening and ending tag mismatch: host line 3 and icestats, line 4, column 12 (, line 4)", - } - ) - ] - ) + with pytest.raises(XMLSyntaxError): + collector.collect_output_stats(outputs[0]) -def test_stats_collector_collect(requests_mock): +def test_stats_collector_collect( + requests_mock, + outputs: List[AnyOutput], +): requests_mock.get( - "http://example.com:8000/admin/stats.xml", + "http://localhost:8000/admin/stats.xml", content=icecast_stats.read_bytes(), ) requests_mock.get( @@ -93,35 +100,38 @@ def test_stats_collector_collect(requests_mock): ) legacy_client = Mock() - default_stream = { - "enable": "true", - "output": "icecast", - "host": "example.com", - "port": 8000, - "mount": "main.ogg", - "admin_user": "admin", - "admin_pass": "hackme", - } - legacy_client.get_stream_parameters.return_value = { - "stream_params": { - "s1": {**default_stream}, - "s2": {**default_stream, "enable": "false", "mount": "main.mp3"}, - "s3": {**default_stream, "mount": "unknown.mp3"}, - "s4": { - **default_stream, - "output": "shoutcast", - "host": "shoutcast.com", - "mount": "shout.mp3", - }, - } + default_output = { + "source_password": "hackme", + "admin_password": "hackme", } + outputs.extend( + [ + IcecastOutput( + **{ + **default_output, + "enabled": False, + "host": "example.com", + "mount": "disabled.ogg", + "audio": {"format": "ogg", "bitrate": 256}, + } + ), + ShoutcastOutput( + **{ + **default_output, + "enabled": True, + "kind": "shoutcast", + "host": "shoutcast.com", + "audio": {"format": "mp3", "bitrate": 256}, + } + ), + ] + ) collector = StatsCollector(legacy_client) - collector.collect(_timestamp=datetime(2022, 8, 9, 11, 19, 7)) + collector.collect(outputs, _timestamp=datetime(2022, 8, 9, 11, 19, 7)) legacy_client.assert_has_calls( [ - call.get_stream_parameters(), call.push_stream_stats( [ { @@ -129,12 +139,17 @@ def test_stats_collector_collect(requests_mock): "num_listeners": 2, "mount_name": "main.ogg", }, + { + "timestamp": "2022-08-09 11:19:07", + "num_listeners": 3, + "mount_name": "main.mp3", + }, { "timestamp": "2022-08-09 11:19:07", "num_listeners": 1, "mount_name": "shoutcast", }, ] - ), + ) ] )