diff --git a/playout/libretime_playout/history/stats.py b/playout/libretime_playout/history/stats.py
index 92e7cd054..2e56e95a4 100644
--- a/playout/libretime_playout/history/stats.py
+++ b/playout/libretime_playout/history/stats.py
@@ -2,9 +2,10 @@ from dataclasses import dataclass
from datetime import datetime
from threading import Thread
from time import sleep
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, List, Optional, Union
from libretime_api_client.v1 import ApiClient as LegacyClient
+from libretime_shared.config import IcecastOutput, ShoutcastOutput
from loguru import logger
from lxml import etree
from requests import Session
@@ -14,20 +15,9 @@ from requests.exceptions import ( # pylint: disable=redefined-builtin
Timeout,
)
+from ..config import Config
-@dataclass
-class Source:
- stream_id: str
- mount: str
-
-
-@dataclass
-class Server:
- host: str
- port: int
- auth: Tuple[str, str]
- sources: List[Source]
- is_shoutcast: bool = False
+AnyOutput = Union[IcecastOutput, ShoutcastOutput]
@dataclass
@@ -48,73 +38,32 @@ class StatsCollector:
self._timeout = 30
self._legacy_client = legacy_client
- def get_streams_grouped_by_server(self) -> List[Server]:
- """
- Get streams grouped by server to prevent duplicate requests.
- """
- dirty_streams: Dict[str, Dict[str, Any]]
- dirty_streams = self._legacy_client.get_stream_parameters()["stream_params"]
+ def get_output_url(self, output: AnyOutput) -> str:
+ if output.kind == "icecast":
+ return f"http://{output.host}:{output.port}/admin/stats.xml"
+ return f"http://{output.host}:{output.port}/admin.cgi?sid=1&mode=viewxml"
- servers: Dict[str, Server] = {}
- for stream_id, dirty_stream in dirty_streams.items():
- if dirty_stream["enable"].lower() != "true":
- continue
+ def collect_output_stats(
+ self,
+ output: AnyOutput,
+ ) -> Dict[str, Stats]:
- source = Source(stream_id=stream_id, mount=dirty_stream["mount"])
-
- server_id = f"{dirty_stream['host']}:{dirty_stream['port']}"
- if server_id not in servers:
- servers[server_id] = Server(
- host=dirty_stream["host"],
- port=dirty_stream["port"],
- auth=(dirty_stream["admin_user"], dirty_stream["admin_pass"]),
- sources=[source],
- is_shoutcast=dirty_stream["output"] == "shoutcast",
- )
- else:
- servers[server_id].sources.append(source)
-
- return list(servers.values())
-
- def report_server_error(self, server: Server, error: Exception):
- self._legacy_client.update_stream_setting_table(
- {source.stream_id: str(error) for source in server.sources}
+ response = self._session.get(
+ url=self.get_output_url(output),
+ auth=(output.admin_user, output.admin_password),
+ timeout=self._timeout,
)
+ response.raise_for_status()
- def collect_server_stats(self, server: Server) -> Dict[str, Stats]:
- url = f"http://{server.host}:{server.port}/admin/stats.xml"
-
- # Shoutcast specific url
- if server.is_shoutcast:
- url = f"http://{server.host}:{server.port}/admin.cgi?sid=1&mode=viewxml"
-
- try:
- response = self._session.get(url, auth=server.auth, timeout=self._timeout)
- response.raise_for_status()
-
- except (
- ConnectionError,
- HTTPError,
- Timeout,
- ) as exception:
- logger.exception(exception)
- self.report_server_error(server, exception)
- return {}
-
- try:
- root = etree.fromstring( # nosec
- response.content,
- parser=etree.XMLParser(resolve_entities=False),
- )
- except etree.XMLSyntaxError as exception:
- logger.exception(exception)
- self.report_server_error(server, exception)
- return {}
+ root = etree.fromstring( # nosec
+ response.content,
+ parser=etree.XMLParser(resolve_entities=False),
+ )
stats = {}
# Shoutcast specific parsing
- if server.is_shoutcast:
+ if output.kind == "shoutcast":
listeners_el = root.find("CURRENTLISTENERS")
listeners = 0 if listeners_el is None else int(listeners_el.text)
@@ -123,46 +72,71 @@ class StatsCollector:
)
return stats
- mounts = [source.mount for source in server.sources]
+ # Icecast specific parsing
for source in root.iterchildren("source"):
mount = source.attrib.get("mount")
if mount is None:
continue
- mount = mount.lstrip("/")
- if mount not in mounts:
- continue
listeners_el = source.find("listeners")
listeners = 0 if listeners_el is None else int(listeners_el.text)
+ mount = mount.lstrip("/")
stats[mount] = Stats(
listeners=listeners,
)
return stats
- def collect(self, *, _timestamp: Optional[datetime] = None):
+ def collect(
+ self,
+ outputs: List[AnyOutput],
+ *,
+ _timestamp: Optional[datetime] = None,
+ ):
if _timestamp is None:
_timestamp = datetime.utcnow()
- servers = self.get_streams_grouped_by_server()
-
stats: List[Dict[str, Any]] = []
stats_timestamp = _timestamp.strftime("%Y-%m-%d %H:%M:%S")
+ cache: Dict[str, Dict[str, Stats]] = {}
- for server in servers:
- server_stats = self.collect_server_stats(server)
- if not server_stats:
+ for output_id, output in enumerate(outputs, start=1):
+ if (
+ output.kind not in ("icecast", "shoutcast")
+ or not output.enabled
+ or output.admin_password is None
+ ):
continue
- stats.extend(
- {
- "timestamp": stats_timestamp,
- "num_listeners": mount_stats.listeners,
- "mount_name": mount,
- }
- for mount, mount_stats in server_stats.items()
- )
+ output_url = self.get_output_url(output)
+ if output_url not in cache:
+ try:
+ cache[output_url] = self.collect_output_stats(output)
+ except (
+ etree.XMLSyntaxError,
+ ConnectionError,
+ HTTPError,
+ Timeout,
+ ) as exception:
+ logger.exception(exception)
+ self._legacy_client.update_stream_setting_table(
+ {output_id: str(exception)}
+ )
+ continue
+
+ output_stats = cache[output_url]
+
+ mount = "shoutcast" if output.kind == "shoutcast" else output.mount
+
+ if mount in output_stats:
+ stats.append(
+ {
+ "timestamp": stats_timestamp,
+ "num_listeners": output_stats[mount].listeners,
+ "mount_name": mount,
+ }
+ )
if stats:
self._legacy_client.push_stream_stats(stats)
@@ -172,15 +146,16 @@ class StatsCollectorThread(Thread):
name = "stats collector"
daemon = True
- def __init__(self, legacy_client: LegacyClient) -> None:
+ def __init__(self, config: Config, legacy_client: LegacyClient) -> None:
super().__init__()
+ self._config = config
self._collector = StatsCollector(legacy_client)
def run(self):
logger.info(f"starting {self.name}")
while True:
try:
- self._collector.collect()
+ self._collector.collect(self._config.stream.outputs.merged)
except Exception as exception:
logger.exception(exception)
sleep(120)
diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py
index d46ef9245..9fdc5feb5 100644
--- a/playout/libretime_playout/main.py
+++ b/playout/libretime_playout/main.py
@@ -135,7 +135,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
recorder_thread.daemon = True
recorder_thread.start()
- stats_collector_thread = StatsCollectorThread(legacy_client)
+ stats_collector_thread = StatsCollectorThread(config, legacy_client)
stats_collector_thread.start()
# Just sleep the main thread, instead of blocking on fetch_thread.join().
diff --git a/playout/tests/conftest.py b/playout/tests/conftest.py
index d9f24e4e2..cb30a3d62 100644
--- a/playout/tests/conftest.py
+++ b/playout/tests/conftest.py
@@ -3,7 +3,7 @@ import pytest
from libretime_playout.config import Config
-@pytest.fixture(scope="session")
+@pytest.fixture()
def config():
return Config(
**{
diff --git a/playout/tests/history/stats_test.py b/playout/tests/history/stats_test.py
index ec418ae0d..012db80af 100644
--- a/playout/tests/history/stats_test.py
+++ b/playout/tests/history/stats_test.py
@@ -1,90 +1,97 @@
from datetime import datetime
+from typing import List
from unittest.mock import Mock, call
import pytest
+from libretime_shared.config import IcecastOutput, ShoutcastOutput
+from lxml.etree import XMLSyntaxError
+from requests.exceptions import HTTPError
-from libretime_playout.history.stats import Server, Source, Stats, StatsCollector
+from libretime_playout.history.stats import AnyOutput, Stats, StatsCollector
from ..fixtures import icecast_stats, shoutcast_admin
-@pytest.fixture(name="server")
-def _server_fixture():
- return Server(
- host="example.com",
- port=8000,
- auth=("admin", "hackme"),
- sources=[
- Source("s1", "main.ogg"),
- ],
- )
+@pytest.fixture(name="outputs")
+def outputs_fixture():
+ default_output = {
+ "enabled": True,
+ "mount": "main.ogg",
+ "source_password": "hackme",
+ "admin_password": "hackme",
+ "audio": {"format": "ogg", "bitrate": 256},
+ }
+ return [
+ IcecastOutput(**default_output),
+ IcecastOutput(
+ **{
+ **default_output,
+ "mount": "main.mp3",
+ "audio": {"format": "mp3", "bitrate": 256},
+ }
+ ),
+ ]
-def test_stats_collector_collect_server_stats(requests_mock, server):
+def test_stats_collector_collect_server_stats(
+ requests_mock,
+ outputs: List[AnyOutput],
+):
requests_mock.get(
- "http://example.com:8000/admin/stats.xml",
+ "http://localhost:8000/admin/stats.xml",
content=icecast_stats.read_bytes(),
)
legacy_client = Mock()
collector = StatsCollector(legacy_client)
- assert collector.collect_server_stats(server) == {"main.ogg": Stats(listeners=2)}
+ result = collector.collect_output_stats(outputs[0])
+ assert result == {
+ "main.ogg": Stats(listeners=2),
+ "main.mp3": Stats(listeners=3),
+ }
legacy_client.assert_not_called()
-def test_stats_collector_collect_server_stats_unauthorized(requests_mock, server):
+def test_stats_collector_collect_server_stats_unauthorized(
+ requests_mock,
+ outputs: List[AnyOutput],
+):
requests_mock.get(
- "http://example.com:8000/admin/stats.xml",
+ "http://localhost:8000/admin/stats.xml",
status_code=401,
)
legacy_client = Mock()
collector = StatsCollector(legacy_client)
- assert not collector.collect_server_stats(server)
-
- legacy_client.assert_has_calls(
- [
- call.update_stream_setting_table(
- {
- "s1": "401 Client Error: None for url: http://example.com:8000/admin/stats.xml",
- }
- )
- ]
- )
+ with pytest.raises(HTTPError):
+ collector.collect_output_stats(outputs[0])
-def test_stats_collector_collect_server_stats_invalid_xml(requests_mock, server):
+def test_stats_collector_collect_server_stats_invalid_xml(
+ requests_mock,
+ outputs: List[AnyOutput],
+):
requests_mock.get(
- "http://example.com:8000/admin/stats.xml",
- content=b"""
-
- localhost
-
- """,
+ "http://localhost:8000/admin/stats.xml",
+ content=b"""localhost""",
)
legacy_client = Mock()
collector = StatsCollector(legacy_client)
- assert not collector.collect_server_stats(server)
-
- legacy_client.assert_has_calls(
- [
- call.update_stream_setting_table(
- {
- "s1": "Opening and ending tag mismatch: host line 3 and icestats, line 4, column 12 (, line 4)",
- }
- )
- ]
- )
+ with pytest.raises(XMLSyntaxError):
+ collector.collect_output_stats(outputs[0])
-def test_stats_collector_collect(requests_mock):
+def test_stats_collector_collect(
+ requests_mock,
+ outputs: List[AnyOutput],
+):
requests_mock.get(
- "http://example.com:8000/admin/stats.xml",
+ "http://localhost:8000/admin/stats.xml",
content=icecast_stats.read_bytes(),
)
requests_mock.get(
@@ -93,35 +100,38 @@ def test_stats_collector_collect(requests_mock):
)
legacy_client = Mock()
- default_stream = {
- "enable": "true",
- "output": "icecast",
- "host": "example.com",
- "port": 8000,
- "mount": "main.ogg",
- "admin_user": "admin",
- "admin_pass": "hackme",
- }
- legacy_client.get_stream_parameters.return_value = {
- "stream_params": {
- "s1": {**default_stream},
- "s2": {**default_stream, "enable": "false", "mount": "main.mp3"},
- "s3": {**default_stream, "mount": "unknown.mp3"},
- "s4": {
- **default_stream,
- "output": "shoutcast",
- "host": "shoutcast.com",
- "mount": "shout.mp3",
- },
- }
+ default_output = {
+ "source_password": "hackme",
+ "admin_password": "hackme",
}
+ outputs.extend(
+ [
+ IcecastOutput(
+ **{
+ **default_output,
+ "enabled": False,
+ "host": "example.com",
+ "mount": "disabled.ogg",
+ "audio": {"format": "ogg", "bitrate": 256},
+ }
+ ),
+ ShoutcastOutput(
+ **{
+ **default_output,
+ "enabled": True,
+ "kind": "shoutcast",
+ "host": "shoutcast.com",
+ "audio": {"format": "mp3", "bitrate": 256},
+ }
+ ),
+ ]
+ )
collector = StatsCollector(legacy_client)
- collector.collect(_timestamp=datetime(2022, 8, 9, 11, 19, 7))
+ collector.collect(outputs, _timestamp=datetime(2022, 8, 9, 11, 19, 7))
legacy_client.assert_has_calls(
[
- call.get_stream_parameters(),
call.push_stream_stats(
[
{
@@ -129,12 +139,17 @@ def test_stats_collector_collect(requests_mock):
"num_listeners": 2,
"mount_name": "main.ogg",
},
+ {
+ "timestamp": "2022-08-09 11:19:07",
+ "num_listeners": 3,
+ "mount_name": "main.mp3",
+ },
{
"timestamp": "2022-08-09 11:19:07",
"num_listeners": 1,
"mount_name": "shoutcast",
},
]
- ),
+ )
]
)