diff --git a/python_apps/media-monitor2/media/monitor/airtime.py b/python_apps/media-monitor2/media/monitor/airtime.py index 50e0ad353..b6bf5b440 100644 --- a/python_apps/media-monitor2/media/monitor/airtime.py +++ b/python_apps/media-monitor2/media/monitor/airtime.py @@ -92,14 +92,18 @@ class AirtimeMessageReceiver(Loggable): def __request_now_bootstrap(self, directory_id=None, directory=None): + if (not directory_id) and (not directory): + raise ValueError("You must provide either directory_id or directory") sdb = AirtimeDB(apc.AirtimeApiClient.create_right_config()) - if directory_id == None: directory_id = sdb.directories[directory] - if directory_id in sdb.id_lookup: - d = sdb.id_lookup[directory_id] - bs = Bootstrapper(sdb, self.manager.watch_signal()) - bs.flush_watch( directory=d, last_ran=time.time() ) - else: - raise DirectoryIsNotListed(directory_id) + if directory_id == None: directory_id = sdb.to_id(directory) + if directory == None: directory = sdb.to_directory(directory_id) + try: + bs = Bootstrapper( sdb, self.manager.watch_signal() ) + bs.flush_watch( directory=directory, last_ran=time.time() ) + except Exception as e: + self.logger.info( "Exception bootstrapping: (dir,id)=(%s,%s)" % (directory, directory_id) ) + self.logger.info( str(e) ) + raise DirectoryIsNotListed(directory) def supported_messages(self): return self.dispatch_table.keys() @@ -148,28 +152,10 @@ class AirtimeMessageReceiver(Loggable): def change_storage(self, msg): new_storage_directory = msg['directory'] - new_import = os.path.join(new_storage_directory, 'imported') - new_organize = os.path.join(new_storage_directory, 'organize') - for d in [new_import, new_organize]: - if os.path.exists(d): - self.logger.info("Changing storage to existing dir: '%s'" % d) - else: - try: os.makedirs(d) - except Exception as e: - self.logger.info("Could not create dir for storage '%s'" % d) - self.logger.info(str(e)) + self.manager.change_storage_root(new_storage_directory) - if all([ os.path.exists(d) for d in [new_import, new_organize] ]): - self.manager.set_store_path(new_import) - try: - self.__request_now_bootstrap( directory=new_import ) - except Exception as e: - self.logger.info("Did not bootstrap off directory '%s'. Probably not in airtime db" % new_import) - self.logger.info(str(e)) - # set_organize_path should automatically flush new_organize - self.manager.set_organize_path(new_organize) - else: - self.logger.info("Change storage procedure failed, could not create directories") + for to_bootstrap in [ self.manager.get_recorded_path(), self.manager.get_imported_path() ]: + self.__request_now_bootstrap( directory=to_bootstrap ) def file_delete(self, msg): # deletes should be requested only from imported folder but we don't diff --git a/python_apps/media-monitor2/media/monitor/bootstrap.py b/python_apps/media-monitor2/media/monitor/bootstrap.py index 91d6dc00d..29b3c4ca4 100644 --- a/python_apps/media-monitor2/media/monitor/bootstrap.py +++ b/python_apps/media-monitor2/media/monitor/bootstrap.py @@ -52,7 +52,7 @@ class Bootstrapper(Loggable): for to_delete in db_songs.difference(songs): dispatcher.send(signal=self.watch_signal, sender=self, event=DeleteFile(to_delete)) deleted += 1 - self.logger.info( "Flushed watch directories. (modified, deleted) = (%d, %d)" - % (modded, deleted) ) + self.logger.info( "Flushed watch directory(%s). (modified, deleted) = (%d, %d)" + % (directory, modded, deleted) ) diff --git a/python_apps/media-monitor2/media/monitor/manager.py b/python_apps/media-monitor2/media/monitor/manager.py index 04aa39cd0..2226c9983 100644 --- a/python_apps/media-monitor2/media/monitor/manager.py +++ b/python_apps/media-monitor2/media/monitor/manager.py @@ -1,8 +1,11 @@ import pyinotify +from media.monitor.events import PathChannel from media.monitor.log import Loggable from media.monitor.listeners import StoreWatchListener, OrganizeListener +from media.monitor.handler import ProblemFileHandler from media.monitor.organizer import Organizer +import media.monitor.pure as mmp class Manager(Loggable): """ @@ -20,13 +23,16 @@ class Manager(Loggable): self.watch_listener = StoreWatchListener(signal=self.watch_channel) self.organize = { 'organize_path' : None, - 'store_path' : None, + 'imported_path' : None, + 'recorded_path' : None, + 'problem_files_path' : None, # This guy doesn't need to be changed, always the same. # Gets hooked by wm to different directories 'organize_listener' : OrganizeListener(signal=self.organize_channel), # Also stays the same as long as its target, the directory # which the "organized" files go to, isn't changed. 'organizer' : None, + 'problem_handler' : None, } # A private mapping path => watch_descriptor # we use the same dictionary for organize, watch, store wd events. @@ -60,6 +66,21 @@ class Manager(Loggable): """ return Organizer(channel=self.organize_channel,target_path=target_path) + def get_problem_files_path(self): + return self.organize['problem_files_path'] + + def set_problem_files_path(self, new_path): + self.organize['problem_files_path'] = new_path + self.organize['problem_handler'] = ProblemFileHandler( PathChannel(signal='badfile',path=new_path) ) + + def get_recorded_path(self): + return self.organize['recorded_path'] + + def set_recorded_path(self, new_path): + self.__remove_watch(self.organize['recorded_path']) + self.organize['recorded_path'] = new_path + self.__add_watch(new_path, self.watch_listener) + def get_organize_path(self): """ returns the current path that is being watched for organization @@ -80,20 +101,15 @@ class Manager(Loggable): self.organize['organize_listener'].flush_events(new_path) self.__add_watch(new_path, self.organize['organize_listener']) - organize_path = property(get_organize_path, set_organize_path) + def get_imported_path(self): + return self.organize['imported_path'] - def get_store_path(self): - """ - returns the store_path (should be accessed through the property usually) - """ - return self.organize['store_path'] - - def set_store_path(self,new_path): + def set_imported_path(self,new_path): """ set the directory where organized files go to """ - self.__remove_watch(self.organize['store_path']) - self.organize['store_path'] = new_path + self.__remove_watch(self.organize['imported_path']) + self.organize['imported_path'] = new_path self.organize['organizer'] = self.__create_organizer(new_path) # flush all the files in the new store_directory. this is done so that # new files are added to the database. Note that we are not responsible @@ -102,7 +118,18 @@ class Manager(Loggable): # self.watch_listener.flush_events(new_path) self.__add_watch(new_path, self.watch_listener) - store_path = property(get_store_path, set_store_path) + def change_storage_root(self, store): + """ + hooks up all the directories for you. Problem, recorded, imported, organize. + """ + store_paths = mmp.expand_storage(store) + self.set_problem_files_path(store_paths['problem_files']) + self.set_imported_path(store_paths['imported']) + self.set_recorded_path(store_paths['recorded']) + self.set_organize_path(store_paths['organize']) + mmp.create_dir(store) + for p in store_paths.values(): + mmp.create_dir(p) def has_watch(self, path): """ diff --git a/python_apps/media-monitor2/media/monitor/pure.py b/python_apps/media-monitor2/media/monitor/pure.py index 29f1ef366..dc946bf72 100644 --- a/python_apps/media-monitor2/media/monitor/pure.py +++ b/python_apps/media-monitor2/media/monitor/pure.py @@ -308,6 +308,15 @@ def import_organize(store): store = os.path.normpath(store) return os.path.join(store,'organize'), os.path.join(store,'imported') +def expand_storage(store): + store = os.path.normpath(store) + return { + 'organize' : os.path.join(store, 'organize'), + 'recorded' : os.path.join(store, 'recorded'), + 'problem_files' : os.path.join(store, 'problem_files'), + 'imported' : os.path.join(store, 'imported'), + } + def create_dir(path): """ will try and make sure that path exists at all costs. raises an exception diff --git a/python_apps/media-monitor2/media/monitor/syncdb.py b/python_apps/media-monitor2/media/monitor/syncdb.py index 43d92eb84..9a232e287 100644 --- a/python_apps/media-monitor2/media/monitor/syncdb.py +++ b/python_apps/media-monitor2/media/monitor/syncdb.py @@ -2,6 +2,7 @@ import os from media.monitor.log import Loggable from media.monitor.exceptions import NoDirectoryInAirtime +import media.monitor.pure as mmp from os.path import normpath class AirtimeDB(Loggable): @@ -27,6 +28,7 @@ class AirtimeDB(Loggable): self.dir_to_id = dict([ (v,k) for k,v in dirs_with_id.iteritems() ]) self.base_storage = dirs_setup[u'stor'] + self.storage_paths = mmp.expand_storage( self.base_storage ) self.base_id = self.dir_to_id[self.base_storage] # hack to get around annoying schema of airtime db @@ -37,11 +39,17 @@ class AirtimeDB(Loggable): # store... self.watched_directories = set([ os.path.normpath(p) for p in dirs_setup[u'watched_dirs'] ]) + def to_id(self, directory): + return self.dir_to_id[ directory ] + + def to_directory(self, dir_id): + return self.id_to_dir[ dir_id ] + def storage_path(self): return self.base_storage - def organize_path(self): return os.path.join(self.base_storage, 'organize') - def problem_path(self): return os.path.join(self.base_storage, 'problem_files') - def import_path(self): return os.path.join(self.base_storage, 'imported') - def recorded_path(self): return os.path.join(self.base_storage, 'recorded') + def organize_path(self): return self.storage_paths['organize'] + def problem_path(self): return self.storage_paths['problem_files'] + def import_path(self): return self.storage_paths['imported'] + def recorded_path(self): return self.storage_paths['recorded'] def list_watched(self): """ diff --git a/python_apps/media-monitor2/mm2.py b/python_apps/media-monitor2/mm2.py index 86af44b7d..b2bf974b1 100644 --- a/python_apps/media-monitor2/mm2.py +++ b/python_apps/media-monitor2/mm2.py @@ -5,14 +5,12 @@ import os from media.monitor.manager import Manager from media.monitor.bootstrap import Bootstrapper from media.monitor.log import get_logger -from media.monitor.events import PathChannel from media.monitor.config import MMConfig from media.monitor.toucher import ToucherThread from media.monitor.syncdb import AirtimeDB -from media.monitor.exceptions import FailedToObtainLocale, FailedToSetLocale, NoConfigFile, FailedToCreateDir +from media.monitor.exceptions import FailedToObtainLocale, FailedToSetLocale, NoConfigFile from media.monitor.airtime import AirtimeNotifier, AirtimeMessageReceiver from media.monitor.watchersyncer import WatchSyncer -from media.monitor.handler import ProblemFileHandler from media.monitor.eventdrainer import EventDrainer import media.monitor.pure as mmp @@ -50,10 +48,6 @@ except Exception as e: watch_syncer = WatchSyncer(signal='watch', chunking_number=config['chunking_number'], timeout=config['request_max_wait']) -try: - problem_handler = ProblemFileHandler( PathChannel(signal='badfile',path='/srv/airtime/stor/problem_files/') ) -except FailedToCreateDir as e: - log.info("Failed to create problem directory: '%s'" % e.path) apiclient = apc.AirtimeApiClient.create_right_config(log=log,config_path=global_config) @@ -64,12 +58,12 @@ sdb = AirtimeDB(apiclient) manager = Manager() +airtime_receiver = AirtimeMessageReceiver(config,manager) +airtime_notifier = AirtimeNotifier(config, airtime_receiver) + store = apiclient.setup_media_monitor() -organize_dir, import_dir = mmp.import_organize(store[u'stor']) -# Order matters here: -# TODO : add flushing -manager.set_store_path(import_dir) -manager.set_organize_path(organize_dir) +airtime_receiver.change_storage({ 'directory':store[u'stor'] }) + for watch_dir in store[u'watched_dirs']: if not os.path.exists(watch_dir): @@ -78,16 +72,13 @@ for watch_dir in store[u'watched_dirs']: except Exception as e: log.error("Could not create watch directory: '%s' (given from the database)." % watch_dir) if os.path.exists(watch_dir): - manager.add_watch_directory(watch_dir) + airtime_receiver.new_watch({ 'directory':watch_dir }) last_ran=config.last_ran() bs = Bootstrapper( db=sdb, watch_signal='watch' ) bs.flush_all( config.last_ran() ) -airtime_receiver = AirtimeMessageReceiver(config,manager) -airtime_notifier = AirtimeNotifier(config, airtime_receiver) - ed = EventDrainer(airtime_notifier.connection,interval=float(config['rmq_event_wait'])) # Launch the toucher that updates the last time when the script was ran every