CC-5227 - Sometimes Liquidsoap telnet is unresponsive which causes all Pypo threads to block

-initial commit
This commit is contained in:
Martin Konecny 2013-06-11 15:55:17 -04:00
parent 3391aa90c5
commit f164cf0ad4
5 changed files with 70 additions and 25 deletions

View File

@ -25,6 +25,7 @@ from recorder import Recorder
from listenerstat import ListenerStat from listenerstat import ListenerStat
from pypomessagehandler import PypoMessageHandler from pypomessagehandler import PypoMessageHandler
from pypoliquidsoap import PypoLiquidsoap from pypoliquidsoap import PypoLiquidsoap
from timeout import ls_timeout
from media.update.replaygainupdater import ReplayGainUpdater from media.update.replaygainupdater import ReplayGainUpdater
from media.update.silananalyzer import SilanAnalyzer from media.update.silananalyzer import SilanAnalyzer
@ -156,6 +157,7 @@ def keyboardInterruptHandler(signum, frame):
logger.info('\nKeyboard Interrupt\n') logger.info('\nKeyboard Interrupt\n')
sys.exit(0) sys.exit(0)
@ls_timeout
def liquidsoap_get_info(telnet_lock, host, port, logger): def liquidsoap_get_info(telnet_lock, host, port, logger):
logger.debug("Checking to see if Liquidsoap is running") logger.debug("Checking to see if Liquidsoap is running")
try: try:

View File

@ -19,6 +19,7 @@ from subprocess import Popen, PIPE
from api_clients import api_client from api_clients import api_client
from std_err_override import LogWriter from std_err_override import LogWriter
from timeout import ls_timeout
# configure logging # configure logging
@ -38,12 +39,9 @@ signal.signal(signal.SIGINT, keyboardInterruptHandler)
POLL_INTERVAL = 1800 POLL_INTERVAL = 1800
config_static = None
class PypoFetch(Thread): class PypoFetch(Thread):
def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config): def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config):
Thread.__init__(self) Thread.__init__(self)
global config_static
self.api_client = api_client.AirtimeApiClient() self.api_client = api_client.AirtimeApiClient()
self.fetch_queue = pypoFetch_q self.fetch_queue = pypoFetch_q
@ -51,7 +49,6 @@ class PypoFetch(Thread):
self.media_prepare_queue = media_q self.media_prepare_queue = media_q
self.last_update_schedule_timestamp = time.time() self.last_update_schedule_timestamp = time.time()
self.config = config self.config = config
config_static = config
self.listener_timeout = POLL_INTERVAL self.listener_timeout = POLL_INTERVAL
self.telnet_lock = telnet_lock self.telnet_lock = telnet_lock
@ -178,9 +175,17 @@ class PypoFetch(Thread):
self.pypo_liquidsoap.clear_queue_tracker() self.pypo_liquidsoap.clear_queue_tracker()
@ls_timeout
def restart_liquidsoap(self): def restart_liquidsoap(self):
try: try:
self.telnet_lock.acquire() """do not block - if we receive the lock then good - no other thread
will try communicating with Liquidsoap. If we don't receive, it may
mean some thread blocked and is still holding the lock. Restarting
Liquidsoap will cause that thread to release the lock as an Exception
will be thrown."""
self.telnet_lock.acquire(False)
self.logger.info("Restarting Liquidsoap") self.logger.info("Restarting Liquidsoap")
subprocess.call('/etc/init.d/airtime-liquidsoap restart', shell=True) subprocess.call('/etc/init.d/airtime-liquidsoap restart', shell=True)
@ -299,6 +304,7 @@ class PypoFetch(Thread):
self.logger.info("No change detected in setting...") self.logger.info("No change detected in setting...")
self.update_liquidsoap_connection_status() self.update_liquidsoap_connection_status()
@ls_timeout
def update_liquidsoap_connection_status(self): def update_liquidsoap_connection_status(self):
""" """
updates the status of Liquidsoap connection to the streaming server updates the status of Liquidsoap connection to the streaming server
@ -345,6 +351,7 @@ class PypoFetch(Thread):
self.api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time)) self.api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time))
@ls_timeout
def update_liquidsoap_stream_format(self, stream_format): def update_liquidsoap_stream_format(self, stream_format):
# Push stream metadata to liquidsoap # Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
@ -361,6 +368,7 @@ class PypoFetch(Thread):
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def update_liquidsoap_transition_fade(self, fade): def update_liquidsoap_transition_fade(self, fade):
# Push stream metadata to liquidsoap # Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
@ -377,6 +385,7 @@ class PypoFetch(Thread):
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def update_liquidsoap_station_name(self, station_name): def update_liquidsoap_station_name(self, station_name):
# Push stream metadata to liquidsoap # Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!

View File

@ -2,6 +2,7 @@
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
from configobj import ConfigObj
import sys import sys
import time import time
@ -21,7 +22,7 @@ from threading import Thread
from api_clients import api_client from api_clients import api_client
from std_err_override import LogWriter from std_err_override import LogWriter
from configobj import ConfigObj from timeout import ls_timeout
# configure logging # configure logging
@ -114,25 +115,6 @@ class PypoPush(Thread):
return present, future return present, future
def get_current_stream_id_from_liquidsoap(self):
response = "-1"
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
msg = 'dynamic_source.get_id\n'
tn.write(msg)
response = tn.read_until("\r\n").strip(" \r\n")
tn.write('exit\n')
tn.read_all()
except Exception, e:
self.logger.error("Error connecting to Liquidsoap: %s", e)
response = []
finally:
self.telnet_lock.release()
return response
#def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id): #def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id):
#correct = False #correct = False
#if media_item is None: #if media_item is None:
@ -162,6 +144,7 @@ class PypoPush(Thread):
return seconds return seconds
@ls_timeout
def stop_web_stream_all(self): def stop_web_stream_all(self):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()

View File

@ -1,4 +1,5 @@
import telnetlib import telnetlib
from timeout import ls_timeout
def create_liquidsoap_annotation(media): def create_liquidsoap_annotation(media):
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.
@ -38,6 +39,7 @@ class TelnetLiquidsoap:
else: else:
raise Exception("Unexpected list length returned: %s" % output) raise Exception("Unexpected list length returned: %s" % output)
@ls_timeout
def queue_clear_all(self): def queue_clear_all(self):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -55,6 +57,7 @@ class TelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def queue_remove(self, queue_id): def queue_remove(self, queue_id):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -72,6 +75,7 @@ class TelnetLiquidsoap:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def queue_push(self, queue_id, media_item): def queue_push(self, queue_id, media_item):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -98,6 +102,7 @@ class TelnetLiquidsoap:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def stop_web_stream_buffer(self): def stop_web_stream_buffer(self):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -120,6 +125,7 @@ class TelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def stop_web_stream_output(self): def stop_web_stream_output(self):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -138,6 +144,7 @@ class TelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def start_web_stream(self, media_item): def start_web_stream(self, media_item):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -160,6 +167,7 @@ class TelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def start_web_stream_buffer(self, media_item): def start_web_stream_buffer(self, media_item):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -182,6 +190,7 @@ class TelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def get_current_stream_id(self): def get_current_stream_id(self):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -201,6 +210,7 @@ class TelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def disconnect_source(self, sourcename): def disconnect_source(self, sourcename):
self.logger.debug('Disconnecting source: %s', sourcename) self.logger.debug('Disconnecting source: %s', sourcename)
command = "" command = ""
@ -221,6 +231,7 @@ class TelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def telnet_send(self, commands): def telnet_send(self, commands):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -265,6 +276,7 @@ class DummyTelnetLiquidsoap:
for i in range(4): for i in range(4):
self.liquidsoap_mock_queues["s"+str(i)] = [] self.liquidsoap_mock_queues["s"+str(i)] = []
@ls_timeout
def queue_push(self, queue_id, media_item): def queue_push(self, queue_id, media_item):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
@ -280,6 +292,7 @@ class DummyTelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
@ls_timeout
def queue_remove(self, queue_id): def queue_remove(self, queue_id):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()

View File

@ -0,0 +1,38 @@
import threading
import logging
def __timeout(func, timeout_duration, default, args, kwargs):
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = func(*args, **kwargs)
first_attempt = True
while True:
it = InterruptableThread()
it.start()
it.join(timeout_duration)
logger = logging.getLogger()
if it.isAlive():
"""Restart Liquidsoap and try the command one more time. If it
fails again then there is something critically wrong..."""
if first_attempt:
#restart liquidsoap
pass
else:
raise Exception("Thread did not terminate")
else:
return it.result
first_attempt = False
def ls_timeout(f, timeout=4, default=None):
def new_f(*args, **kwargs):
return __timeout(f, timeout, default, args, kwargs)
return new_f