refactor(playout): remove date_interval_to_seconds function

Replace date_interval_to_seconds function with
either timedelta.total_seconds() or seconds_between().
This commit is contained in:
jo 2022-07-17 12:54:14 +02:00 committed by Kyle Robbertze
parent db3f9bed82
commit 28857fbf49
6 changed files with 26 additions and 74 deletions

View File

@ -1,10 +0,0 @@
def date_interval_to_seconds(interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (
interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10**6
) / float(10**6)
return seconds

View File

@ -389,7 +389,7 @@ class PypoFetch(Thread):
mimetypes.init(["%s/mime.types" % os.path.dirname(os.path.realpath(__file__))])
mime_ext = mimetypes.guess_extension(mime, strict=False)
length1 = pure.date_interval_to_seconds(end - start)
length1 = (end - start).total_seconds()
length2 = media_item["cue_out"] - media_item["cue_in"]
if abs(length2 - length1) > 1:

View File

@ -8,6 +8,8 @@ from threading import Thread
from loguru import logger
from .utils import seconds_between
def keyboardInterruptHandler(signum, frame):
logger.info("\nKeyboard Interrupt\n")
@ -45,11 +47,10 @@ class PypoLiqQueue(Thread):
media_item = schedule_deque.popleft()
self.pypo_liquidsoap.play(media_item)
if len(schedule_deque):
time_until_next_play = self.date_interval_to_seconds(
schedule_deque[0]["start"] - datetime.utcnow()
time_until_next_play = seconds_between(
datetime.utcnow(),
schedule_deque[0]["start"],
)
if time_until_next_play < 0:
time_until_next_play = 0
else:
time_until_next_play = None
else:
@ -63,27 +64,14 @@ class PypoLiqQueue(Thread):
schedule_deque.append(media_schedule[i])
if len(keys):
time_until_next_play = self.date_interval_to_seconds(
media_schedule[keys[0]]["start"] - datetime.utcnow()
time_until_next_play = seconds_between(
datetime.utcnow(),
media_schedule[keys[0]]["start"],
)
else:
time_until_next_play = None
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (
interval.microseconds
+ (interval.seconds + interval.days * 24 * 3600) * 10**6
) / float(10**6)
if seconds < 0:
seconds = 0
return seconds
def run(self):
try:
self.main()

View File

@ -5,6 +5,7 @@ from loguru import logger
from .events import EventKind
from .telnetliquidsoap import TelnetLiquidsoap
from .utils import seconds_between
class PypoLiquidsoap:
@ -209,36 +210,12 @@ class PypoLiquidsoap:
def modify_cue_point(self, link):
assert self.is_file(link)
tnow = datetime.utcnow()
lateness = seconds_between(link["start"], datetime.utcnow())
link_start = link["start"]
diff_td = tnow - link_start
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec > 0:
logger.debug(
"media item was supposed to start %s ago. Preparing to start..",
diff_sec,
)
original_cue_in_td = timedelta(seconds=float(link["cue_in"]))
link["cue_in"] = (
self.date_interval_to_seconds(original_cue_in_td) + diff_sec
)
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (
interval.microseconds
+ (interval.seconds + interval.days * 24 * 3600) * 10**6
) / float(10**6)
if seconds < 0:
seconds = 0
return seconds
if lateness > 0:
logger.debug(f"media item was supposed to start {lateness}s ago")
cue_in_orig = timedelta(seconds=float(link["cue_in"]))
link["cue_in"] = cue_in_orig.total_seconds() + lateness
def clear_all_queues(self):
self.telnet_liquidsoap.queue_clear_all()

View File

@ -85,8 +85,7 @@ class PypoPush(Thread):
logger.debug(f"ignoring ended media_item: {media_item}")
continue
diff_td = tnow - media_item["start"]
diff_sec = self.date_interval_to_seconds(diff_td)
diff_sec = (tnow - media_item["start"]).total_seconds()
if diff_sec >= 0:
logger.debug(f"adding media_item to present: {media_item}")
@ -97,18 +96,6 @@ class PypoPush(Thread):
return present, future
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (
interval.microseconds
+ (interval.seconds + interval.days * 24 * 3600) * 10**6
) / float(10**6)
return seconds
@ls_timeout
def stop_web_stream_all(self):
try:

View File

@ -0,0 +1,10 @@
from datetime import datetime
def seconds_between(base: datetime, target: datetime) -> float:
"""
Get seconds between base and target datetime.
Return 0 if target is older than base.
"""
return max(0, (target - base).total_seconds())