From 6438b54a5fe6d6dd8a1744a0afb2caa98b5fceb0 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Fri, 2 Nov 2012 17:50:43 -0400 Subject: [PATCH] CC-4661: Listener Statistics -backend part working --- .../application/controllers/ApiController.php | 20 +++ .../application/models/ListenerStat.php | 23 +++- .../application/models/StreamSetting.php | 29 ++++ python_apps/api_clients/api_client.cfg | 3 + python_apps/api_clients/api_client.py | 32 +++++ python_apps/pypo/listenerstat.py | 127 ++++++++++++++++++ python_apps/pypo/pypocli.py | 5 + 7 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 python_apps/pypo/listenerstat.py diff --git a/airtime_mvc/application/controllers/ApiController.php b/airtime_mvc/application/controllers/ApiController.php index 1a1876ce9..1b47a924c 100644 --- a/airtime_mvc/application/controllers/ApiController.php +++ b/airtime_mvc/application/controllers/ApiController.php @@ -40,6 +40,8 @@ class ApiController extends Zend_Controller_Action ->addActionContext('get-files-without-replay-gain' , 'json') ->addActionContext('reload-metadata-group' , 'json') ->addActionContext('notify-webstream-data' , 'json') + ->addActionContext('get-stream-parameters' , 'json') + ->addActionContext('push-stream-stats' , 'json') ->initContext(); } @@ -957,4 +959,22 @@ class ApiController extends Zend_Controller_Action $this->view->media_id = $media_id; } + public function getStreamParametersAction() { + $streams = array("s1", "s2", "s3"); + $stream_params = array(); + foreach ($streams as $s) { + $stream_params[$s] = + Application_Model_StreamSetting::getStreamDataNormalized($s); + } + $this->view->stream_params = $stream_params; + } + + public function pushStreamStatsAction() { + $request = $this->getRequest(); + $data = json_decode($request->getParam("data"), true); + + Application_Model_ListenerStat::insertDataPoints($data); + $this->view->data = $data; + } + } diff --git a/airtime_mvc/application/models/ListenerStat.php b/airtime_mvc/application/models/ListenerStat.php index bedd82453..9ba8b0f8b 100644 --- a/airtime_mvc/application/models/ListenerStat.php +++ b/airtime_mvc/application/models/ListenerStat.php @@ -17,4 +17,25 @@ SQL; return $data; } -} \ No newline at end of file + + public static function insertDataPoints($p_dataPoints) { + + + $timestamp_sql = "INSERT INTO cc_timestamp (timestamp) VALUES (:ts::TIMESTAMP) RETURNING id;"; + $stats_sql = "INSERT INTO cc_listener_count (timestamp_id, listener_count) + VALUES (:timestamp_id, :listener_count)"; + foreach ($p_dataPoints as $dp) { + $timestamp_id = Application_Common_Database::prepareAndExecute($timestamp_sql, + array('ts'=> $dp['timestamp']), "column"); + + Application_Common_Database::prepareAndExecute($stats_sql, + array('timestamp_id' => $timestamp_id, + 'listener_count' => $dp["num_listeners"]) + ); + } + + } + + + +} diff --git a/airtime_mvc/application/models/StreamSetting.php b/airtime_mvc/application/models/StreamSetting.php index cdd7747b7..f1b1e372c 100644 --- a/airtime_mvc/application/models/StreamSetting.php +++ b/airtime_mvc/application/models/StreamSetting.php @@ -129,6 +129,35 @@ class Application_Model_StreamSetting return $data; } + /* Similar to getStreamData, but removes all sX prefixes to + * make data easier to iterate over */ + public static function getStreamDataNormalized($p_streamId) + { + $con = Propel::getConnection(); + $streamId = pg_escape_string($p_streamId); + $sql = "SELECT * " + ."FROM cc_stream_setting " + ."WHERE keyname LIKE '{$streamId}_%'"; + + $stmt = $con->prepare($sql); + + if ($stmt->execute()) { + $rows = $stmt->fetchAll(); + } else { + $msg = implode(',', $stmt->errorInfo()); + throw new Exception("Error: $msg"); + } + + $data = array(); + + foreach ($rows as $row) { + list($id, $key) = explode("_", $row["keyname"], 2); + $data[$key] = $row["value"]; + } + + return $data; + } + public static function getStreamSetting() { $con = Propel::getConnection(); diff --git a/python_apps/api_clients/api_client.cfg b/python_apps/api_clients/api_client.cfg index efdcc763f..1aeafac65 100644 --- a/python_apps/api_clients/api_client.cfg +++ b/python_apps/api_clients/api_client.cfg @@ -120,3 +120,6 @@ notify_webstream_data = 'notify-webstream-data/api_key/%%api_key%%/media_id/%%me notify_liquidsoap_started = 'rabbitmq-do-push/api_key/%%api_key%%/format/json' +get_stream_parameters = 'get-stream-parameters/api_key/%%api_key%%/format/json' + +push_stream_stats = 'push-stream-stats/api_key/%%api_key%%/format/json' diff --git a/python_apps/api_clients/api_client.py b/python_apps/api_clients/api_client.py index 5ca176ec2..ca1f37cd8 100644 --- a/python_apps/api_clients/api_client.py +++ b/python_apps/api_clients/api_client.py @@ -737,3 +737,35 @@ class AirtimeApiClient(object): self.logger.info(self.get_response_from_server(request, attempts = 5)) except Exception, e: self.logger.error("Exception: %s", e) + + + def get_stream_parameters(self): + response = None + try: + url = "http://%(base_url)s:%(base_port)s/%(api_base)s/%(get_stream_parameters)s/" % (self.config) + url = url.replace("%%api_key%%", self.config["api_key"]) + self.logger.debug(url) + request = urllib2.Request(url) + + response = self.get_response_from_server(request, attempts = 5) + self.logger.debug(response) + + response = json.loads(response) + except Exception, e: + self.logger.error("Exception: %s", e) + + return response + + def push_stream_stats(self, data): + try: + url = "http://%(base_url)s:%(base_port)s/%(api_base)s/%(push_stream_stats)s/" \ + % (self.config) + url = url.replace("%%api_key%%", self.config["api_key"]) + json_data = json.dumps(data) + encoded_data = urllib.urlencode({'data': json_data}) + request = urllib2.Request(url, encoded_data) + print self.get_response_from_server(request) + + except Exception, e: + self.logger.error("Exception: %s", e) + diff --git a/python_apps/pypo/listenerstat.py b/python_apps/pypo/listenerstat.py new file mode 100644 index 000000000..983d31f93 --- /dev/null +++ b/python_apps/pypo/listenerstat.py @@ -0,0 +1,127 @@ +from threading import Thread +import urllib2 +import xml.dom.minidom +import base64 +from datetime import datetime +import traceback +import logging +import time + +from api_clients import api_client + +class ListenerStat(Thread): + def __init__(self, logger=None): + Thread.__init__(self) + self.api_client = api_client.AirtimeApiClient() + if logger is None: + self.logger = logging.getLogger() + else: + self.logger = logger + + def get_node_text(self, nodelist): + rc = [] + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc.append(node.data) + return ''.join(rc) + + def get_stream_parameters(self): + #[{"user":"", "password":"", "url":"", "port":""},{},{}] + return self.api_client.get_stream_parameters() + + + def get_icecast_xml(self, ip): + encoded = base64.b64encode("%(admin_user)s:%(admin_password)s" % ip) + + header = {"Authorization":"Basic %s" % encoded} + self.logger.debug(ip) + url = 'http://%(host)s:%(port)s/admin/stats.xml' % ip + self.logger.debug(url) + req = urllib2.Request( + #assuming that the icecast stats path is /admin/stats.xml + #need to fix this + url=url, + headers=header) + + f = urllib2.urlopen(req) + document = f.read() + return document + + + def get_icecast_stats(self, ip): + document = self.get_icecast_xml(ip) + dom = xml.dom.minidom.parseString(document) + sources = dom.getElementsByTagName("source") + + mount_stats = {} + for s in sources: + #drop the leading '/' character + mount_name = s.getAttribute("mount")[1:] + if mount_name == ip["mount"]: + timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + listeners = s.getElementsByTagName("listeners") + num_listeners = 0 + if len(listeners): + num_listeners = self.get_node_text(listeners[0].childNodes) + + mount_stats = {"timestamp":timestamp, \ + "num_listeners": num_listeners, \ + "mount_name": mount_name} + return mount_stats + + def get_stream_stats(self, stream_parameters): + stats = [] + + #iterate over stream_parameters which is a list of dicts. Each dict + #represents one Airtime stream (currently this limit is 3). + #Note that there can be optimizations done, since if all three + #streams are the same server, we will still initiate 3 separate + #connections + for k, v in stream_parameters.items(): + v["admin_user"] = "admin" + v["admin_password"] = "hackme" + if v["enable"] == 'true': + stats.append(self.get_icecast_stats(v)) + #stats.append(get_shoutcast_stats(ip)) + + return stats + + def push_stream_stats(self, stats): + self.api_client.push_stream_stats(stats) + + def run(self): + #Wake up every 120 seconds and gather icecast statistics. Note that we + #are currently querying the server every 2 minutes for list of + #mountpoints as well. We could remove this query if we hooked into + #rabbitmq events, and listened for these changes instead. + while True: + try: + stream_parameters = self.get_stream_parameters() + + stats = self.get_stream_stats(stream_parameters["stream_params"]) + self.logger.debug(stats) + + self.push_stream_stats(stats) + time.sleep(120) + except Exception, e: + top = traceback.format_exc() + self.logger.error('Exception: %s', top) + time.sleep(120) + + +if __name__ == "__main__": + # create logger + logger = logging.getLogger('std_out') + logger.setLevel(logging.DEBUG) + # create console handler and set level to debug + #ch = logging.StreamHandler() + #ch.setLevel(logging.DEBUG) + # create formatter + formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)s - %(levelname)s - %(message)s') + # add formatter to ch + #ch.setFormatter(formatter) + # add ch to logger + #logger.addHandler(ch) + + ls = ListenerStat(logger) + ls.run() diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index 8a1370a79..1b51a13f8 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -21,6 +21,7 @@ from pypopush import PypoPush from pypofetch import PypoFetch from pypofile import PypoFile from recorder import Recorder +from listenerstat import ListenerStat from pypomessagehandler import PypoMessageHandler from configobj import ConfigObj @@ -209,6 +210,10 @@ if __name__ == '__main__': recorder.daemon = True recorder.start() + stat = ListenerStat() + stat.daemon = True + stat.start() + # all join() are commented out because we want to exit entire pypo # if pypofetch is exiting #pmh.join()