Merge branch 'devel' of dev.sourcefabric.org:airtime into devel

This commit is contained in:
james 2011-07-04 09:31:35 -04:00
commit 77656c6499
7 changed files with 231 additions and 62 deletions

View file

@ -3,20 +3,22 @@ import time
import logging import logging
import logging.config import logging.config
import sys import sys
import os
import signal import signal
from multiprocessing import Process from multiprocessing import Process, Queue as mpQueue
from airtimefilemonitor.airtimenotifier import AirtimeNotifier from airtimefilemonitor.airtimenotifier import AirtimeNotifier
from airtimefilemonitor.airtimeprocessevent import AirtimeProcessEvent from airtimefilemonitor.airtimeprocessevent import AirtimeProcessEvent
from airtimefilemonitor.mediaconfig import AirtimeMediaConfig from airtimefilemonitor.mediaconfig import AirtimeMediaConfig
from airtimefilemonitor.airtimemediamonitorbootstrap import AirtimeMediaMonitorBootstrap
def handleSigTERM(signum, frame): def handleSigTERM(signum, frame):
logger = logging.getLogger() logger = logging.getLogger()
logger.info("Main Process Shutdown, TERM signal caught. %d") logger.info("Main Process Shutdown, TERM signal caught.")
for p in processes: for p in processes:
p.terminate()
logger.info("Killed process. %d", p.pid) logger.info("Killed process. %d", p.pid)
p.terminate()
sys.exit(0) sys.exit(0)
@ -26,27 +28,21 @@ try:
logging.config.fileConfig("logging.cfg") logging.config.fileConfig("logging.cfg")
except Exception, e: except Exception, e:
print 'Error configuring logging: ', e print 'Error configuring logging: ', e
sys.exit() sys.exit(1)
logger = logging.getLogger() logger = logging.getLogger()
processes = [] processes = []
try: try:
config = AirtimeMediaConfig() config = AirtimeMediaConfig(logger)
multi_queue = mpQueue()
logger.info("Initializing event processor") logger.info("Initializing event processor")
pe = AirtimeProcessEvent(airtime_config=config) pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config)
notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config) notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=0, airtime_config=config)
notifier.coalesce_events() notifier.coalesce_events()
#create 5 worker processes
for i in range(5):
p = Process(target=notifier.process_file_events, args=(pe.multi_queue,))
processes.append(p)
p.start()
signal.signal(signal.SIGTERM, handleSigTERM)
logger.info("Setting up monitor") logger.info("Setting up monitor")
response = None response = None
while response is None: while response is None:
@ -56,19 +52,28 @@ try:
storage_directory = response["stor"].encode('utf-8') storage_directory = response["stor"].encode('utf-8')
logger.info("Storage Directory is: %s", storage_directory) logger.info("Storage Directory is: %s", storage_directory)
config.storage_directory = storage_directory config.storage_directory = storage_directory
config.imported_directory = storage_directory + '/imported'
bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe)
bootstrap.scan()
#create 5 worker processes
for i in range(5):
p = Process(target=notifier.process_file_events, args=(multi_queue,))
processes.append(p)
p.start()
wdd = pe.watch_directory(storage_directory) wdd = pe.watch_directory(storage_directory)
logger.info("Added watch to %s", storage_directory) logger.info("Added watch to %s", storage_directory)
logger.info("wdd result %s", wdd[storage_directory]) logger.info("wdd result %s", wdd[storage_directory])
#register signal before process forks and exits.
signal.signal(signal.SIGTERM, handleSigTERM)
notifier.loop(callback=pe.notifier_loop_callback) notifier.loop(callback=pe.notifier_loop_callback)
for p in processes:
p.join()
except KeyboardInterrupt: except KeyboardInterrupt:
notifier.stop() notifier.stop()
logger.info("Keyboard Interrupt")
except Exception, e: except Exception, e:
notifier.stop() #notifier.stop()
logger.error('Exception: %s', e) logger.error('Exception: %s', e)

View file

@ -17,13 +17,13 @@ DAEMON=/usr/lib/airtime/media-monitor/airtime-media-monitor
PIDFILE=/var/run/airtime-media-monitor.pid PIDFILE=/var/run/airtime-media-monitor.pid
start () { start () {
monit monitor airtime-media-monitor >/dev/null 2>&1 #monit monitor airtime-media-monitor >/dev/null 2>&1
start-stop-daemon --start --background --quiet --chuid $USERID:$GROUPID --make-pidfile --pidfile $PIDFILE --startas $DAEMON start-stop-daemon --start --background --quiet --chuid $USERID:$GROUPID --make-pidfile --pidfile $PIDFILE --startas $DAEMON
} }
stop () { stop () {
# Send TERM after 5 seconds, wait at most 30 seconds. # Send TERM after 5 seconds, wait at most 30 seconds.
monit unmonitor airtime-media-monitor >/dev/null 2>&1 #monit unmonitor airtime-media-monitor >/dev/null 2>&1
start-stop-daemon --stop --oknodo --retry TERM/5/0/30 --quiet --pidfile $PIDFILE start-stop-daemon --stop --oknodo --retry TERM/5/0/30 --quiet --pidfile $PIDFILE
rm -f $PIDFILE rm -f $PIDFILE
} }

