CC-4661: Listener Statistics

-backend part working
This commit is contained in:
Martin Konecny 2012-11-02 17:50:43 -04:00
parent 8b70136dd6
commit 6438b54a5f
7 changed files with 238 additions and 1 deletions

View File

@ -40,6 +40,8 @@ class ApiController extends Zend_Controller_Action
->addActionContext('get-files-without-replay-gain' , 'json')
->addActionContext('reload-metadata-group' , 'json')
->addActionContext('notify-webstream-data' , 'json')
->addActionContext('get-stream-parameters' , 'json')
->addActionContext('push-stream-stats' , 'json')
->initContext();
}
@ -957,4 +959,22 @@ class ApiController extends Zend_Controller_Action
$this->view->media_id = $media_id;
}
public function getStreamParametersAction() {
$streams = array("s1", "s2", "s3");
$stream_params = array();
foreach ($streams as $s) {
$stream_params[$s] =
Application_Model_StreamSetting::getStreamDataNormalized($s);
}
$this->view->stream_params = $stream_params;
}
public function pushStreamStatsAction() {
$request = $this->getRequest();
$data = json_decode($request->getParam("data"), true);
Application_Model_ListenerStat::insertDataPoints($data);
$this->view->data = $data;
}
}

View File

@ -17,4 +17,25 @@ SQL;
return $data;
}
}
public static function insertDataPoints($p_dataPoints) {
$timestamp_sql = "INSERT INTO cc_timestamp (timestamp) VALUES (:ts::TIMESTAMP) RETURNING id;";
$stats_sql = "INSERT INTO cc_listener_count (timestamp_id, listener_count)
VALUES (:timestamp_id, :listener_count)";
foreach ($p_dataPoints as $dp) {
$timestamp_id = Application_Common_Database::prepareAndExecute($timestamp_sql,
array('ts'=> $dp['timestamp']), "column");
Application_Common_Database::prepareAndExecute($stats_sql,
array('timestamp_id' => $timestamp_id,
'listener_count' => $dp["num_listeners"])
);
}
}
}

View File

@ -129,6 +129,35 @@ class Application_Model_StreamSetting
return $data;
}
/* Similar to getStreamData, but removes all sX prefixes to
* make data easier to iterate over */
public static function getStreamDataNormalized($p_streamId)
{
$con = Propel::getConnection();
$streamId = pg_escape_string($p_streamId);
$sql = "SELECT * "
."FROM cc_stream_setting "
."WHERE keyname LIKE '{$streamId}_%'";
$stmt = $con->prepare($sql);
if ($stmt->execute()) {
$rows = $stmt->fetchAll();
} else {
$msg = implode(',', $stmt->errorInfo());
throw new Exception("Error: $msg");
}
$data = array();
foreach ($rows as $row) {
list($id, $key) = explode("_", $row["keyname"], 2);
$data[$key] = $row["value"];
}
return $data;
}
public static function getStreamSetting()
{
$con = Propel::getConnection();

View File

@ -120,3 +120,6 @@ notify_webstream_data = 'notify-webstream-data/api_key/%%api_key%%/media_id/%%me
notify_liquidsoap_started = 'rabbitmq-do-push/api_key/%%api_key%%/format/json'
get_stream_parameters = 'get-stream-parameters/api_key/%%api_key%%/format/json'
push_stream_stats = 'push-stream-stats/api_key/%%api_key%%/format/json'

View File

@ -737,3 +737,35 @@ class AirtimeApiClient(object):
self.logger.info(self.get_response_from_server(request, attempts = 5))
except Exception, e:
self.logger.error("Exception: %s", e)
def get_stream_parameters(self):
response = None
try:
url = "http://%(base_url)s:%(base_port)s/%(api_base)s/%(get_stream_parameters)s/" % (self.config)
url = url.replace("%%api_key%%", self.config["api_key"])
self.logger.debug(url)
request = urllib2.Request(url)
response = self.get_response_from_server(request, attempts = 5)
self.logger.debug(response)
response = json.loads(response)
except Exception, e:
self.logger.error("Exception: %s", e)
return response
def push_stream_stats(self, data):
try:
url = "http://%(base_url)s:%(base_port)s/%(api_base)s/%(push_stream_stats)s/" \
% (self.config)
url = url.replace("%%api_key%%", self.config["api_key"])
json_data = json.dumps(data)
encoded_data = urllib.urlencode({'data': json_data})
request = urllib2.Request(url, encoded_data)
print self.get_response_from_server(request)
except Exception, e:
self.logger.error("Exception: %s", e)

View File

@ -0,0 +1,127 @@
from threading import Thread
import urllib2
import xml.dom.minidom
import base64
from datetime import datetime
import traceback
import logging
import time
from api_clients import api_client
class ListenerStat(Thread):
def __init__(self, logger=None):
Thread.__init__(self)
self.api_client = api_client.AirtimeApiClient()
if logger is None:
self.logger = logging.getLogger()
else:
self.logger = logger
def get_node_text(self, nodelist):
rc = []
for node in nodelist:
if node.nodeType == node.TEXT_NODE:
rc.append(node.data)
return ''.join(rc)
def get_stream_parameters(self):
#[{"user":"", "password":"", "url":"", "port":""},{},{}]
return self.api_client.get_stream_parameters()
def get_icecast_xml(self, ip):
encoded = base64.b64encode("%(admin_user)s:%(admin_password)s" % ip)
header = {"Authorization":"Basic %s" % encoded}
self.logger.debug(ip)
url = 'http://%(host)s:%(port)s/admin/stats.xml' % ip
self.logger.debug(url)
req = urllib2.Request(
#assuming that the icecast stats path is /admin/stats.xml
#need to fix this
url=url,
headers=header)
f = urllib2.urlopen(req)
document = f.read()
return document
def get_icecast_stats(self, ip):
document = self.get_icecast_xml(ip)
dom = xml.dom.minidom.parseString(document)
sources = dom.getElementsByTagName("source")
mount_stats = {}
for s in sources:
#drop the leading '/' character
mount_name = s.getAttribute("mount")[1:]
if mount_name == ip["mount"]:
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
listeners = s.getElementsByTagName("listeners")
num_listeners = 0
if len(listeners):
num_listeners = self.get_node_text(listeners[0].childNodes)
mount_stats = {"timestamp":timestamp, \
"num_listeners": num_listeners, \
"mount_name": mount_name}
return mount_stats
def get_stream_stats(self, stream_parameters):
stats = []
#iterate over stream_parameters which is a list of dicts. Each dict
#represents one Airtime stream (currently this limit is 3).
#Note that there can be optimizations done, since if all three
#streams are the same server, we will still initiate 3 separate
#connections
for k, v in stream_parameters.items():
v["admin_user"] = "admin"
v["admin_password"] = "hackme"
if v["enable"] == 'true':
stats.append(self.get_icecast_stats(v))
#stats.append(get_shoutcast_stats(ip))
return stats
def push_stream_stats(self, stats):
self.api_client.push_stream_stats(stats)
def run(self):
#Wake up every 120 seconds and gather icecast statistics. Note that we
#are currently querying the server every 2 minutes for list of
#mountpoints as well. We could remove this query if we hooked into
#rabbitmq events, and listened for these changes instead.
while True:
try:
stream_parameters = self.get_stream_parameters()
stats = self.get_stream_stats(stream_parameters["stream_params"])
self.logger.debug(stats)
self.push_stream_stats(stats)
time.sleep(120)
except Exception, e:
top = traceback.format_exc()
self.logger.error('Exception: %s', top)
time.sleep(120)
if __name__ == "__main__":
# create logger
logger = logging.getLogger('std_out')
logger.setLevel(logging.DEBUG)
# create console handler and set level to debug
#ch = logging.StreamHandler()
#ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)s - %(levelname)s - %(message)s')
# add formatter to ch
#ch.setFormatter(formatter)
# add ch to logger
#logger.addHandler(ch)
ls = ListenerStat(logger)
ls.run()

View File

@ -21,6 +21,7 @@ from pypopush import PypoPush
from pypofetch import PypoFetch
from pypofile import PypoFile
from recorder import Recorder
from listenerstat import ListenerStat
from pypomessagehandler import PypoMessageHandler
from configobj import ConfigObj
@ -209,6 +210,10 @@ if __name__ == '__main__':
recorder.daemon = True
recorder.start()
stat = ListenerStat()
stat.daemon = True
stat.start()
# all join() are commented out because we want to exit entire pypo
# if pypofetch is exiting
#pmh.join()