merge api client conflict
This commit is contained in:
commit
6f42291f8a
64 changed files with 22680 additions and 40 deletions
|
@ -121,3 +121,6 @@ notify_webstream_data = 'notify-webstream-data/api_key/%%api_key%%/media_id/%%me
|
|||
|
||||
notify_liquidsoap_started = 'rabbitmq-do-push/api_key/%%api_key%%/format/json'
|
||||
|
||||
get_stream_parameters = 'get-stream-parameters/api_key/%%api_key%%/format/json'
|
||||
|
||||
push_stream_stats = 'push-stream-stats/api_key/%%api_key%%/format/json'
|
||||
|
|
|
@ -388,6 +388,15 @@ class AirtimeApiClient(object):
|
|||
Update the server with the latest metadata we've received from the
|
||||
external webstream
|
||||
"""
|
||||
self.logger.info( self.notify_webstream_data.req(
|
||||
self.logger.info( self.services.notify_webstream_data.req(
|
||||
_post_data={'data':data}, media_id=str(media_id)).retry(5))
|
||||
|
||||
def get_stream_parameters(self):
|
||||
response = self.services.get_stream_parameters()
|
||||
self.logger.debug(response)
|
||||
return response
|
||||
|
||||
def push_stream_stats(self, data):
|
||||
# TODO : users of this method should do their own error handling
|
||||
response = self.services.push_stream_stats(_post_data={'data': json.dumps(data)})
|
||||
return response
|
||||
|
|
|
@ -11,40 +11,40 @@ from configobj import ConfigObj
|
|||
if os.geteuid() != 0:
|
||||
print "Please run this as root."
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def get_current_script_dir():
|
||||
current_script_dir = os.path.realpath(__file__)
|
||||
index = current_script_dir.rindex('/')
|
||||
return current_script_dir[0:index]
|
||||
|
||||
|
||||
def copy_dir(src_dir, dest_dir):
|
||||
if (os.path.exists(dest_dir)) and (dest_dir != "/"):
|
||||
shutil.rmtree(dest_dir)
|
||||
if not (os.path.exists(dest_dir)):
|
||||
#print "Copying directory "+os.path.realpath(src_dir)+" to "+os.path.realpath(dest_dir)
|
||||
shutil.copytree(src_dir, dest_dir)
|
||||
|
||||
|
||||
def create_dir(path):
|
||||
try:
|
||||
os.makedirs(path)
|
||||
except Exception, e:
|
||||
pass
|
||||
|
||||
|
||||
def get_rand_string(length=10):
|
||||
return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(length))
|
||||
|
||||
|
||||
def get_rand_string(length=10):
|
||||
return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(length))
|
||||
|
||||
|
||||
PATH_INI_FILE = '/etc/airtime/pypo.cfg'
|
||||
|
||||
try:
|
||||
try:
|
||||
# Absolute path this script is in
|
||||
current_script_dir = get_current_script_dir()
|
||||
|
||||
|
||||
if not os.path.exists(PATH_INI_FILE):
|
||||
shutil.copy('%s/../pypo.cfg'%current_script_dir, PATH_INI_FILE)
|
||||
|
||||
|
||||
try:
|
||||
os.remove("/etc/airtime/liquidsoap.cfg")
|
||||
except Exception, e:
|
||||
|
@ -59,7 +59,7 @@ try:
|
|||
except Exception, e:
|
||||
print 'Error loading config file: ', e
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
#copy monit files
|
||||
shutil.copy('%s/../../monit/monit-airtime-generic.cfg'%current_script_dir, '/etc/monit/conf.d/')
|
||||
subprocess.call('sed -i "s/\$admin_pass/%s/g" /etc/monit/conf.d/monit-airtime-generic.cfg' % get_rand_string(), shell=True)
|
||||
|
@ -80,13 +80,13 @@ try:
|
|||
create_dir(config['cache_dir'])
|
||||
create_dir(config['file_dir'])
|
||||
create_dir(config['tmp_dir'])
|
||||
|
||||
|
||||
create_dir(config["base_recorded_files"])
|
||||
|
||||
#copy files to bin dir
|
||||
copy_dir("%s/.."%current_script_dir, config["bin_dir"]+"/bin/")
|
||||
|
||||
# delete /usr/lib/airtime/pypo/bin/liquidsoap_scripts/liquidsoap.cfg
|
||||
# delete /usr/lib/airtime/pypo/bin/liquidsoap_scripts/liquidsoap.cfg
|
||||
# as we don't use it anymore.(CC-2552)
|
||||
os.remove(config["bin_dir"]+"/bin/liquidsoap_scripts/liquidsoap.cfg")
|
||||
|
||||
|
@ -102,7 +102,7 @@ try:
|
|||
|
||||
#copy log rotate script
|
||||
shutil.copy(config["bin_dir"]+"/bin/liquidsoap_scripts/airtime-liquidsoap.logrotate", "/etc/logrotate.d/airtime-liquidsoap")
|
||||
|
||||
|
||||
except Exception, e:
|
||||
print e
|
||||
|
||||
|
|
127
python_apps/pypo/listenerstat.py
Normal file
127
python_apps/pypo/listenerstat.py
Normal file
|
@ -0,0 +1,127 @@
|
|||
from threading import Thread
|
||||
import urllib2
|
||||
import xml.dom.minidom
|
||||
import base64
|
||||
from datetime import datetime
|
||||
import traceback
|
||||
import logging
|
||||
import time
|
||||
|
||||
from api_clients import api_client
|
||||
|
||||
class ListenerStat(Thread):
|
||||
def __init__(self, logger=None):
|
||||
Thread.__init__(self)
|
||||
self.api_client = api_client.AirtimeApiClient()
|
||||
if logger is None:
|
||||
self.logger = logging.getLogger()
|
||||
else:
|
||||
self.logger = logger
|
||||
|
||||
def get_node_text(self, nodelist):
|
||||
rc = []
|
||||
for node in nodelist:
|
||||
if node.nodeType == node.TEXT_NODE:
|
||||
rc.append(node.data)
|
||||
return ''.join(rc)
|
||||
|
||||
def get_stream_parameters(self):
|
||||
#[{"user":"", "password":"", "url":"", "port":""},{},{}]
|
||||
return self.api_client.get_stream_parameters()
|
||||
|
||||
|
||||
def get_icecast_xml(self, ip):
|
||||
encoded = base64.b64encode("%(admin_user)s:%(admin_password)s" % ip)
|
||||
|
||||
header = {"Authorization":"Basic %s" % encoded}
|
||||
self.logger.debug(ip)
|
||||
url = 'http://%(host)s:%(port)s/admin/stats.xml' % ip
|
||||
self.logger.debug(url)
|
||||
req = urllib2.Request(
|
||||
#assuming that the icecast stats path is /admin/stats.xml
|
||||
#need to fix this
|
||||
url=url,
|
||||
headers=header)
|
||||
|
||||
f = urllib2.urlopen(req)
|
||||
document = f.read()
|
||||
return document
|
||||
|
||||
|
||||
def get_icecast_stats(self, ip):
|
||||
document = self.get_icecast_xml(ip)
|
||||
dom = xml.dom.minidom.parseString(document)
|
||||
sources = dom.getElementsByTagName("source")
|
||||
|
||||
mount_stats = {}
|
||||
for s in sources:
|
||||
#drop the leading '/' character
|
||||
mount_name = s.getAttribute("mount")[1:]
|
||||
if mount_name == ip["mount"]:
|
||||
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
||||
listeners = s.getElementsByTagName("listeners")
|
||||
num_listeners = 0
|
||||
if len(listeners):
|
||||
num_listeners = self.get_node_text(listeners[0].childNodes)
|
||||
|
||||
mount_stats = {"timestamp":timestamp, \
|
||||
"num_listeners": num_listeners, \
|
||||
"mount_name": mount_name}
|
||||
return mount_stats
|
||||
|
||||
def get_stream_stats(self, stream_parameters):
|
||||
stats = []
|
||||
|
||||
#iterate over stream_parameters which is a list of dicts. Each dict
|
||||
#represents one Airtime stream (currently this limit is 3).
|
||||
#Note that there can be optimizations done, since if all three
|
||||
#streams are the same server, we will still initiate 3 separate
|
||||
#connections
|
||||
for k, v in stream_parameters.items():
|
||||
v["admin_user"] = "admin"
|
||||
v["admin_password"] = "hackme"
|
||||
if v["enable"] == 'true':
|
||||
stats.append(self.get_icecast_stats(v))
|
||||
#stats.append(get_shoutcast_stats(ip))
|
||||
|
||||
return stats
|
||||
|
||||
def push_stream_stats(self, stats):
|
||||
self.api_client.push_stream_stats(stats)
|
||||
|
||||
def run(self):
|
||||
#Wake up every 120 seconds and gather icecast statistics. Note that we
|
||||
#are currently querying the server every 2 minutes for list of
|
||||
#mountpoints as well. We could remove this query if we hooked into
|
||||
#rabbitmq events, and listened for these changes instead.
|
||||
while True:
|
||||
try:
|
||||
stream_parameters = self.get_stream_parameters()
|
||||
|
||||
stats = self.get_stream_stats(stream_parameters["stream_params"])
|
||||
self.logger.debug(stats)
|
||||
|
||||
self.push_stream_stats(stats)
|
||||
except Exception, e:
|
||||
top = traceback.format_exc()
|
||||
self.logger.error('Exception: %s', top)
|
||||
|
||||
time.sleep(120)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# create logger
|
||||
logger = logging.getLogger('std_out')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
# create console handler and set level to debug
|
||||
#ch = logging.StreamHandler()
|
||||
#ch.setLevel(logging.DEBUG)
|
||||
# create formatter
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)s - %(levelname)s - %(message)s')
|
||||
# add formatter to ch
|
||||
#ch.setFormatter(formatter)
|
||||
# add ch to logger
|
||||
#logger.addHandler(ch)
|
||||
|
||||
ls = ListenerStat(logger)
|
||||
ls.run()
|
|
@ -21,6 +21,7 @@ from pypopush import PypoPush
|
|||
from pypofetch import PypoFetch
|
||||
from pypofile import PypoFile
|
||||
from recorder import Recorder
|
||||
from listenerstat import ListenerStat
|
||||
from pypomessagehandler import PypoMessageHandler
|
||||
|
||||
from configobj import ConfigObj
|
||||
|
@ -209,6 +210,10 @@ if __name__ == '__main__':
|
|||
recorder.daemon = True
|
||||
recorder.start()
|
||||
|
||||
stat = ListenerStat()
|
||||
stat.daemon = True
|
||||
stat.start()
|
||||
|
||||
# all join() are commented out because we want to exit entire pypo
|
||||
# if pypofetch is exiting
|
||||
#pmh.join()
|
||||
|
|
|
@ -120,6 +120,10 @@ class PypoPush(Thread):
|
|||
|
||||
next_media_item_chain = self.get_next_schedule_chain(chains, datetime.utcnow())
|
||||
if next_media_item_chain is not None:
|
||||
try:
|
||||
chains.remove(next_media_item_chain)
|
||||
except ValueError, e:
|
||||
self.logger.error(str(e))
|
||||
chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
|
||||
time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow())
|
||||
self.logger.debug("Blocking %s seconds until show start", time_until_next_play)
|
||||
|
@ -405,8 +409,9 @@ class PypoPush(Thread):
|
|||
closest_chain = None
|
||||
for chain in chains:
|
||||
chain_start = datetime.strptime(chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
|
||||
chain_end = datetime.strptime(chain[-1]['end'], "%Y-%m-%d-%H-%M-%S")
|
||||
self.logger.debug("tnow %s, chain_start %s", tnow, chain_start)
|
||||
if (closest_start == None or chain_start < closest_start) and chain_start > tnow:
|
||||
if (closest_start == None or chain_start < closest_start) and (chain_start > tnow or (chain_start < tnow and chain_end > tnow)):
|
||||
closest_start = chain_start
|
||||
closest_chain = chain
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue