feat(playout): stats collector using stream config

This commit is contained in:
jo 2022-08-10 17:35:06 +02:00 committed by Kyle Robbertze
parent b9368d1b7b
commit 314c70a208
4 changed files with 156 additions and 166 deletions

View File

@ -2,9 +2,10 @@ from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Union
from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_shared.config import IcecastOutput, ShoutcastOutput
from loguru import logger from loguru import logger
from lxml import etree from lxml import etree
from requests import Session from requests import Session
@ -14,20 +15,9 @@ from requests.exceptions import ( # pylint: disable=redefined-builtin
Timeout, Timeout,
) )
from ..config import Config
@dataclass AnyOutput = Union[IcecastOutput, ShoutcastOutput]
class Source:
stream_id: str
mount: str
@dataclass
class Server:
host: str
port: int
auth: Tuple[str, str]
sources: List[Source]
is_shoutcast: bool = False
@dataclass @dataclass
@ -48,73 +38,32 @@ class StatsCollector:
self._timeout = 30 self._timeout = 30
self._legacy_client = legacy_client self._legacy_client = legacy_client
def get_streams_grouped_by_server(self) -> List[Server]: def get_output_url(self, output: AnyOutput) -> str:
""" if output.kind == "icecast":
Get streams grouped by server to prevent duplicate requests. return f"http://{output.host}:{output.port}/admin/stats.xml"
""" return f"http://{output.host}:{output.port}/admin.cgi?sid=1&mode=viewxml"
dirty_streams: Dict[str, Dict[str, Any]]
dirty_streams = self._legacy_client.get_stream_parameters()["stream_params"]
servers: Dict[str, Server] = {} def collect_output_stats(
for stream_id, dirty_stream in dirty_streams.items(): self,
if dirty_stream["enable"].lower() != "true": output: AnyOutput,
continue ) -> Dict[str, Stats]:
source = Source(stream_id=stream_id, mount=dirty_stream["mount"]) response = self._session.get(
url=self.get_output_url(output),
server_id = f"{dirty_stream['host']}:{dirty_stream['port']}" auth=(output.admin_user, output.admin_password),
if server_id not in servers: timeout=self._timeout,
servers[server_id] = Server(
host=dirty_stream["host"],
port=dirty_stream["port"],
auth=(dirty_stream["admin_user"], dirty_stream["admin_pass"]),
sources=[source],
is_shoutcast=dirty_stream["output"] == "shoutcast",
)
else:
servers[server_id].sources.append(source)
return list(servers.values())
def report_server_error(self, server: Server, error: Exception):
self._legacy_client.update_stream_setting_table(
{source.stream_id: str(error) for source in server.sources}
) )
response.raise_for_status()
def collect_server_stats(self, server: Server) -> Dict[str, Stats]: root = etree.fromstring( # nosec
url = f"http://{server.host}:{server.port}/admin/stats.xml" response.content,
parser=etree.XMLParser(resolve_entities=False),
# Shoutcast specific url )
if server.is_shoutcast:
url = f"http://{server.host}:{server.port}/admin.cgi?sid=1&mode=viewxml"
try:
response = self._session.get(url, auth=server.auth, timeout=self._timeout)
response.raise_for_status()
except (
ConnectionError,
HTTPError,
Timeout,
) as exception:
logger.exception(exception)
self.report_server_error(server, exception)
return {}
try:
root = etree.fromstring( # nosec
response.content,
parser=etree.XMLParser(resolve_entities=False),
)
except etree.XMLSyntaxError as exception:
logger.exception(exception)
self.report_server_error(server, exception)
return {}
stats = {} stats = {}
# Shoutcast specific parsing # Shoutcast specific parsing
if server.is_shoutcast: if output.kind == "shoutcast":
listeners_el = root.find("CURRENTLISTENERS") listeners_el = root.find("CURRENTLISTENERS")
listeners = 0 if listeners_el is None else int(listeners_el.text) listeners = 0 if listeners_el is None else int(listeners_el.text)
@ -123,46 +72,71 @@ class StatsCollector:
) )
return stats return stats
mounts = [source.mount for source in server.sources] # Icecast specific parsing
for source in root.iterchildren("source"): for source in root.iterchildren("source"):
mount = source.attrib.get("mount") mount = source.attrib.get("mount")
if mount is None: if mount is None:
continue continue
mount = mount.lstrip("/")
if mount not in mounts:
continue
listeners_el = source.find("listeners") listeners_el = source.find("listeners")
listeners = 0 if listeners_el is None else int(listeners_el.text) listeners = 0 if listeners_el is None else int(listeners_el.text)
mount = mount.lstrip("/")
stats[mount] = Stats( stats[mount] = Stats(
listeners=listeners, listeners=listeners,
) )
return stats return stats
def collect(self, *, _timestamp: Optional[datetime] = None): def collect(
self,
outputs: List[AnyOutput],
*,
_timestamp: Optional[datetime] = None,
):
if _timestamp is None: if _timestamp is None:
_timestamp = datetime.utcnow() _timestamp = datetime.utcnow()
servers = self.get_streams_grouped_by_server()
stats: List[Dict[str, Any]] = [] stats: List[Dict[str, Any]] = []
stats_timestamp = _timestamp.strftime("%Y-%m-%d %H:%M:%S") stats_timestamp = _timestamp.strftime("%Y-%m-%d %H:%M:%S")
cache: Dict[str, Dict[str, Stats]] = {}
for server in servers: for output_id, output in enumerate(outputs, start=1):
server_stats = self.collect_server_stats(server) if (
if not server_stats: output.kind not in ("icecast", "shoutcast")
or not output.enabled
or output.admin_password is None
):
continue continue
stats.extend( output_url = self.get_output_url(output)
{ if output_url not in cache:
"timestamp": stats_timestamp, try:
"num_listeners": mount_stats.listeners, cache[output_url] = self.collect_output_stats(output)
"mount_name": mount, except (
} etree.XMLSyntaxError,
for mount, mount_stats in server_stats.items() ConnectionError,
) HTTPError,
Timeout,
) as exception:
logger.exception(exception)
self._legacy_client.update_stream_setting_table(
{output_id: str(exception)}
)
continue
output_stats = cache[output_url]
mount = "shoutcast" if output.kind == "shoutcast" else output.mount
if mount in output_stats:
stats.append(
{
"timestamp": stats_timestamp,
"num_listeners": output_stats[mount].listeners,
"mount_name": mount,
}
)
if stats: if stats:
self._legacy_client.push_stream_stats(stats) self._legacy_client.push_stream_stats(stats)
@ -172,15 +146,16 @@ class StatsCollectorThread(Thread):
name = "stats collector" name = "stats collector"
daemon = True daemon = True
def __init__(self, legacy_client: LegacyClient) -> None: def __init__(self, config: Config, legacy_client: LegacyClient) -> None:
super().__init__() super().__init__()
self._config = config
self._collector = StatsCollector(legacy_client) self._collector = StatsCollector(legacy_client)
def run(self): def run(self):
logger.info(f"starting {self.name}") logger.info(f"starting {self.name}")
while True: while True:
try: try:
self._collector.collect() self._collector.collect(self._config.stream.outputs.merged)
except Exception as exception: except Exception as exception:
logger.exception(exception) logger.exception(exception)
sleep(120) sleep(120)