View file

@ -0,0 +1,109 @@
import os
import time
from subprocess import Popen, PIPE
class AirtimeMediaMonitorBootstrap():
def __init__(self, logger, multi_queue, pe):
self.logger = logger
self.multi_queue = multi_queue
self.pe = pe
self.airtime_tmp = '/var/tmp/airtime'
"""
on bootup we want to scan all directories and look for files that
weren't there or files that changed before media-monitor process
went offline. We can do this by doing a hash of the directory metadata.
"""
def scan(self):
directories = ['/srv/airtime/stor']
for dir in directories:
self.check_for_diff(dir)
def check_for_diff(self, dir):
#set to hold new and/or modified files. We use a set to make it ok if files are added
#twice. This is become some of the tests for new files return result sets that are not
#mutually exclusive from each other.
added_files = set()
removed_files = set()
if os.path.exists(self.airtime_tmp + '/.airtime_media_index'):
#find files that have been modified since the last time
#media-monitor process was running.
time_diff_sec = time.time() - os.path.getmtime(self.airtime_tmp + '/.airtime_media_index')
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -%d" % (dir, time_diff_sec/60+1)
self.logger.debug(command)
stdout = self.execCommandAndReturnStdOut(command)
self.logger.info("Files modified since last checkin: \n%s\n", stdout)
new_files = stdout.split('\n')
for file_path in new_files:
added_files.add(file_path)
#a previous index exists, we can do a diff between this
#file and the current state to see whether anything has
#changed.
self.logger.info("Previous index file found.")
#find deleted files
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, self.airtime_tmp)
self.execCommand(command)
command = "diff -u %s/.airtime_media_index %s/.airtime_media_index.tmp" % (self.airtime_tmp, self.airtime_tmp)
stdout = self.execCommandAndReturnStdOut(command)
#remove first 3 lines from the diff output.
stdoutSplit = (stdout.split('\n'))[3:]
self.logger.info("Changed files since last checkin:\n%s\n", "\n".join(stdoutSplit))
for line in stdoutSplit:
if len(line.strip(' ')) > 1:
if line[0] == '+':
added_files.add(line[1:])
elif line[0] == '-':
removed_files.add(line[1:])
self.pe.write_index_file()
else:
#a previous index does not exist. Most likely means that
#media monitor has never seen this directory before. Let's
#notify airtime server about each of these files
self.logger.info("Previous index file does not exist. Creating a new one")
#create a new index file.
stdout = self.pe.write_index_file()
new_files = stdout.split('\n')
for file_path in new_files:
added_files.add(file_path)
for file_path in removed_files:
self.pe.handle_removed_file(file_path)
for file_path in added_files:
if os.path.exists(file_path):
self.pe.handle_created_file(False, os.path.basename(file_path), file_path)
def execCommand(self, command):
p = Popen(command, shell=True)
sts = os.waitpid(p.pid, 0)[1]
if sts != 0:
self.logger.warn("command \n%s\n return with a non-zero return value", command)
def execCommandAndReturnStdOut(self, command):
p = Popen(command, shell=True, stdout=PIPE)
stdout = p.communicate()[0]
if p.returncode != 0:
self.logger.warn("command \n%s\n return with a non-zero return value", command)
return stdout

View file

@ -3,8 +3,6 @@ import time
import os import os
import logging import logging
from multiprocessing import Process, Lock, Queue as mpQueue
# For RabbitMQ # For RabbitMQ
from kombu.connection import BrokerConnection from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer from kombu.messaging import Exchange, Queue, Consumer, Producer
@ -27,6 +25,7 @@ class AirtimeNotifier(Notifier):
self.import_processes = {} self.import_processes = {}
self.watched_folders = [] self.watched_folders = []
while not self.init_rabbit_mq(): while not self.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5) time.sleep(5)
@ -104,8 +103,14 @@ class AirtimeNotifier(Notifier):
mm.watch_directory(new_storage_directory) mm.watch_directory(new_storage_directory)
#update airtime with information about files discovered in our
#watched directories. Pass in a dict() object with the following
#attributes:
# -filepath
# -mode
# -data
# -is_recorded_show
def update_airtime(self, d): def update_airtime(self, d):
filepath = d['filepath'] filepath = d['filepath']
mode = d['mode'] mode = d['mode']

View file

@ -4,7 +4,7 @@ import grp
import pwd import pwd
import logging import logging
from multiprocessing import Process, Lock, Queue as mpQueue from subprocess import Popen, PIPE
import pyinotify import pyinotify
from pyinotify import WatchManager, Notifier, ProcessEvent from pyinotify import WatchManager, Notifier, ProcessEvent
@ -18,7 +18,7 @@ from airtimefilemonitor.mediaconfig import AirtimeMediaConfig
class AirtimeProcessEvent(ProcessEvent): class AirtimeProcessEvent(ProcessEvent):
def my_init(self, airtime_config=None): def my_init(self, queue, airtime_config=None):
""" """
Method automatically called from ProcessEvent.__init__(). Additional Method automatically called from ProcessEvent.__init__(). Additional
keyworded arguments passed to ProcessEvent.__init__() are then keyworded arguments passed to ProcessEvent.__init__() are then
@ -34,11 +34,12 @@ class AirtimeProcessEvent(ProcessEvent):
self.gui_replaced = {} self.gui_replaced = {}
self.renamed_files = {} self.renamed_files = {}
self.file_events = [] self.file_events = []
self.multi_queue = mpQueue() self.multi_queue = queue
self.mask = pyinotify.ALL_EVENTS self.mask = pyinotify.ALL_EVENTS
self.wm = WatchManager() self.wm = WatchManager()
self.md_manager = AirtimeMetadata() self.md_manager = AirtimeMetadata()
#define which directories the pyinotify WatchManager should watch.
def watch_directory(self, directory): def watch_directory(self, directory):
return self.wm.add_watch(directory, self.mask, rec=True, auto_add=True) return self.wm.add_watch(directory, self.mask, rec=True, auto_add=True)
@ -139,6 +140,7 @@ class AirtimeProcessEvent(ProcessEvent):
return filepath return filepath
#create path in /srv/airtime/stor/imported/[song-metadata]
def create_file_path(self, imported_filepath, orig_md): def create_file_path(self, imported_filepath, orig_md):
storage_directory = self.config.storage_directory storage_directory = self.config.storage_directory
@ -177,7 +179,6 @@ class AirtimeProcessEvent(ProcessEvent):
#yyyy-mm-dd-hh-MM-ss #yyyy-mm-dd-hh-MM-ss
y = orig_md['MDATA_KEY_YEAR'].split("-") y = orig_md['MDATA_KEY_YEAR'].split("-")
filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "recorded".encode('utf-8'), y[0], y[1], orig_md['MDATA_KEY_YEAR'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "recorded".encode('utf-8'), y[0], y[1], orig_md['MDATA_KEY_YEAR'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
is_recorded_show = True
elif(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')): elif(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')):
filepath = '%s/%s/%s/%s/%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) filepath = '%s/%s/%s/%s/%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
else: else:
@ -191,34 +192,53 @@ class AirtimeProcessEvent(ProcessEvent):
except Exception, e: except Exception, e:
self.logger.error('Exception: %s', e) self.logger.error('Exception: %s', e)
return filepath, is_recorded_show return filepath
#event.dir: True if the event was raised against a directory.
#event.name
#event.pathname: pathname (str): Concatenation of 'path' and 'name'.
def process_IN_CREATE(self, event): def process_IN_CREATE(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname) self.logger.debug("PROCESS_IN_CREATE")
self.handle_created_file(event.dir, event.name, event.pathname)
def handle_created_file(self, dir, name, pathname):
self.logger.debug("dir: %s, name: %s, pathname: %s ", dir, name, pathname)
storage_directory = self.config.storage_directory storage_directory = self.config.storage_directory
if not dir:
if not event.dir:
#file created is a tmp file which will be modified and then moved back to the original filename. #file created is a tmp file which will be modified and then moved back to the original filename.
if self.is_temp_file(event.name) : if self.is_temp_file(name) :
self.temp_files[event.pathname] = None self.temp_files[pathname] = None
#This is a newly imported file. #This is a newly imported file.
elif self.is_audio_file(event.pathname): elif self.is_audio_file(pathname):
if self.is_parent_directory(event.pathname, storage_directory): if self.is_parent_directory(pathname, storage_directory):
self.set_needed_file_permissions(event.pathname, event.dir) self.set_needed_file_permissions(pathname, dir)
file_md = self.md_manager.get_md_from_file(event.pathname)
if file_md is not None: self.process_new_file(pathname)
filepath, is_recorded_show = self.create_file_path(event.pathname, file_md)
self.move_file(event.pathname, filepath)
self.renamed_files[event.pathname] = filepath
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show})
else: else:
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'is_recorded_show': False})
else: else:
if self.is_parent_directory(event.pathname, storage_directory): if self.is_parent_directory(pathname, storage_directory):
self.set_needed_file_permissions(event.pathname, event.dir) self.set_needed_file_permissions(pathname, dir)
def process_new_file(self, pathname):
self.logger.info("Processing new file: %s", pathname)
file_md = self.md_manager.get_md_from_file(pathname)
if file_md is not None:
is_recorded_show = 'MDATA_KEY_CREATOR' in file_md and \
file_md['MDATA_KEY_CREATOR'] == "AIRTIMERECORDERSOURCEFABRIC".encode('utf-8')
if not self.is_parent_directory(pathname, self.config.imported_directory):
filepath = self.create_file_path(pathname, file_md)
self.move_file(pathname, filepath)
self.renamed_files[pathname] = filepath
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show})
else:
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'data': file_md, 'is_recorded_show': is_recorded_show})
def process_IN_MODIFY(self, event): def process_IN_MODIFY(self, event):
@ -276,18 +296,48 @@ class AirtimeProcessEvent(ProcessEvent):
def process_IN_DELETE(self, event): def process_IN_DELETE(self, event):
self.logger.info("%s: %s", event.maskname, event.pathname) self.logger.info("%s: %s", event.maskname, event.pathname)
if not event.dir: if not event.dir:
self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_DELETE}) self.handle_removed_file(event.pathname)
def handle_removed_file(self, pathname):
self.logger.info("Deleting %s", pathname)
self.file_events.append({'filepath': pathname, 'mode': self.config.MODE_DELETE})
def process_default(self, event): def process_default(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname) #self.logger.info("%s: %s", event.maskname, event.pathname)
pass pass
def execCommandAndReturnStdOut(self, command):
p = Popen(command, shell=True, stdout=PIPE)
stdout = p.communicate()[0]
if p.returncode != 0:
self.logger.warn("command \n%s\n return with a non-zero return value", command)
return stdout
def write_index_file(self):
#create a new index file.
self.logger.debug("writing new index file")
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % '/srv/airtime/stor'
stdout = self.execCommandAndReturnStdOut(command)
self.write_file('/var/tmp/airtime' + '/.airtime_media_index', stdout)
return stdout
def write_file(self, file, string):
f = open(file, 'w')
f.write(string)
f.close()
def notifier_loop_callback(self, notifier): def notifier_loop_callback(self, notifier):
for event in self.file_events: if len(self.file_events) > 0:
self.multi_queue.put(event) for event in self.file_events:
self.multi_queue.put(event)
self.file_events = []
self.file_events = []
#no file_events and queue is empty. This is a good time
#to write an index file.
self.write_index_file()
#check for any events recieved from Airtime. #check for any events recieved from Airtime.
try: try:

