From 89dd251782d52341e07c760117723b40be467e57 Mon Sep 17 00:00:00 2001 From: martin Date: Tue, 28 Jun 2011 18:01:30 -0400 Subject: [PATCH 1/9] cc-2419: media monitor import on start -initial check-in --- python_apps/media-monitor/MediaMonitor.py | 26 ++++--- .../airtime-media-monitor-init-d | 4 +- .../airtimemediamonitorbootstrap.py | 74 +++++++++++++++++++ .../airtimefilemonitor/mediaconfig.py | 4 +- python_apps/monit/airtime-monit.cfg | 8 +- 5 files changed, 98 insertions(+), 18 deletions(-) create mode 100644 python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index 172af7f6a..cd56ff707 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -3,6 +3,7 @@ import time import logging import logging.config import sys +import os import signal from multiprocessing import Process @@ -10,13 +11,14 @@ from multiprocessing import Process from airtimefilemonitor.airtimenotifier import AirtimeNotifier from airtimefilemonitor.airtimeprocessevent import AirtimeProcessEvent from airtimefilemonitor.mediaconfig import AirtimeMediaConfig +from airtimefilemonitor.airtimemediamonitorbootstrap import AirtimeMediaMonitorBootstrap def handleSigTERM(signum, frame): logger = logging.getLogger() - logger.info("Main Process Shutdown, TERM signal caught. %d") + logger.info("Main Process Shutdown, TERM signal caught.") for p in processes: - p.terminate() logger.info("Killed process. %d", p.pid) + p.terminate() sys.exit(0) @@ -26,13 +28,17 @@ try: logging.config.fileConfig("logging.cfg") except Exception, e: print 'Error configuring logging: ', e - sys.exit() + sys.exit(1) logger = logging.getLogger() processes = [] try: - config = AirtimeMediaConfig() + config = AirtimeMediaConfig(logger) + + bootstrap = AirtimeMediaMonitorBootstrap(logger) + bootstrap.scan() + logger.info("Initializing event processor") pe = AirtimeProcessEvent(airtime_config=config) @@ -45,8 +51,6 @@ try: processes.append(p) p.start() - signal.signal(signal.SIGTERM, handleSigTERM) - logger.info("Setting up monitor") response = None while response is None: @@ -61,14 +65,16 @@ try: logger.info("Added watch to %s", storage_directory) logger.info("wdd result %s", wdd[storage_directory]) + + #register signal before process forks and exits. + signal.signal(signal.SIGTERM, handleSigTERM) notifier.loop(callback=pe.notifier_loop_callback) - for p in processes: - p.join() + except KeyboardInterrupt: notifier.stop() + logger.info("Keyboard Interrupt") except Exception, e: - notifier.stop() + #notifier.stop() logger.error('Exception: %s', e) - diff --git a/python_apps/media-monitor/airtime-media-monitor-init-d b/python_apps/media-monitor/airtime-media-monitor-init-d index 8bf311161..38dc92895 100755 --- a/python_apps/media-monitor/airtime-media-monitor-init-d +++ b/python_apps/media-monitor/airtime-media-monitor-init-d @@ -17,13 +17,13 @@ DAEMON=/usr/lib/airtime/media-monitor/airtime-media-monitor PIDFILE=/var/run/airtime-media-monitor.pid start () { - monit monitor airtime-media-monitor >/dev/null 2>&1 + #monit monitor airtime-media-monitor >/dev/null 2>&1 start-stop-daemon --start --background --quiet --chuid $USERID:$GROUPID --make-pidfile --pidfile $PIDFILE --startas $DAEMON } stop () { # Send TERM after 5 seconds, wait at most 30 seconds. - monit unmonitor airtime-media-monitor >/dev/null 2>&1 + #monit unmonitor airtime-media-monitor >/dev/null 2>&1 start-stop-daemon --stop --oknodo --retry TERM/5/0/30 --quiet --pidfile $PIDFILE rm -f $PIDFILE } diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py new file mode 100644 index 000000000..96399a006 --- /dev/null +++ b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py @@ -0,0 +1,74 @@ +import os + +from subprocess import Popen, PIPE + + +class AirtimeMediaMonitorBootstrap(): + + def __init__(self, logger): + self.logger = logger + + """ + on bootup we want to scan all directories and look for files that + weren't there or files that changed before media-monitor process + went offline. We can do this by doing a hash of the directory metadata. + """ + def scan(self): + directories = ['/srv/airtime/stor'] + + for dir in directories: + self.check_for_diff(dir) + + def check_for_diff(self, dir): + airtime_tmp = '/var/tmp/airtime' + + if os.path.exists(airtime_tmp + '/.airtime_media_index'): + #a previous index exists, we can do a diff between this + #file and the current state to see whether anything has + #changed. + self.logger.info("Previous index file found.") + + + #find files that have been modified since the last time + #media-monitor process was running. + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -30" % dir + stdout = self.execCommandAndReturnStdOut(command) + self.logger.info("Files modified since last checkin: \n%s\n", stdout) + + + #find deleted files + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, airtime_tmp) + self.execCommand(command) + + command = "diff %s/.airtime_media_index.tmp %s/.airtime_media_index" % (airtime_tmp, airtime_tmp) + stdout = self.execCommandAndReturnStdOut(command) + self.logger.info("Deleted files since last checkin:\n%s\n", stdout) + + else: + #a previous index does not exist. Most likely means that + #media monitor has never seen this directory before. Let's + #notify airtime server about each of these files + self.logger.info("Previous index file does not exist. Creating a new one") + + #create a new index file. + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index" % (dir, airtime_tmp) + self.execCommand(command) + + def execCommand(self, command): + p = Popen(command, shell=True) + sts = os.waitpid(p.pid, 0)[1] + if sts != 0: + self.logger.warn("command \n%s\n return with a non-zero return value", command) + + def execCommandAndReturnStdOut(self, command): + p = Popen(command, shell=True, stdout=PIPE) + stdout = p.communicate()[0] + if p.returncode != 0: + self.logger.warn("command \n%s\n return with a non-zero return value", command) + return stdout + + +if __name__ == '__main__': + + mmb = AirtimeMediaMonitorBootstrap() + mmb.scan() \ No newline at end of file diff --git a/python_apps/media-monitor/airtimefilemonitor/mediaconfig.py b/python_apps/media-monitor/airtimefilemonitor/mediaconfig.py index d1a4e04ae..2b1197e24 100644 --- a/python_apps/media-monitor/airtimefilemonitor/mediaconfig.py +++ b/python_apps/media-monitor/airtimefilemonitor/mediaconfig.py @@ -9,14 +9,14 @@ class AirtimeMediaConfig: MODE_MOVED = "moved" MODE_DELETE = "delete" - def __init__(self): + def __init__(self, logger): # loading config file try: config = ConfigObj('/etc/airtime/media-monitor.cfg') self.cfg = config except Exception, e: - print 'Error loading config: ', e + logger.info('Error loading config: ', e) sys.exit() self.storage_directory = None diff --git a/python_apps/monit/airtime-monit.cfg b/python_apps/monit/airtime-monit.cfg index 647fffd10..22f05e2ca 100644 --- a/python_apps/monit/airtime-monit.cfg +++ b/python_apps/monit/airtime-monit.cfg @@ -13,10 +13,10 @@ with pidfile "/var/run/airtime-liquidsoap.pid" start program = "/etc/init.d/airtime-playout start" with timeout 10 seconds stop program = "/etc/init.d/airtime-playout stop" - check process airtime-media-monitor - with pidfile "/var/run/airtime-media-monitor.pid" - start program = "/etc/init.d/airtime-media-monitor start" with timeout 10 seconds - stop program = "/etc/init.d/airtime-media-monitor stop" +# check process airtime-media-monitor +# with pidfile "/var/run/airtime-media-monitor.pid" +# start program = "/etc/init.d/airtime-media-monitor start" with timeout 10 seconds +# stop program = "/etc/init.d/airtime-media-monitor stop" check process airtime-show-recorder with pidfile "/var/run/airtime-show-recorder.pid" start program = "/etc/init.d/airtime-show-recorder start" with timeout 10 seconds From 3335ff703a38a155303ac5823292c23899bca095 Mon Sep 17 00:00:00 2001 From: martin Date: Tue, 28 Jun 2011 18:01:30 -0400 Subject: [PATCH 2/9] cc-2419: media monitor import on start -initial check-in --- python_apps/media-monitor/MediaMonitor.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index cd56ff707..12bbc46ab 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -65,12 +65,9 @@ try: logger.info("Added watch to %s", storage_directory) logger.info("wdd result %s", wdd[storage_directory]) - #register signal before process forks and exits. signal.signal(signal.SIGTERM, handleSigTERM) notifier.loop(callback=pe.notifier_loop_callback) - - except KeyboardInterrupt: notifier.stop() From 4ab7523a84173aef768a8e97feee8416108c61e4 Mon Sep 17 00:00:00 2001 From: martin Date: Wed, 29 Jun 2011 17:26:42 -0400 Subject: [PATCH 3/9] CC-2419: Media monitor does not import files that already existed in /srv/airtime/stor -work in progress --- python_apps/media-monitor/MediaMonitor.py | 12 ++++---- .../airtimemediamonitorbootstrap.py | 28 +++++++++++-------- .../airtimefilemonitor/airtimenotifier.py | 7 +++++ .../airtimefilemonitor/airtimeprocessevent.py | 25 +++++++++-------- 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index 12bbc46ab..d8520321b 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -6,7 +6,7 @@ import sys import os import signal -from multiprocessing import Process +from multiprocessing import Process, Queue as mpQueue from airtimefilemonitor.airtimenotifier import AirtimeNotifier from airtimefilemonitor.airtimeprocessevent import AirtimeProcessEvent @@ -36,18 +36,20 @@ processes = [] try: config = AirtimeMediaConfig(logger) - bootstrap = AirtimeMediaMonitorBootstrap(logger) - bootstrap.scan() + multi_queue = mpQueue() + bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue) + bootstrap.scan() + logger.info("Initializing event processor") - pe = AirtimeProcessEvent(airtime_config=config) + pe = AirtimeProcessEvent(multi_queue, airtime_config=config) notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config) notifier.coalesce_events() #create 5 worker processes for i in range(5): - p = Process(target=notifier.process_file_events, args=(pe.multi_queue,)) + p = Process(target=notifier.process_file_events, args=(multi_queue,)) processes.append(p) p.start() diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py index 96399a006..70f5a8f40 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py @@ -5,8 +5,9 @@ from subprocess import Popen, PIPE class AirtimeMediaMonitorBootstrap(): - def __init__(self, logger): + def __init__(self, logger, multi_queue): self.logger = logger + self.multi_queue = multi_queue """ on bootup we want to scan all directories and look for files that @@ -21,21 +22,21 @@ class AirtimeMediaMonitorBootstrap(): def check_for_diff(self, dir): airtime_tmp = '/var/tmp/airtime' + + #find files that have been modified since the last time + #media-monitor process was running. + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -30" % dir + stdout = self.execCommandAndReturnStdOut(command) + self.logger.info("Files modified since last checkin: \n%s\n", stdout) + + #TODO: notify about modified and newly created files (not including copied files) if os.path.exists(airtime_tmp + '/.airtime_media_index'): #a previous index exists, we can do a diff between this #file and the current state to see whether anything has #changed. self.logger.info("Previous index file found.") - - - #find files that have been modified since the last time - #media-monitor process was running. - command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -30" % dir - stdout = self.execCommandAndReturnStdOut(command) - self.logger.info("Files modified since last checkin: \n%s\n", stdout) - - + #find deleted files command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, airtime_tmp) self.execCommand(command) @@ -43,6 +44,8 @@ class AirtimeMediaMonitorBootstrap(): command = "diff %s/.airtime_media_index.tmp %s/.airtime_media_index" % (airtime_tmp, airtime_tmp) stdout = self.execCommandAndReturnStdOut(command) self.logger.info("Deleted files since last checkin:\n%s\n", stdout) + + #TODO: notify about deleted files and files moved here else: #a previous index does not exist. Most likely means that @@ -53,6 +56,9 @@ class AirtimeMediaMonitorBootstrap(): #create a new index file. command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index" % (dir, airtime_tmp) self.execCommand(command) + + #TODO: notify about all files in this directory. + self.multi_queue.put(event) def execCommand(self, command): p = Popen(command, shell=True) @@ -71,4 +77,4 @@ class AirtimeMediaMonitorBootstrap(): if __name__ == '__main__': mmb = AirtimeMediaMonitorBootstrap() - mmb.scan() \ No newline at end of file + mmb.scan() diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py index ee41c249e..b2dfe7d1b 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py @@ -104,6 +104,13 @@ class AirtimeNotifier(Notifier): mm.watch_directory(new_storage_directory) + #update airtime with information about files discovered in our + #watched directories. Pass in a dict() object with the following + #attributes: + # -filepath + # -mode + # -data + # -is_recorded_show def update_airtime(self, d): filepath = d['filepath'] diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py index 7ec3c941d..a736d85b1 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py @@ -4,8 +4,6 @@ import grp import pwd import logging -from multiprocessing import Process, Lock, Queue as mpQueue - import pyinotify from pyinotify import WatchManager, Notifier, ProcessEvent @@ -18,7 +16,7 @@ from airtimefilemonitor.mediaconfig import AirtimeMediaConfig class AirtimeProcessEvent(ProcessEvent): - def my_init(self, airtime_config=None): + def my_init(self, queue, airtime_config=None): """ Method automatically called from ProcessEvent.__init__(). Additional keyworded arguments passed to ProcessEvent.__init__() are then @@ -34,11 +32,12 @@ class AirtimeProcessEvent(ProcessEvent): self.gui_replaced = {} self.renamed_files = {} self.file_events = [] - self.multi_queue = mpQueue() + self.multi_queue = queue self.mask = pyinotify.ALL_EVENTS self.wm = WatchManager() self.md_manager = AirtimeMetadata() + #define which directories the pyinotify WatchManager should watch. def watch_directory(self, directory): return self.wm.add_watch(directory, self.mask, rec=True, auto_add=True) @@ -206,13 +205,8 @@ class AirtimeProcessEvent(ProcessEvent): elif self.is_audio_file(event.pathname): if self.is_parent_directory(event.pathname, storage_directory): self.set_needed_file_permissions(event.pathname, event.dir) - file_md = self.md_manager.get_md_from_file(event.pathname) - - if file_md is not None: - filepath, is_recorded_show = self.create_file_path(event.pathname, file_md) - self.move_file(event.pathname, filepath) - self.renamed_files[event.pathname] = filepath - self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) + + self.process_new_file(event.pathname) else: self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) @@ -220,7 +214,16 @@ class AirtimeProcessEvent(ProcessEvent): if self.is_parent_directory(event.pathname, storage_directory): self.set_needed_file_permissions(event.pathname, event.dir) + def process_new_file(pathname): + file_md = self.md_manager.get_md_from_file(pathname) + if file_md is not None: + filepath, is_recorded_show = self.create_file_path(pathname, file_md) + self.move_file(pathname, filepath) + self.renamed_files[pathname] = filepath + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) + + def process_IN_MODIFY(self, event): if not event.dir: self.logger.info("%s: %s", event.maskname, event.pathname) From 4f804b17c722af179bb9804115bf2fff2dc073ac Mon Sep 17 00:00:00 2001 From: martin Date: Tue, 28 Jun 2011 18:01:30 -0400 Subject: [PATCH 4/9] cc-2419: media monitor import on start -initial check-in --- python_apps/media-monitor/MediaMonitor.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index d8520321b..ec02392ee 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -36,11 +36,9 @@ processes = [] try: config = AirtimeMediaConfig(logger) - multi_queue = mpQueue() - - bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue) + bootstrap = AirtimeMediaMonitorBootstrap(logger) bootstrap.scan() - + logger.info("Initializing event processor") pe = AirtimeProcessEvent(multi_queue, airtime_config=config) From 397007efee1bd42d5042123cb20d59014058ec0f Mon Sep 17 00:00:00 2001 From: martin Date: Tue, 28 Jun 2011 18:01:30 -0400 Subject: [PATCH 5/9] cc-2419: media monitor import on start -initial check-in --- python_apps/media-monitor/MediaMonitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index ec02392ee..407d2abcf 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -37,7 +37,7 @@ try: config = AirtimeMediaConfig(logger) bootstrap = AirtimeMediaMonitorBootstrap(logger) - bootstrap.scan() + bootstrap.scan() logger.info("Initializing event processor") pe = AirtimeProcessEvent(multi_queue, airtime_config=config) From 778b645a692d3fcb20735ebd1ea6b5234dc88098 Mon Sep 17 00:00:00 2001 From: martin Date: Wed, 29 Jun 2011 17:26:42 -0400 Subject: [PATCH 6/9] CC-2419: Media monitor does not import files that already existed in /srv/airtime/stor -work in progress --- python_apps/media-monitor/MediaMonitor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index 407d2abcf..f9d4e9643 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -36,9 +36,12 @@ processes = [] try: config = AirtimeMediaConfig(logger) - bootstrap = AirtimeMediaMonitorBootstrap(logger) - bootstrap.scan() + multi_queue = mpQueue() + + bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue) + bootstrap.scan() + logger.info("Initializing event processor") pe = AirtimeProcessEvent(multi_queue, airtime_config=config) From 851d5c12b25cebf9735443c06309cff1d9fbaf63 Mon Sep 17 00:00:00 2001 From: martin Date: Thu, 30 Jun 2011 11:56:48 -0400 Subject: [PATCH 7/9] cc-2419: media monitor import on start -files importing working... --- python_apps/media-monitor/MediaMonitor.py | 32 +++++++-------- .../airtimemediamonitorbootstrap.py | 40 +++++++++++++++---- .../airtimefilemonitor/airtimeprocessevent.py | 39 +++++++++++------- 3 files changed, 72 insertions(+), 39 deletions(-) diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index f9d4e9643..eee4b8ca9 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -37,33 +37,31 @@ try: config = AirtimeMediaConfig(logger) multi_queue = mpQueue() - - - bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue) - bootstrap.scan() - logger.info("Initializing event processor") - pe = AirtimeProcessEvent(multi_queue, airtime_config=config) + pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config) notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config) notifier.coalesce_events() + + logger.info("Setting up monitor") + response = None + while response is None: + response = notifier.api_client.setup_media_monitor() + time.sleep(5) + + storage_directory = response["stor"].encode('utf-8') + logger.info("Storage Directory is: %s", storage_directory) + config.storage_directory = storage_directory + + bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe) + bootstrap.scan() #create 5 worker processes for i in range(5): p = Process(target=notifier.process_file_events, args=(multi_queue,)) processes.append(p) p.start() - - logger.info("Setting up monitor") - response = None - while response is None: - response = notifier.api_client.setup_media_monitor() - time.sleep(5) - - storage_directory = response["stor"].encode('utf-8') - logger.info("Storage Directory is: %s", storage_directory) - config.storage_directory = storage_directory - + wdd = pe.watch_directory(storage_directory) logger.info("Added watch to %s", storage_directory) logger.info("wdd result %s", wdd[storage_directory]) diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py index 70f5a8f40..cc22dc45d 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py @@ -5,9 +5,10 @@ from subprocess import Popen, PIPE class AirtimeMediaMonitorBootstrap(): - def __init__(self, logger, multi_queue): + def __init__(self, logger, multi_queue, pe): self.logger = logger self.multi_queue = multi_queue + self.pe = pe """ on bootup we want to scan all directories and look for files that @@ -22,16 +23,24 @@ class AirtimeMediaMonitorBootstrap(): def check_for_diff(self, dir): airtime_tmp = '/var/tmp/airtime' + + #set to hold new and/or modified files. We use a set to make it ok if files are added + #twice. This is become some of the tests for new files return result sets that are not + #mutually exclusive from each other. + modified_files = set() #find files that have been modified since the last time #media-monitor process was running. command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -30" % dir stdout = self.execCommandAndReturnStdOut(command) self.logger.info("Files modified since last checkin: \n%s\n", stdout) - - #TODO: notify about modified and newly created files (not including copied files) + + new_files = stdout.split('\n') + + for file_path in new_files: + modified_files.add(file_path) - if os.path.exists(airtime_tmp + '/.airtime_media_index'): + if os.path.exists(airtime_tmp + '/.airtime_media_index') and False: #a previous index exists, we can do a diff between this #file and the current state to see whether anything has #changed. @@ -54,11 +63,26 @@ class AirtimeMediaMonitorBootstrap(): self.logger.info("Previous index file does not exist. Creating a new one") #create a new index file. - command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index" % (dir, airtime_tmp) - self.execCommand(command) + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % dir + stdout = self.execCommandAndReturnStdOut(command) + self.logger.info("New files found: \n%s\n", stdout) + self.write_file(airtime_tmp + '/.airtime_media_index', stdout) + + new_files = stdout.split('\n') + + for file_path in new_files: + modified_files.add(file_path) + + self.logger.debug("set size: %d", len(modified_files)) - #TODO: notify about all files in this directory. - self.multi_queue.put(event) + for file_path in modified_files: + if os.path.exists(file_path): + self.pe.handle_created_file(False, os.path.basename(file_path), file_path) + + def write_file(self, file, string): + f = open(file, 'w') + f.write(string) + f.close() def execCommand(self, command): p = Popen(command, shell=True) diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py index a736d85b1..994ad2c37 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py @@ -138,6 +138,7 @@ class AirtimeProcessEvent(ProcessEvent): return filepath + #create path in /srv/airtime/stor/imported/[song-metadata] def create_file_path(self, imported_filepath, orig_md): storage_directory = self.config.storage_directory @@ -192,29 +193,39 @@ class AirtimeProcessEvent(ProcessEvent): return filepath, is_recorded_show + #event.dir: True if the event was raised against a directory. + #event.name + #event.pathname: pathname (str): Concatenation of 'path' and 'name'. def process_IN_CREATE(self, event): - #self.logger.info("%s: %s", event.maskname, event.pathname) - storage_directory = self.config.storage_directory + self.logger.debug("PROCESS_IN_CREATE") + self.handle_created_file(event.dir, event.name, event.pathname) - if not event.dir: + + def handle_created_file(self, dir, name, pathname): + + self.logger.debug("dir: %s, name: %s, pathname: %s ", dir, name, pathname) + storage_directory = self.config.storage_directory + if not dir: #file created is a tmp file which will be modified and then moved back to the original filename. - if self.is_temp_file(event.name) : - self.temp_files[event.pathname] = None + if self.is_temp_file(name) : + self.temp_files[pathname] = None #This is a newly imported file. - elif self.is_audio_file(event.pathname): - if self.is_parent_directory(event.pathname, storage_directory): - self.set_needed_file_permissions(event.pathname, event.dir) + elif self.is_audio_file(pathname): + if self.is_parent_directory(pathname, storage_directory): + self.set_needed_file_permissions(pathname, dir) - self.process_new_file(event.pathname) + self.process_new_file(pathname) else: - self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'is_recorded_show': False}) else: - if self.is_parent_directory(event.pathname, storage_directory): - self.set_needed_file_permissions(event.pathname, event.dir) - - def process_new_file(pathname): + if self.is_parent_directory(pathname, storage_directory): + self.set_needed_file_permissions(pathname, dir) + + + def process_new_file(self, pathname): + self.logger.info("Processing new file: %s", pathname) file_md = self.md_manager.get_md_from_file(pathname) if file_md is not None: From 0f67db9ed707874f08bf885fc3d996af76eb1eb9 Mon Sep 17 00:00:00 2001 From: martin Date: Thu, 30 Jun 2011 17:23:31 -0400 Subject: [PATCH 8/9] cc-2419 media monitor import on start --- python_apps/media-monitor/MediaMonitor.py | 3 +- .../airtimemediamonitorbootstrap.py | 85 ++++++++++--------- .../airtimefilemonitor/airtimenotifier.py | 4 +- .../airtimefilemonitor/airtimeprocessevent.py | 66 +++++++++++--- 4 files changed, 103 insertions(+), 55 deletions(-) diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index eee4b8ca9..909371f75 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -40,7 +40,7 @@ try: logger.info("Initializing event processor") pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config) - notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config) + notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=0, airtime_config=config) notifier.coalesce_events() logger.info("Setting up monitor") @@ -52,6 +52,7 @@ try: storage_directory = response["stor"].encode('utf-8') logger.info("Storage Directory is: %s", storage_directory) config.storage_directory = storage_directory + config.imported_directory = storage_directory + '/imported' bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe) bootstrap.scan() diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py index cc22dc45d..63bc69ab6 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py @@ -1,4 +1,5 @@ import os +import time from subprocess import Popen, PIPE @@ -9,6 +10,7 @@ class AirtimeMediaMonitorBootstrap(): self.logger = logger self.multi_queue = multi_queue self.pe = pe + self.airtime_tmp = '/var/tmp/airtime' """ on bootup we want to scan all directories and look for files that @@ -21,41 +23,56 @@ class AirtimeMediaMonitorBootstrap(): for dir in directories: self.check_for_diff(dir) - def check_for_diff(self, dir): - airtime_tmp = '/var/tmp/airtime' - + def check_for_diff(self, dir): #set to hold new and/or modified files. We use a set to make it ok if files are added #twice. This is become some of the tests for new files return result sets that are not #mutually exclusive from each other. - modified_files = set() - - #find files that have been modified since the last time - #media-monitor process was running. - command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -30" % dir - stdout = self.execCommandAndReturnStdOut(command) - self.logger.info("Files modified since last checkin: \n%s\n", stdout) - - new_files = stdout.split('\n') - - for file_path in new_files: - modified_files.add(file_path) + added_files = set() + removed_files = set() - if os.path.exists(airtime_tmp + '/.airtime_media_index') and False: + if os.path.exists(self.airtime_tmp + '/.airtime_media_index'): + + #find files that have been modified since the last time + #media-monitor process was running. + time_diff_sec = time.time() - os.path.getmtime(self.airtime_tmp + '/.airtime_media_index') + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -%d" % (dir, time_diff_sec/60+1) + self.logger.debug(command) + stdout = self.execCommandAndReturnStdOut(command) + self.logger.info("Files modified since last checkin: \n%s\n", stdout) + + new_files = stdout.split('\n') + + for file_path in new_files: + added_files.add(file_path) + + + + #a previous index exists, we can do a diff between this #file and the current state to see whether anything has #changed. self.logger.info("Previous index file found.") #find deleted files - command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, airtime_tmp) + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, self.airtime_tmp) self.execCommand(command) - command = "diff %s/.airtime_media_index.tmp %s/.airtime_media_index" % (airtime_tmp, airtime_tmp) + command = "diff -u %s/.airtime_media_index %s/.airtime_media_index.tmp" % (self.airtime_tmp, self.airtime_tmp) stdout = self.execCommandAndReturnStdOut(command) - self.logger.info("Deleted files since last checkin:\n%s\n", stdout) - - #TODO: notify about deleted files and files moved here + #remove first 3 lines from the diff output. + stdoutSplit = (stdout.split('\n'))[3:] + + self.logger.info("Changed files since last checkin:\n%s\n", "\n".join(stdoutSplit)) + + for line in stdoutSplit: + if len(line.strip(' ')) > 0: + if line[0] == '+': + added_files.add(line[1:]) + elif line[0] == '-': + removed_files.add(line[1:]) + + self.pe.write_index_file() else: #a previous index does not exist. Most likely means that #media monitor has never seen this directory before. Let's @@ -63,27 +80,20 @@ class AirtimeMediaMonitorBootstrap(): self.logger.info("Previous index file does not exist. Creating a new one") #create a new index file. - command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % dir - stdout = self.execCommandAndReturnStdOut(command) - self.logger.info("New files found: \n%s\n", stdout) - self.write_file(airtime_tmp + '/.airtime_media_index', stdout) + stdout = self.pe.write_index_file() new_files = stdout.split('\n') for file_path in new_files: - modified_files.add(file_path) + added_files.add(file_path) - self.logger.debug("set size: %d", len(modified_files)) - - for file_path in modified_files: + for file_path in added_files: if os.path.exists(file_path): self.pe.handle_created_file(False, os.path.basename(file_path), file_path) - - def write_file(self, file, string): - f = open(file, 'w') - f.write(string) - f.close() + for file_path in removed_files: + self.pe.handle_removed_file(file_path) + def execCommand(self, command): p = Popen(command, shell=True) sts = os.waitpid(p.pid, 0)[1] @@ -96,9 +106,4 @@ class AirtimeMediaMonitorBootstrap(): if p.returncode != 0: self.logger.warn("command \n%s\n return with a non-zero return value", command) return stdout - - -if __name__ == '__main__': - - mmb = AirtimeMediaMonitorBootstrap() - mmb.scan() + \ No newline at end of file diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py index b2dfe7d1b..fd29a5e5a 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py @@ -3,8 +3,6 @@ import time import os import logging -from multiprocessing import Process, Lock, Queue as mpQueue - # For RabbitMQ from kombu.connection import BrokerConnection from kombu.messaging import Exchange, Queue, Consumer, Producer @@ -26,6 +24,7 @@ class AirtimeNotifier(Notifier): self.md_manager = AirtimeMetadata() self.import_processes = {} self.watched_folders = [] + while not self.init_rabbit_mq(): self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") @@ -112,7 +111,6 @@ class AirtimeNotifier(Notifier): # -data # -is_recorded_show def update_airtime(self, d): - filepath = d['filepath'] mode = d['mode'] diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py index 994ad2c37..846cb3c41 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py @@ -4,6 +4,8 @@ import grp import pwd import logging +from subprocess import Popen, PIPE + import pyinotify from pyinotify import WatchManager, Notifier, ProcessEvent @@ -36,6 +38,11 @@ class AirtimeProcessEvent(ProcessEvent): self.mask = pyinotify.ALL_EVENTS self.wm = WatchManager() self.md_manager = AirtimeMetadata() + + #Set to "True" everytime we get a file event so + #that we can track of when we need to rewrite the + #index file + self.dirty = False #define which directories the pyinotify WatchManager should watch. def watch_directory(self, directory): @@ -177,7 +184,6 @@ class AirtimeProcessEvent(ProcessEvent): #yyyy-mm-dd-hh-MM-ss y = orig_md['MDATA_KEY_YEAR'].split("-") filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "recorded".encode('utf-8'), y[0], y[1], orig_md['MDATA_KEY_YEAR'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) - is_recorded_show = True elif(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')): filepath = '%s/%s/%s/%s/%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) else: @@ -191,7 +197,7 @@ class AirtimeProcessEvent(ProcessEvent): except Exception, e: self.logger.error('Exception: %s', e) - return filepath, is_recorded_show + return filepath #event.dir: True if the event was raised against a directory. #event.name @@ -229,10 +235,15 @@ class AirtimeProcessEvent(ProcessEvent): file_md = self.md_manager.get_md_from_file(pathname) if file_md is not None: - filepath, is_recorded_show = self.create_file_path(pathname, file_md) - self.move_file(pathname, filepath) - self.renamed_files[pathname] = filepath - self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) + is_recorded_show = 'MDATA_KEY_CREATOR' in file_md and \ + file_md['MDATA_KEY_CREATOR'] == "AIRTIMERECORDERSOURCEFABRIC".encode('utf-8') + if not self.is_parent_directory(pathname, self.config.imported_directory): + filepath = self.create_file_path(pathname, file_md) + self.move_file(pathname, filepath) + self.renamed_files[pathname] = filepath + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) + else: + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'data': file_md, 'is_recorded_show': is_recorded_show}) def process_IN_MODIFY(self, event): @@ -290,18 +301,51 @@ class AirtimeProcessEvent(ProcessEvent): def process_IN_DELETE(self, event): self.logger.info("%s: %s", event.maskname, event.pathname) if not event.dir: - self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_DELETE}) + self.handle_removed_file(event.pathname) + + def handle_removed_file(self, pathname): + self.logger.info("Deleting %s", pathname) + self.file_events.append({'filepath': pathname, 'mode': self.config.MODE_DELETE}) + def process_default(self, event): #self.logger.info("%s: %s", event.maskname, event.pathname) pass + + def execCommandAndReturnStdOut(self, command): + p = Popen(command, shell=True, stdout=PIPE) + stdout = p.communicate()[0] + if p.returncode != 0: + self.logger.warn("command \n%s\n return with a non-zero return value", command) + return stdout + + def write_index_file(self): + #create a new index file. + self.logger.debug("writing new index file") + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % '/srv/airtime/stor' + stdout = self.execCommandAndReturnStdOut(command) + self.write_file('/var/tmp/airtime' + '/.airtime_media_index', stdout) + return stdout + + def write_file(self, file, string): + f = open(file, 'w') + f.write(string) + f.close() def notifier_loop_callback(self, notifier): + + if len(self.file_events) > 0: + for event in self.file_events: + self.multi_queue.put(event) - for event in self.file_events: - self.multi_queue.put(event) - - self.file_events = [] + self.dirty = True + self.file_events = [] + elif self.multi_queue.empty(): + #no file_events and queue is empty. This is a good time + #to write an index file. + if self.dirty: + self.write_index_file() + self.dirty = False #check for any events recieved from Airtime. try: From d4e4090429edd5e50bb9266989278c9a19c233f2 Mon Sep 17 00:00:00 2001 From: martin Date: Thu, 30 Jun 2011 18:00:46 -0400 Subject: [PATCH 9/9] cc-2419: media monitor import on start -send removed file list before added file list --- .../airtimemediamonitorbootstrap.py | 8 ++++---- .../airtimefilemonitor/airtimeprocessevent.py | 12 ++---------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py index 63bc69ab6..d6efe9a10 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py @@ -66,7 +66,7 @@ class AirtimeMediaMonitorBootstrap(): self.logger.info("Changed files since last checkin:\n%s\n", "\n".join(stdoutSplit)) for line in stdoutSplit: - if len(line.strip(' ')) > 0: + if len(line.strip(' ')) > 1: if line[0] == '+': added_files.add(line[1:]) elif line[0] == '-': @@ -87,12 +87,12 @@ class AirtimeMediaMonitorBootstrap(): for file_path in new_files: added_files.add(file_path) + for file_path in removed_files: + self.pe.handle_removed_file(file_path) + for file_path in added_files: if os.path.exists(file_path): self.pe.handle_created_file(False, os.path.basename(file_path), file_path) - - for file_path in removed_files: - self.pe.handle_removed_file(file_path) def execCommand(self, command): p = Popen(command, shell=True) diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py index 846cb3c41..f22ec7993 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py @@ -38,11 +38,6 @@ class AirtimeProcessEvent(ProcessEvent): self.mask = pyinotify.ALL_EVENTS self.wm = WatchManager() self.md_manager = AirtimeMetadata() - - #Set to "True" everytime we get a file event so - #that we can track of when we need to rewrite the - #index file - self.dirty = False #define which directories the pyinotify WatchManager should watch. def watch_directory(self, directory): @@ -338,14 +333,11 @@ class AirtimeProcessEvent(ProcessEvent): for event in self.file_events: self.multi_queue.put(event) - self.dirty = True + self.file_events = [] - elif self.multi_queue.empty(): #no file_events and queue is empty. This is a good time #to write an index file. - if self.dirty: - self.write_index_file() - self.dirty = False + self.write_index_file() #check for any events recieved from Airtime. try: