CC-2080: Turn pypo-fetch and pypo-push into threads of the same process

First implementation
This commit is contained in:
martin 2011-03-20 19:34:43 -04:00
parent 6ba4881ff2
commit e64e30d6da
16 changed files with 172 additions and 266 deletions

View File

@ -1,3 +0,0 @@
#!/bin/sh
su -l pypo -c "tail -F /etc/service/pypo-fetch/log/main/current"

3
dev_tools/pypoless.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/sh
su -l pypo -c "less /etc/service/pypo/log/main/current"

3
dev_tools/pypotail.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/sh
su -l pypo -c "tail -F /etc/service/pypo/log/main/current"

View File

@ -39,7 +39,6 @@ AirtimeInstall::InstallPostgresScriptingLanguage();
echo "* Creating Database Tables".PHP_EOL; echo "* Creating Database Tables".PHP_EOL;
AirtimeInstall::CreateDatabaseTables(); AirtimeInstall::CreateDatabaseTables();
AirtimeInstall::MigrateTables(__DIR__);
echo "* Storage Directory Setup".PHP_EOL; echo "* Storage Directory Setup".PHP_EOL;
AirtimeInstall::SetupStorageDirectory($CC_CONFIG); AirtimeInstall::SetupStorageDirectory($CC_CONFIG);

View File

View File

View File

@ -1,14 +0,0 @@
#!/bin/sh
pypo_user="pypo"
export HOME="/home/pypo/"
# Location of pypo_cli.py Python script
pypo_path="/opt/pypo/bin/"
pypo_script="pypo-cli.py"
echo "*** Daemontools: starting daemon"
cd ${pypo_path}
exec 2>&1
# Note the -u when calling python! we need it to get unbuffered binary stdout and stderr
exec setuidgid ${pypo_user} \
python -u ${pypo_path}${pypo_script} \
-p
# EOF

View File

