From 0f67db9ed707874f08bf885fc3d996af76eb1eb9 Mon Sep 17 00:00:00 2001 From: martin Date: Thu, 30 Jun 2011 17:23:31 -0400 Subject: [PATCH] cc-2419 media monitor import on start --- python_apps/media-monitor/MediaMonitor.py | 3 +- .../airtimemediamonitorbootstrap.py | 85 ++++++++++--------- .../airtimefilemonitor/airtimenotifier.py | 4 +- .../airtimefilemonitor/airtimeprocessevent.py | 66 +++++++++++--- 4 files changed, 103 insertions(+), 55 deletions(-) diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index eee4b8ca9..909371f75 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -40,7 +40,7 @@ try: logger.info("Initializing event processor") pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config) - notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config) + notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=0, airtime_config=config) notifier.coalesce_events() logger.info("Setting up monitor") @@ -52,6 +52,7 @@ try: storage_directory = response["stor"].encode('utf-8') logger.info("Storage Directory is: %s", storage_directory) config.storage_directory = storage_directory + config.imported_directory = storage_directory + '/imported' bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe) bootstrap.scan() diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py index cc22dc45d..63bc69ab6 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py @@ -1,4 +1,5 @@ import os +import time from subprocess import Popen, PIPE @@ -9,6 +10,7 @@ class AirtimeMediaMonitorBootstrap(): self.logger = logger self.multi_queue = multi_queue self.pe = pe + self.airtime_tmp = '/var/tmp/airtime' """ on bootup we want to scan all directories and look for files that @@ -21,41 +23,56 @@ class AirtimeMediaMonitorBootstrap(): for dir in directories: self.check_for_diff(dir) - def check_for_diff(self, dir): - airtime_tmp = '/var/tmp/airtime' - + def check_for_diff(self, dir): #set to hold new and/or modified files. We use a set to make it ok if files are added #twice. This is become some of the tests for new files return result sets that are not #mutually exclusive from each other. - modified_files = set() - - #find files that have been modified since the last time - #media-monitor process was running. - command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -30" % dir - stdout = self.execCommandAndReturnStdOut(command) - self.logger.info("Files modified since last checkin: \n%s\n", stdout) - - new_files = stdout.split('\n') - - for file_path in new_files: - modified_files.add(file_path) + added_files = set() + removed_files = set() - if os.path.exists(airtime_tmp + '/.airtime_media_index') and False: + if os.path.exists(self.airtime_tmp + '/.airtime_media_index'): + + #find files that have been modified since the last time + #media-monitor process was running. + time_diff_sec = time.time() - os.path.getmtime(self.airtime_tmp + '/.airtime_media_index') + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -%d" % (dir, time_diff_sec/60+1) + self.logger.debug(command) + stdout = self.execCommandAndReturnStdOut(command) + self.logger.info("Files modified since last checkin: \n%s\n", stdout) + + new_files = stdout.split('\n') + + for file_path in new_files: + added_files.add(file_path) + + + + #a previous index exists, we can do a diff between this #file and the current state to see whether anything has #changed. self.logger.info("Previous index file found.") #find deleted files - command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, airtime_tmp) + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, self.airtime_tmp) self.execCommand(command) - command = "diff %s/.airtime_media_index.tmp %s/.airtime_media_index" % (airtime_tmp, airtime_tmp) + command = "diff -u %s/.airtime_media_index %s/.airtime_media_index.tmp" % (self.airtime_tmp, self.airtime_tmp) stdout = self.execCommandAndReturnStdOut(command) - self.logger.info("Deleted files since last checkin:\n%s\n", stdout) - - #TODO: notify about deleted files and files moved here + #remove first 3 lines from the diff output. + stdoutSplit = (stdout.split('\n'))[3:] + + self.logger.info("Changed files since last checkin:\n%s\n", "\n".join(stdoutSplit)) + + for line in stdoutSplit: + if len(line.strip(' ')) > 0: + if line[0] == '+': + added_files.add(line[1:]) + elif line[0] == '-': + removed_files.add(line[1:]) + + self.pe.write_index_file() else: #a previous index does not exist. Most likely means that #media monitor has never seen this directory before. Let's @@ -63,27 +80,20 @@ class AirtimeMediaMonitorBootstrap(): self.logger.info("Previous index file does not exist. Creating a new one") #create a new index file. - command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % dir - stdout = self.execCommandAndReturnStdOut(command) - self.logger.info("New files found: \n%s\n", stdout) - self.write_file(airtime_tmp + '/.airtime_media_index', stdout) + stdout = self.pe.write_index_file() new_files = stdout.split('\n') for file_path in new_files: - modified_files.add(file_path) + added_files.add(file_path) - self.logger.debug("set size: %d", len(modified_files)) - - for file_path in modified_files: + for file_path in added_files: if os.path.exists(file_path): self.pe.handle_created_file(False, os.path.basename(file_path), file_path) - - def write_file(self, file, string): - f = open(file, 'w') - f.write(string) - f.close() + for file_path in removed_files: + self.pe.handle_removed_file(file_path) + def execCommand(self, command): p = Popen(command, shell=True) sts = os.waitpid(p.pid, 0)[1] @@ -96,9 +106,4 @@ class AirtimeMediaMonitorBootstrap(): if p.returncode != 0: self.logger.warn("command \n%s\n return with a non-zero return value", command) return stdout - - -if __name__ == '__main__': - - mmb = AirtimeMediaMonitorBootstrap() - mmb.scan() + \ No newline at end of file diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py index b2dfe7d1b..fd29a5e5a 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py @@ -3,8 +3,6 @@ import time import os import logging -from multiprocessing import Process, Lock, Queue as mpQueue - # For RabbitMQ from kombu.connection import BrokerConnection from kombu.messaging import Exchange, Queue, Consumer, Producer @@ -26,6 +24,7 @@ class AirtimeNotifier(Notifier): self.md_manager = AirtimeMetadata() self.import_processes = {} self.watched_folders = [] + while not self.init_rabbit_mq(): self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") @@ -112,7 +111,6 @@ class AirtimeNotifier(Notifier): # -data # -is_recorded_show def update_airtime(self, d): - filepath = d['filepath'] mode = d['mode'] diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py index 994ad2c37..846cb3c41 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py @@ -4,6 +4,8 @@ import grp import pwd import logging +from subprocess import Popen, PIPE + import pyinotify from pyinotify import WatchManager, Notifier, ProcessEvent @@ -36,6 +38,11 @@ class AirtimeProcessEvent(ProcessEvent): self.mask = pyinotify.ALL_EVENTS self.wm = WatchManager() self.md_manager = AirtimeMetadata() + + #Set to "True" everytime we get a file event so + #that we can track of when we need to rewrite the + #index file + self.dirty = False #define which directories the pyinotify WatchManager should watch. def watch_directory(self, directory): @@ -177,7 +184,6 @@ class AirtimeProcessEvent(ProcessEvent): #yyyy-mm-dd-hh-MM-ss y = orig_md['MDATA_KEY_YEAR'].split("-") filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "recorded".encode('utf-8'), y[0], y[1], orig_md['MDATA_KEY_YEAR'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) - is_recorded_show = True elif(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')): filepath = '%s/%s/%s/%s/%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) else: @@ -191,7 +197,7 @@ class AirtimeProcessEvent(ProcessEvent): except Exception, e: self.logger.error('Exception: %s', e) - return filepath, is_recorded_show + return filepath #event.dir: True if the event was raised against a directory. #event.name @@ -229,10 +235,15 @@ class AirtimeProcessEvent(ProcessEvent): file_md = self.md_manager.get_md_from_file(pathname) if file_md is not None: - filepath, is_recorded_show = self.create_file_path(pathname, file_md) - self.move_file(pathname, filepath) - self.renamed_files[pathname] = filepath - self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) + is_recorded_show = 'MDATA_KEY_CREATOR' in file_md and \ + file_md['MDATA_KEY_CREATOR'] == "AIRTIMERECORDERSOURCEFABRIC".encode('utf-8') + if not self.is_parent_directory(pathname, self.config.imported_directory): + filepath = self.create_file_path(pathname, file_md) + self.move_file(pathname, filepath) + self.renamed_files[pathname] = filepath + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) + else: + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'data': file_md, 'is_recorded_show': is_recorded_show}) def process_IN_MODIFY(self, event): @@ -290,18 +301,51 @@ class AirtimeProcessEvent(ProcessEvent): def process_IN_DELETE(self, event): self.logger.info("%s: %s", event.maskname, event.pathname) if not event.dir: - self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_DELETE}) + self.handle_removed_file(event.pathname) + + def handle_removed_file(self, pathname): + self.logger.info("Deleting %s", pathname) + self.file_events.append({'filepath': pathname, 'mode': self.config.MODE_DELETE}) + def process_default(self, event): #self.logger.info("%s: %s", event.maskname, event.pathname) pass + + def execCommandAndReturnStdOut(self, command): + p = Popen(command, shell=True, stdout=PIPE) + stdout = p.communicate()[0] + if p.returncode != 0: + self.logger.warn("command \n%s\n return with a non-zero return value", command) + return stdout + + def write_index_file(self): + #create a new index file. + self.logger.debug("writing new index file") + command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % '/srv/airtime/stor' + stdout = self.execCommandAndReturnStdOut(command) + self.write_file('/var/tmp/airtime' + '/.airtime_media_index', stdout) + return stdout + + def write_file(self, file, string): + f = open(file, 'w') + f.write(string) + f.close() def notifier_loop_callback(self, notifier): + + if len(self.file_events) > 0: + for event in self.file_events: + self.multi_queue.put(event) - for event in self.file_events: - self.multi_queue.put(event) - - self.file_events = [] + self.dirty = True + self.file_events = [] + elif self.multi_queue.empty(): + #no file_events and queue is empty. This is a good time + #to write an index file. + if self.dirty: + self.write_index_file() + self.dirty = False #check for any events recieved from Airtime. try: