Merge branch 'cc-4661-listener-statistics' of dev.sourcefabric.org:airtime into cc-4661-listener-statistics

This commit is contained in:
James 2012-11-02 18:25:23 -04:00
commit a93b588a09
8 changed files with 253 additions and 14 deletions

View File

@ -40,6 +40,8 @@ class ApiController extends Zend_Controller_Action
->addActionContext('get-files-without-replay-gain' , 'json') ->addActionContext('get-files-without-replay-gain' , 'json')
->addActionContext('reload-metadata-group' , 'json') ->addActionContext('reload-metadata-group' , 'json')
->addActionContext('notify-webstream-data' , 'json') ->addActionContext('notify-webstream-data' , 'json')
->addActionContext('get-stream-parameters' , 'json')
->addActionContext('push-stream-stats' , 'json')
->initContext(); ->initContext();
} }
@ -957,4 +959,22 @@ class ApiController extends Zend_Controller_Action
$this->view->media_id = $media_id; $this->view->media_id = $media_id;
} }
public function getStreamParametersAction() {
$streams = array("s1", "s2", "s3");
$stream_params = array();
foreach ($streams as $s) {
$stream_params[$s] =
Application_Model_StreamSetting::getStreamDataNormalized($s);
}
$this->view->stream_params = $stream_params;
}
public function pushStreamStatsAction() {
$request = $this->getRequest();
$data = json_decode($request->getParam("data"), true);
Application_Model_ListenerStat::insertDataPoints($data);
$this->view->data = $data;
}
} }

View File

@ -35,4 +35,27 @@ SQL;
} }
return $out; return $out;
} }
}
public static function insertDataPoints($p_dataPoints) {
$timestamp_sql = "INSERT INTO cc_timestamp (timestamp) VALUES (:ts::TIMESTAMP) RETURNING id;";
$stats_sql = "INSERT INTO cc_listener_count (timestamp_id, listener_count, mount_name)
VALUES (:timestamp_id, :listener_count, :mount_name)";
foreach ($p_dataPoints as $dp) {
$timestamp_id = Application_Common_Database::prepareAndExecute($timestamp_sql,
array('ts'=> $dp['timestamp']), "column");
Application_Common_Database::prepareAndExecute($stats_sql,
array('timestamp_id' => $timestamp_id,
'listener_count' => $dp["num_listeners"],
'mount_name' => $dp["mount_name"],
)
);
}
}
}

View File