@ -36,7 +36,7 @@ def create_user(username):
os.system("adduser --system --quiet --group --shell /bin/bash "+username) os.system("adduser --system --quiet --group --shell /bin/bash "+username)
#set pypo password #set pypo password
p = os.popen('/usr/bin/passwd pypo', 'w') p = os.popen('/usr/bin/passwd pypo 2>&1 1>/dev/null', 'w')
p.write('pypo\n') p.write('pypo\n')
p.write('pypo\n') p.write('pypo\n')
p.close() p.close()
@ -103,23 +103,15 @@ try:
os.system("chmod -R 755 "+BASE_PATH) os.system("chmod -R 755 "+BASE_PATH)
os.system("chown -R pypo:pypo "+BASE_PATH) os.system("chown -R pypo:pypo "+BASE_PATH)
print "Installing daemontool script pypo-fetch" print "Installing pypo daemon"
create_path("/etc/service/pypo-fetch") create_path("/etc/service/pypo")
create_path("/etc/service/pypo-fetch/log") create_path("/etc/service/pypo/log")
shutil.copy("%s/pypo-daemontools-fetch.sh"%current_script_dir, "/etc/service/pypo-fetch/run") shutil.copy("%s/pypo-daemontools.sh"%current_script_dir, "/etc/service/pypo/run")
shutil.copy("%s/pypo-daemontools-logger.sh"%current_script_dir, "/etc/service/pypo-fetch/log/run") shutil.copy("%s/pypo-daemontools-logger.sh"%current_script_dir, "/etc/service/pypo/log/run")
os.system("chmod -R 755 /etc/service/pypo-fetch") os.system("chmod -R 755 /etc/service/pypo")
os.system("chown -R pypo:pypo /etc/service/pypo-fetch") os.system("chown -R pypo:pypo /etc/service/pypo")
print "Installing daemontool script pypo-push" print "Installing liquidsoap daemon"
create_path("/etc/service/pypo-push")
create_path("/etc/service/pypo-push/log")
shutil.copy("%s/pypo-daemontools-push.sh"%current_script_dir, "/etc/service/pypo-push/run")
shutil.copy("%s/pypo-daemontools-logger.sh"%current_script_dir, "/etc/service/pypo-push/log/run")
os.system("chmod -R 755 /etc/service/pypo-push")
os.system("chown -R pypo:pypo /etc/service/pypo-push")
print "Installing daemontool script pypo-liquidsoap"
create_path("/etc/service/pypo-liquidsoap") create_path("/etc/service/pypo-liquidsoap")
create_path("/etc/service/pypo-liquidsoap/log") create_path("/etc/service/pypo-liquidsoap/log")
shutil.copy("%s/pypo-daemontools-liquidsoap.sh"%current_script_dir, "/etc/service/pypo-liquidsoap/run") shutil.copy("%s/pypo-daemontools-liquidsoap.sh"%current_script_dir, "/etc/service/pypo-liquidsoap/run")
@ -134,16 +126,12 @@ try:
found = True found = True
p = Popen('svstat /etc/service/pypo-fetch', shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) p = Popen('svstat /etc/service/pypo', shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
output = p.stdout.read() output = p.stdout.read()
if (output.find("unable to open supervise/ok: file does not exist") >= 0): if (output.find("unable to open supervise/ok: file does not exist") >= 0):
found = False found = False
print output print output
p = Popen('svstat /etc/service/pypo-push', shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
output = p.stdout.read()
print output
p = Popen('svstat /etc/service/pypo-liquidsoap', shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) p = Popen('svstat /etc/service/pypo-liquidsoap', shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
output = p.stdout.read() output = p.stdout.read()
print output print output
@ -152,6 +140,7 @@ try:
print "Pypo install has completed, but daemontools is not running, please make sure you have it installed and then reboot." print "Pypo install has completed, but daemontools is not running, please make sure you have it installed and then reboot."
except Exception, e: except Exception, e:
print "exception:" + str(e) print "exception:" + str(e)
sys.exit(1)

View File

@ -9,12 +9,9 @@ if os.geteuid() != 0:
sys.exit(1) sys.exit(1)
try: try:
print "Starting daemontool script pypo-fetch" print "Starting daemontool script pypo"
os.system("svc -u /etc/service/pypo-fetch") os.system("svc -u /etc/service/pypo")
print "Starting daemontool script pypo-push"
os.system("svc -u /etc/service/pypo-push")
print "Starting daemontool script pypo-liquidsoap" print "Starting daemontool script pypo-liquidsoap"
os.system("svc -u /etc/service/pypo-liquidsoap") os.system("svc -u /etc/service/pypo-liquidsoap")

View File

@ -9,12 +9,9 @@ if os.geteuid() != 0:
sys.exit(1) sys.exit(1)
try: try:
print "Stopping daemontool script pypo-fetch" print "Stopping daemontool script pypo"
os.system("svc -dx /etc/service/pypo-fetch 2>/dev/null") os.system("svc -dx /etc/service/pypo 2>/dev/null")
print "Stopping daemontool script pypo-push"
os.system("svc -dx /etc/service/pypo-push 2>/dev/null")
print "Stopping daemontool script pypo-liquidsoap" print "Stopping daemontool script pypo-liquidsoap"
os.system("svc -dx /etc/service/pypo-liquidsoap 2>/dev/null") os.system("svc -dx /etc/service/pypo-liquidsoap 2>/dev/null")
os.system("killall liquidsoap") os.system("killall liquidsoap")

View File

@ -37,12 +37,9 @@ try:
print "Removing pypo files" print "Removing pypo files"
remove_path(BASE_PATH) remove_path(BASE_PATH)
print "Removing daemontool script pypo-fetch" print "Removing daemontool script pypo"
remove_path("rm -rf /etc/service/pypo-fetch") remove_path("rm -rf /etc/service/pypo")
print "Removing daemontool script pypo-push"
remove_path("rm -rf /etc/service/pypo-push")
print "Removing daemontool script pypo-liquidsoap" print "Removing daemontool script pypo-liquidsoap"
remove_path("rm -rf /etc/service/pypo-liquidsoap") remove_path("rm -rf /etc/service/pypo-liquidsoap")

View File

@ -1,22 +1,27 @@
[loggers] [loggers]
keys=root keys=root,fetch,push
[handlers] [handlers]
keys=consoleHandler,fileHandlerERROR,fileHandlerDEBUG,nullHandler keys=consoleHandler
[formatters] [formatters]
keys=simpleFormatter keys=simpleFormatter
[logger_root] [logger_root]
level=DEBUG level=DEBUG
handlers=consoleHandler,fileHandlerERROR,fileHandlerDEBUG handlers=consoleHandler
[logger_libs] [logger_fetch]
handlers=nullHandler
level=DEBUG level=DEBUG
qualname="process" handlers=consoleHandler
qualname=fetch
propagate=0 propagate=0
[logger_push]
level=DEBUG
handlers=consoleHandler
qualname=push
propagate=0
[handler_consoleHandler] [handler_consoleHandler]
class=StreamHandler class=StreamHandler
@ -24,37 +29,6 @@ level=DEBUG
formatter=simpleFormatter formatter=simpleFormatter
args=(sys.stdout,) args=(sys.stdout,)
[handler_fileHandlerERROR]
class=FileHandler
level=WARNING
formatter=simpleFormatter
args=("./error.log",)
[handler_fileHandlerDEBUG]
class=FileHandler
level=DEBUG
formatter=simpleFormatter
args=("./debug.log",)
[handler_nullHandler]
class=FileHandler
level=DEBUG
formatter=simpleFormatter
args=("/dev/null",)
[formatter_simpleFormatter] [formatter_simpleFormatter]
format=%(asctime)s %(levelname)s - [%(filename)s : %(funcName)s() : line %(lineno)d] - %(message)s format=%(asctime)s %(levelname)s - [%(filename)s : %(funcName)s() : line %(lineno)d] - %(message)s
datefmt= datefmt=
## multitail color sheme
## pyml / python
# colorscheme:pyml:www.obp.net
# cs_re:blue:\[[^ ]*\]
# cs_re:red:CRITICAL:*
# cs_re:red,black,blink:ERROR:*
# cs_re:blue:NOTICE:*
# cs_re:cyan:INFO:*
# cs_re:green:DEBUG:*

View File

@ -27,6 +27,7 @@ import logging.config
#import string #import string
#import operator #import operator
#import inspect #import inspect
from Queue import Queue
from pypopush import PypoPush from pypopush import PypoPush
from pypofetch import PypoFetch from pypofetch import PypoFetch
@ -66,10 +67,6 @@ logging.config.fileConfig("logging.cfg")
# loading config file # loading config file
try: try:
config = ConfigObj('config.cfg') config = ConfigObj('config.cfg')
POLL_INTERVAL = float(config['poll_interval'])
PUSH_INTERVAL = float(config['push_interval'])
LS_HOST = config['ls_host']
LS_PORT = config['ls_port']
except Exception, e: except Exception, e:
print 'Error loading config file: ', e print 'Error loading config file: ', e
sys.exit() sys.exit()
@ -135,41 +132,23 @@ if __name__ == '__main__':
g.selfcheck() g.selfcheck()
logger = logging.getLogger() logger = logging.getLogger()
loops = 0
if options.test: if options.test:
g.test_api() g.test_api()
sys.exit() sys.exit()
q = Queue()
if options.fetch_scheduler:
pf = PypoFetch()
while True:
try: pf.fetch('scheduler')
except Exception, e:
print e
sys.exit()
if (loops%2 == 0): pp = PypoPush(q)
logger.info("heartbeat") pp.start()
loops += 1
time.sleep(POLL_INTERVAL)
if options.push_scheduler: pf = PypoFetch(q)
pp = PypoPush() pf.start()
while True:
try: pp.push('scheduler')
except Exception, e:
print 'PUSH ERROR!! WILL EXIT NOW:('
print e
sys.exit()
if (loops%60 == 0): pp.join()
logger.info("heartbeat") pf.join()
loops += 1
time.sleep(PUSH_INTERVAL)
"""
if options.check: if options.check:
try: g.check_schedule() try: g.check_schedule()
except Exception, e: except Exception, e:
@ -179,4 +158,4 @@ if __name__ == '__main__':
try: pf.cleanup('scheduler') try: pf.cleanup('scheduler')
except Exception, e: except Exception, e:
print e print e
sys.exit() """

View File

@ -9,35 +9,39 @@ import random
import string import string
import json import json
import telnetlib import telnetlib
import math
from threading import Thread
from api_clients import api_client from api_clients import api_client
from util import CueFile from util import CueFile
from configobj import ConfigObj from configobj import ConfigObj
# configure logging
logging.config.fileConfig("logging.cfg")
# loading config file # loading config file
try: try:
config = ConfigObj('config.cfg') config = ConfigObj('config.cfg')
POLL_INTERVAL = float(config['poll_interval'])
PUSH_INTERVAL = 0.5
#PUSH_INTERVAL = float(config['push_interval'])
LS_HOST = config['ls_host'] LS_HOST = config['ls_host']
LS_PORT = config['ls_port'] LS_PORT = config['ls_port']
POLL_INTERVAL = 5
except Exception, e: except Exception, e:
print 'Error loading config file: ', e print 'Error loading config file: ', e
sys.exit() sys.exit()
class PypoFetch: class PypoFetch(Thread):
def __init__(self): def __init__(self, q):
Thread.__init__(self)
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.cue_file = CueFile() self.cue_file = CueFile()
self.set_export_source('scheduler') self.set_export_source('scheduler')
self.queue = q
def set_export_source(self, export_source): def set_export_source(self, export_source):
self.export_source = export_source self.export_source = export_source
self.cache_dir = config["cache_dir"] + self.export_source + '/' self.cache_dir = config["cache_dir"] + self.export_source + '/'
self.schedule_file = self.cache_dir + 'schedule.pickle'
self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle"
""" """
Fetching part of pypo Fetching part of pypo
@ -48,10 +52,8 @@ class PypoFetch:
- runs the cleanup routine, to get rid of unused cashed files - runs the cleanup routine, to get rid of unused cashed files
""" """
def fetch(self, export_source): def fetch(self, export_source):
""" #wrapper script for fetching the whole schedule (in json)
wrapper script for fetching the whole schedule (in json) logger = logging.getLogger('fetch')
"""
logger = logging.getLogger()
try: os.mkdir(self.cache_dir) try: os.mkdir(self.cache_dir)
except Exception, e: pass except Exception, e: pass
@ -65,30 +67,29 @@ class PypoFetch:
except Exception, e: logger.error("%s", e) except Exception, e: logger.error("%s", e)
# prepare the playlists # prepare the playlists
if config["cue_style"] == 'pre': try:
try: self.prepare_playlists_cue() playlists = self.prepare_playlists()
except Exception, e: logger.error("%s", e) except Exception, e: logger.error("%s", e)
elif config["cue_style"] == 'otf':
try: self.prepare_playlists(self.export_source)
except Exception, e: logger.error("%s", e) scheduled_data = dict()
scheduled_data['playlists'] = playlists
scheduled_data['schedule'] = self.schedule
self.queue.put(scheduled_data)
# cleanup # cleanup
try: self.cleanup(self.export_source) try: self.cleanup(self.export_source)
except Exception, e: logger.error("%s", e) except Exception, e: logger.error("%s", e)
def get_schedule(self): def get_schedule(self):
logger = logging.getLogger() logger = logging.getLogger('fetch')
status, response = self.api_client.get_schedule() status, response = self.api_client.get_schedule()
if status == 1: if status == 1:
logger.info("dump serialized schedule to %s", self.schedule_file)
schedule = response['playlists'] schedule = response['playlists']
stream_metadata = response['stream_metadata'] stream_metadata = response['stream_metadata']
try: try:
schedule_file = open(self.schedule_file, "w") self.schedule = schedule
pickle.dump(schedule, schedule_file)
schedule_file.close()
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
#encode in latin-1 due to telnet protocol not supporting utf-8 #encode in latin-1 due to telnet protocol not supporting utf-8
@ -96,7 +97,7 @@ class PypoFetch:
tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1')) tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1'))
tn.write('exit\n') tn.write('exit\n')
logger.debug(tn.read_all()) tn.read_all()
except Exception, e: except Exception, e:
logger.error("Exception %s", e) logger.error("Exception %s", e)
@ -104,45 +105,22 @@ class PypoFetch:
return status return status
#TODO this is a duplicate function!!!
def load_schedule(self):
logger = logging.getLogger()
schedule = None
# create the file if it doesnt exist
if (not os.path.exists(self.schedule_file)):
logger.debug('creating file ' + self.schedule_file)
open(self.schedule_file, 'w').close()
else:
# load the schedule from cache
#logger.debug('loading schedule file '+self.schedule_file)
try:
schedule_file = open(self.schedule_file, "r")
schedule = pickle.load(schedule_file)
schedule_file.close()
except Exception, e:
logger.error('%s', e)
return schedule
""" """
Alternative version of playout preparation. Every playlist entry is Alternative version of playout preparation. Every playlist entry is
pre-cued if neccessary (cue_in/cue_out != 0) and stored in the pre-cued if neccessary (cue_in/cue_out != 0) and stored in the
playlist folder. playlist folder.
file is eg 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3 file is eg 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3
""" """
def prepare_playlists_cue(self): def prepare_playlists(self):
logger = logging.getLogger() logger = logging.getLogger('fetch')
# Load schedule from disk schedule = self.schedule
schedule = self.load_schedule() playlists = dict()
# Dont do anything if schedule is empty # Dont do anything if schedule is empty
if (not schedule): if not schedule:
logger.debug("Schedule is empty.") logger.debug("Schedule is empty.")
return return playlists
scheduleKeys = sorted(schedule.iterkeys()) scheduleKeys = sorted(schedule.iterkeys())
@ -173,14 +151,10 @@ class PypoFetch:
elif int(playlist['subtype']) > 0 and int(playlist['subtype']) < 5: elif int(playlist['subtype']) > 0 and int(playlist['subtype']) < 5:
ls_playlist = self.handle_media_file(playlist, pkey) ls_playlist = self.handle_media_file(playlist, pkey)
# write playlist file playlists[pkey] = ls_playlist
plfile = open(self.cache_dir + str(pkey) + '/list.lsp', "w")
plfile.write(json.dumps(ls_playlist))
plfile.close()
logger.info('ls playlist file written to %s', self.cache_dir + str(pkey) + '/list.lsp')
except Exception, e: except Exception, e:
logger.info("%s", e) logger.info("%s", e)
return playlists
def handle_media_file(self, playlist, pkey): def handle_media_file(self, playlist, pkey):
""" """
@ -189,7 +163,7 @@ class PypoFetch:
""" """
ls_playlist = [] ls_playlist = []
logger = logging.getLogger() logger = logging.getLogger('fetch')
for media in playlist['medias']: for media in playlist['medias']:
logger.debug("Processing track %s", media['uri']) logger.debug("Processing track %s", media['uri'])
@ -252,7 +226,7 @@ class PypoFetch:
def handle_remote_file(self, media, dst, do_cue): def handle_remote_file(self, media, dst, do_cue):
logger = logging.getLogger() logger = logging.getLogger('fetch')
if do_cue == False: if do_cue == False:
if os.path.isfile(dst): if os.path.isfile(dst):
logger.debug("file already in cache: %s", dst) logger.debug("file already in cache: %s", dst)
@ -301,7 +275,7 @@ class PypoFetch:
Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR" Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR"
and deletes them. and deletes them.
""" """
logger = logging.getLogger() logger = logging.getLogger('fetch')
offset = 3600 * int(config["cache_for"]) offset = 3600 * int(config["cache_for"])
now = time.time() now = time.time()
@ -323,3 +297,18 @@ class PypoFetch:
print e print e
logger.error("%s", e) logger.error("%s", e)
def run(self):
loops = 0
heartbeat_period = math.floor(30/POLL_INTERVAL)
logger = logging.getLogger('fetch')
while True:
if loops % heartbeat_period == 0:
logger.info("heartbeat")
loops = 0
try: self.fetch('scheduler')
except Exception, e:
logger.error('Pypo Fetch Error, exiting: %s', e)
sys.exit()
time.sleep(POLL_INTERVAL)
loops += 1

View File

@ -7,29 +7,37 @@ import pickle
import telnetlib import telnetlib
import calendar import calendar
import json import json
import math
from threading import Thread
from api_clients import api_client from api_clients import api_client
from util import CueFile from util import CueFile
from configobj import ConfigObj from configobj import ConfigObj
# configure logging
logging.config.fileConfig("logging.cfg")
# loading config file # loading config file
try: try:
config = ConfigObj('config.cfg') config = ConfigObj('config.cfg')
POLL_INTERVAL = float(config['poll_interval'])
PUSH_INTERVAL = 0.5
#PUSH_INTERVAL = float(config['push_interval'])
LS_HOST = config['ls_host'] LS_HOST = config['ls_host']
LS_PORT = config['ls_port'] LS_PORT = config['ls_port']
PUSH_INTERVAL = 2
except Exception, e: except Exception, e:
print 'Error loading config file: ', e logger.error('Error loading config file %s', e)
sys.exit() sys.exit()
class PypoPush: class PypoPush(Thread):
def __init__(self): def __init__(self, q):
Thread.__init__(self)
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.cue_file = CueFile() self.cue_file = CueFile()
self.set_export_source('scheduler') self.set_export_source('scheduler')
self.queue = q
self.schedule = dict()
self.playlists = dict()
""" """
push_ahead2 MUST be < push_ahead. The difference in these two values push_ahead2 MUST be < push_ahead. The difference in these two values
@ -42,7 +50,6 @@ class PypoPush:
def set_export_source(self, export_source): def set_export_source(self, export_source):
self.export_source = export_source self.export_source = export_source
self.cache_dir = config["cache_dir"] + self.export_source + '/' self.cache_dir = config["cache_dir"] + self.export_source + '/'
self.schedule_file = self.cache_dir + 'schedule.pickle'
self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle" self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle"
""" """
@ -52,41 +59,46 @@ class PypoPush:
then liquidsoap is asked (via telnet) to reload and immediately play it. then liquidsoap is asked (via telnet) to reload and immediately play it.
""" """
def push(self, export_source): def push(self, export_source):
logger = logging.getLogger() logger = logging.getLogger('push')
self.schedule = self.load_schedule() if not self.queue.empty():
playedItems = self.load_schedule_tracker() scheduled_data = self.queue.get()
self.schedule = scheduled_data['schedule']
self.playlists = scheduled_data['playlists']
tcoming = time.localtime(time.time() + self.push_ahead) schedule = self.schedule
tcoming2 = time.localtime(time.time() + self.push_ahead2) playlists = self.playlists
str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5])
str_tcoming2_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming2[0], tcoming2[1], tcoming2[2], tcoming2[3], tcoming2[4], tcoming2[5])
currently_on_air = False currently_on_air = False
if self.schedule == None: if schedule:
logger.warn('Unable to loop schedule - maybe write in progress?') playedItems = self.load_schedule_tracker()
logger.warn('Will try again in next loop.')
else: timenow = time.time()
for pkey in self.schedule: tcoming = time.localtime(timenow + self.push_ahead)
str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5])
tcoming2 = time.localtime(timenow + self.push_ahead2)
str_tcoming2_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming2[0], tcoming2[1], tcoming2[2], tcoming2[3], tcoming2[4], tcoming2[5])
tnow = time.localtime(timenow)
str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5])
for pkey in schedule:
plstart = pkey[0:19] plstart = pkey[0:19]
start = self.schedule[pkey]['start'] start = schedule[pkey]['start']
end = self.schedule[pkey]['end'] end = schedule[pkey]['end']
playedFlag = (pkey in playedItems) and playedItems[pkey].get("played", 0) playedFlag = (pkey in playedItems) and playedItems[pkey].get("played", 0)
if plstart == str_tcoming_s or (plstart < str_tcoming_s and plstart > str_tcoming2_s and not playedFlag): if plstart == str_tcoming_s or (plstart < str_tcoming_s and plstart > str_tcoming2_s and not playedFlag):
logger.debug('Preparing to push playlist scheduled at: %s', pkey) logger.debug('Preparing to push playlist scheduled at: %s', pkey)
playlist = self.schedule[pkey] playlist = schedule[pkey]
ptype = playlist['subtype']
currently_on_air = True currently_on_air = True
# We have a match, replace the current playlist and # We have a match, replace the current playlist and
# force liquidsoap to refresh. # force liquidsoap to refresh.
if (self.push_liquidsoap(pkey, self.schedule, ptype) == 1): if (self.push_liquidsoap(pkey, schedule, playlists) == 1):
logger.debug("Pushed to liquidsoap, updating 'played' status.") logger.debug("Pushed to liquidsoap, updating 'played' status.")
# Marked the current playlist as 'played' in the schedule tracker # Marked the current playlist as 'played' in the schedule tracker
# so it is not called again in the next push loop. # so it is not called again in the next push loop.
@ -100,39 +112,27 @@ class PypoPush:
# Call API to update schedule states # Call API to update schedule states
logger.debug("Doing callback to server to update 'played' status.") logger.debug("Doing callback to server to update 'played' status.")
self.api_client.notify_scheduled_item_start_playing(pkey, self.schedule) self.api_client.notify_scheduled_item_start_playing(pkey, schedule)
if self.schedule != None: start = schedule[pkey]['start']
tnow = time.localtime(time.time()) end = schedule[pkey]['end']
str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5])
for pkey in self.schedule:
start = self.schedule[pkey]['start']
end = self.schedule[pkey]['end']
if start <= str_tnow_s and str_tnow_s < end: if start <= str_tnow_s and str_tnow_s < end:
currently_on_air = True currently_on_air = True
else:
logger.debug('Empty schedule')
if not currently_on_air: if not currently_on_air:
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write('source.skip\n'.encode('latin-1')) tn.write('source.skip\n')
tn.write('exit\n') tn.write('exit\n')
tn.read_all() tn.read_all()
#logger.info('source.skip')
#logger.debug(tn.read_all())
def push_liquidsoap(self, pkey, schedule, ptype): def push_liquidsoap(self, pkey, schedule, playlists):
logger = logging.getLogger() logger = logging.getLogger('push')
src = self.cache_dir + str(pkey) + '/list.lsp'
try: try:
if True == os.access(src, os.R_OK): playlist = playlists[pkey]
logger.debug('OK - Can read playlist file')
pl_file = open(src, "r")
file_content = pl_file.read()
pl_file.close()
logger.debug('file content: %s' % (file_content))
playlist = json.loads(file_content)
#strptime returns struct_time in local time #strptime returns struct_time in local time
#mktime takes a time_struct and returns a floating point #mktime takes a time_struct and returns a floating point
@ -180,43 +180,24 @@ class PypoPush:
except Exception, e: except Exception, e:
logger.error('%s', e) logger.error('%s', e)
status = 0 status = 0
return status return status
def load_schedule(self):
logger = logging.getLogger()
schedule = None
# create the file if it doesnt exist
if (not os.path.exists(self.schedule_file)):
logger.debug('creating file ' + self.schedule_file)
open(self.schedule_file, 'w').close()
else:
# load the schedule from cache
#logger.debug('loading schedule file '+self.schedule_file)
try:
schedule_file = open(self.schedule_file, "r")
schedule = pickle.load(schedule_file)
schedule_file.close()
except Exception, e:
logger.error('%s', e)
return schedule
def load_schedule_tracker(self): def load_schedule_tracker(self):
logger = logging.getLogger() logger = logging.getLogger('push')
logger.debug('load_schedule_tracker')
playedItems = dict() playedItems = dict()
# create the file if it doesnt exist # create the file if it doesnt exist
if (not os.path.exists(self.schedule_tracker_file)): if (not os.path.exists(self.schedule_tracker_file)):
logger.debug('creating file ' + self.schedule_tracker_file) try:
schedule_tracker = open(self.schedule_tracker_file, 'w') logger.debug('creating file ' + self.schedule_tracker_file)
pickle.dump(playedItems, schedule_tracker) schedule_tracker = open(self.schedule_tracker_file, 'w')
schedule_tracker.close() pickle.dump(playedItems, schedule_tracker)
schedule_tracker.close()
except Exception, e:
logger.error('Error creating schedule tracker file: %s', e)
else: else:
logger.debug('schedule tracker file exists, opening: ' + self.schedule_tracker_file)
try: try:
schedule_tracker = open(self.schedule_tracker_file, "r") schedule_tracker = open(self.schedule_tracker_file, "r")
playedItems = pickle.load(schedule_tracker) playedItems = pickle.load(schedule_tracker)
@ -226,3 +207,18 @@ class PypoPush:
return playedItems return playedItems
def run(self):
loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL)
logger = logging.getLogger('push')
while True:
if loops % heartbeat_period == 0:
logger.info("heartbeat")
loops = 0
try: self.push('scheduler')
except Exception, e:
logger.error('Pypo Push Error, exiting: %s', e)
sys.exit()
time.sleep(PUSH_INTERVAL)
loops += 1