CC-1665: Scheduled stream rebroadcasting and recording
-cancelling a webstream works
This commit is contained in:
parent
9cc152c0ae
commit
fcce4bf64c
|
@ -49,6 +49,7 @@ class PypoPush(Thread):
|
||||||
|
|
||||||
self.pushed_objects = {}
|
self.pushed_objects = {}
|
||||||
self.logger = logging.getLogger('push')
|
self.logger = logging.getLogger('push')
|
||||||
|
self.current_stream_info = None
|
||||||
|
|
||||||
def main(self):
|
def main(self):
|
||||||
loops = 0
|
loops = 0
|
||||||
|
@ -73,38 +74,54 @@ class PypoPush(Thread):
|
||||||
|
|
||||||
tnow = datetime.utcnow()
|
tnow = datetime.utcnow()
|
||||||
current_event_chain, original_chain = self.get_current_chain(chains, tnow)
|
current_event_chain, original_chain = self.get_current_chain(chains, tnow)
|
||||||
if len(current_event_chain) > 0 and len(liquidsoap_queue_approx) == 0:
|
|
||||||
#Something is scheduled but Liquidsoap is not playing anything!
|
if len(current_event_chain) > 0:
|
||||||
#Need to schedule it immediately..this might happen if Liquidsoap crashed.
|
|
||||||
try:
|
try:
|
||||||
chains.remove(original_chain)
|
chains.remove(original_chain)
|
||||||
except ValueError, e:
|
except ValueError, e:
|
||||||
self.logger.error(str(e))
|
self.logger.error(str(e))
|
||||||
|
if len(liquidsoap_queue_approx) == 0 and current_event_chain[0]['type'] == 'file':
|
||||||
|
#Something is scheduled but Liquidsoap is not playing anything!
|
||||||
|
#Need to schedule it immediately..this might happen if Liquidsoap crashed.
|
||||||
|
self.modify_cue_point(current_event_chain[0])
|
||||||
|
next_media_item_chain = current_event_chain
|
||||||
|
time_until_next_play = 0
|
||||||
|
#sleep for 0.2 seconds to give pypo-file time to copy.
|
||||||
|
time.sleep(0.2)
|
||||||
|
continue
|
||||||
|
if not self.current_stream_info and current_event_chain[0]['type'] == 'stream':
|
||||||
|
#a stream is schedule but Liquidsoap is not playing it. Need to start it.
|
||||||
|
next_media_item_chain = current_event_chain
|
||||||
|
time_until_next_play = 0
|
||||||
|
continue
|
||||||
|
|
||||||
self.modify_cue_point(current_event_chain[0])
|
#At this point we know that Liquidsoap is playing something, and that something
|
||||||
next_media_item_chain = current_event_chain
|
#is scheduled. We need to verify whether the schedule we just received matches
|
||||||
time_until_next_play = 0
|
#what Liquidsoap is playing, and if not, correct it.
|
||||||
#sleep for 0.2 seconds to give pypo-file time to copy.
|
media_chain = filter(lambda item: (item["type"] == "file"), current_event_chain)
|
||||||
time.sleep(0.2)
|
stream_chain = filter(lambda item: (item["type"] == "stream"), current_event_chain)
|
||||||
|
self.handle_new_schedule(media_schedule, liquidsoap_queue_approx, media_chain, stream_chain)
|
||||||
|
|
||||||
|
|
||||||
|
#At this point everything in the present has been taken care of and Liquidsoap
|
||||||
|
#is playing whatever is scheduled.
|
||||||
|
#Now we need to prepare ourselves for future scheduled events.
|
||||||
|
#
|
||||||
|
next_media_item_chain = self.get_next_schedule_chain(chains, tnow)
|
||||||
|
|
||||||
|
self.logger.debug("Next schedule chain: %s", next_media_item_chain)
|
||||||
|
if next_media_item_chain is not None:
|
||||||
|
try:
|
||||||
|
chains.remove(next_media_item_chain)
|
||||||
|
except ValueError, e:
|
||||||
|
self.logger.error(str(e))
|
||||||
|
|
||||||
|
chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
|
||||||
|
time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow())
|
||||||
|
self.logger.debug("Blocking %s seconds until show start", time_until_next_play)
|
||||||
else:
|
else:
|
||||||
media_chain = filter(lambda item: (item["type"] == "file"), current_event_chain)
|
self.logger.debug("Blocking indefinitely since no show scheduled")
|
||||||
self.handle_new_media_schedule(media_schedule, liquidsoap_queue_approx, media_chain)
|
time_until_next_play = None
|
||||||
|
|
||||||
next_media_item_chain = self.get_next_schedule_chain(chains, tnow)
|
|
||||||
|
|
||||||
self.logger.debug("Next schedule chain: %s", next_media_item_chain)
|
|
||||||
if next_media_item_chain is not None:
|
|
||||||
try:
|
|
||||||
chains.remove(next_media_item_chain)
|
|
||||||
except ValueError, e:
|
|
||||||
self.logger.error(str(e))
|
|
||||||
|
|
||||||
chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
|
|
||||||
time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow())
|
|
||||||
self.logger.debug("Blocking %s seconds until show start", time_until_next_play)
|
|
||||||
else:
|
|
||||||
self.logger.debug("Blocking indefinitely since no show scheduled")
|
|
||||||
time_until_next_play = None
|
|
||||||
except Empty, e:
|
except Empty, e:
|
||||||
#We only get here when a new chain of tracks are ready to be played.
|
#We only get here when a new chain of tracks are ready to be played.
|
||||||
self.push_to_liquidsoap(next_media_item_chain)
|
self.push_to_liquidsoap(next_media_item_chain)
|
||||||
|
@ -166,7 +183,7 @@ class PypoPush(Thread):
|
||||||
|
|
||||||
return liquidsoap_queue_approx
|
return liquidsoap_queue_approx
|
||||||
|
|
||||||
def handle_new_media_schedule(self, media_schedule, liquidsoap_queue_approx, media_chain):
|
def handle_new_schedule(self, media_schedule, liquidsoap_queue_approx, media_chain, stream_chain):
|
||||||
"""
|
"""
|
||||||
This function's purpose is to gracefully handle situations where
|
This function's purpose is to gracefully handle situations where
|
||||||
Liquidsoap already has a track in its queue, but the schedule
|
Liquidsoap already has a track in its queue, but the schedule
|
||||||
|
@ -175,6 +192,18 @@ class PypoPush(Thread):
|
||||||
queue.
|
queue.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if self.current_stream_info:
|
||||||
|
if len(stream_chain) == 0:
|
||||||
|
#Liquidsoap is rebroadcasting a webstream, but there is no stream
|
||||||
|
#in the schedule. Let's stop streaming.
|
||||||
|
self.stop_web_stream(self.current_stream_info)
|
||||||
|
elif self.current_stream_info['uri'] != stream_chain[0]['uri']:
|
||||||
|
#Liquidsoap is rebroadcasting a webstream and a webstream is scheduled
|
||||||
|
#to play, but they are not the same!
|
||||||
|
self.stop_web_stream(self.current_stream_info)
|
||||||
|
self.start_web_stream(stream_chain[0])
|
||||||
|
|
||||||
|
|
||||||
problem_at_iteration = self.find_removed_items(media_schedule, liquidsoap_queue_approx)
|
problem_at_iteration = self.find_removed_items(media_schedule, liquidsoap_queue_approx)
|
||||||
|
|
||||||
if problem_at_iteration is not None:
|
if problem_at_iteration is not None:
|
||||||
|
@ -388,6 +417,7 @@ class PypoPush(Thread):
|
||||||
tn.write("exit\n")
|
tn.write("exit\n")
|
||||||
self.logger.debug(tn.read_all())
|
self.logger.debug(tn.read_all())
|
||||||
|
|
||||||
|
self.current_stream_info = media_item
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.error(str(e))
|
self.logger.error(str(e))
|
||||||
finally:
|
finally:
|
||||||
|
@ -406,6 +436,7 @@ class PypoPush(Thread):
|
||||||
tn.write("exit\n")
|
tn.write("exit\n")
|
||||||
self.logger.debug(tn.read_all())
|
self.logger.debug(tn.read_all())
|
||||||
|
|
||||||
|
self.current_stream_info = None
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.error(str(e))
|
self.logger.error(str(e))
|
||||||
finally:
|
finally:
|
||||||
|
|
Loading…
Reference in New Issue