@ -129,6 +129,35 @@ class Application_Model_StreamSetting
return $data; return $data;
} }
/* Similar to getStreamData, but removes all sX prefixes to
* make data easier to iterate over */
public static function getStreamDataNormalized($p_streamId)
{
$con = Propel::getConnection();
$streamId = pg_escape_string($p_streamId);
$sql = "SELECT * "
."FROM cc_stream_setting "
."WHERE keyname LIKE '{$streamId}_%'";
$stmt = $con->prepare($sql);
if ($stmt->execute()) {
$rows = $stmt->fetchAll();
} else {
$msg = implode(',', $stmt->errorInfo());
throw new Exception("Error: $msg");
}
$data = array();
foreach ($rows as $row) {
list($id, $key) = explode("_", $row["keyname"], 2);
$data[$key] = $row["value"];
}
return $data;
}
public static function getStreamSetting() public static function getStreamSetting()
{ {
$con = Propel::getConnection(); $con = Propel::getConnection();

View File

@ -120,3 +120,6 @@ notify_webstream_data = 'notify-webstream-data/api_key/%%api_key%%/media_id/%%me
notify_liquidsoap_started = 'rabbitmq-do-push/api_key/%%api_key%%/format/json' notify_liquidsoap_started = 'rabbitmq-do-push/api_key/%%api_key%%/format/json'
get_stream_parameters = 'get-stream-parameters/api_key/%%api_key%%/format/json'
push_stream_stats = 'push-stream-stats/api_key/%%api_key%%/format/json'

View File

@ -737,3 +737,35 @@ class AirtimeApiClient(object):
self.logger.info(self.get_response_from_server(request, attempts = 5)) self.logger.info(self.get_response_from_server(request, attempts = 5))
except Exception, e: except Exception, e:
self.logger.error("Exception: %s", e) self.logger.error("Exception: %s", e)
def get_stream_parameters(self):
response = None
try:
url = "http://%(base_url)s:%(base_port)s/%(api_base)s/%(get_stream_parameters)s/" % (self.config)
url = url.replace("%%api_key%%", self.config["api_key"])
self.logger.debug(url)
request = urllib2.Request(url)
response = self.get_response_from_server(request, attempts = 5)
self.logger.debug(response)
response = json.loads(response)
except Exception, e:
self.logger.error("Exception: %s", e)
return response
def push_stream_stats(self, data):
try:
url = "http://%(base_url)s:%(base_port)s/%(api_base)s/%(push_stream_stats)s/" \
% (self.config)
url = url.replace("%%api_key%%", self.config["api_key"])
json_data = json.dumps(data)
encoded_data = urllib.urlencode({'data': json_data})
request = urllib2.Request(url, encoded_data)
print self.get_response_from_server(request)
except Exception, e:
self.logger.error("Exception: %s", e)

View File

@ -11,40 +11,40 @@ from configobj import ConfigObj
if os.geteuid() != 0: if os.geteuid() != 0:
print "Please run this as root." print "Please run this as root."
sys.exit(1) sys.exit(1)
def get_current_script_dir(): def get_current_script_dir():
current_script_dir = os.path.realpath(__file__) current_script_dir = os.path.realpath(__file__)
index = current_script_dir.rindex('/') index = current_script_dir.rindex('/')
return current_script_dir[0:index] return current_script_dir[0:index]
def copy_dir(src_dir, dest_dir): def copy_dir(src_dir, dest_dir):
if (os.path.exists(dest_dir)) and (dest_dir != "/"): if (os.path.exists(dest_dir)) and (dest_dir != "/"):
shutil.rmtree(dest_dir) shutil.rmtree(dest_dir)
if not (os.path.exists(dest_dir)): if not (os.path.exists(dest_dir)):
#print "Copying directory "+os.path.realpath(src_dir)+" to "+os.path.realpath(dest_dir) #print "Copying directory "+os.path.realpath(src_dir)+" to "+os.path.realpath(dest_dir)
shutil.copytree(src_dir, dest_dir) shutil.copytree(src_dir, dest_dir)
def create_dir(path): def create_dir(path):
try: try:
os.makedirs(path) os.makedirs(path)
except Exception, e: except Exception, e:
pass pass
def get_rand_string(length=10): def get_rand_string(length=10):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(length)) return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(length))
def get_rand_string(length=10): def get_rand_string(length=10):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(length)) return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(length))
PATH_INI_FILE = '/etc/airtime/pypo.cfg' PATH_INI_FILE = '/etc/airtime/pypo.cfg'
try: try:
# Absolute path this script is in # Absolute path this script is in
current_script_dir = get_current_script_dir() current_script_dir = get_current_script_dir()
if not os.path.exists(PATH_INI_FILE): if not os.path.exists(PATH_INI_FILE):
shutil.copy('%s/../pypo.cfg'%current_script_dir, PATH_INI_FILE) shutil.copy('%s/../pypo.cfg'%current_script_dir, PATH_INI_FILE)
try: try:
os.remove("/etc/airtime/liquidsoap.cfg") os.remove("/etc/airtime/liquidsoap.cfg")
except Exception, e: except Exception, e:
@ -59,7 +59,7 @@ try:
except Exception, e: except Exception, e:
print 'Error loading config file: ', e print 'Error loading config file: ', e
sys.exit(1) sys.exit(1)
#copy monit files #copy monit files
shutil.copy('%s/../../monit/monit-airtime-generic.cfg'%current_script_dir, '/etc/monit/conf.d/') shutil.copy('%s/../../monit/monit-airtime-generic.cfg'%current_script_dir, '/etc/monit/conf.d/')
subprocess.call('sed -i "s/\$admin_pass/%s/g" /etc/monit/conf.d/monit-airtime-generic.cfg' % get_rand_string(), shell=True) subprocess.call('sed -i "s/\$admin_pass/%s/g" /etc/monit/conf.d/monit-airtime-generic.cfg' % get_rand_string(), shell=True)
@ -80,13 +80,13 @@ try:
create_dir(config['cache_dir']) create_dir(config['cache_dir'])
create_dir(config['file_dir']) create_dir(config['file_dir'])
create_dir(config['tmp_dir']) create_dir(config['tmp_dir'])
create_dir(config["base_recorded_files"]) create_dir(config["base_recorded_files"])
#copy files to bin dir #copy files to bin dir
copy_dir("%s/.."%current_script_dir, config["bin_dir"]+"/bin/") copy_dir("%s/.."%current_script_dir, config["bin_dir"]+"/bin/")
# delete /usr/lib/airtime/pypo/bin/liquidsoap_scripts/liquidsoap.cfg # delete /usr/lib/airtime/pypo/bin/liquidsoap_scripts/liquidsoap.cfg
# as we don't use it anymore.(CC-2552) # as we don't use it anymore.(CC-2552)
os.remove(config["bin_dir"]+"/bin/liquidsoap_scripts/liquidsoap.cfg") os.remove(config["bin_dir"]+"/bin/liquidsoap_scripts/liquidsoap.cfg")
@ -102,7 +102,7 @@ try:
#copy log rotate script #copy log rotate script
shutil.copy(config["bin_dir"]+"/bin/liquidsoap_scripts/airtime-liquidsoap.logrotate", "/etc/logrotate.d/airtime-liquidsoap") shutil.copy(config["bin_dir"]+"/bin/liquidsoap_scripts/airtime-liquidsoap.logrotate", "/etc/logrotate.d/airtime-liquidsoap")
except Exception, e: except Exception, e:
print e print e

