-fading between sources in liquidsoap

-cleaned up pypo-cli
This commit is contained in:
martin 2011-03-02 16:43:46 -05:00
parent 3f2d908f1a
commit 64d15669ac
3 changed files with 195 additions and 469 deletions

View File

@ -7,7 +7,7 @@
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-1.9.xsd"> http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-1.9.xsd">
<!-- <!--
<changeSet id="1" author="bob"> <changeSet id="2011-03-01-16-30-00" author="bob">
<createTable tableName="department"> <createTable tableName="department">
<column name="id" type="int"> <column name="id" type="int">
<constraints primaryKey="true" nullable="false"/> <constraints primaryKey="true" nullable="false"/>

View File

@ -5,20 +5,6 @@
Python part of radio playout (pypo) Python part of radio playout (pypo)
The main functions are "fetch" (./pypo_cli.py -f) and "push" (./pypo_cli.py -p) The main functions are "fetch" (./pypo_cli.py -f) and "push" (./pypo_cli.py -p)
There are two layers: scheduler & daypart (fallback)
The daypart is a fallback-layer generated by the playlists daypart-settings
(eg a playlist creator can say that the list is good for Monday and Tues,
between 14:00 and 16:00). So if there is nothing in the schedule, pypo will
still play something (instead of silence..) This layer is optional.
It is there so that you dont have a fallback player which plays the same 100
tracks over and over again.
Attention & ToDos
- liquidsoap does not like mono files! So we have to make sure that only files with
2 channels are fed to LiquidSoap
(solved: current = audio_to_stereo(current) - maybe not with ultimate performance)
""" """
# python defaults (debian default) # python defaults (debian default)
@ -51,16 +37,11 @@ from util import *
from api_clients import * from api_clients import *
PYPO_VERSION = '0.2' PYPO_VERSION = '0.2'
PYPO_MEDIA_SKIP = 1
PYPO_MEDIA_LIVE_SESSION = 2
PYPO_MEDIA_STREAM = 3
PYPO_MEDIA_FILE_URL = 4
PYPO_MEDIA_FILE_LOCAL = 5
# Set up command-line options # Set up command-line options
parser = OptionParser() parser = OptionParser()
# help screeen / info # help screen / info
usage = "%prog [options]" + " - python playout system" usage = "%prog [options]" + " - python playout system"
parser = OptionParser(usage=usage) parser = OptionParser(usage=usage)
@ -71,11 +52,7 @@ parser.add_option("-t", "--test", help="Do a test to make sure everything is wor
parser.add_option("-f", "--fetch-scheduler", help="Fetch the schedule from server. This is a polling process that runs forever.", default=False, action="store_true", dest="fetch_scheduler") parser.add_option("-f", "--fetch-scheduler", help="Fetch the schedule from server. This is a polling process that runs forever.", default=False, action="store_true", dest="fetch_scheduler")
parser.add_option("-p", "--push-scheduler", help="Push the schedule to Liquidsoap. This is a polling process that runs forever.", default=False, action="store_true", dest="push_scheduler") parser.add_option("-p", "--push-scheduler", help="Push the schedule to Liquidsoap. This is a polling process that runs forever.", default=False, action="store_true", dest="push_scheduler")
parser.add_option("-F", "--fetch-daypart", help="Fetch from daypart - scheduler (loop, interval in config file)", default=False, action="store_true", dest="fetch_daypart")
parser.add_option("-P", "--push-daypart", help="Push daypart to Liquidsoap (loop, interval in config file)", default=False, action="store_true", dest="push_daypart")
parser.add_option("-b", "--cleanup", help="Cleanup", default=False, action="store_true", dest="cleanup") parser.add_option("-b", "--cleanup", help="Cleanup", default=False, action="store_true", dest="cleanup")
#parser.add_option("-j", "--jingles", help="Get new jingles from server, comma separated list if jingle-id's as argument", metavar="LIST")
parser.add_option("-c", "--check", help="Check the cached schedule and exit", default=False, action="store_true", dest="check") parser.add_option("-c", "--check", help="Check the cached schedule and exit", default=False, action="store_true", dest="check")
# parse options # parse options
@ -95,45 +72,45 @@ try:
except Exception, e: except Exception, e:
print 'Error loading config file: ', e print 'Error loading config file: ', e
sys.exit() sys.exit()
class Global: class Global:
def __init__(self): def __init__(self):
print print
def selfcheck(self): def selfcheck(self):
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
if (not self.api_client.is_server_compatible()): if (not self.api_client.is_server_compatible()):
sys.exit() sys.exit()
class Playout: class Playout:
def __init__(self): def __init__(self):
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.cue_file = CueFile() self.cue_file = CueFile()
self.silence_file = config["file_dir"] + 'basic/silence.mp3' self.set_export_source('scheduler')
""" """
push_ahead2 MUST be < push_ahead. The difference in these two values push_ahead2 MUST be < push_ahead. The difference in these two values
gives the number of seconds of the window of opportunity for the scheduler gives the number of seconds of the window of opportunity for the scheduler
to catch when a playlist is to be played. to catch when a playlist is to be played.
""" """
self.push_ahead = 15 self.push_ahead = 15
self.push_ahead2 = 10 self.push_ahead2 = 10
self.range_updated = False self.range_updated = False
def test_api(self): def test_api(self):
self.api_client.test() self.api_client.test()
def set_export_source(self, export_source): def set_export_source(self, export_source):
self.export_source = export_source self.export_source = export_source
self.cache_dir = config["cache_dir"] + self.export_source + '/' self.cache_dir = config["cache_dir"] + self.export_source + '/'
self.schedule_file = self.cache_dir + 'schedule.pickle' self.schedule_file = self.cache_dir + 'schedule.pickle'
self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle" self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle"
""" """
Fetching part of pypo Fetching part of pypo
- Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for")
@ -141,38 +118,22 @@ class Playout:
- playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied
to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
- runs the cleanup routine, to get rid of unused cashed files - runs the cleanup routine, to get rid of unused cashed files
""" """
def fetch(self, export_source): def fetch(self, export_source):
""" """
wrapper script for fetching the whole schedule (in json) wrapper script for fetching the whole schedule (in json)
""" """
logger = logging.getLogger() logger = logging.getLogger()
self.set_export_source(export_source)
try: os.mkdir(self.cache_dir) try: os.mkdir(self.cache_dir)
except Exception, e: pass except Exception, e: pass
"""
Trigger daypart range-generation. (Only if daypart-instance)
"""
if self.export_source == 'daypart':
print '******************************'
print '*** TRIGGER DAYPART UPDATE ***'
print '******************************'
try:
self.generate_range_dp()
except Exception, e:
logger.error("%s", e)
# get schedule # get schedule
try: try:
while self.get_schedule() != 1: while self.get_schedule() != 1:
logger.warning("failed to read from export url") logger.warning("failed to read from export url")
time.sleep(1) time.sleep(1)
except Exception, e: logger.error("%s", e) except Exception, e: logger.error("%s", e)
# prepare the playlists # prepare the playlists
@ -186,38 +147,11 @@ class Playout:
# cleanup # cleanup
try: self.cleanup(self.export_source) try: self.cleanup(self.export_source)
except Exception, e: logger.error("%s", e) except Exception, e: logger.error("%s", e)
#logger.info("fetch loop completed")
"""
This is actually a bit ugly (again feel free to improve!!)
The generate_range_dp function should be called once a day,
we do this at 18h. The hour before the state is set back to 'False'
"""
def generate_range_dp(self):
logger = logging.getLogger()
logger.debug("trying to trigger daypart update")
tnow = time.localtime(time.time())
if (tnow[3] == 16):
self.range_updated = False
if (tnow[3] == 17 and self.range_updated == False):
try:
print self.api_client.generate_range_dp()
logger.info("daypart updated")
self.range_updated = True
except Exception, e:
print e
def get_schedule(self): def get_schedule(self):
logger = logging.getLogger() logger = logging.getLogger()
status, response = self.api_client.get_schedule() status, response = self.api_client.get_schedule()
if status == 1: if status == 1:
logger.info("dump serialized schedule to %s", self.schedule_file) logger.info("dump serialized schedule to %s", self.schedule_file)
schedule = response['playlists'] schedule = response['playlists']
@ -225,23 +159,23 @@ class Playout:
schedule_file = open(self.schedule_file, "w") schedule_file = open(self.schedule_file, "w")
pickle.dump(schedule, schedule_file) pickle.dump(schedule, schedule_file)
schedule_file.close() schedule_file.close()
except Exception, e: except Exception, e:
logger.critical("Exception %s", e) logger.critical("Exception %s", e)
status = 0 status = 0
return status return status
""" """
Alternative version of playout preparation. Every playlist entry is Alternative version of playout preparation. Every playlist entry is
pre-cued if neccessary (cue_in/cue_out != 0) and stored in the pre-cued if neccessary (cue_in/cue_out != 0) and stored in the
playlist folder. playlist folder.
file is eg 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3 file is eg 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3
""" """
def prepare_playlists_cue(self): def prepare_playlists_cue(self):
logger = logging.getLogger() logger = logging.getLogger()
# Load schedule from disk # Load schedule from disk
schedule = self.load_schedule() schedule = self.load_schedule()
@ -251,20 +185,18 @@ class Playout:
return return
scheduleKeys = sorted(schedule.iterkeys()) scheduleKeys = sorted(schedule.iterkeys())
try: try:
for pkey in scheduleKeys: for pkey in scheduleKeys:
logger.info("found playlist at %s", pkey) logger.info("found playlist at %s", pkey)
playlist = schedule[pkey] playlist = schedule[pkey]
# create playlist directory # create playlist directory
try: try:
os.mkdir(self.cache_dir + str(pkey)) os.mkdir(self.cache_dir + str(pkey))
except Exception, e: except Exception, e:
pass pass
ls_playlist = '';
logger.debug('*****************************************') logger.debug('*****************************************')
logger.debug('pkey: ' + str(pkey)) logger.debug('pkey: ' + str(pkey))
logger.debug('cached at : ' + self.cache_dir + str(pkey)) logger.debug('cached at : ' + self.cache_dir + str(pkey))
@ -275,88 +207,33 @@ class Playout:
logger.debug('source id: ' + str(playlist['x_ident'])) logger.debug('source id: ' + str(playlist['x_ident']))
logger.debug('*****************************************') logger.debug('*****************************************')
# Creating an API call like the next two lines would make this more flexible
# mediaType = api_client.get_media_type(playlist)
# if (mediaType == PYPO_MEDIA_SKIP):
if int(playlist['played']) == 1: if int(playlist['played']) == 1:
logger.info("playlist %s already played / sent to liquidsoap, so will ignore it", pkey) logger.info("playlist %s already played / sent to liquidsoap, so will ignore it", pkey)
elif int(playlist['subtype']) == 5:
ls_playlist = self.handle_live_session(playlist, pkey, ls_playlist)
elif int(playlist['subtype']) == 6:
ls_playlist = self.handle_live_cast(playlist, pkey, ls_playlist)
elif int(playlist['subtype']) > 0 and int(playlist['subtype']) < 5: elif int(playlist['subtype']) > 0 and int(playlist['subtype']) < 5:
ls_playlist = self.handle_media_file(playlist, pkey, ls_playlist) ls_playlist = self.handle_media_file(playlist, pkey)
# write playlist file # write playlist file
plfile = open(self.cache_dir + str(pkey) + '/list.lsp', "w") plfile = open(self.cache_dir + str(pkey) + '/list.lsp', "w")
plfile.write(ls_playlist) plfile.write(json.dumps(ls_playlist))
plfile.close() plfile.close()
logger.info('ls playlist file written to %s', self.cache_dir + str(pkey) + '/list.lsp') logger.info('ls playlist file written to %s', self.cache_dir + str(pkey) + '/list.lsp')
except Exception, e: except Exception, e:
logger.info("%s", e) logger.info("%s", e)
def handle_live_session(self, playlist, pkey, ls_playlist): def handle_media_file(self, playlist, pkey):
"""
This is a live session, so silence is scheduled.
Maybe not the most elegant solution :)
It adds 20 times 30min silence to the playlist
Silence file has to be in <file_dir>/basic/silence.mp3
"""
logger = logging.getLogger()
logger.debug("found %s seconds of live/studio session at %s", pkey, playlist['duration'])
if os.path.isfile(self.silence_file):
logger.debug('file stored at: %s' + self.silence_file)
for i in range (0, 19):
ls_playlist += self.silence_file + "\n"
else:
print 'Could not find silence file!'
print 'File is expected to be at: ' + self.silence_file
logger.critical('File is expected to be at: %s', self.silence_file)
sys.exit()
return ls_playlist
def handle_live_cast(self, playlist, pkey, ls_playlist):
"""
This is a live-cast session
Create a silence list. (could eg also be a fallback list..)
"""
logger = logging.getLogger()
logger.debug("found %s seconds of live-cast session at %s", pkey, playlist['duration'])
if os.path.isfile(self.silence_file):
logger.debug('file stored at: %s' + self.silence_file)
for i in range (0, 19):
ls_playlist += self.silence_file + "\n"
else:
print 'Could not find silence file!'
print 'File is expected to be at: ' + self.silence_file
logger.critical('File is expected to be at: %s', self.silence_file)
sys.exit()
return ls_playlist
def handle_media_file(self, playlist, pkey, ls_playlist):
""" """
This handles both remote and local files. This handles both remote and local files.
Returns an updated ls_playlist string. Returns an updated ls_playlist string.
""" """
ls_playlist = []
logger = logging.getLogger() logger = logging.getLogger()
for media in playlist['medias']: for media in playlist['medias']:
logger.debug("Processing track %s", media['uri']) logger.debug("Processing track %s", media['uri'])
fileExt = os.path.splitext(media['uri'])[1] fileExt = os.path.splitext(media['uri'])[1]
try: try:
if str(media['cue_in']) == '0' and str(media['cue_out']) == '0': if str(media['cue_in']) == '0' and str(media['cue_out']) == '0':
logger.debug('No cue in/out detected for this file') logger.debug('No cue in/out detected for this file')
@ -367,21 +244,21 @@ class Playout:
dst = "%s%s/%s_cue_%s-%s%s" % \ dst = "%s%s/%s_cue_%s-%s%s" % \
(self.cache_dir, str(pkey), str(media['id']), str(float(media['cue_in']) / 1000), str(float(media['cue_out']) / 1000), str(fileExt)) (self.cache_dir, str(pkey), str(media['id']), str(float(media['cue_in']) / 1000), str(float(media['cue_out']) / 1000), str(fileExt))
do_cue = True do_cue = True
# check if it is a remote file, if yes download # check if it is a remote file, if yes download
if media['uri'][0:4] == 'http': if media['uri'][0:4] == 'http':
self.handle_remote_file(media, dst, do_cue) self.handle_remote_file(media, dst, do_cue)
else: else:
# Assume local file logger.debug("invalid media uri: %s", media['uri'])
self.handle_local_file(media, dst, do_cue)
if True == os.access(dst, os.R_OK): if True == os.access(dst, os.R_OK):
# check filesize (avoid zero-byte files) # check filesize (avoid zero-byte files)
try: fsize = os.path.getsize(dst) try: fsize = os.path.getsize(dst)
except Exception, e: except Exception, e:
logger.error("%s", e) logger.error("%s", e)
fsize = 0 fsize = 0
if fsize > 0: if fsize > 0:
pl_entry = \ pl_entry = \
'annotate:export_source="%s",media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",schedule_table_id="%s":%s'\ 'annotate:export_source="%s",media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",schedule_table_id="%s":%s'\
@ -394,128 +271,69 @@ class Playout:
Tracks are only added to the playlist if they are accessible Tracks are only added to the playlist if they are accessible
on the file system and larger than 0 bytes. on the file system and larger than 0 bytes.
So this can lead to playlists shorter than expectet. So this can lead to playlists shorter than expectet.
(there is a hardware silence detector for this cases...) (there is a hardware silence detector for this cases...)
""" """
ls_playlist += pl_entry + "\n" entry = dict()
entry['type'] = 'file'
entry['annotate'] = pl_entry
ls_playlist.append(entry)
logger.debug("everything ok, adding %s to playlist", pl_entry) logger.debug("everything ok, adding %s to playlist", pl_entry)
else: else:
print 'zero-file: ' + dst + ' from ' + media['uri'] print 'zero-file: ' + dst + ' from ' + media['uri']
logger.warning("zero-size file - skiping %s. will not add it to playlist", dst) logger.warning("zero-size file - skipping %s. will not add it to playlist", dst)
else: else:
logger.warning("something went wrong. file %s not available. will not add it to playlist", dst) logger.warning("something went wrong. file %s not available. will not add it to playlist", dst)
except Exception, e: logger.info("%s", e) except Exception, e: logger.info("%s", e)
return ls_playlist return ls_playlist
def handle_remote_file(self, media, dst, do_cue): def handle_remote_file(self, media, dst, do_cue):
logger = logging.getLogger() logger = logging.getLogger()
if do_cue == False: if do_cue == False:
if os.path.isfile(dst): if os.path.isfile(dst):
logger.debug("file already in cache: %s", dst) logger.debug("file already in cache: %s", dst)
else: else:
logger.debug("try to download %s", media['uri']) logger.debug("try to download %s", media['uri'])
self.api_client.get_media(media['uri'], dst) self.api_client.get_media(media['uri'], dst)
else: else:
if os.path.isfile(dst): if os.path.isfile(dst):
logger.debug("file already in cache: %s", dst) logger.debug("file already in cache: %s", dst)
else: else:
logger.debug("try to download and cue %s", media['uri']) logger.debug("try to download and cue %s", media['uri'])
fileExt = os.path.splitext(media['uri'])[1] fileExt = os.path.splitext(media['uri'])[1]
dst_tmp = config["tmp_dir"] + "".join([random.choice(string.letters) for i in xrange(10)]) + fileExt dst_tmp = config["tmp_dir"] + "".join([random.choice(string.letters) for i in xrange(10)]) + fileExt
self.api_client.get_media(media['uri'], dst_tmp) self.api_client.get_media(media['uri'], dst_tmp)
# cue # cue
logger.debug("STARTING CUE") logger.debug("STARTING CUE")
debugDst = self.cue_file.cue(dst_tmp, dst, float(media['cue_in']) / 1000, float(media['cue_out']) / 1000) debugDst = self.cue_file.cue(dst_tmp, dst, float(media['cue_in']) / 1000, float(media['cue_out']) / 1000)
logger.debug(debugDst) logger.debug(debugDst)
logger.debug("END CUE") logger.debug("END CUE")
if True == os.access(dst, os.R_OK): if True == os.access(dst, os.R_OK):
try: fsize = os.path.getsize(dst) try: fsize = os.path.getsize(dst)
except Exception, e: except Exception, e:
logger.error("%s", e) logger.error("%s", e)
fsize = 0 fsize = 0
if fsize > 0: if fsize > 0:
logger.debug('try to remove temporary file: %s' + dst_tmp) logger.debug('try to remove temporary file: %s' + dst_tmp)
try: os.remove(dst_tmp) try: os.remove(dst_tmp)
except Exception, e: except Exception, e:
logger.error("%s", e) logger.error("%s", e)
else: else:
logger.warning('something went wrong cueing: %s - using uncued file' + dst) logger.warning('something went wrong cueing: %s - using uncued file' + dst)
try: os.rename(dst_tmp, dst) try: os.rename(dst_tmp, dst)
except Exception, e:
logger.error("%s", e)
def handle_local_file(self, media, dst, do_cue):
"""
Handle files on NAS. Pre-cueing not implemented at the moment.
(not needed by openbroadcast, feel free to add this)
Here's an implementation for locally stored files.
Works the same as with remote files, just replaced API-download with
file copy.
"""
logger = logging.getLogger()
if do_cue == False:
if os.path.isfile(dst):
logger.debug("file already in cache: %s", dst)
else:
logger.debug("try to copy file to cache %s", media['uri'])
try:
shutil.copy(media['uri'], dst)
logger.info("copied %s to %s", media['uri'], dst)
except Exception, e:
logger.error("%s", e)
else:
if os.path.isfile(dst):
logger.debug("file already in cache: %s", dst)
else:
logger.debug("try to copy and cue %s", media['uri'])
print '***'
dst_tmp = config["tmp_dir"] + "".join([random.choice(string.letters) for i in xrange(10)])
print dst_tmp
print '***'
try:
shutil.copy(media['uri'], dst_tmp)
logger.info("copied %s to %s", media['uri'], dst_tmp)
except Exception, e:
logger.error("%s", e)
# cue
print "STARTING CUE"
print self.cue_file.cue(dst_tmp, dst, float(media['cue_in']) / 1000, float(media['cue_out']) / 1000)
print "END CUE"
if True == os.access(dst, os.R_OK):
try: fsize = os.path.getsize(dst)
except Exception, e: except Exception, e:
logger.error("%s", e) logger.error("%s", e)
fsize = 0
if fsize > 0:
logger.debug('try to remove temporary file: %s' + dst_tmp)
try: os.remove(dst_tmp)
except Exception, e:
logger.error("%s", e)
else:
logger.warning('something went wrong cueing: %s - using uncued file' + dst)
try: os.rename(dst_tmp, dst)
except Exception, e:
logger.error("%s", e)
def cleanup(self, export_source): def cleanup(self, export_source):
""" """
@ -524,7 +342,6 @@ class Playout:
""" """
logger = logging.getLogger() logger = logging.getLogger()
self.set_export_source(export_source)
offset = 3600 * int(config["cache_for"]) offset = 3600 * int(config["cache_for"])
now = time.time() now = time.time()
@ -532,11 +349,6 @@ class Playout:
for dir in d: for dir in d:
try: try:
timestamp = time.mktime(time.strptime(dir, "%Y-%m-%d-%H-%M-%S")) timestamp = time.mktime(time.strptime(dir, "%Y-%m-%d-%H-%M-%S"))
#logger.debug('dir : %s', (dir))
#logger.debug('age : %s', (round((now - timestamp),1)))
#logger.debug('delete in : %ss', (round((offset - (now - timestamp)),1)))
#logger.debug('Folder "Age": %s - %s', round((((now - offset) - timestamp) / 60), 2), os.path.join(r, dir))
if (now - timestamp) > offset: if (now - timestamp) > offset:
try: try:
logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp) logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp)
@ -548,48 +360,39 @@ class Playout:
logger.info('sucessfully removed %s', os.path.join(r, dir)) logger.info('sucessfully removed %s', os.path.join(r, dir))
except Exception, e: except Exception, e:
print e print e
logger.error("%s", e) logger.error("%s", e)
""" """
The Push Loop - the push loop periodically (minimal 1/2 of the playlist-grid) The Push Loop - the push loop periodically (minimal 1/2 of the playlist-grid)
checks if there is a playlist that should be scheduled at the current time. checks if there is a playlist that should be scheduled at the current time.
If yes, the temporary liquidsoap playlist gets replaced with the corresponding one, If yes, the temporary liquidsoap playlist gets replaced with the corresponding one,
then liquidsoap is asked (via telnet) to reload and immediately play it. then liquidsoap is asked (via telnet) to reload and immediately play it.
""" """
def push(self, export_source): def push(self, export_source):
logger = logging.getLogger() logger = logging.getLogger()
self.set_export_source(export_source)
#try:
# dummy = self.schedule
# logger.debug('schedule already loaded')
#except Exception, e:
# self.schedule = self.push_init(self.export_source)
self.schedule = self.load_schedule() self.schedule = self.load_schedule()
playedItems = self.load_schedule_tracker() playedItems = self.load_schedule_tracker()
tcoming = time.localtime(time.time() + self.push_ahead) tcoming = time.localtime(time.time() + self.push_ahead)
tcoming2 = time.localtime(time.time() + self.push_ahead2) tcoming2 = time.localtime(time.time() + self.push_ahead2)
tnow = time.localtime(time.time()) tnow = time.localtime(time.time())
str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5]) str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5])
str_tcoming2_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming2[0], tcoming2[1], tcoming2[2], tcoming2[3], tcoming2[4], tcoming2[5]) str_tcoming2_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming2[0], tcoming2[1], tcoming2[2], tcoming2[3], tcoming2[4], tcoming2[5])
if self.schedule == None: if self.schedule == None:
logger.warn('Unable to loop schedule - maybe write in progress?') logger.warn('Unable to loop schedule - maybe write in progress?')
logger.warn('Will try again in next loop.') logger.warn('Will try again in next loop.')
else: else:
for pkey in self.schedule: for pkey in self.schedule:
playedFlag = (pkey in playedItems) and playedItems[pkey].get("played", 0) playedFlag = (pkey in playedItems) and playedItems[pkey].get("played", 0)
if pkey[0:19] == str_tcoming_s or (pkey[0:19] < str_tcoming_s and pkey[0:19] > str_tcoming2_s and not playedFlag): if pkey[0:19] == str_tcoming_s or (pkey[0:19] < str_tcoming_s and pkey[0:19] > str_tcoming2_s and not playedFlag):
logger.debug('Preparing to push playlist scheduled at: %s', pkey) logger.debug('Preparing to push playlist scheduled at: %s', pkey)
playlist = self.schedule[pkey] playlist = self.schedule[pkey]
ptype = playlist['subtype'] ptype = playlist['subtype']
# We have a match, replace the current playlist and # We have a match, replace the current playlist and
@ -598,41 +401,41 @@ class Playout:
logger.debug("Pushed to liquidsoap, updating 'played' status.") logger.debug("Pushed to liquidsoap, updating 'played' status.")
# Marked the current playlist as 'played' in the schedule tracker # Marked the current playlist as 'played' in the schedule tracker
# so it is not called again in the next push loop. # so it is not called again in the next push loop.
# Write changes back to tracker file. # Write changes back to tracker file.
playedItems[pkey] = playlist playedItems[pkey] = playlist
playedItems[pkey]['played'] = 1 playedItems[pkey]['played'] = 1
schedule_tracker = open(self.schedule_tracker_file, "w") schedule_tracker = open(self.schedule_tracker_file, "w")
pickle.dump(playedItems, schedule_tracker) pickle.dump(playedItems, schedule_tracker)
schedule_tracker.close() schedule_tracker.close()
logger.debug("Wrote schedule to disk: "+str(playedItems)) logger.debug("Wrote schedule to disk: "+str(json.dumps(playedItems)))
# Call API to update schedule states # Call API to update schedule states
logger.debug("Doing callback to server to update 'played' status.") logger.debug("Doing callback to server to update 'played' status.")
self.api_client.notify_scheduled_item_start_playing(pkey, self.schedule) self.api_client.notify_scheduled_item_start_playing(pkey, self.schedule)
def load_schedule(self): def load_schedule(self):
logger = logging.getLogger() logger = logging.getLogger()
schedule = None schedule = None
# create the file if it doesnt exist # create the file if it doesnt exist
if (not os.path.exists(self.schedule_file)): if (not os.path.exists(self.schedule_file)):
logger.debug('creating file ' + self.schedule_file) logger.debug('creating file ' + self.schedule_file)
open(self.schedule_file, 'w').close() open(self.schedule_file, 'w').close()
else: else:
# load the schedule from cache # load the schedule from cache
#logger.debug('loading schedule file '+self.schedule_file) #logger.debug('loading schedule file '+self.schedule_file)
try: try:
schedule_file = open(self.schedule_file, "r") schedule_file = open(self.schedule_file, "r")
schedule = pickle.load(schedule_file) schedule = pickle.load(schedule_file)
schedule_file.close() schedule_file.close()
except Exception, e: except Exception, e:
logger.error('%s', e) logger.error('%s', e)
return schedule return schedule
def load_schedule_tracker(self): def load_schedule_tracker(self):
logger = logging.getLogger() logger = logging.getLogger()
playedItems = dict() playedItems = dict()
@ -642,138 +445,89 @@ class Playout:
logger.debug('creating file ' + self.schedule_tracker_file) logger.debug('creating file ' + self.schedule_tracker_file)
schedule_tracker = open(self.schedule_tracker_file, 'w') schedule_tracker = open(self.schedule_tracker_file, 'w')
pickle.dump(playedItems, schedule_tracker) pickle.dump(playedItems, schedule_tracker)
schedule_tracker.close() schedule_tracker.close()
else: else:
try: try:
#logger.debug('loading schedule tracker file '+ self.schedule_tracker_file)
schedule_tracker = open(self.schedule_tracker_file, "r") schedule_tracker = open(self.schedule_tracker_file, "r")
playedItems = pickle.load(schedule_tracker) playedItems = pickle.load(schedule_tracker)
schedule_tracker.close() schedule_tracker.close()
except Exception, e: except Exception, e:
logger.error('Unable to load schedule tracker file: %s', e) logger.error('Unable to load schedule tracker file: %s', e)
return playedItems return playedItems
def push_liquidsoap(self, pkey, schedule, ptype): def push_liquidsoap(self, pkey, schedule, ptype):
logger = logging.getLogger() logger = logging.getLogger()
src = self.cache_dir + str(pkey) + '/list.lsp' src = self.cache_dir + str(pkey) + '/list.lsp'
try: try:
if True == os.access(src, os.R_OK): if True == os.access(src, os.R_OK):
logger.debug('OK - Can read playlist file') logger.debug('OK - Can read playlist file')
pl_file = open(src, "r") pl_file = open(src, "r")
file_content = pl_file.read()
pl_file.close()
logger.debug('file content: %s' % (file_content))
playlist = json.loads(file_content)
#strptime returns struct_time in local time #strptime returns struct_time in local time
#mktime takes a time_struct and returns a floating point #mktime takes a time_struct and returns a floating point
#gmtime Convert a time expressed in seconds since the epoch to a struct_time in UTC #gmtime Convert a time expressed in seconds since the epoch to a struct_time in UTC
#mktime: expresses the time in local time, not UTC. It returns a floating point number, for compatibility with time(). #mktime: expresses the time in local time, not UTC. It returns a floating point number, for compatibility with time().
epoch_start = calendar.timegm(time.gmtime(time.mktime(time.strptime(pkey, '%Y-%m-%d-%H-%M-%S')))) epoch_start = calendar.timegm(time.gmtime(time.mktime(time.strptime(pkey, '%Y-%m-%d-%H-%M-%S'))))
#Return the time as a floating point number expressed in seconds since the epoch, in UTC. #Return the time as a floating point number expressed in seconds since the epoch, in UTC.
epoch_now = time.time() epoch_now = time.time()
logger.debug("Epoch start: " + str(epoch_start)) logger.debug("Epoch start: " + str(epoch_start))
logger.debug("Epoch now: " + str(epoch_now)) logger.debug("Epoch now: " + str(epoch_now))
sleep_time = epoch_start - epoch_now; sleep_time = epoch_start - epoch_now;
if sleep_time < 0: if sleep_time < 0:
sleep_time = 0 sleep_time = 0
logger.debug('sleeping for %s s' % (sleep_time)) logger.debug('sleeping for %s s' % (sleep_time))
time.sleep(sleep_time) time.sleep(sleep_time)
tn = telnetlib.Telnet(LS_HOST, 1234) tn = telnetlib.Telnet(LS_HOST, 1234)
# Get any extra information for liquidsoap (which will be sent back to us)
liquidsoap_data = self.api_client.get_liquidsoap_data(pkey, schedule)
logger.debug("Sending additional telnet data to liquidsoap: "+str(liquidsoap_data["schedule_id"]))
#skip the currently playing song if any. #skip the currently playing song if any.
logger.debug("source.skip\n") logger.debug("source.skip\n")
tn.write("source.skip\n") tn.write("source.skip\n")
# Get any extra information for liquidsoap (which will be sent back to us)
liquidsoap_data = self.api_client.get_liquidsoap_data(pkey, schedule)
#Sending schedule table row id string. #Sending schedule table row id string.
logger.debug("vars.pypo_data %s\n"%(str(liquidsoap_data["schedule_id"]))) logger.debug("vars.pypo_data %s\n"%(str(liquidsoap_data["schedule_id"])))
tn.write("vars.pypo_data %s\n"%(str(liquidsoap_data["schedule_id"]))) tn.write("vars.pypo_data %s\n"%(str(liquidsoap_data["schedule_id"])))
for line in pl_file.readlines(): for item in playlist:
line = line.strip() annotate = str(item['annotate'])
logger.debug(line) logger.debug(annotate)
tn.write('queue.push %s' % (line)) tn.write('queue.push %s' % (annotate))
tn.write("\n") tn.write("\n")
tn.write("exit\n") tn.write("exit\n")
logger.debug(tn.read_all()) logger.debug(tn.read_all())
"""
tn = telnetlib.Telnet(LS_HOST, 1234)
# Get any extra information for liquidsoap (which will be sent back to us)
liquidsoap_data = self.api_client.get_liquidsoap_data(pkey, schedule)
logger.debug("Sending additional data to liquidsoap: "+liquidsoap_data)
#Sending JSON string. Example: {"schedule_id":"13"}
tn.write("vars.pypo_data "+liquidsoap_data+"\n")
tn.write(self.export_source + '.flip')
tn.write("\n")
tn.write("exit\n")
tn.read_all()
"""
status = 1 status = 1
except Exception, e: except Exception, e:
logger.error('%s', e) logger.error('%s', e)
status = 0 status = 0
return status return status
"""
Updates the jingles. Give comma separated list of jingle tracks.
NOTE: commented out because it needs to be converted to use the API client. - Paul
"""
#def update_jingles(self, options):
# print 'jingles'
#
# jingle_list = string.split(options, ',')
# print jingle_list
# for media_id in jingle_list:
# # api path maybe should not be hard-coded
# src = API_BASE + 'api/pypo/get_media/' + str(media_id)
# print src
# # include the hourly jungles for the moment
# dst = "%s%s/%s.mp3" % (config["file_dir"], 'jingles/hourly', str(media_id))
# print dst
#
# try:
# print '** urllib auth with: ',
# print self.api_auth
# opener = urllib.URLopener()
# opener.retrieve (src, dst, False, self.api_auth)
# logger.info("downloaded %s to %s", src, dst)
# except Exception, e:
# print e
# logger.error("%s", e)
def check_schedule(self, export_source): def check_schedule(self, export_source):
logger = logging.getLogger() logger = logging.getLogger()
self.set_export_source(export_source)
try: try:
schedule_file = open(self.schedule_file, "r") schedule_file = open(self.schedule_file, "r")
schedule = pickle.load(schedule_file) schedule = pickle.load(schedule_file)
schedule_file.close() schedule_file.close()
except Exception, e: except Exception, e:
logger.error("%s", e) logger.error("%s", e)
schedule = None schedule = None
@ -789,112 +543,68 @@ class Playout:
print 'duration: ' + str(playlist['duration']) print 'duration: ' + str(playlist['duration'])
print 'source id: ' + str(playlist['x_ident']) print 'source id: ' + str(playlist['x_ident'])
print '-----------------------------------------' print '-----------------------------------------'
for media in playlist['medias']: for media in playlist['medias']:
print media print media
print print
if __name__ == '__main__': if __name__ == '__main__':
print
print '###########################################' print '###########################################'
print '# *** pypo *** #' print '# *** pypo *** #'
print '# Liquidsoap + External Scheduler #' print '# Liquidsoap + External Scheduler #'
print '# Playout System #' print '# Playout System #'
print '###########################################' print '###########################################'
print
# initialize # initialize
g = Global() g = Global()
g.selfcheck() g.selfcheck()
po = Playout() po = Playout()
while True:
logger = logging.getLogger()
loops = 0
run = True if options.test:
while run == True: po.test_api()
logger = logging.getLogger() sys.exit()
loops = 0 while options.fetch_scheduler:
try: po.fetch('scheduler')
if options.test: except Exception, e:
po.test_api() print e
sys.exit()
if (loops%2 == 0):
logger.info("heartbeat\n\n\n\n")
loops += 1
time.sleep(POLL_INTERVAL)
while options.push_scheduler:
po.push('scheduler')
try: po.push('scheduler')
except Exception, e:
print 'PUSH ERROR!! WILL EXIT NOW:('
print e
sys.exit()
if (loops%60 == 0):
logger.info("heartbeat")
loops += 1
time.sleep(PUSH_INTERVAL)
while options.check:
try: po.check_schedule()
except Exception, e:
print e
sys.exit()
while options.cleanup:
try: po.cleanup('scheduler')
except Exception, e:
print e
sys.exit()
sys.exit() sys.exit()
while options.fetch_scheduler:
try: po.fetch('scheduler')
except Exception, e:
print e
sys.exit()
#print 'ZZzZzZzzzzZZZz.... sleeping for ' + str(POLL_INTERVAL) + ' seconds'
#logger.info('fetch loop %s - ZZzZzZzzzzZZZz.... sleeping for %s seconds', loops, POLL_INTERVAL)
if (loops%2 == 0):
logger.info("heartbeat\n\n\n\n")
loops += 1
time.sleep(POLL_INTERVAL)
while options.fetch_daypart:
try: po.fetch('daypart')
except Exception, e:
print e
sys.exit()
#print 'ZZzZzZzzzzZZZz.... sleeping for ' + str(POLL_INTERVAL) + ' seconds'
#logger.info('fetch loop %s - ZZzZzZzzzzZZZz.... sleeping for %s seconds', loops, POLL_INTERVAL)
loops += 1
time.sleep(POLL_INTERVAL)
while options.push_scheduler:
po.push('scheduler')
try: po.push('scheduler')
except Exception, e:
print 'PUSH ERROR!! WILL EXIT NOW:('
print e
sys.exit()
if (loops%20 == 0):
logger.info("heartbeat")
#logger.info('push loop %s - ZZzZzZzzzzZZZz.... sleeping for %s seconds', loops, PUSH_INTERVAL)
loops += 1
time.sleep(PUSH_INTERVAL)
while options.push_daypart:
po.push('daypart')
try: po.push('daypart')
except Exception, e:
print 'PUSH ERROR!! WILL EXIT NOW:('
print e
sys.exit()
#logger.info('push loop %s - ZZzZzZzzzzZZZz.... sleeping for %s seconds', loops, PUSH_INTERVAL)
loops += 1
time.sleep(PUSH_INTERVAL)
#while options.jingles:
# try: po.update_jingles(options.jingles)
# except Exception, e:
# print e
# sys.exit()
while options.check:
try: po.check_schedule()
except Exception, e:
print e
sys.exit()
while options.cleanup:
try: po.cleanup('scheduler')
except Exception, e:
print e
sys.exit()
sys.exit()

