cc-1799 : Filesystem

running Notifier as a daemon, allows parallel processing of event queue at all times.
This commit is contained in:
Naomi Aro 2011-06-28 11:39:54 +02:00
parent 8b533bbd21
commit 4c8e6a04ac
4 changed files with 48 additions and 67 deletions

View file

@ -22,56 +22,48 @@ def handleSigTERM(signum, frame):
sys.exit(0) sys.exit(0)
if __name__ == '__main__': # configure logging
try:
logging.config.fileConfig("logging.cfg")
except Exception, e:
print 'Error configuring logging: ', e
sys.exit()
# configure logging logger = logging.getLogger()
try: p = None
logging.config.fileConfig("logging.cfg")
except Exception, e:
print 'Error configuring logging: ', e
sys.exit()
logger = logging.getLogger() try:
p = None config = AirtimeMediaConfig()
logger.info("Initializing event processor")
pe = AirtimeProcessEvent(airtime_config=config)
try: notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=1, airtime_config=config)
config = AirtimeMediaConfig() notifier.coalesce_events()
logger.info("Initializing event processor")
pe = AirtimeProcessEvent(airtime_config=config)
notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=1, airtime_config=config) p = Process(target=notifier.process_file_events, args=(pe.file_events,))
notifier.coalesce_events() p.start()
p = Process(target=notifier.process_file_events, args=(pe.file_events,)) signal.signal(signal.SIGTERM, handleSigTERM)
p.daemon = True
p.start()
signal.signal(signal.SIGTERM, handleSigTERM) logger.info("Setting up monitor")
response = None
while response is None:
response = notifier.api_client.setup_media_monitor()
time.sleep(5)
logger.info("Setting up monitor") storage_directory = response["stor"].encode('utf-8')
response = None logger.info("Storage Directory is: %s", storage_directory)
while response is None: config.storage_directory = storage_directory
response = notifier.api_client.setup_media_monitor()
time.sleep(5)
storage_directory = response["stor"].encode('utf-8') wdd = pe.watch_directory(storage_directory)
logger.info("Storage Directory is: %s", storage_directory) logger.info("Added watch to %s", storage_directory)
config.storage_directory = storage_directory logger.info("wdd result %s", wdd[storage_directory])
wdd = pe.watch_directory(storage_directory) notifier.loop(daemonize=True, callback=pe.notifier_loop_callback, pid_file='/var/run/airtime-notifier.pid', stdout='/var/log/airtime/media-monitor/media-monitor.log')
logger.info("Added watch to %s", storage_directory)
logger.info("wdd result %s", wdd[storage_directory])
#notifier.loop(callback=mm.notifier_loop_callback) except KeyboardInterrupt:
notifier.stop()
while True: except Exception, e:
if(notifier.check_events(1)): notifier.stop()
notifier.read_events() logger.error('Exception: %s', e)
notifier.process_events()
pe.notifier_loop_callback(notifier)
except KeyboardInterrupt:
notifier.stop()
except Exception, e:
logger.error('Exception: %s', e)

View file

@ -66,12 +66,10 @@ class AirtimeNotifier(Notifier):
elif m['event_type'] == "new_watch": elif m['event_type'] == "new_watch":
self.logger.info("AIRTIME NOTIFIER add watched folder event " + m['directory']) self.logger.info("AIRTIME NOTIFIER add watched folder event " + m['directory'])
#start a new process to walk through this folder and add the files to Airtime. self.walk_newly_watched_directory(m['directory'])
p = Process(target=self.walk_newly_watched_directory, args=(m['directory'],))
p.start() mm = self.proc_fun()
self.import_processes[m['directory']] = p mm.watch_directory(m['directory'])
#add this new folder to our list of watched folders
self.watched_folders.append(m['directory'])
elif m['event_type'] == "remove_watch": elif m['event_type'] == "remove_watch":
watched_directory = m['directory'].encode('utf-8') watched_directory = m['directory'].encode('utf-8')
@ -163,5 +161,7 @@ class AirtimeNotifier(Notifier):
full_filepath = path+"/"+filename full_filepath = path+"/"+filename
if mm.is_audio_file(full_filepath): if mm.is_audio_file(full_filepath):
self.update_airtime({'filepath': full_filepath, 'mode': self.config.MODE_CREATE, 'is_recorded_show': False}) self.logger.info("importing %s", full_filepath)
event = {'filepath': full_filepath, 'mode': self.config.MODE_CREATE, 'is_recorded_show': False}
mm.file_events.put(event)

View file

@ -37,11 +37,6 @@ class AirtimeProcessEvent(ProcessEvent):
self.wm = WatchManager() self.wm = WatchManager()
self.md_manager = AirtimeMetadata() self.md_manager = AirtimeMetadata()
schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem")
connection = BrokerConnection(self.config.cfg["rabbitmq_host"], self.config.cfg["rabbitmq_user"], self.config.cfg["rabbitmq_password"], "/")
channel = connection.channel()
def watch_directory(self, directory): def watch_directory(self, directory):
return self.wm.add_watch(directory, self.mask, rec=True, auto_add=True) return self.wm.add_watch(directory, self.mask, rec=True, auto_add=True)
@ -278,13 +273,6 @@ class AirtimeProcessEvent(ProcessEvent):
def notifier_loop_callback(self, notifier): def notifier_loop_callback(self, notifier):
#put a watch on any fully imported watched directories.
for watched_directory in notifier.import_processes.keys():
process = notifier.import_processes[watched_directory]
if not process.is_alive():
self.watch_directory(watched_directory)
del notifier.import_processes[watched_directory]
#check for any events recieved from Airtime. #check for any events recieved from Airtime.
try: try:
notifier.connection.drain_events(timeout=0.1) notifier.connection.drain_events(timeout=0.1)

View file

@ -29,6 +29,7 @@ try:
os.system("/etc/init.d/airtime-media-monitor stop") os.system("/etc/init.d/airtime-media-monitor stop")
os.system("rm -f /etc/init.d/airtime-media-monitor") os.system("rm -f /etc/init.d/airtime-media-monitor")
os.system("rm -f /var/run/airtime-notifier.pid")
os.system("update-rc.d -f airtime-media-monitor remove >/dev/null 2>&1") os.system("update-rc.d -f airtime-media-monitor remove >/dev/null 2>&1")
print "Removing log directories" print "Removing log directories"