feat(playout): replace thread timeout with socket timeout
Prefer the lower level socket timeout feature, to the hand made threaded timeout. The thread timeout does not raise or log the errors that may occur during the communication with liquidsoap, and we should handle them instead.
This commit is contained in:
parent
6b6e8951d1
commit
4c63ef71fc
|
@ -15,7 +15,6 @@ from requests import RequestException
|
|||
from ..config import CACHE_DIR, POLL_INTERVAL, Config
|
||||
from ..liquidsoap.client import LiquidsoapClient
|
||||
from ..liquidsoap.models import Info, MessageFormatKind, StreamPreferences, StreamState
|
||||
from ..timeout import ls_timeout
|
||||
from .events import Events, FileEvent, FileEvents
|
||||
from .liquidsoap import PypoLiquidsoap
|
||||
from .schedule import get_schedule, receive_schedule
|
||||
|
@ -157,7 +156,6 @@ class PypoFetch(Thread):
|
|||
self.pypo_liquidsoap.clear_all_queues()
|
||||
self.pypo_liquidsoap.clear_queue_tracker()
|
||||
|
||||
@ls_timeout
|
||||
def update_liquidsoap_stream_format(
|
||||
self,
|
||||
stream_format: Union[MessageFormatKind, int],
|
||||
|
@ -167,21 +165,18 @@ class PypoFetch(Thread):
|
|||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def update_liquidsoap_message_offline(self, message_offline: str) -> None:
|
||||
try:
|
||||
self.liq_client.settings_update(message_offline=message_offline)
|
||||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def update_liquidsoap_transition_fade(self, fade: float) -> None:
|
||||
try:
|
||||
self.liq_client.settings_update(input_fade_transition=fade)
|
||||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def update_liquidsoap_station_name(self, station_name: str) -> None:
|
||||
try:
|
||||
self.liq_client.settings_update(station_name=station_name)
|
||||
|
|
|
@ -2,7 +2,6 @@ import logging
|
|||
from typing import List, Optional
|
||||
|
||||
from ..liquidsoap.client import LiquidsoapClient
|
||||
from ..timeout import ls_timeout
|
||||
from .events import FileEvent, WebStreamEvent
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -49,21 +48,18 @@ class TelnetLiquidsoap:
|
|||
self.liq_client = liq_client
|
||||
self.queues = queues
|
||||
|
||||
@ls_timeout
|
||||
def queue_clear_all(self):
|
||||
try:
|
||||
self.liq_client.queues_remove(*self.queues)
|
||||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def queue_remove(self, queue_id: int):
|
||||
try:
|
||||
self.liq_client.queues_remove(queue_id)
|
||||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def queue_push(self, queue_id: int, file_event: FileEvent):
|
||||
try:
|
||||
annotation = create_liquidsoap_annotation(file_event)
|
||||
|
@ -71,21 +67,18 @@ class TelnetLiquidsoap:
|
|||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def stop_web_stream_buffer(self):
|
||||
try:
|
||||
self.liq_client.web_stream_stop_buffer()
|
||||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def stop_web_stream_output(self):
|
||||
try:
|
||||
self.liq_client.web_stream_stop()
|
||||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def start_web_stream(self):
|
||||
try:
|
||||
self.liq_client.web_stream_start()
|
||||
|
@ -93,7 +86,6 @@ class TelnetLiquidsoap:
|
|||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def start_web_stream_buffer(self, event: WebStreamEvent):
|
||||
try:
|
||||
self.liq_client.web_stream_start_buffer(event.row_id, event.uri)
|
||||
|
@ -101,7 +93,6 @@ class TelnetLiquidsoap:
|
|||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def get_current_stream_id(self) -> str:
|
||||
try:
|
||||
return self.liq_client.web_stream_get_id()
|
||||
|
@ -109,7 +100,6 @@ class TelnetLiquidsoap:
|
|||
logger.exception(exception)
|
||||
return "-1"
|
||||
|
||||
@ls_timeout
|
||||
def disconnect_source(self, sourcename):
|
||||
if sourcename not in ("master_dj", "live_dj"):
|
||||
raise ValueError(f"invalid source name: {sourcename}")
|
||||
|
@ -120,7 +110,6 @@ class TelnetLiquidsoap:
|
|||
except OSError as exception:
|
||||
logger.exception(exception)
|
||||
|
||||
@ls_timeout
|
||||
def switch_source(self, sourcename, status):
|
||||
if sourcename not in ("master_dj", "live_dj", "scheduled_play"):
|
||||
raise ValueError(f"invalid source name: {sourcename}")
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
import threading
|
||||
|
||||
|
||||
def __timeout(func, timeout_duration, default, args, kwargs):
|
||||
class InterruptableThread(threading.Thread):
|
||||
name = "liquidsoap_timeout"
|
||||
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self)
|
||||
self.result = default
|
||||
|
||||
def run(self):
|
||||
self.result = func(*args, **kwargs)
|
||||
|
||||
while True:
|
||||
thread = InterruptableThread()
|
||||
thread.start()
|
||||
thread.join(timeout_duration)
|
||||
|
||||
if thread.is_alive():
|
||||
raise RuntimeError("Thread did not terminate")
|
||||
|
||||
return thread.result
|
||||
|
||||
|
||||
def ls_timeout(func, timeout=15, default=None):
|
||||
def new_f(*args, **kwargs):
|
||||
return __timeout(func, timeout, default, args, kwargs)
|
||||
|
||||
return new_f
|
Loading…
Reference in New Issue