View File

@ -5,7 +5,7 @@ set("log.file.path", log_file)
set("log.stdout", true) set("log.stdout", true)
set("server.telnet", true) set("server.telnet", true)
queue = request.queue(id="queue") queue = request.queue(id="queue", conservative=true)
queue = audio_to_stereo(queue) queue = audio_to_stereo(queue)
pypo_data = ref '0' pypo_data = ref '0'
@ -23,12 +23,23 @@ def crossfade(s)
cross(fader,s) cross(fader,s)
end end
# Define a transition that fades out the
# old source, adds a single, and then
# plays the new source
def to_live(old,new) =
# Fade out old source
old = fade.final(old)
# Compose this in sequence with
# the new source
sequence([old,new])
end
# Add a skip function to a source # Add a skip function to a source
# when it does not have one # when it does not have one
# by default # by default
def add_skip_command(s) = def add_skip_command(s)
# A command to skip # A command to skip
def skip(_) = def skip(_)
source.skip(s) source.skip(s)
"Done!" "Done!"
end end
@ -42,7 +53,7 @@ end
server.register(namespace="vars", "pypo_data", fun (s) -> begin pypo_data := s "Done" end) server.register(namespace="vars", "pypo_data", fun (s) -> begin pypo_data := s "Done" end)
server.register(namespace="vars", "web_stream_enabled", fun (s) -> begin web_stream_enabled := (s == "true") string_of(!web_stream_enabled) end) server.register(namespace="vars", "web_stream_enabled", fun (s) -> begin web_stream_enabled := (s == "true") string_of(!web_stream_enabled) end)
default = single("/opt/pypo/files/basic/silence.mp3") default = single(conservative=true, "/opt/pypo/files/basic/silence.mp3")
default = rewrite_metadata([("artist","Airtime"), ("title", "offline")],default) default = rewrite_metadata([("artist","Airtime"), ("title", "offline")],default)
s = fallback(track_sensitive=false, [queue, default]) s = fallback(track_sensitive=false, [queue, default])
@ -53,9 +64,14 @@ s = crossfade(s)
# Attach a skip command to the source s: # Attach a skip command to the source s:
add_skip_command(s) add_skip_command(s)
web_stream_source = input.http(id="web_stream", autostart = false, "") web_stream_source = input.http(id="web_stream", autostart = false, buffer=0.5, max=20., "")
#once the stream is started, give it a sink so that liquidsoap doesn't
#create buffer overflow warnings in the log file.
output.dummy(fallible=true, web_stream_source)
s = switch(track_sensitive = false, s = switch(track_sensitive = false,
transitions=[to_live,to_live],
[ [
({ !web_stream_enabled }, web_stream_source), ({ !web_stream_enabled }, web_stream_source),
({ true }, s) ({ true }, s)