feat(playout): rewrite stats collector (#2028)

- Replace defusedxml with lxml
This commit is contained in:
Jonas L 2022-08-09 21:14:19 +02:00 committed by GitHub
parent 02c16de2ab
commit 4019367abc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 426 additions and 163 deletions

View File

@ -0,0 +1,186 @@
from dataclasses import dataclass
from datetime import datetime
from threading import Thread
from time import sleep
from typing import Any, Dict, List, Optional, Tuple
from libretime_api_client.v1 import ApiClient as LegacyClient
from loguru import logger
from lxml import etree
from requests import Session
from requests.exceptions import ( # pylint: disable=redefined-builtin
ConnectionError,
HTTPError,
Timeout,
)
@dataclass
class Source:
stream_id: str
mount: str
@dataclass
class Server:
host: str
port: int
auth: Tuple[str, str]
sources: List[Source]
is_shoutcast: bool = False
@dataclass
class Stats:
listeners: int
# pylint: disable=too-few-public-methods
class StatsCollector:
"""
Collect stats from Icecast and Shoutcast.
"""
_session: Session
def __init__(self, legacy_client: LegacyClient):
self._session = Session()
self._timeout = 30
self._legacy_client = legacy_client
def get_streams_grouped_by_server(self) -> List[Server]:
"""
Get streams grouped by server to prevent duplicate requests.
"""
dirty_streams: Dict[str, Dict[str, Any]]
dirty_streams = self._legacy_client.get_stream_parameters()["stream_params"]
servers: Dict[str, Server] = {}
for stream_id, dirty_stream in dirty_streams.items():
if dirty_stream["enable"].lower() != "true":
continue
source = Source(stream_id=stream_id, mount=dirty_stream["mount"])
server_id = f"{dirty_stream['host']}:{dirty_stream['port']}"
if server_id not in servers:
servers[server_id] = Server(
host=dirty_stream["host"],
port=dirty_stream["port"],
auth=(dirty_stream["admin_user"], dirty_stream["admin_pass"]),
sources=[source],
is_shoutcast=dirty_stream["output"] == "shoutcast",
)
else:
servers[server_id].sources.append(source)
return list(servers.values())
def report_server_error(self, server: Server, error: Exception):
self._legacy_client.update_stream_setting_table(
{source.stream_id: str(error) for source in server.sources}
)
def collect_server_stats(self, server: Server) -> Dict[str, Stats]:
url = f"http://{server.host}:{server.port}/admin/stats.xml"
# Shoutcast specific url
if server.is_shoutcast:
url = f"http://{server.host}:{server.port}/admin.cgi?sid=1&mode=viewxml"
try:
response = self._session.get(url, auth=server.auth, timeout=self._timeout)
response.raise_for_status()
except (
ConnectionError,
HTTPError,
Timeout,
) as exception:
logger.exception(exception)
self.report_server_error(server, exception)
return {}
try:
root = etree.fromstring( # nosec
response.content,
parser=etree.XMLParser(resolve_entities=False),
)
except etree.XMLSyntaxError as exception:
logger.exception(exception)
self.report_server_error(server, exception)
return {}
stats = {}
# Shoutcast specific parsing
if server.is_shoutcast:
listeners_el = root.find("CURRENTLISTENERS")
listeners = 0 if listeners_el is None else int(listeners_el.text)
stats["shoutcast"] = Stats(
listeners=listeners,
)
return stats
mounts = [source.mount for source in server.sources]
for source in root.iterchildren("source"):
mount = source.attrib.get("mount")
if mount is None:
continue
mount = mount.lstrip("/")
if mount not in mounts:
continue
listeners_el = source.find("listeners")
listeners = 0 if listeners_el is None else int(listeners_el.text)
stats[mount] = Stats(
listeners=listeners,
)
return stats
def collect(self, *, _timestamp: Optional[datetime] = None):
if _timestamp is None:
_timestamp = datetime.utcnow()
servers = self.get_streams_grouped_by_server()
stats: List[Dict[str, Any]] = []
stats_timestamp = _timestamp.strftime("%Y-%m-%d %H:%M:%S")
for server in servers:
server_stats = self.collect_server_stats(server)
if not server_stats:
continue
stats.extend(
{
"timestamp": stats_timestamp,
"num_listeners": mount_stats.listeners,
"mount_name": mount,
}
for mount, mount_stats in server_stats.items()
)
if stats:
self._legacy_client.push_stream_stats(stats)
class StatsCollectorThread(Thread):
name = "stats collector"
daemon = True
def __init__(self, legacy_client: LegacyClient) -> None:
super().__init__()
self._collector = StatsCollector(legacy_client)
def run(self):
logger.info(f"starting {self.name}")
while True:
try:
self._collector.collect()
except Exception as exception:
logger.exception(exception)
sleep(120)

View File

@ -21,6 +21,7 @@ from libretime_shared.logging import level_from_name, setup_logger
from loguru import logger from loguru import logger
from .config import CACHE_DIR, RECORD_DIR, Config from .config import CACHE_DIR, RECORD_DIR, Config
from .history.stats import StatsCollectorThread
from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION, parse_liquidsoap_version from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION, parse_liquidsoap_version
from .message_handler import PypoMessageHandler from .message_handler import PypoMessageHandler
from .player.fetch import PypoFetch from .player.fetch import PypoFetch
@ -28,7 +29,6 @@ from .player.file import PypoFile
from .player.liquidsoap import PypoLiquidsoap from .player.liquidsoap import PypoLiquidsoap
from .player.push import PypoPush from .player.push import PypoPush
from .recorder import Recorder from .recorder import Recorder
from .stats import ListenerStat
from .timeout import ls_timeout from .timeout import ls_timeout
@ -196,9 +196,8 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
recorder.daemon = True recorder.daemon = True
recorder.start() recorder.start()
stat = ListenerStat(config, legacy_client) stats_collector = StatsCollectorThread(legacy_client)
stat.daemon = True stats_collector.start()
stat.start()
# Just sleep the main thread, instead of blocking on pf.join(). # Just sleep the main thread, instead of blocking on pf.join().
# This allows CTRL-C to work! # This allows CTRL-C to work!

View File

@ -1,156 +0,0 @@
import base64
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime
from threading import Thread
import defusedxml.minidom
from libretime_api_client.v1 import ApiClient as LegacyClient
from loguru import logger
from .config import Config
class ListenerStat(Thread):
HTTP_REQUEST_TIMEOUT = 30 # 30 second HTTP request timeout
def __init__(self, config: Config, legacy_client: LegacyClient):
Thread.__init__(self)
self.config = config
self.legacy_client = legacy_client
def get_node_text(self, nodelist):
rc = []
for node in nodelist:
if node.nodeType == node.TEXT_NODE:
rc.append(node.data)
return "".join(rc)
def get_stream_parameters(self):
# [{"user":"", "password":"", "url":"", "port":""},{},{}]
return self.legacy_client.get_stream_parameters()
def get_stream_server_xml(self, ip, url, is_shoutcast=False):
auth_string = "%(admin_user)s:%(admin_pass)s" % ip
encoded = base64.b64encode(auth_string.encode("utf-8"))
header = {"Authorization": "Basic %s" % encoded.decode("ascii")}
if is_shoutcast:
# user agent is required for shoutcast auth, otherwise it returns 404.
user_agent = "Mozilla/5.0 (Linux; rv:22.0) Gecko/20130405 Firefox/22.0"
header["User-Agent"] = user_agent
req = urllib.request.Request(
# assuming that the icecast stats path is /admin/stats.xml
# need to fix this
url=url,
headers=header,
)
resp = urllib.request.urlopen(req, timeout=ListenerStat.HTTP_REQUEST_TIMEOUT)
document = resp.read()
return document
def get_icecast_stats(self, ip):
document = None
if "airtime.pro" in ip["host"].lower():
url = "http://%(host)s:%(port)s/stats.xsl" % ip
document = self.get_stream_server_xml(ip, url)
else:
url = "http://%(host)s:%(port)s/admin/stats.xml" % ip
document = self.get_stream_server_xml(ip, url)
dom = defusedxml.minidom.parseString(document)
sources = dom.getElementsByTagName("source")
mount_stats = None
for source in sources:
# drop the leading '/' character
mount_name = source.getAttribute("mount")[1:]
if mount_name == ip["mount"]:
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
listeners = source.getElementsByTagName("listeners")
num_listeners = 0
if len(listeners):
num_listeners = self.get_node_text(listeners[0].childNodes)
mount_stats = {
"timestamp": timestamp,
"num_listeners": num_listeners,
"mount_name": mount_name,
}
return mount_stats
def get_shoutcast_stats(self, ip):
url = "http://%(host)s:%(port)s/admin.cgi?sid=1&mode=viewxml" % ip
document = self.get_stream_server_xml(ip, url, is_shoutcast=True)
dom = defusedxml.minidom.parseString(document)
current_listeners = dom.getElementsByTagName("CURRENTLISTENERS")
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
num_listeners = 0
if current_listeners:
num_listeners = self.get_node_text(current_listeners[0].childNodes)
mount_stats = {
"timestamp": timestamp,
"num_listeners": num_listeners,
"mount_name": "shoutcast",
}
return mount_stats
def get_stream_stats(self, stream_parameters):
stats = []
# iterate over stream_parameters which is a list of dicts. Each dict
# represents one Airtime stream (currently this limit is 3).
# Note that there can be optimizations done, since if all three
# streams are the same server, we will still initiate 3 separate
# connections
for k, v in stream_parameters.items():
if v["enable"] == "true":
try:
if v["output"] == "icecast":
mount_stats = self.get_icecast_stats(v)
if mount_stats:
stats.append(mount_stats)
else:
stats.append(self.get_shoutcast_stats(v))
self.update_listener_stat_error(k, "OK")
except Exception as exception:
try:
self.update_listener_stat_error(k, str(exception))
except Exception as exception2:
logger.exception(exception2)
return stats
def push_stream_stats(self, stats):
self.legacy_client.push_stream_stats(stats)
def update_listener_stat_error(self, stream_id, error):
data = {stream_id: error}
self.legacy_client.update_stream_setting_table(data)
def run(self):
# Wake up every 120 seconds and gather icecast statistics. Note that we
# are currently querying the server every 2 minutes for list of
# mountpoints as well. We could remove this query if we hooked into
# rabbitmq events, and listened for these changes instead.
while True:
try:
stream_parameters = self.get_stream_parameters()
stats = self.get_stream_stats(stream_parameters["stream_params"])
if stats:
self.push_stream_stats(stats)
except Exception as exception:
logger.exception(exception)
time.sleep(120)

View File

@ -2,6 +2,7 @@
[python] [python]
python3 = buster, bullseye, bionic, focal, jammy python3 = buster, bullseye, bionic, focal, jammy
python3-pip = buster, bullseye, bionic, focal, jammy python3-pip = buster, bullseye, bionic, focal, jammy
python3-lxml = bullseye, focal, jammy
[liquidsoap] [liquidsoap]
# https://github.com/savonet/liquidsoap/blob/main/CHANGES.md # https://github.com/savonet/liquidsoap/blob/main/CHANGES.md

View File

@ -1,5 +1,8 @@
[tool.pylint.messages_control] [tool.pylint.messages_control]
extension-pkg-whitelist = "pydantic" extension-pkg-whitelist = [
"lxml.etree",
"pydantic",
]
disable = [ disable = [
"missing-class-docstring", "missing-class-docstring",
"missing-function-docstring", "missing-function-docstring",

View File

@ -1,8 +1,9 @@
# Please do not edit this file, edit the setup.py file! # Please do not edit this file, edit the setup.py file!
# This file is auto-generated by tools/extract_requirements.py. # This file is auto-generated by tools/extract_requirements.py.
backports.zoneinfo>=0.2.1,<0.3;python_version<'3.9' backports.zoneinfo>=0.2.1,<0.3;python_version<'3.9'
defusedxml>=0.6.0,<0.8 dataclasses>=0.8,<0.9;python_version<'3.7'
kombu==4.6.11 kombu==4.6.11
lxml>=4.5.0,<4.10.0
mutagen>=1.45.1,<1.46 mutagen>=1.45.1,<1.46
python-dateutil>=2.8.1,<2.9 python-dateutil>=2.8.1,<2.9
requests>=2.25.1,<2.29 requests>=2.25.1,<2.29

View File

@ -24,8 +24,9 @@ setup(
python_requires=">=3.6", python_requires=">=3.6",
install_requires=[ install_requires=[
"backports.zoneinfo>=0.2.1,<0.3;python_version<'3.9'", "backports.zoneinfo>=0.2.1,<0.3;python_version<'3.9'",
"defusedxml>=0.6.0,<0.8", "dataclasses>=0.8,<0.9;python_version<'3.7'",
"kombu==4.6.11", "kombu==4.6.11",
"lxml>=4.5.0,<4.10.0",
"mutagen>=1.45.1,<1.46", "mutagen>=1.45.1,<1.46",
"python-dateutil>=2.8.1,<2.9", "python-dateutil>=2.8.1,<2.9",
"requests>=2.25.1,<2.29", "requests>=2.25.1,<2.29",

View File

6
playout/tests/fixtures/__init__.py vendored Normal file
View File

@ -0,0 +1,6 @@
from pathlib import Path
fixture_path = Path(__file__).parent
icecast_stats = fixture_path / "icecast_stats.xml"
shoutcast_admin = fixture_path / "shoutcast_admin.xml"

View File

@ -0,0 +1,74 @@
<?xml version="1.0"?>
<icestats>
<admin>icemaster@radio.org</admin>
<client_connections>3935</client_connections>
<clients>7</clients>
<connections>4201</connections>
<file_connections>14</file_connections>
<host>localhost</host>
<listener_connections>117</listener_connections>
<listeners>5</listeners>
<location>Moon</location>
<server_id>Icecast 2.4.4</server_id>
<server_start>Tue, 15 Mar 2022 18:29:12 +0100</server_start>
<server_start_iso8601>2022-03-15T18:29:12+0100</server_start_iso8601>
<source_client_connections>2</source_client_connections>
<source_relay_connections>0</source_relay_connections>
<source_total_connections>2</source_total_connections>
<sources>2</sources>
<stats>0</stats>
<stats_connections>0</stats_connections>
<source mount="/main.mp3">
<audio_info>channels=2;samplerate=44100;bitrate=320</audio_info>
<bitrate>320</bitrate>
<channels>2</channels>
<genre>various</genre>
<listener_peak>7</listener_peak>
<listeners>3</listeners>
<listenurl>http://localhost:8800/main.mp3</listenurl>
<max_listeners>unlimited</max_listeners>
<public>1</public>
<samplerate>44100</samplerate>
<server_description>Main (mp3 320kbps)</server_description>
<server_name>Radio</server_name>
<server_type>audio/mpeg</server_type>
<server_url>https://www.radio.org</server_url>
<slow_listeners>2</slow_listeners>
<source_ip>192.168.100.20</source_ip>
<stream_start>Tue, 15 Mar 2022 18:29:19 +0100</stream_start>
<stream_start_iso8601>2022-03-15T18:29:19+0100</stream_start_iso8601>
<title>Robert Glasper Experiment/Lupe Fiasco/Bilal - Always Shine</title>
<total_bytes_read>6110388200</total_bytes_read>
<total_bytes_sent>20338244727</total_bytes_sent>
<user_agent>Liquidsoap/1.4.4 (Unix; OCaml 4.10.0)</user_agent>
</source>
<source mount="/main.ogg">
<audio_bitrate>256000</audio_bitrate>
<audio_channels>2</audio_channels>
<audio_info>channels=2;quality=0.8;samplerate=44100</audio_info>
<audio_samplerate>44100</audio_samplerate>
<channels>2</channels>
<genre>various</genre>
<ice-bitrate>256</ice-bitrate>
<listener_peak>4</listener_peak>
<listeners>2</listeners>
<listenurl>http://localhost:8800/main.ogg</listenurl>
<max_listeners>unlimited</max_listeners>
<public>1</public>
<quality>0.8</quality>
<samplerate>44100</samplerate>
<server_description>Main (ogg 256kbps)</server_description>
<server_name>Radio</server_name>
<server_type>application/ogg</server_type>
<server_url>https://www.radio.org</server_url>
<slow_listeners>2</slow_listeners>
<source_ip>192.168.100.20</source_ip>
<stream_start>Tue, 15 Mar 2022 18:29:19 +0100</stream_start>
<stream_start_iso8601>2022-03-15T18:29:19+0100</stream_start_iso8601>
<subtype>Vorbis</subtype>
<title>Robert Glasper Experiment/Lupe Fiasco/Bilal - Always Shine</title>
<total_bytes_read>4499297657</total_bytes_read>
<total_bytes_sent>9051758982</total_bytes_sent>
<user_agent>Liquidsoap/1.4.4 (Unix; OCaml 4.10.0)</user_agent>
</source>
</icestats>

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<SHOUTCASTSERVER>
<CURRENTLISTENERS>1</CURRENTLISTENERS>
<PEAKLISTENERS>0</PEAKLISTENERS>
<MAXLISTENERS>32</MAXLISTENERS>
<UNIQUELISTENERS>0</UNIQUELISTENERS>
<AVERAGETIME>0</AVERAGETIME>
</SHOUTCASTSERVER>

View File

View File

@ -0,0 +1,140 @@
from datetime import datetime
from unittest.mock import Mock, call
import pytest
from libretime_playout.history.stats import Server, Source, Stats, StatsCollector
from ..fixtures import icecast_stats, shoutcast_admin
@pytest.fixture(name="server")
def _server_fixture():
return Server(
host="example.com",
port=8000,
auth=("admin", "hackme"),
sources=[
Source("s1", "main.ogg"),
],
)
def test_stats_collector_collect_server_stats(requests_mock, server):
requests_mock.get(
"http://example.com:8000/admin/stats.xml",
content=icecast_stats.read_bytes(),
)
legacy_client = Mock()
collector = StatsCollector(legacy_client)
assert collector.collect_server_stats(server) == {"main.ogg": Stats(listeners=2)}
legacy_client.assert_not_called()
def test_stats_collector_collect_server_stats_unauthorized(requests_mock, server):
requests_mock.get(
"http://example.com:8000/admin/stats.xml",
status_code=401,
)
legacy_client = Mock()
collector = StatsCollector(legacy_client)
assert not collector.collect_server_stats(server)
legacy_client.assert_has_calls(
[
call.update_stream_setting_table(
{
"s1": "401 Client Error: None for url: http://example.com:8000/admin/stats.xml",
}
)
]
)
def test_stats_collector_collect_server_stats_invalid_xml(requests_mock, server):
requests_mock.get(
"http://example.com:8000/admin/stats.xml",
content=b"""<?xml version="1.0"?>
<icestats>
<host>localhost
</icestats>
""",
)
legacy_client = Mock()
collector = StatsCollector(legacy_client)
assert not collector.collect_server_stats(server)
legacy_client.assert_has_calls(
[
call.update_stream_setting_table(
{
"s1": "Opening and ending tag mismatch: host line 3 and icestats, line 4, column 12 (<string>, line 4)",
}
)
]
)
def test_stats_collector_collect(requests_mock):
requests_mock.get(
"http://example.com:8000/admin/stats.xml",
content=icecast_stats.read_bytes(),
)
requests_mock.get(
"http://shoutcast.com:8000/admin.cgi?sid=1&mode=viewxml",
content=shoutcast_admin.read_bytes(),
)
legacy_client = Mock()
default_stream = {
"enable": "true",
"output": "icecast",
"host": "example.com",
"port": 8000,
"mount": "main.ogg",
"admin_user": "admin",
"admin_pass": "hackme",
}
legacy_client.get_stream_parameters.return_value = {
"stream_params": {
"s1": {**default_stream},
"s2": {**default_stream, "enable": "false", "mount": "main.mp3"},
"s3": {**default_stream, "mount": "unknown.mp3"},
"s4": {
**default_stream,
"output": "shoutcast",
"host": "shoutcast.com",
"mount": "shout.mp3",
},
}
}
collector = StatsCollector(legacy_client)
collector.collect(_timestamp=datetime(2022, 8, 9, 11, 19, 7))
legacy_client.assert_has_calls(
[
call.get_stream_parameters(),
call.push_stream_stats(
[
{
"timestamp": "2022-08-09 11:19:07",
"num_listeners": 2,
"mount_name": "main.ogg",
},
{
"timestamp": "2022-08-09 11:19:07",
"num_listeners": 1,
"mount_name": "shoutcast",
},
]
),
]
)