CC-3336: Refactor schedule API used by pypo

This commit is contained in:
Martin Konecny 2012-02-27 13:52:35 -05:00
parent a53d856e8d
commit 11f31effca
4 changed files with 222 additions and 299 deletions

View File

@ -425,46 +425,22 @@ class Application_Model_Schedule {
$rows = array(); $rows = array();
$sql = "SELECT st.file_id AS file_id," $sql = "SELECT st.file_id AS file_id,"
." st.starts AS starts," ." st.starts AS start,"
." st.ends AS ends," ." st.ends AS end,"
." si.starts as show_start," ." si.starts as show_start,"
." si.ends as show_end" ." si.ends as show_end"
." FROM $CC_CONFIG[scheduleTable] as st" ." FROM $CC_CONFIG[scheduleTable] as st"
." LEFT JOIN $CC_CONFIG[showInstances] as si" ." LEFT JOIN $CC_CONFIG[showInstances] as si"
." ON st.instance_id = si.id" ." ON st.instance_id = si.id"
." ORDER BY starts"; ." ORDER BY start";
/*
$sql = "SELECT pt.creator as creator"
." st.file_id AS file_id"
." st.starts AS starts"
." st.ends AS ends"
." st.name as show_name"
." si.starts as show_start"
." si.ends as show_end"
." FROM $CC_CONFIG[scheduleTable] as st"
." LEFT JOIN $CC_CONFIG[showInstances] as si"
." ON st.instance_id = si.id"
." LEFT JOIN $CC_CONFIG[showTable] as sh"
." ON si.show_id = sh.id"
//The next line ensures we only get songs that haven't ended yet
." WHERE (st.ends >= TIMESTAMP '$p_currentDateTime')"
." AND (st.ends <= TIMESTAMP '$p_toDateTime')"
//next line makes sure that we aren't returning items that
//are past the show's scheduled timeslot.
." AND (st.starts < si.ends)"
." ORDER BY starts";
* */
Logging::log($sql);
$rows = $CC_DBC->GetAll($sql); $rows = $CC_DBC->GetAll($sql);
if (!PEAR::isError($rows)) { if (PEAR::isError($rows)) {
foreach ($rows as &$row) { return null;
$row["start"] = $row["starts"];
$row["end"] = $row["ends"];
}
} }
return $rows; return $rows;
} }
@ -508,13 +484,16 @@ class Application_Model_Schedule {
$data = array(); $data = array();
$utcTimeZone = new DateTimeZone("UTC"); $utcTimeZone = new DateTimeZone("UTC");
$data["status"] = array();
$data["media"] = array();
foreach ($items as $item){ foreach ($items as $item){
$storedFile = Application_Model_StoredFile::Recall($item["file_id"]); $storedFile = Application_Model_StoredFile::Recall($item["file_id"]);
$uri = $storedFile->getFileUrlUsingConfigAddress(); $uri = $storedFile->getFileUrlUsingConfigAddress();
$showEndDateTime = new DateTime($item["show_end"], $utcTimeZone); $showEndDateTime = new DateTime($item["show_end"], $utcTimeZone);
$trackEndDateTime = new DateTime($item["ends"], $utcTimeZone); $trackEndDateTime = new DateTime($item["end"], $utcTimeZone);
/* Note: cue_out and end are always the same. */ /* Note: cue_out and end are always the same. */
/* TODO: Not all tracks will have "show_end" */ /* TODO: Not all tracks will have "show_end" */
@ -525,16 +504,16 @@ class Application_Model_Schedule {
$item["cue_out"] = $item["cue_out"] - $diff; $item["cue_out"] = $item["cue_out"] - $diff;
} }
$starts = Application_Model_Schedule::AirtimeTimeToPypoTime($item["starts"]); $start = Application_Model_Schedule::AirtimeTimeToPypoTime($item["start"]);
$data[$starts] = array( $data["media"][$start] = array(
'id' => $storedFile->getGunid(), 'id' => $storedFile->getGunid(),
'uri' => $uri, 'uri' => $uri,
'fade_in' => Application_Model_Schedule::WallTimeToMillisecs($item["fade_in"]), 'fade_in' => Application_Model_Schedule::WallTimeToMillisecs($item["fade_in"]),
'fade_out' => Application_Model_Schedule::WallTimeToMillisecs($item["fade_out"]), 'fade_out' => Application_Model_Schedule::WallTimeToMillisecs($item["fade_out"]),
'cue_in' => Application_Model_DateHelper::CalculateLengthInSeconds($item["cue_in"]), 'cue_in' => Application_Model_DateHelper::CalculateLengthInSeconds($item["cue_in"]),
'cue_out' => Application_Model_DateHelper::CalculateLengthInSeconds($item["cue_out"]), 'cue_out' => Application_Model_DateHelper::CalculateLengthInSeconds($item["cue_out"]),
'start' => $starts, 'start' => $start,
'end' => Application_Model_Schedule::AirtimeTimeToPypoTime($item["ends"]) 'end' => Application_Model_Schedule::AirtimeTimeToPypoTime($item["end"])
); );
} }