View File

@ -135,7 +135,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
recorder_thread.daemon = True recorder_thread.daemon = True
recorder_thread.start() recorder_thread.start()
stats_collector_thread = StatsCollectorThread(legacy_client) stats_collector_thread = StatsCollectorThread(config, legacy_client)
stats_collector_thread.start() stats_collector_thread.start()
# Just sleep the main thread, instead of blocking on fetch_thread.join(). # Just sleep the main thread, instead of blocking on fetch_thread.join().

View File

@ -3,7 +3,7 @@ import pytest
from libretime_playout.config import Config from libretime_playout.config import Config
@pytest.fixture(scope="session") @pytest.fixture()
def config(): def config():
return Config( return Config(
**{ **{

View File

@ -1,90 +1,97 @@
from datetime import datetime from datetime import datetime
from typing import List
from unittest.mock import Mock, call from unittest.mock import Mock, call
import pytest import pytest
from libretime_shared.config import IcecastOutput, ShoutcastOutput
from lxml.etree import XMLSyntaxError
from requests.exceptions import HTTPError
from libretime_playout.history.stats import Server, Source, Stats, StatsCollector from libretime_playout.history.stats import AnyOutput, Stats, StatsCollector
from ..fixtures import icecast_stats, shoutcast_admin from ..fixtures import icecast_stats, shoutcast_admin
@pytest.fixture(name="server") @pytest.fixture(name="outputs")
def _server_fixture(): def outputs_fixture():
return Server( default_output = {
host="example.com", "enabled": True,
port=8000, "mount": "main.ogg",
auth=("admin", "hackme"), "source_password": "hackme",
sources=[ "admin_password": "hackme",
Source("s1", "main.ogg"), "audio": {"format": "ogg", "bitrate": 256},
], }
) return [
IcecastOutput(**default_output),
IcecastOutput(
**{
**default_output,
"mount": "main.mp3",
"audio": {"format": "mp3", "bitrate": 256},
}
),
]
def test_stats_collector_collect_server_stats(requests_mock, server): def test_stats_collector_collect_server_stats(
requests_mock,
outputs: List[AnyOutput],
):
requests_mock.get( requests_mock.get(
"http://example.com:8000/admin/stats.xml", "http://localhost:8000/admin/stats.xml",
content=icecast_stats.read_bytes(), content=icecast_stats.read_bytes(),
) )
legacy_client = Mock() legacy_client = Mock()
collector = StatsCollector(legacy_client) collector = StatsCollector(legacy_client)
assert collector.collect_server_stats(server) == {"main.ogg": Stats(listeners=2)} result = collector.collect_output_stats(outputs[0])
assert result == {
"main.ogg": Stats(listeners=2),
"main.mp3": Stats(listeners=3),
}
legacy_client.assert_not_called() legacy_client.assert_not_called()
def test_stats_collector_collect_server_stats_unauthorized(requests_mock, server): def test_stats_collector_collect_server_stats_unauthorized(
requests_mock,
outputs: List[AnyOutput],
):
requests_mock.get( requests_mock.get(
"http://example.com:8000/admin/stats.xml", "http://localhost:8000/admin/stats.xml",
status_code=401, status_code=401,
) )
legacy_client = Mock() legacy_client = Mock()
collector = StatsCollector(legacy_client) collector = StatsCollector(legacy_client)
assert not collector.collect_server_stats(server) with pytest.raises(HTTPError):
collector.collect_output_stats(outputs[0])
legacy_client.assert_has_calls(
[
call.update_stream_setting_table(
{
"s1": "401 Client Error: None for url: http://example.com:8000/admin/stats.xml",
}
)
]
)
def test_stats_collector_collect_server_stats_invalid_xml(requests_mock, server): def test_stats_collector_collect_server_stats_invalid_xml(
requests_mock,
outputs: List[AnyOutput],
):
requests_mock.get( requests_mock.get(
"http://example.com:8000/admin/stats.xml", "http://localhost:8000/admin/stats.xml",
content=b"""<?xml version="1.0"?> content=b"""<?xml version="1.0"?><icestats><host>localhost</icestats>""",
<icestats>
<host>localhost
</icestats>
""",
) )
legacy_client = Mock() legacy_client = Mock()
collector = StatsCollector(legacy_client) collector = StatsCollector(legacy_client)
assert not collector.collect_server_stats(server) with pytest.raises(XMLSyntaxError):
collector.collect_output_stats(outputs[0])
legacy_client.assert_has_calls(
[
call.update_stream_setting_table(
{
"s1": "Opening and ending tag mismatch: host line 3 and icestats, line 4, column 12 (<string>, line 4)",
}
)
]
)
def test_stats_collector_collect(requests_mock): def test_stats_collector_collect(
requests_mock,
outputs: List[AnyOutput],
):
requests_mock.get( requests_mock.get(
"http://example.com:8000/admin/stats.xml", "http://localhost:8000/admin/stats.xml",
content=icecast_stats.read_bytes(), content=icecast_stats.read_bytes(),
) )
requests_mock.get( requests_mock.get(
@ -93,35 +100,38 @@ def test_stats_collector_collect(requests_mock):
) )
legacy_client = Mock() legacy_client = Mock()
default_stream = { default_output = {
"enable": "true", "source_password": "hackme",
"output": "icecast", "admin_password": "hackme",
"host": "example.com",
"port": 8000,
"mount": "main.ogg",
"admin_user": "admin",
"admin_pass": "hackme",
}
legacy_client.get_stream_parameters.return_value = {
"stream_params": {
"s1": {**default_stream},
"s2": {**default_stream, "enable": "false", "mount": "main.mp3"},
"s3": {**default_stream, "mount": "unknown.mp3"},
"s4": {
**default_stream,
"output": "shoutcast",
"host": "shoutcast.com",
"mount": "shout.mp3",
},
}
} }
outputs.extend(
[
IcecastOutput(
**{
**default_output,
"enabled": False,
"host": "example.com",
"mount": "disabled.ogg",
"audio": {"format": "ogg", "bitrate": 256},
}
),
ShoutcastOutput(
**{
**default_output,
"enabled": True,
"kind": "shoutcast",
"host": "shoutcast.com",
"audio": {"format": "mp3", "bitrate": 256},
}
),
]
)
collector = StatsCollector(legacy_client) collector = StatsCollector(legacy_client)
collector.collect(_timestamp=datetime(2022, 8, 9, 11, 19, 7)) collector.collect(outputs, _timestamp=datetime(2022, 8, 9, 11, 19, 7))
legacy_client.assert_has_calls( legacy_client.assert_has_calls(
[ [
call.get_stream_parameters(),
call.push_stream_stats( call.push_stream_stats(
[ [
{ {
@ -129,12 +139,17 @@ def test_stats_collector_collect(requests_mock):
"num_listeners": 2, "num_listeners": 2,
"mount_name": "main.ogg", "mount_name": "main.ogg",
}, },
{
"timestamp": "2022-08-09 11:19:07",
"num_listeners": 3,
"mount_name": "main.mp3",
},
{ {
"timestamp": "2022-08-09 11:19:07", "timestamp": "2022-08-09 11:19:07",
"num_listeners": 1, "num_listeners": 1,
"mount_name": "shoutcast", "mount_name": "shoutcast",
}, },
] ]
), )
] ]
) )