diff --git a/playout/libretime_playout/history/__init__.py b/playout/libretime_playout/history/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/playout/libretime_playout/history/stats.py b/playout/libretime_playout/history/stats.py new file mode 100644 index 000000000..92e7cd054 --- /dev/null +++ b/playout/libretime_playout/history/stats.py @@ -0,0 +1,186 @@ +from dataclasses import dataclass +from datetime import datetime +from threading import Thread +from time import sleep +from typing import Any, Dict, List, Optional, Tuple + +from libretime_api_client.v1 import ApiClient as LegacyClient +from loguru import logger +from lxml import etree +from requests import Session +from requests.exceptions import ( # pylint: disable=redefined-builtin + ConnectionError, + HTTPError, + Timeout, +) + + +@dataclass +class Source: + stream_id: str + mount: str + + +@dataclass +class Server: + host: str + port: int + auth: Tuple[str, str] + sources: List[Source] + is_shoutcast: bool = False + + +@dataclass +class Stats: + listeners: int + + +# pylint: disable=too-few-public-methods +class StatsCollector: + """ + Collect stats from Icecast and Shoutcast. + """ + + _session: Session + + def __init__(self, legacy_client: LegacyClient): + self._session = Session() + self._timeout = 30 + self._legacy_client = legacy_client + + def get_streams_grouped_by_server(self) -> List[Server]: + """ + Get streams grouped by server to prevent duplicate requests. + """ + dirty_streams: Dict[str, Dict[str, Any]] + dirty_streams = self._legacy_client.get_stream_parameters()["stream_params"] + + servers: Dict[str, Server] = {} + for stream_id, dirty_stream in dirty_streams.items(): + if dirty_stream["enable"].lower() != "true": + continue + + source = Source(stream_id=stream_id, mount=dirty_stream["mount"]) + + server_id = f"{dirty_stream['host']}:{dirty_stream['port']}" + if server_id not in servers: + servers[server_id] = Server( + host=dirty_stream["host"], + port=dirty_stream["port"], + auth=(dirty_stream["admin_user"], dirty_stream["admin_pass"]), + sources=[source], + is_shoutcast=dirty_stream["output"] == "shoutcast", + ) + else: + servers[server_id].sources.append(source) + + return list(servers.values()) + + def report_server_error(self, server: Server, error: Exception): + self._legacy_client.update_stream_setting_table( + {source.stream_id: str(error) for source in server.sources} + ) + + def collect_server_stats(self, server: Server) -> Dict[str, Stats]: + url = f"http://{server.host}:{server.port}/admin/stats.xml" + + # Shoutcast specific url + if server.is_shoutcast: + url = f"http://{server.host}:{server.port}/admin.cgi?sid=1&mode=viewxml" + + try: + response = self._session.get(url, auth=server.auth, timeout=self._timeout) + response.raise_for_status() + + except ( + ConnectionError, + HTTPError, + Timeout, + ) as exception: + logger.exception(exception) + self.report_server_error(server, exception) + return {} + + try: + root = etree.fromstring( # nosec + response.content, + parser=etree.XMLParser(resolve_entities=False), + ) + except etree.XMLSyntaxError as exception: + logger.exception(exception) + self.report_server_error(server, exception) + return {} + + stats = {} + + # Shoutcast specific parsing + if server.is_shoutcast: + listeners_el = root.find("CURRENTLISTENERS") + listeners = 0 if listeners_el is None else int(listeners_el.text) + + stats["shoutcast"] = Stats( + listeners=listeners, + ) + return stats + + mounts = [source.mount for source in server.sources] + for source in root.iterchildren("source"): + mount = source.attrib.get("mount") + if mount is None: + continue + mount = mount.lstrip("/") + if mount not in mounts: + continue + + listeners_el = source.find("listeners") + listeners = 0 if listeners_el is None else int(listeners_el.text) + + stats[mount] = Stats( + listeners=listeners, + ) + + return stats + + def collect(self, *, _timestamp: Optional[datetime] = None): + if _timestamp is None: + _timestamp = datetime.utcnow() + + servers = self.get_streams_grouped_by_server() + + stats: List[Dict[str, Any]] = [] + stats_timestamp = _timestamp.strftime("%Y-%m-%d %H:%M:%S") + + for server in servers: + server_stats = self.collect_server_stats(server) + if not server_stats: + continue + + stats.extend( + { + "timestamp": stats_timestamp, + "num_listeners": mount_stats.listeners, + "mount_name": mount, + } + for mount, mount_stats in server_stats.items() + ) + + if stats: + self._legacy_client.push_stream_stats(stats) + + +class StatsCollectorThread(Thread): + name = "stats collector" + daemon = True + + def __init__(self, legacy_client: LegacyClient) -> None: + super().__init__() + self._collector = StatsCollector(legacy_client) + + def run(self): + logger.info(f"starting {self.name}") + while True: + try: + self._collector.collect() + except Exception as exception: + logger.exception(exception) + sleep(120) diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index e810a11b3..0eedfa79c 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -21,6 +21,7 @@ from libretime_shared.logging import level_from_name, setup_logger from loguru import logger from .config import CACHE_DIR, RECORD_DIR, Config +from .history.stats import StatsCollectorThread from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION, parse_liquidsoap_version from .message_handler import PypoMessageHandler from .player.fetch import PypoFetch @@ -28,7 +29,6 @@ from .player.file import PypoFile from .player.liquidsoap import PypoLiquidsoap from .player.push import PypoPush from .recorder import Recorder -from .stats import ListenerStat from .timeout import ls_timeout @@ -196,9 +196,8 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ recorder.daemon = True recorder.start() - stat = ListenerStat(config, legacy_client) - stat.daemon = True - stat.start() + stats_collector = StatsCollectorThread(legacy_client) + stats_collector.start() # Just sleep the main thread, instead of blocking on pf.join(). # This allows CTRL-C to work! diff --git a/playout/libretime_playout/stats.py b/playout/libretime_playout/stats.py deleted file mode 100644 index 952441bc1..000000000 --- a/playout/libretime_playout/stats.py +++ /dev/null @@ -1,156 +0,0 @@ -import base64 -import time -import urllib.error -import urllib.parse -import urllib.request -from datetime import datetime -from threading import Thread - -import defusedxml.minidom -from libretime_api_client.v1 import ApiClient as LegacyClient -from loguru import logger - -from .config import Config - - -class ListenerStat(Thread): - - HTTP_REQUEST_TIMEOUT = 30 # 30 second HTTP request timeout - - def __init__(self, config: Config, legacy_client: LegacyClient): - Thread.__init__(self) - self.config = config - self.legacy_client = legacy_client - - def get_node_text(self, nodelist): - rc = [] - for node in nodelist: - if node.nodeType == node.TEXT_NODE: - rc.append(node.data) - return "".join(rc) - - def get_stream_parameters(self): - # [{"user":"", "password":"", "url":"", "port":""},{},{}] - return self.legacy_client.get_stream_parameters() - - def get_stream_server_xml(self, ip, url, is_shoutcast=False): - auth_string = "%(admin_user)s:%(admin_pass)s" % ip - encoded = base64.b64encode(auth_string.encode("utf-8")) - - header = {"Authorization": "Basic %s" % encoded.decode("ascii")} - - if is_shoutcast: - # user agent is required for shoutcast auth, otherwise it returns 404. - user_agent = "Mozilla/5.0 (Linux; rv:22.0) Gecko/20130405 Firefox/22.0" - header["User-Agent"] = user_agent - - req = urllib.request.Request( - # assuming that the icecast stats path is /admin/stats.xml - # need to fix this - url=url, - headers=header, - ) - - resp = urllib.request.urlopen(req, timeout=ListenerStat.HTTP_REQUEST_TIMEOUT) - document = resp.read() - - return document - - def get_icecast_stats(self, ip): - document = None - if "airtime.pro" in ip["host"].lower(): - url = "http://%(host)s:%(port)s/stats.xsl" % ip - document = self.get_stream_server_xml(ip, url) - else: - url = "http://%(host)s:%(port)s/admin/stats.xml" % ip - document = self.get_stream_server_xml(ip, url) - dom = defusedxml.minidom.parseString(document) - sources = dom.getElementsByTagName("source") - - mount_stats = None - for source in sources: - # drop the leading '/' character - mount_name = source.getAttribute("mount")[1:] - if mount_name == ip["mount"]: - timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") - listeners = source.getElementsByTagName("listeners") - num_listeners = 0 - if len(listeners): - num_listeners = self.get_node_text(listeners[0].childNodes) - - mount_stats = { - "timestamp": timestamp, - "num_listeners": num_listeners, - "mount_name": mount_name, - } - - return mount_stats - - def get_shoutcast_stats(self, ip): - url = "http://%(host)s:%(port)s/admin.cgi?sid=1&mode=viewxml" % ip - document = self.get_stream_server_xml(ip, url, is_shoutcast=True) - dom = defusedxml.minidom.parseString(document) - current_listeners = dom.getElementsByTagName("CURRENTLISTENERS") - - timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") - num_listeners = 0 - if current_listeners: - num_listeners = self.get_node_text(current_listeners[0].childNodes) - - mount_stats = { - "timestamp": timestamp, - "num_listeners": num_listeners, - "mount_name": "shoutcast", - } - - return mount_stats - - def get_stream_stats(self, stream_parameters): - stats = [] - - # iterate over stream_parameters which is a list of dicts. Each dict - # represents one Airtime stream (currently this limit is 3). - # Note that there can be optimizations done, since if all three - # streams are the same server, we will still initiate 3 separate - # connections - for k, v in stream_parameters.items(): - if v["enable"] == "true": - try: - if v["output"] == "icecast": - mount_stats = self.get_icecast_stats(v) - if mount_stats: - stats.append(mount_stats) - else: - stats.append(self.get_shoutcast_stats(v)) - self.update_listener_stat_error(k, "OK") - except Exception as exception: - try: - self.update_listener_stat_error(k, str(exception)) - except Exception as exception2: - logger.exception(exception2) - - return stats - - def push_stream_stats(self, stats): - self.legacy_client.push_stream_stats(stats) - - def update_listener_stat_error(self, stream_id, error): - data = {stream_id: error} - self.legacy_client.update_stream_setting_table(data) - - def run(self): - # Wake up every 120 seconds and gather icecast statistics. Note that we - # are currently querying the server every 2 minutes for list of - # mountpoints as well. We could remove this query if we hooked into - # rabbitmq events, and listened for these changes instead. - while True: - try: - stream_parameters = self.get_stream_parameters() - stats = self.get_stream_stats(stream_parameters["stream_params"]) - - if stats: - self.push_stream_stats(stats) - except Exception as exception: - logger.exception(exception) - - time.sleep(120) diff --git a/playout/packages.ini b/playout/packages.ini index e07f39972..1a9847a76 100644 --- a/playout/packages.ini +++ b/playout/packages.ini @@ -2,6 +2,7 @@ [python] python3 = buster, bullseye, bionic, focal, jammy python3-pip = buster, bullseye, bionic, focal, jammy +python3-lxml = bullseye, focal, jammy [liquidsoap] # https://github.com/savonet/liquidsoap/blob/main/CHANGES.md diff --git a/playout/pyproject.toml b/playout/pyproject.toml index 7de38496a..17fdd3f74 100644 --- a/playout/pyproject.toml +++ b/playout/pyproject.toml @@ -1,5 +1,8 @@ [tool.pylint.messages_control] -extension-pkg-whitelist = "pydantic" +extension-pkg-whitelist = [ + "lxml.etree", + "pydantic", +] disable = [ "missing-class-docstring", "missing-function-docstring", diff --git a/playout/requirements.txt b/playout/requirements.txt index 9fef3ce3c..3a06dccab 100644 --- a/playout/requirements.txt +++ b/playout/requirements.txt @@ -1,8 +1,9 @@ # Please do not edit this file, edit the setup.py file! # This file is auto-generated by tools/extract_requirements.py. backports.zoneinfo>=0.2.1,<0.3;python_version<'3.9' -defusedxml>=0.6.0,<0.8 +dataclasses>=0.8,<0.9;python_version<'3.7' kombu==4.6.11 +lxml>=4.5.0,<4.10.0 mutagen>=1.45.1,<1.46 python-dateutil>=2.8.1,<2.9 requests>=2.25.1,<2.29 diff --git a/playout/setup.py b/playout/setup.py index 5f47845a6..b85d24969 100644 --- a/playout/setup.py +++ b/playout/setup.py @@ -24,8 +24,9 @@ setup( python_requires=">=3.6", install_requires=[ "backports.zoneinfo>=0.2.1,<0.3;python_version<'3.9'", - "defusedxml>=0.6.0,<0.8", + "dataclasses>=0.8,<0.9;python_version<'3.7'", "kombu==4.6.11", + "lxml>=4.5.0,<4.10.0", "mutagen>=1.45.1,<1.46", "python-dateutil>=2.8.1,<2.9", "requests>=2.25.1,<2.29", diff --git a/playout/tests/__init__.py b/playout/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/playout/tests/fixtures/__init__.py b/playout/tests/fixtures/__init__.py new file mode 100644 index 000000000..c94703c11 --- /dev/null +++ b/playout/tests/fixtures/__init__.py @@ -0,0 +1,6 @@ +from pathlib import Path + +fixture_path = Path(__file__).parent + +icecast_stats = fixture_path / "icecast_stats.xml" +shoutcast_admin = fixture_path / "shoutcast_admin.xml" diff --git a/playout/tests/fixtures/icecast_stats.xml b/playout/tests/fixtures/icecast_stats.xml new file mode 100644 index 000000000..c9a378e44 --- /dev/null +++ b/playout/tests/fixtures/icecast_stats.xml @@ -0,0 +1,74 @@ + + + icemaster@radio.org + 3935 + 7 + 4201 + 14 + localhost + 117 + 5 + Moon + Icecast 2.4.4 + Tue, 15 Mar 2022 18:29:12 +0100 + 2022-03-15T18:29:12+0100 + 2 + 0 + 2 + 2 + 0 + 0 + + channels=2;samplerate=44100;bitrate=320 + 320 + 2 + various + 7 + 3 + http://localhost:8800/main.mp3 + unlimited + 1 + 44100 + Main (mp3 320kbps) + Radio + audio/mpeg + https://www.radio.org + 2 + 192.168.100.20 + Tue, 15 Mar 2022 18:29:19 +0100 + 2022-03-15T18:29:19+0100 + Robert Glasper Experiment/Lupe Fiasco/Bilal - Always Shine + 6110388200 + 20338244727 + Liquidsoap/1.4.4 (Unix; OCaml 4.10.0) + + + 256000 + 2 + channels=2;quality=0.8;samplerate=44100 + 44100 + 2 + various + 256 + 4 + 2 + http://localhost:8800/main.ogg + unlimited + 1 + 0.8 + 44100 + Main (ogg 256kbps) + Radio + application/ogg + https://www.radio.org + 2 + 192.168.100.20 + Tue, 15 Mar 2022 18:29:19 +0100 + 2022-03-15T18:29:19+0100 + Vorbis + Robert Glasper Experiment/Lupe Fiasco/Bilal - Always Shine + 4499297657 + 9051758982 + Liquidsoap/1.4.4 (Unix; OCaml 4.10.0) + + diff --git a/playout/tests/fixtures/shoutcast_admin.xml b/playout/tests/fixtures/shoutcast_admin.xml new file mode 100644 index 000000000..5220a4a1b --- /dev/null +++ b/playout/tests/fixtures/shoutcast_admin.xml @@ -0,0 +1,8 @@ + + + 1 + 0 + 32 + 0 + 0 + diff --git a/playout/tests/history/__init__.py b/playout/tests/history/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/playout/tests/history/stats_test.py b/playout/tests/history/stats_test.py new file mode 100644 index 000000000..ec418ae0d --- /dev/null +++ b/playout/tests/history/stats_test.py @@ -0,0 +1,140 @@ +from datetime import datetime +from unittest.mock import Mock, call + +import pytest + +from libretime_playout.history.stats import Server, Source, Stats, StatsCollector + +from ..fixtures import icecast_stats, shoutcast_admin + + +@pytest.fixture(name="server") +def _server_fixture(): + return Server( + host="example.com", + port=8000, + auth=("admin", "hackme"), + sources=[ + Source("s1", "main.ogg"), + ], + ) + + +def test_stats_collector_collect_server_stats(requests_mock, server): + requests_mock.get( + "http://example.com:8000/admin/stats.xml", + content=icecast_stats.read_bytes(), + ) + + legacy_client = Mock() + + collector = StatsCollector(legacy_client) + assert collector.collect_server_stats(server) == {"main.ogg": Stats(listeners=2)} + + legacy_client.assert_not_called() + + +def test_stats_collector_collect_server_stats_unauthorized(requests_mock, server): + requests_mock.get( + "http://example.com:8000/admin/stats.xml", + status_code=401, + ) + + legacy_client = Mock() + + collector = StatsCollector(legacy_client) + assert not collector.collect_server_stats(server) + + legacy_client.assert_has_calls( + [ + call.update_stream_setting_table( + { + "s1": "401 Client Error: None for url: http://example.com:8000/admin/stats.xml", + } + ) + ] + ) + + +def test_stats_collector_collect_server_stats_invalid_xml(requests_mock, server): + requests_mock.get( + "http://example.com:8000/admin/stats.xml", + content=b""" + + localhost + + """, + ) + + legacy_client = Mock() + + collector = StatsCollector(legacy_client) + assert not collector.collect_server_stats(server) + + legacy_client.assert_has_calls( + [ + call.update_stream_setting_table( + { + "s1": "Opening and ending tag mismatch: host line 3 and icestats, line 4, column 12 (, line 4)", + } + ) + ] + ) + + +def test_stats_collector_collect(requests_mock): + requests_mock.get( + "http://example.com:8000/admin/stats.xml", + content=icecast_stats.read_bytes(), + ) + requests_mock.get( + "http://shoutcast.com:8000/admin.cgi?sid=1&mode=viewxml", + content=shoutcast_admin.read_bytes(), + ) + + legacy_client = Mock() + default_stream = { + "enable": "true", + "output": "icecast", + "host": "example.com", + "port": 8000, + "mount": "main.ogg", + "admin_user": "admin", + "admin_pass": "hackme", + } + legacy_client.get_stream_parameters.return_value = { + "stream_params": { + "s1": {**default_stream}, + "s2": {**default_stream, "enable": "false", "mount": "main.mp3"}, + "s3": {**default_stream, "mount": "unknown.mp3"}, + "s4": { + **default_stream, + "output": "shoutcast", + "host": "shoutcast.com", + "mount": "shout.mp3", + }, + } + } + + collector = StatsCollector(legacy_client) + collector.collect(_timestamp=datetime(2022, 8, 9, 11, 19, 7)) + + legacy_client.assert_has_calls( + [ + call.get_stream_parameters(), + call.push_stream_stats( + [ + { + "timestamp": "2022-08-09 11:19:07", + "num_listeners": 2, + "mount_name": "main.ogg", + }, + { + "timestamp": "2022-08-09 11:19:07", + "num_listeners": 1, + "mount_name": "shoutcast", + }, + ] + ), + ] + )