View File

@ -261,15 +261,15 @@ class AirTimeApiClient(ApiClientInterface):
export_url = export_url.replace('%%api_key%%', self.config["api_key"]) export_url = export_url.replace('%%api_key%%', self.config["api_key"])
response = "" response = ""
status = 0
try: try:
response_json = self.get_response_from_server(export_url) response_json = self.get_response_from_server(export_url)
response = json.loads(response_json) response = json.loads(response_json)
status = response['check'] success = True
except Exception, e: except Exception, e:
logger.error(e) logger.error(e)
success = False
return status, response return success, response
def get_media(self, uri, dst): def get_media(self, uri, dst):

View File

@ -43,6 +43,8 @@ class PypoFetch(Thread):
self.fetch_queue = pypoFetch_q self.fetch_queue = pypoFetch_q
self.push_queue = pypoPush_q self.push_queue = pypoPush_q
self.logger = logging.getLogger();
self.cache_dir = os.path.join(config["cache_dir"], "scheduler") self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
logger.debug("Cache dir %s", self.cache_dir) logger.debug("Cache dir %s", self.cache_dir)
try: try:
@ -52,12 +54,8 @@ class PypoFetch(Thread):
except Exception, e: except Exception, e:
logger.error(e) logger.error(e)
self.schedule_data = [] self.schedule_data = []
logger = logging.getLogger('fetch') self.logger.info("PypoFetch: init complete")
logger.info("PypoFetch: init complete")
""" """
Handle a message from RabbitMQ, put it into our yucky global var. Handle a message from RabbitMQ, put it into our yucky global var.
@ -65,53 +63,51 @@ class PypoFetch(Thread):
""" """
def handle_message(self, message): def handle_message(self, message):
try: try:
logger = logging.getLogger('fetch') self.logger.info("Received event from Pypo Message Handler: %s" % message)
logger.info("Received event from Pypo Message Handler: %s" % message)
m = json.loads(message) m = json.loads(message)
command = m['event_type'] command = m['event_type']
logger.info("Handling command: " + command) self.logger.info("Handling command: " + command)
if command == 'update_schedule': if command == 'update_schedule':
self.schedule_data = m['schedule'] self.schedule_data = m['schedule']
self.process_schedule(self.schedule_data, "scheduler", False) self.process_schedule(self.schedule_data, False)
elif command == 'update_stream_setting': elif command == 'update_stream_setting':
logger.info("Updating stream setting...") self.logger.info("Updating stream setting...")
self.regenerateLiquidsoapConf(m['setting']) self.regenerateLiquidsoapConf(m['setting'])
elif command == 'update_stream_format': elif command == 'update_stream_format':
logger.info("Updating stream format...") self.logger.info("Updating stream format...")
self.update_liquidsoap_stream_format(m['stream_format']) self.update_liquidsoap_stream_format(m['stream_format'])
elif command == 'update_station_name': elif command == 'update_station_name':
logger.info("Updating station name...") self.logger.info("Updating station name...")
self.update_liquidsoap_station_name(m['station_name']) self.update_liquidsoap_station_name(m['station_name'])
elif command == 'cancel_current_show': elif command == 'cancel_current_show':
logger.info("Cancel current show command received...") self.logger.info("Cancel current show command received...")
self.stop_current_show() self.stop_current_show()
except Exception, e: except Exception, e:
import traceback import traceback
top = traceback.format_exc() top = traceback.format_exc()
logger.error('Exception: %s', e) self.logger.error('Exception: %s', e)
logger.error("traceback: %s", top) self.logger.error("traceback: %s", top)
logger.error("Exception in handling Message Handler message: %s", e) self.logger.error("Exception in handling Message Handler message: %s", e)
def stop_current_show(self): def stop_current_show(self):
logger = logging.getLogger('fetch') self.logger.debug('Notifying Liquidsoap to stop playback.')
logger.debug('Notifying Liquidsoap to stop playback.')
try: try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write('source.skip\n') tn.write('source.skip\n')
tn.write('exit\n') tn.write('exit\n')
tn.read_all() tn.read_all()
except Exception, e: except Exception, e:
logger.debug(e) self.logger.debug(e)
logger.debug('Could not connect to liquidsoap') self.logger.debug('Could not connect to liquidsoap')
def regenerateLiquidsoapConf(self, setting): def regenerateLiquidsoapConf(self, setting):
logger = logging.getLogger('fetch')
existing = {} existing = {}
# create a temp file # create a temp file
fh = open('/etc/airtime/liquidsoap.cfg', 'r') fh = open('/etc/airtime/liquidsoap.cfg', 'r')
logger.info("Reading existing config...") self.logger.info("Reading existing config...")
# read existing conf file and build dict # read existing conf file and build dict
while 1: while 1:
line = fh.readline() line = fh.readline()
@ -141,7 +137,7 @@ class PypoFetch(Thread):
#restart flag #restart flag
restart = False restart = False
logger.info("Looking for changes...") self.logger.info("Looking for changes...")
# look for changes # look for changes
for s in setting: for s in setting:
if "output_sound_device" in s[u'keyname'] or "icecast_vorbis_metadata" in s[u'keyname']: if "output_sound_device" in s[u'keyname'] or "icecast_vorbis_metadata" in s[u'keyname']:
@ -149,13 +145,13 @@ class PypoFetch(Thread):
state_change_restart[stream] = False state_change_restart[stream] = False
# This is the case where restart is required no matter what # This is the case where restart is required no matter what
if (existing[s[u'keyname']] != s[u'value']): if (existing[s[u'keyname']] != s[u'value']):
logger.info("'Need-to-restart' state detected for %s...", s[u'keyname']) self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname'])
restart = True; restart = True;
else: else:
stream, dump = s[u'keyname'].split('_',1) stream, dump = s[u'keyname'].split('_',1)
if "_output" in s[u'keyname']: if "_output" in s[u'keyname']:
if (existing[s[u'keyname']] != s[u'value']): if (existing[s[u'keyname']] != s[u'value']):
logger.info("'Need-to-restart' state detected for %s...", s[u'keyname']) self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname'])
restart = True; restart = True;
state_change_restart[stream] = True state_change_restart[stream] = True
elif ( s[u'value'] != 'disabled'): elif ( s[u'value'] != 'disabled'):
@ -167,22 +163,22 @@ class PypoFetch(Thread):
if stream not in change: if stream not in change:
change[stream] = False change[stream] = False
if not (s[u'value'] == existing[s[u'keyname']]): if not (s[u'value'] == existing[s[u'keyname']]):
logger.info("Keyname: %s, Curent value: %s, New Value: %s", s[u'keyname'], existing[s[u'keyname']], s[u'value']) self.logger.info("Keyname: %s, Curent value: %s, New Value: %s", s[u'keyname'], existing[s[u'keyname']], s[u'value'])
change[stream] = True change[stream] = True
# set flag change for sound_device alway True # set flag change for sound_device alway True
logger.info("Change:%s, State_Change:%s...", change, state_change_restart) self.logger.info("Change:%s, State_Change:%s...", change, state_change_restart)
for k, v in state_change_restart.items(): for k, v in state_change_restart.items():
if k == "sound_device" and v: if k == "sound_device" and v:
restart = True restart = True
elif v and change[k]: elif v and change[k]:
logger.info("'Need-to-restart' state detected for %s...", k) self.logger.info("'Need-to-restart' state detected for %s...", k)
restart = True restart = True
# rewrite # rewrite
if restart: if restart:
fh = open('/etc/airtime/liquidsoap.cfg', 'w') fh = open('/etc/airtime/liquidsoap.cfg', 'w')
logger.info("Rewriting liquidsoap.cfg...") self.logger.info("Rewriting liquidsoap.cfg...")
fh.write("################################################\n") fh.write("################################################\n")
fh.write("# THIS FILE IS AUTO GENERATED. DO NOT CHANGE!! #\n") fh.write("# THIS FILE IS AUTO GENERATED. DO NOT CHANGE!! #\n")
fh.write("################################################\n") fh.write("################################################\n")
@ -204,17 +200,16 @@ class PypoFetch(Thread):
fh.close() fh.close()
# restarting pypo. # restarting pypo.
# we could just restart liquidsoap but it take more time somehow. # we could just restart liquidsoap but it take more time somehow.
logger.info("Restarting pypo...") self.logger.info("Restarting pypo...")
sys.exit(0) sys.exit(0)
else: else:
logger.info("No change detected in setting...") self.logger.info("No change detected in setting...")
self.update_liquidsoap_connection_status() self.update_liquidsoap_connection_status()
""" """
updates the status of liquidsoap connection to the streaming server updates the status of liquidsoap connection to the streaming server
This fucntion updates the bootup time variable in liquidsoap script This fucntion updates the bootup time variable in liquidsoap script
""" """
def update_liquidsoap_connection_status(self): def update_liquidsoap_connection_status(self):
logger = logging.getLogger('fetch')
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
# update the boot up time of liquidsoap. Since liquidsoap is not restarting, # update the boot up time of liquidsoap. Since liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get # we are manually adjusting the bootup time variable so the status msg will get
@ -232,7 +227,7 @@ class PypoFetch(Thread):
# streamin info is in the form of: # streamin info is in the form of:
# eg. s1:true,2:true,3:false # eg. s1:true,2:true,3:false
streams = stream_info.split(",") streams = stream_info.split(",")
logger.info(streams) self.logger.info(streams)
fake_time = current_time + 1 fake_time = current_time + 1
for s in streams: for s in streams:
@ -246,33 +241,31 @@ class PypoFetch(Thread):
# Push stream metadata to liquidsoap # Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try: try:
logger = logging.getLogger('fetch') self.logger.info(LS_HOST)
logger.info(LS_HOST) self.logger.info(LS_PORT)
logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
logger.info(command) self.logger.info(command)
tn.write(command) tn.write(command)
tn.write('exit\n') tn.write('exit\n')
tn.read_all() tn.read_all()
except Exception, e: except Exception, e:
logger.error("Exception %s", e) self.logger.error("Exception %s", e)
def update_liquidsoap_station_name(self, station_name): def update_liquidsoap_station_name(self, station_name):
# Push stream metadata to liquidsoap # Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try: try:
logger = logging.getLogger('fetch') self.logger.info(LS_HOST)
logger.info(LS_HOST) self.logger.info(LS_PORT)
logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.station_name %s\n' % station_name).encode('utf-8') command = ('vars.station_name %s\n' % station_name).encode('utf-8')
logger.info(command) self.logger.info(command)
tn.write(command) tn.write(command)
tn.write('exit\n') tn.write('exit\n')
tn.read_all() tn.read_all()
except Exception, e: except Exception, e:
logger.error("Exception %s", e) self.logger.error("Exception %s", e)
""" """
Process the schedule Process the schedule
@ -283,165 +276,162 @@ class PypoFetch(Thread):
- runs the cleanup routine, to get rid of unused cached files - runs the cleanup routine, to get rid of unused cached files
""" """
def process_schedule(self, schedule_data, bootstrapping): def process_schedule(self, schedule_data, bootstrapping):
logger = logging.getLogger('fetch') media = schedule_data["media"]
playlists = schedule_data["playlists"]
# Download all the media and put playlists in liquidsoap "annotate" format # Download all the media and put playlists in liquidsoap "annotate" format
try: try:
liquidsoap_playlists = self.prepare_playlists(playlists, bootstrapping) media = self.prepare_media(media, bootstrapping)
except Exception, e: logger.error("%s", e) except Exception, e: self.logger.error("%s", e)
# Send the data to pypo-push # Send the data to pypo-push
scheduled_data = dict() scheduled_data = dict()
scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists
scheduled_data['schedule'] = playlists
self.push_queue.put(scheduled_data)
scheduled_data['liquidsoap_annotation_queue'] = liquidsoap_annotation_queue
self.push_queue.put(media)
"""
# cleanup # cleanup
try: self.cleanup() try: self.cleanup()
except Exception, e: logger.error("%s", e) except Exception, e: self.logger.error("%s", e)
"""
"""
In this function every audio file is cut as necessary (cue_in/cue_out != 0)
and stored in a playlist folder.
file is e.g. 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3
"""
def prepare_playlists(self, playlists, bootstrapping):
logger = logging.getLogger('fetch')
liquidsoap_playlists = dict()
# Dont do anything if playlists is empty
if not playlists:
logger.debug("Schedule is empty.")
return liquidsoap_playlists
scheduleKeys = sorted(playlists.iterkeys())
def prepare_media(self, media, bootstrapping):
"""
Iterate through the list of media items in "media" and
download them.
"""
try: try:
for pkey in scheduleKeys: mediaKeys = sorted(media.iterkeys())
logger.info("Playlist starting at %s", pkey) for mkey in mediaKeys:
playlist = playlists[pkey] self.logger.debug("Media item starting at %s", mkey)
media_item = media[mkey]
if bootstrapping:
check_for_crash(media_item)
# create playlist directory # create playlist directory
try: try:
os.mkdir(self.cache_dir + str(pkey)) """
Extract year, month, date from mkey
"""
y_m_d = mkey[0:10]
download_dir = os.mkdir(os.path.join(self.cache_dir, y_m_d))
fileExt = os.path.splitext(media_item['uri'])[1]
dst = os.path.join(download_dir, media_item['id']+fileExt)
except Exception, e: except Exception, e:
logger.warning(e) self.logger.warning(e)
if self.handle_media_file(media_item, dst):
entry = create_liquidsoap_annotation(media_item, dst)
#entry['show_name'] = playlist['show_name']
entry['show_name'] = "TODO"
media_item["annotation"] = entry
ls_playlist = self.handle_media_file(playlist, pkey, bootstrapping)
liquidsoap_playlists[pkey] = ls_playlist
except Exception, e: except Exception, e:
logger.error("%s", e) self.logger.error("%s", e)
return liquidsoap_playlists
return media
def create_liquidsoap_annotation(media, dst):
pl_entry = \
'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], 0, \
float(media['fade_in']) / 1000, \
float(media['fade_out']) / 1000, \
float(media['cue_in']), \
float(media['cue_out']), \
media['row_id'], dst)
""" """
Download and cache the media files. Tracks are only added to the playlist if they are accessible
This handles both remote and local files. on the file system and larger than 0 bytes.
Returns an updated ls_playlist string. So this can lead to playlists shorter than expectet.
""" (there is a hardware silence detector for this cases...)
def handle_media_file(self, playlist, pkey, bootstrapping): """
logger = logging.getLogger('fetch') entry = dict()
entry['type'] = 'file'
entry['annotate'] = pl_entry
return entry
ls_playlist = [] def check_for_crash(media_item):
start = media_item['start']
end = media_item['end']
dtnow = datetime.utcnow() dtnow = datetime.utcnow()
str_tnow_s = dtnow.strftime('%Y-%m-%d-%H-%M-%S') str_tnow_s = dtnow.strftime('%Y-%m-%d-%H-%M-%S')
sortedKeys = sorted(playlist['medias'].iterkeys()) if start <= str_tnow_s and str_tnow_s < end:
#song is currently playing and we just started pypo. Maybe there
for key in sortedKeys: #was a power outage? Let's restart playback of this song.
media = playlist['medias'][key] start_split = map(int, start.split('-'))
logger.debug("Processing track %s", media['uri']) media_start = datetime(start_split[0], start_split[1], start_split[2], start_split[3], start_split[4], start_split[5], 0, None)
self.logger.debug("Found media item that started at %s.", media_start)
if bootstrapping: delta = dtnow - media_start #we get a TimeDelta object from this operation
start = media['start'] self.logger.info("Starting media item at %d second point", delta.seconds)
end = media['end']
if end <= str_tnow_s:
continue
elif start <= str_tnow_s and str_tnow_s < end:
#song is currently playing and we just started pypo. Maybe there
#was a power outage? Let's restart playback of this song.
start_split = map(int, start.split('-'))
media_start = datetime(start_split[0], start_split[1], start_split[2], start_split[3], start_split[4], start_split[5], 0, None)
logger.debug("Found media item that started at %s.", media_start)
delta = dtnow - media_start #we get a TimeDelta object from this operation
logger.info("Starting media item at %d second point", delta.seconds)
media['cue_in'] = delta.seconds + 10
td = timedelta(seconds=10)
playlist['start'] = (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S')
logger.info("Crash detected, setting playlist to restart at %s", (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S'))
"""
Set the cue_in. This is used by Liquidsoap to determine at what point in the media
item it should start playing. If the cue_in happens to be > cue_out, then make cue_in = cue_out
"""
media_item['cue_in'] = delta.seconds + 10 if delta.seconds + 10 < media_item['cue_out'] else media_item['cue_out']
"""
Set the start time, which is used by pypo-push to determine when a media item is scheduled.
Pushing the start time into the future will ensure pypo-push will push this to Liquidsoap.
"""
td = timedelta(seconds=10)
media_item['start'] = (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S')
self.logger.info("Crash detected, setting playlist to restart at %s", (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S'))
def handle_media_file(self, media_item, dst):
"""
Download and cache the media item.
"""
self.logger.debug("Processing track %s", media_item['uri'])
fileExt = os.path.splitext(media['uri'])[1] try:
try: #blocking function to download the media item
dst = os.path.join(self.cache_dir, pkey, media['id']+fileExt) self.download_file(media_item, dst)
# download media file if os.access(dst, os.R_OK):
self.handle_remote_file(media, dst) # check filesize (avoid zero-byte files)
try:
if True == os.access(dst, os.R_OK): fsize = os.path.getsize(dst)
# check filesize (avoid zero-byte files)
try: fsize = os.path.getsize(dst)
except Exception, e:
logger.error("%s", e)
fsize = 0
if fsize > 0: if fsize > 0:
pl_entry = \ return True
'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \ except Exception, e:
% (media['id'], 0, \ self.logger.error("%s", e)
float(media['fade_in']) / 1000, \ fsize = 0
float(media['fade_out']) / 1000, \ else:
float(media['cue_in']), \ self.logger.warning("Cannot read file %s.", dst)
float(media['cue_out']), \
media['row_id'], dst)
""" except Exception, e:
Tracks are only added to the playlist if they are accessible self.logger.info("%s", e)
on the file system and larger than 0 bytes.
So this can lead to playlists shorter than expectet. return False
(there is a hardware silence detector for this cases...)
"""
entry = dict()
entry['type'] = 'file'
entry['annotate'] = pl_entry
entry['show_name'] = playlist['show_name']
ls_playlist.append(entry)
else:
logger.warning("zero-size file - skipping %s. will not add it to playlist at %s", media['uri'], dst)
else:
logger.warning("something went wrong. file %s not available. will not add it to playlist", dst)
except Exception, e: logger.info("%s", e)
return ls_playlist
""" """
Download a file from a remote server and store it in the cache. Download a file from a remote server and store it in the cache.
""" """
def handle_remote_file(self, media, dst): def download_file(self, media_item, dst):
logger = logging.getLogger('fetch')
if os.path.isfile(dst): if os.path.isfile(dst):
pass pass
#logger.debug("file already in cache: %s", dst) #self.logger.debug("file already in cache: %s", dst)
else: else:
logger.debug("try to download %s", media['uri']) self.logger.debug("try to download %s", media_item['uri'])
self.api_client.get_media(media['uri'], dst) self.api_client.get_media(media_item['uri'], dst)
""" """
Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR" Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR"
and deletes them. and deletes them.
""" """
def cleanup(self): def cleanup(self):
logger = logging.getLogger('fetch')
offset = 3600 * int(config["cache_for"]) offset = 3600 * int(config["cache_for"])
now = time.time() now = time.time()
@ -451,78 +441,44 @@ class PypoFetch(Thread):
timestamp = calendar.timegm(time.strptime(dir, "%Y-%m-%d-%H-%M-%S")) timestamp = calendar.timegm(time.strptime(dir, "%Y-%m-%d-%H-%M-%S"))
if (now - timestamp) > offset: if (now - timestamp) > offset:
try: try:
logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp) self.logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp)
shutil.rmtree(os.path.join(r, dir)) shutil.rmtree(os.path.join(r, dir))
except Exception, e: except Exception, e:
logger.error("%s", e) self.logger.error("%s", e)
pass pass
else: else:
logger.info('sucessfully removed %s', os.path.join(r, dir)) self.logger.info('sucessfully removed %s', os.path.join(r, dir))
except Exception, e: except Exception, e:
logger.error(e) self.logger.error(e)
def main(self): def main(self):
logger = logging.getLogger('fetch')
try: os.mkdir(self.cache_dir) try: os.mkdir(self.cache_dir)
except Exception, e: pass except Exception, e: pass
# Bootstrap: since we are just starting up, we need to grab the # Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates. # most recent schedule. After that we can just wait for updates.
status, self.schedule_data = self.api_client.get_schedule() success, self.schedule_data = self.api_client.get_schedule()
if status == 1: if success:
logger.info("Bootstrap schedule received: %s", self.schedule_data) self.logger.info("Bootstrap schedule received: %s", self.schedule_data)
self.process_schedule(self.schedule_data, "scheduler", True) self.process_schedule(self.schedule_data, True)
loops = 1 loops = 1
while True: while True:
logger.info("Loop #%s", loops) self.logger.info("Loop #%s", loops)
try: try:
try: message = self.fetch_queue.get(block=True, timeout=3600)
""" self.handle_message(message)
our simple_queue.get() requires a timeout, in which case we
fetch the Airtime schedule manually. It is important to fetch
the schedule periodically because if we didn't, we would only
get schedule updates via RabbitMq if the user was constantly
using the Airtime interface.
If the user is not using the interface, RabbitMq messages are not
sent, and we will have very stale (or non-existent!) data about the
schedule.
Currently we are checking every 3600 seconds (1 hour)
"""
message = self.fetch_queue.get(block=True, timeout=3600)
self.handle_message(message)
except Empty, e:
"""
Queue timeout. Fetching data manually
"""
raise
except Exception, e:
"""
sleep 5 seconds so that we don't spin inside this
while loop and eat all the CPU
"""
time.sleep(5)
"""
There is a problem with the RabbitMq messenger service. Let's
log the error and get the schedule via HTTP polling
"""
logger.error("Exception, %s", e)
raise
except Exception, e: except Exception, e:
""" """
There is a problem with the RabbitMq messenger service. Let's There is a problem with the RabbitMq messenger service. Let's
log the error and get the schedule via HTTP polling log the error and get the schedule via HTTP polling
""" """
logger.error("Exception, %s", e) self.logger.error("Exception, %s", e)
status, self.schedule_data = self.api_client.get_schedule() status, self.schedule_data = self.api_client.get_schedule()
if status == 1: if status == 1:
self.process_schedule(self.schedule_data, "scheduler", False) self.process_schedule(self.schedule_data, False)
loops += 1 loops += 1

View File

@ -36,11 +36,10 @@ class PypoPush(Thread):
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.queue = q self.queue = q
self.schedule = dict() self.media = dict()
self.playlists = dict()
self.liquidsoap_state_play = True self.liquidsoap_state_play = True
self.push_ahead = 10 self.push_ahead = 30
""" """
The Push Loop - the push loop periodically checks if there is a playlist The Push Loop - the push loop periodically checks if there is a playlist
@ -56,35 +55,30 @@ class PypoPush(Thread):
if not self.queue.empty(): if not self.queue.empty():
# make sure we get the latest schedule # make sure we get the latest schedule
while not self.queue.empty(): while not self.queue.empty():
scheduled_data = self.queue.get() self.media = self.queue.get()
logger.debug("Received data from pypo-fetch") logger.debug("Received data from pypo-fetch")
self.schedule = scheduled_data['schedule'] logger.debug('media %s' % json.dumps(self.media))
self.playlists = scheduled_data['liquidsoap_playlists']
logger.debug('schedule %s' % json.dumps(self.schedule))
logger.debug('playlists %s' % json.dumps(self.playlists))
schedule = self.schedule media = self.media
playlists = self.playlists
currently_on_air = False currently_on_air = False
if schedule: if media:
tnow = time.gmtime(timenow) tnow = time.gmtime(timenow)
tcoming = time.gmtime(timenow + self.push_ahead) tcoming = time.gmtime(timenow + self.push_ahead)
str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5]) str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5])
str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5]) str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5])
for pkey in schedule:
plstart = schedule[pkey]['start'][0:19] for media_item in media:
item_start = media_item['start'][0:19]
if str_tnow_s <= plstart and plstart < str_tcoming_s:
logger.debug('Preparing to push playlist scheduled at: %s', pkey) if str_tnow_s <= item_start and item_start < str_tcoming_s:
playlist = schedule[pkey] """
If the media item starts in the next 30 seconds, push it to the queue.
"""
# We have a match, replace the current playlist and logger.debug('Preparing to push media item scheduled at: %s', pkey)
# force liquidsoap to refresh.
if (self.push_liquidsoap(pkey, schedule, playlists) == 1): if self.push_to_liquidsoap(media_item):
logger.debug("Pushed to liquidsoap, updating 'played' status.") logger.debug("Pushed to liquidsoap, updating 'played' status.")
currently_on_air = True currently_on_air = True
@ -93,33 +87,31 @@ class PypoPush(Thread):
# Call API to update schedule states # Call API to update schedule states
logger.debug("Doing callback to server to update 'played' status.") logger.debug("Doing callback to server to update 'played' status.")
self.api_client.notify_scheduled_item_start_playing(pkey, schedule) self.api_client.notify_scheduled_item_start_playing(pkey, schedule)
show_start = schedule[pkey]['show_start'] def push_to_liquidsoap(self, media_item):
show_end = schedule[pkey]['show_end'] if media_item["starts"] == self.last_end_time:
if show_start <= str_tnow_s and str_tnow_s < show_end:
currently_on_air = True
""" """
If currently_on_air = False but liquidsoap_state_play = True then it means that Liquidsoap may this media item is attached to the end of the last
still be playing audio even though the show has ended ('currently_on_air = False' means no show is scheduled) track, so let's push it now so that Liquidsoap can start playing
See CC-3231. it immediately after (and prepare crossfades if need be).
This is a temporary solution for Airtime 2.0 """
""" tn = telnetlib.Telnet(LS_HOST, LS_PORT)
if not currently_on_air and self.liquidsoap_state_play: tn.write(str('queue.push %s\n' % media_item["annotation"].encode('utf-8')))
logger.debug('Notifying Liquidsoap to stop playback.') #TODO: vars.pypo_data
try: #TODO: vars.show_name
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn.write("exit\n")
tn.write('source.skip\n')
tn.write('exit\n') self.last_end_time = media_item["end"]
tn.read_all() else:
except Exception, e: """
logger.debug(e) this media item does not start right after a current playing track.
logger.debug('Could not connect to liquidsoap') We need to sleep, and then wake up when this track starts.
"""
return False
self.liquidsoap_state_play = False def push_liquidsoap_old(self, pkey, schedule, playlists):
def push_liquidsoap(self, pkey, schedule, playlists):
logger = logging.getLogger('push') logger = logging.getLogger('push')
try: try:
@ -127,10 +119,6 @@ class PypoPush(Thread):
plstart = schedule[pkey]['start'][0:19] plstart = schedule[pkey]['start'][0:19]
#strptime returns struct_time in local time #strptime returns struct_time in local time
#mktime takes a time_struct and returns a floating point
#gmtime Convert a time expressed in seconds since the epoch to a struct_time in UTC
#mktime: expresses the time in local time, not UTC. It returns a floating point number, for compatibility with time().
epoch_start = calendar.timegm(time.strptime(plstart, '%Y-%m-%d-%H-%M-%S')) epoch_start = calendar.timegm(time.strptime(plstart, '%Y-%m-%d-%H-%M-%S'))
#Return the time as a floating point number expressed in seconds since the epoch, in UTC. #Return the time as a floating point number expressed in seconds since the epoch, in UTC.
@ -186,7 +174,7 @@ class PypoPush(Thread):
if loops % heartbeat_period == 0: if loops % heartbeat_period == 0:
logger.info("heartbeat") logger.info("heartbeat")
loops = 0 loops = 0
try: self.push('scheduler') try: self.push()
except Exception, e: except Exception, e:
logger.error('Pypo Push Exception: %s', e) logger.error('Pypo Push Exception: %s', e)
time.sleep(PUSH_INTERVAL) time.sleep(PUSH_INTERVAL)