View File

@ -0,0 +1,127 @@
from threading import Thread
import urllib2
import xml.dom.minidom
import base64
from datetime import datetime
import traceback
import logging
import time
from api_clients import api_client
class ListenerStat(Thread):
def __init__(self, logger=None):
Thread.__init__(self)
self.api_client = api_client.AirtimeApiClient()
if logger is None:
self.logger = logging.getLogger()
else:
self.logger = logger
def get_node_text(self, nodelist):
rc = []
for node in nodelist:
if node.nodeType == node.TEXT_NODE:
rc.append(node.data)
return ''.join(rc)
def get_stream_parameters(self):
#[{"user":"", "password":"", "url":"", "port":""},{},{}]
return self.api_client.get_stream_parameters()
def get_icecast_xml(self, ip):
encoded = base64.b64encode("%(admin_user)s:%(admin_password)s" % ip)
header = {"Authorization":"Basic %s" % encoded}
self.logger.debug(ip)
url = 'http://%(host)s:%(port)s/admin/stats.xml' % ip
self.logger.debug(url)
req = urllib2.Request(
#assuming that the icecast stats path is /admin/stats.xml
#need to fix this
url=url,
headers=header)
f = urllib2.urlopen(req)
document = f.read()
return document
def get_icecast_stats(self, ip):
document = self.get_icecast_xml(ip)
dom = xml.dom.minidom.parseString(document)
sources = dom.getElementsByTagName("source")
mount_stats = {}
for s in sources:
#drop the leading '/' character
mount_name = s.getAttribute("mount")[1:]
if mount_name == ip["mount"]:
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
listeners = s.getElementsByTagName("listeners")
num_listeners = 0
if len(listeners):
num_listeners = self.get_node_text(listeners[0].childNodes)
mount_stats = {"timestamp":timestamp, \
"num_listeners": num_listeners, \
"mount_name": mount_name}
return mount_stats
def get_stream_stats(self, stream_parameters):
stats = []
#iterate over stream_parameters which is a list of dicts. Each dict
#represents one Airtime stream (currently this limit is 3).
#Note that there can be optimizations done, since if all three
#streams are the same server, we will still initiate 3 separate
#connections
for k, v in stream_parameters.items():
v["admin_user"] = "admin"
v["admin_password"] = "hackme"
if v["enable"] == 'true':
stats.append(self.get_icecast_stats(v))
#stats.append(get_shoutcast_stats(ip))
return stats
def push_stream_stats(self, stats):
self.api_client.push_stream_stats(stats)
def run(self):
#Wake up every 120 seconds and gather icecast statistics. Note that we
#are currently querying the server every 2 minutes for list of
#mountpoints as well. We could remove this query if we hooked into
#rabbitmq events, and listened for these changes instead.
while True:
try:
stream_parameters = self.get_stream_parameters()
stats = self.get_stream_stats(stream_parameters["stream_params"])
self.logger.debug(stats)
self.push_stream_stats(stats)
except Exception, e:
top = traceback.format_exc()
self.logger.error('Exception: %s', top)
time.sleep(120)
if __name__ == "__main__":
# create logger
logger = logging.getLogger('std_out')
logger.setLevel(logging.DEBUG)
# create console handler and set level to debug
#ch = logging.StreamHandler()
#ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)s - %(levelname)s - %(message)s')
# add formatter to ch
#ch.setFormatter(formatter)
# add ch to logger
#logger.addHandler(ch)
ls = ListenerStat(logger)
ls.run()

View File

@ -21,6 +21,7 @@ from pypopush import PypoPush
from pypofetch import PypoFetch from pypofetch import PypoFetch
from pypofile import PypoFile from pypofile import PypoFile
from recorder import Recorder from recorder import Recorder
from listenerstat import ListenerStat
from pypomessagehandler import PypoMessageHandler from pypomessagehandler import PypoMessageHandler
from configobj import ConfigObj from configobj import ConfigObj
@ -209,6 +210,10 @@ if __name__ == '__main__':
recorder.daemon = True recorder.daemon = True
recorder.start() recorder.start()
stat = ListenerStat()
stat.daemon = True
stat.start()
# all join() are commented out because we want to exit entire pypo # all join() are commented out because we want to exit entire pypo
# if pypofetch is exiting # if pypofetch is exiting
#pmh.join() #pmh.join()