View file

@ -9,14 +9,14 @@ class AirtimeMediaConfig:
MODE_MOVED = "moved" MODE_MOVED = "moved"
MODE_DELETE = "delete" MODE_DELETE = "delete"
def __init__(self): def __init__(self, logger):
# loading config file # loading config file
try: try:
config = ConfigObj('/etc/airtime/media-monitor.cfg') config = ConfigObj('/etc/airtime/media-monitor.cfg')
self.cfg = config self.cfg = config
except Exception, e: except Exception, e:
print 'Error loading config: ', e logger.info('Error loading config: ', e)
sys.exit() sys.exit()
self.storage_directory = None self.storage_directory = None

View file

@ -13,10 +13,10 @@
with pidfile "/var/run/airtime-liquidsoap.pid" with pidfile "/var/run/airtime-liquidsoap.pid"
start program = "/etc/init.d/airtime-playout start" with timeout 10 seconds start program = "/etc/init.d/airtime-playout start" with timeout 10 seconds
stop program = "/etc/init.d/airtime-playout stop" stop program = "/etc/init.d/airtime-playout stop"
check process airtime-media-monitor # check process airtime-media-monitor
with pidfile "/var/run/airtime-media-monitor.pid" # with pidfile "/var/run/airtime-media-monitor.pid"
start program = "/etc/init.d/airtime-media-monitor start" with timeout 10 seconds # start program = "/etc/init.d/airtime-media-monitor start" with timeout 10 seconds
stop program = "/etc/init.d/airtime-media-monitor stop" # stop program = "/etc/init.d/airtime-media-monitor stop"
check process airtime-show-recorder check process airtime-show-recorder
with pidfile "/var/run/airtime-show-recorder.pid" with pidfile "/var/run/airtime-show-recorder.pid"
start program = "/etc/init.d/airtime-show-recorder start" with timeout 10 seconds start program = "/etc/init.d/airtime-show-recorder start" with timeout 10 seconds