# This file holds the implementations for all the API clients.
# If you want to develop a new client, here are some suggestions:
# Get the fetch methods working first, then the push, then the liquidsoap notifier.
# You will probably want to create a script on your server side to automatically
# schedule a playlist one minute from the current time.
import sys
import time
import urllib
import urllib2
import logging
import json
import os
from urlparse import urlparse
import base64
from configobj import ConfigObj
import string
import hashlib
def api_client_factory(config, logger=None):
if logger != None:
temp_logger = logger
temp_logger = logging.getLogger()
if config["api_client"] == "airtime":
return AirTimeApiClient(temp_logger)
temp_logger.info('API Client "'+config["api_client"]+'" not supported. Please check your config file.\n')
def to_unicode(obj, encoding='utf-8'):
if isinstance(obj, basestring):
if not isinstance(obj, unicode):
obj = unicode(obj, encoding)
return obj
def encode_to(obj, encoding='utf-8'):
if isinstance(obj, unicode):
obj = obj.encode(encoding)
return obj
def convert_dict_value_to_utf8(md):
#list comprehension to convert all values of md to utf-8
return dict([(item[0], encode_to(item[1], "utf-8")) for item in md.items()])
class ApiClientInterface:
# Implementation: optional
# Called from: beginning of all scripts
# Should exit the program if this version of pypo is not compatible with
# 3rd party software.
def is_server_compatible(self, verbose = True):
# Implementation: Required
# Called from: fetch loop
# This is the main method you need to implement when creating a new API client.
# start and end are for testing purposes.
# start and end are strings in the format YYYY-DD-MM-hh-mm-ss
def get_schedule(self, start=None, end=None):
return 0, []
# Implementation: Required
# Called from: fetch loop
# This downloads the media from the server.
def get_media(self, src, dst):
# Implementation: optional
# You dont actually have to implement this function for the liquidsoap playout to work.
# Called from: pypo_notify.py
# This is a callback from liquidsoap, we use this to notify about the
# currently playing *song*. We get passed a JSON string which we handed to
# liquidsoap in get_liquidsoap_data().
def notify_media_item_start_playing(self, data, media_id):
# Implementation: optional
# You dont actually have to implement this function for the liquidsoap playout to work.
def generate_range_dp(self):
# Implementation: optional
# Called from: push loop
# Return a dict of extra info you want to pass to liquidsoap
# You will be able to use this data in update_start_playing
def get_liquidsoap_data(self, pkey, schedule):
def get_shows_to_record(self):
def upload_recorded_show(self):
def check_media_status(self, md5):
def update_media_metadata(self, md):
def list_all_db_files(self, dir_id):
def list_all_watched_dirs(self):
def add_watched_dir(self):
def remove_watched_dir(self):
def set_storage_dir(self):
def register_component(self):
def notify_liquidsoap_error(self, error_msg, stream_id):
def notify_liquidsoap_connection(self, stream_id):
# Put here whatever tests you want to run to make sure your API is working
def test(self):
#def get_media_type(self, playlist):
# nil
# Airtime API Client
class AirTimeApiClient(ApiClientInterface):
def __init__(self, logger=None):
if logger != None:
self.logger = logger
self.logger = logging.getLogger()
# loading config file
self.config = ConfigObj('/etc/airtime/api_client.cfg')
except Exception, e:
self.logger.error('Error loading config file: %s', e)
def get_response_from_server(self, url):
logger = self.logger
successful_response = False
while not successful_response:
response = urllib2.urlopen(url).read()
successful_response = True
except IOError, e:
logger.error('Error Authenticating with remote server: %s', e)
except Exception, e:
logger.error('Couldn\'t connect to remote server. Is it running?')
logger.error("%s" % e)
if not successful_response:
logger.error("Error connecting to server, waiting 5 seconds and trying again.")
return response
def __get_airtime_version(self, verbose = True):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["version_url"])
logger.debug("Trying to contact %s", url)
url = url.replace("%%api_key%%", self.config["api_key"])
version = -1
response = None
data = self.get_response_from_server(url)
logger.debug("Data: %s", data)
response_json = json.loads(data)
version = response_json['version']
logger.debug("Airtime Version %s detected", version)
except Exception, e:
logger.error("Unable to detect Airtime Version - %s", e)
return -1
return version
def test(self):
logger = self.logger
status, items = self.get_schedule('2010-01-01-00-00-00', '2011-01-01-00-00-00')
schedule = items["playlists"]
logger.debug("Number of playlists found: %s", str(len(schedule)))
count = 1
for pkey in sorted(schedule.iterkeys()):
logger.debug("Playlist #%s",str(count))
playlist = schedule[pkey]
for item in playlist["medias"]:
filename = urlparse(item["uri"])
filename = filename.query[5:]
self.get_media(item["uri"], filename)
def is_server_compatible(self, verbose = True):
logger = self.logger
version = self.__get_airtime_version(verbose)
if (version == -1):
if (verbose):
logger.info('Unable to get Airtime version number.\n')
return False
elif (version[0:3] != AIRTIME_VERSION[0:3]):
if (verbose):
logger.info('Airtime version found: ' + str(version))
logger.info('pypo is at version ' +AIRTIME_VERSION+' and is not compatible with this version of Airtime.\n')
return False
if (verbose):
logger.info('Airtime version: ' + str(version))
logger.info('pypo is at version ' +AIRTIME_VERSION+' and is compatible with this version of Airtime.')
return True
def get_schedule(self, start=None, end=None):
logger = self.logger
# Construct the URL
export_url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["export_url"])
logger.info("Fetching schedule from %s", export_url)
export_url = export_url.replace('%%api_key%%', self.config["api_key"])
response = ""
response_json = self.get_response_from_server(export_url)
response = json.loads(response_json)
success = True
except Exception, e:
success = False
return success, response
def get_media(self, uri, dst):
logger = self.logger
src = uri + "/api_key/%%api_key%%"
logger.info("try to download from %s to %s", src, dst)
src = src.replace("%%api_key%%", self.config["api_key"])
# check if file exists already before downloading again
filename, headers = urllib.urlretrieve(src, dst)
except Exception, e:
logger.error("%s", e)
This is a callback from liquidsoap, we use this to notify about the
currently playing *song*. We get passed a JSON string which we handed to
liquidsoap in get_liquidsoap_data().
def notify_media_item_start_playing(self, data, media_id):
logger = self.logger
response = ''
schedule_id = data
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["update_start_playing_url"])
url = url.replace("%%media_id%%", str(media_id))
url = url.replace("%%schedule_id%%", str(schedule_id))
url = url.replace("%%api_key%%", self.config["api_key"])
response = self.get_response_from_server(url)
response = json.loads(response)
logger.info("API-Status %s", response['status'])
logger.info("API-Message %s", response['message'])
except Exception, e:
logger.error("Exception: %s", e)
return response
def get_liquidsoap_data(self, pkey, schedule):
logger = self.logger
playlist = schedule[pkey]
data = dict()
data["schedule_id"] = playlist['id']
except Exception, e:
data["schedule_id"] = 0
return data
def get_shows_to_record(self):
logger = self.logger
response = None
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["show_schedule_url"])
url = url.replace("%%api_key%%", self.config["api_key"])
response = self.get_response_from_server(url)
response = json.loads(response)
logger.info("shows %s", response)
except Exception, e:
logger.error("Exception: %s", e)
response = None
return response
def upload_recorded_show(self, data, headers):
logger = self.logger
response = ''
retries = int(self.config["upload_retries"])
retries_wait = int(self.config["upload_wait"])
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["upload_file_url"])
url = url.replace("%%api_key%%", self.config["api_key"])
for i in range(0, retries):
logger.debug("Upload attempt: %s", i+1)
request = urllib2.Request(url, data, headers)
response = urllib2.urlopen(request).read().strip()
logger.info("uploaded show result %s", response)
except urllib2.HTTPError, e:
logger.error("Http error code: %s", e.code)
except urllib2.URLError, e:
logger.error("Server is down: %s", e.args)
except Exception, e:
logger.error("Exception: %s", e)
#wait some time before next retry
return response
def check_live_stream_auth(self, username, password, dj_type):
#logger = logging.getLogger()
response = ''
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["check_live_stream_auth"])
url = url.replace("%%api_key%%", self.config["api_key"])
url = url.replace("%%username%%", username)
url = url.replace("%%djtype%%", dj_type)
url = url.replace("%%password%%", password)
response = self.get_response_from_server(url)
response = json.loads(response)
except Exception, e:
import traceback
top = traceback.format_exc()
print "Exception: %s", e
print "traceback: %s", top
response = None
return response
def setup_media_monitor(self):
logger = self.logger
response = None
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["media_setup_url"])
url = url.replace("%%api_key%%", self.config["api_key"])
response = self.get_response_from_server(url)
response = json.loads(response)
logger.info("Connected to Airtime Server. Json Media Storage Dir: %s", response)
except Exception, e:
response = None
logger.error("Exception: %s", e)
return response
def update_media_metadata(self, md, mode, is_record=False):
logger = self.logger
response = None
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["update_media_url"])
url = url.replace("%%api_key%%", self.config["api_key"])
url = url.replace("%%mode%%", mode)
md = convert_dict_value_to_utf8(md)
data = urllib.urlencode(md)
req = urllib2.Request(url, data)
response = self.get_response_from_server(req)
logger.info("update media %s, filepath: %s, mode: %s", response, md['MDATA_KEY_FILEPATH'], mode)
response = json.loads(response)
if("error" not in response and is_record):
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["upload_recorded"])
url = url.replace("%%fileid%%", str(response[u'id']))
url = url.replace("%%showinstanceid%%", str(md['MDATA_KEY_TRACKNUMBER']))
url = url.replace("%%api_key%%", self.config["api_key"])
response = self.get_response_from_server(url)
response = json.loads(response)
logger.info("associate recorded %s", response)
except Exception, e:
response = None
import traceback
top = traceback.format_exc()
logger.error('Exception: %s', e)
logger.error("traceback: %s", top)
return response
#returns a list of all db files for a given directory in JSON format:
#{"files":["path/to/file1", "path/to/file2"]}
#Note that these are relative paths to the given directory. The full
#path is not returned.
def list_all_db_files(self, dir_id):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["list_all_db_files"])
url = url.replace("%%api_key%%", self.config["api_key"])
url = url.replace("%%dir_id%%", dir_id)
response = self.get_response_from_server(url)
response = json.loads(response)
except Exception, e:
response = None
logger.error("Exception: %s", e)
return response
def list_all_watched_dirs(self):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["list_all_watched_dirs"])
url = url.replace("%%api_key%%", self.config["api_key"])
response = self.get_response_from_server(url)
response = json.loads(response)
except Exception, e:
response = None
logger.error("Exception: %s", e)
return response
def add_watched_dir(self, path):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["add_watched_dir"])
url = url.replace("%%api_key%%", self.config["api_key"])
url = url.replace("%%path%%", base64.b64encode(path))
response = self.get_response_from_server(url)
response = json.loads(response)
except Exception, e:
response = None
logger.error("Exception: %s", e)
return response
def remove_watched_dir(self, path):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["remove_watched_dir"])
url = url.replace("%%api_key%%", self.config["api_key"])
url = url.replace("%%path%%", base64.b64encode(path))
response = self.get_response_from_server(url)
response = json.loads(response)
except Exception, e:
response = None
logger.error("Exception: %s", e)
return response
def set_storage_dir(self, path):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["set_storage_dir"])
url = url.replace("%%api_key%%", self.config["api_key"])
url = url.replace("%%path%%", base64.b64encode(path))
response = self.get_response_from_server(url)
response = json.loads(response)
except Exception, e:
response = None
logger.error("Exception: %s", e)
return response
def get_stream_setting(self):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["get_stream_setting"])
url = url.replace("%%api_key%%", self.config["api_key"])
response = self.get_response_from_server(url)
response = json.loads(response)
except Exception, e:
response = None
logger.error("Exception: %s", e)
return response
Purpose of this method is to contact the server with a "Hey its me!" message.
This will allow the server to register the component's (component = media-monitor, pypo etc.)
ip address, and later use it to query monit via monit's http service, or download log files
via a http server.
def register_component(self, component):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["register_component"])
url = url.replace("%%api_key%%", self.config["api_key"])
url = url.replace("%%component%%", component)
except Exception, e:
logger.error("Exception: %s", e)
def notify_liquidsoap_status(self, msg, stream_id, time):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["update_liquidsoap_status"])
url = url.replace("%%api_key%%", self.config["api_key"])
msg = msg.replace('/', ' ')
encoded_msg = urllib.quote(msg, '')
url = url.replace("%%msg%%", encoded_msg)
url = url.replace("%%stream_id%%", stream_id)
url = url.replace("%%boot_time%%", time)
response = self.get_response_from_server(url)
except Exception, e:
logger.error("Exception: %s", e)
def notify_source_status(self, sourcename, status):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["update_source_status"])
url = url.replace("%%api_key%%", self.config["api_key"])
url = url.replace("%%sourcename%%", sourcename)
url = url.replace("%%status%%", status)
response = self.get_response_from_server(url)
except Exception, e:
logger.error("Exception: %s", e)
This function updates status of mounted file system information on airtime
def update_file_system_mount(self, added_dir, removed_dir):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["update_fs_mount"])
url = url.replace("%%api_key%%", self.config["api_key"])
added_data_string = string.join(added_dir, ',')
removed_data_string = string.join(removed_dir, ',')
map = [("added_dir", added_data_string),("removed_dir",removed_data_string)]
data = urllib.urlencode(map)
req = urllib2.Request(url, data)
response = self.get_response_from_server(req)
logger.info("update file system mount: %s", json.loads(response))
except Exception, e:
import traceback
top = traceback.format_exc()
logger.error('Exception: %s', e)
logger.error("traceback: %s", top)
When watched dir is missing(unplugged or something) on boot up, this function will get called
and will call appropriate function on Airtime.
def handle_watched_dir_missing(self, dir):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["handle_watched_dir_missing"])
url = url.replace("%%api_key%%", self.config["api_key"])
url = url.replace("%%dir%%", base64.b64encode(dir))
response = self.get_response_from_server(url)
logger.info("update file system mount: %s", json.loads(response))
except Exception, e:
import traceback
top = traceback.format_exc()
logger.error('Exception: %s', e)
logger.error("traceback: %s", top)
Retrive infomations needed on bootstrap time
def get_bootstrap_info(self):
logger = self.logger
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["get_bootstrap_info"])
url = url.replace("%%api_key%%", self.config["api_key"])
response = self.get_response_from_server(url)
response = json.loads(response)
logger.info("Bootstrap info retrieved %s", response)
except Exception, e:
response = None
import traceback
top = traceback.format_exc()
logger.error('Exception: %s', e)
logger.error("traceback: %s", top)
return response