libretime/python_apps/pypo/timeout.py

39 lines
1.1 KiB
Python

import threading
import pypofetch
def __timeout(func, timeout_duration, default, args, kwargs):
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = func(*args, **kwargs)
first_attempt = True
while True:
it = InterruptableThread()
it.start()
if not first_attempt:
timeout_duration = timeout_duration * 2
it.join(timeout_duration)
if it.isAlive():
"""Restart Liquidsoap and try the command one more time. If it
fails again then there is something critically wrong..."""
if first_attempt:
#restart liquidsoap
pypofetch.PypoFetch.ref.restart_liquidsoap()
else:
raise Exception("Thread did not terminate")
else:
return it.result
first_attempt = False
def ls_timeout(f, timeout=15, default=None):
def new_f(*args, **kwargs):
return __timeout(f, timeout, default, args, kwargs)
return new_f