cc-4105: more refactorings, this time so that scan/watch diretories are always handled from one place

This commit is contained in:
Rudi Grinberg 2012-08-01 15:10:54 -04:00
parent 3b1583f620
commit ba78731f99
6 changed files with 83 additions and 62 deletions

View File

@ -92,14 +92,18 @@ class AirtimeMessageReceiver(Loggable):
def __request_now_bootstrap(self, directory_id=None, directory=None):
if (not directory_id) and (not directory):
raise ValueError("You must provide either directory_id or directory")
sdb = AirtimeDB(apc.AirtimeApiClient.create_right_config())
if directory_id == None: directory_id = sdb.directories[directory]
if directory_id in sdb.id_lookup:
d = sdb.id_lookup[directory_id]
bs = Bootstrapper(sdb, self.manager.watch_signal())
bs.flush_watch( directory=d, last_ran=time.time() )
else:
raise DirectoryIsNotListed(directory_id)
if directory_id == None: directory_id = sdb.to_id(directory)
if directory == None: directory = sdb.to_directory(directory_id)
try:
bs = Bootstrapper( sdb, self.manager.watch_signal() )
bs.flush_watch( directory=directory, last_ran=time.time() )
except Exception as e:
self.logger.info( "Exception bootstrapping: (dir,id)=(%s,%s)" % (directory, directory_id) )
self.logger.info( str(e) )
raise DirectoryIsNotListed(directory)
def supported_messages(self):
return self.dispatch_table.keys()
@ -148,28 +152,10 @@ class AirtimeMessageReceiver(Loggable):
def change_storage(self, msg):
new_storage_directory = msg['directory']
new_import = os.path.join(new_storage_directory, 'imported')
new_organize = os.path.join(new_storage_directory, 'organize')
for d in [new_import, new_organize]:
if os.path.exists(d):
self.logger.info("Changing storage to existing dir: '%s'" % d)
else:
try: os.makedirs(d)
except Exception as e:
self.logger.info("Could not create dir for storage '%s'" % d)
self.logger.info(str(e))
self.manager.change_storage_root(new_storage_directory)
if all([ os.path.exists(d) for d in [new_import, new_organize] ]):
self.manager.set_store_path(new_import)
try:
self.__request_now_bootstrap( directory=new_import )
except Exception as e:
self.logger.info("Did not bootstrap off directory '%s'. Probably not in airtime db" % new_import)
self.logger.info(str(e))
# set_organize_path should automatically flush new_organize
self.manager.set_organize_path(new_organize)
else:
self.logger.info("Change storage procedure failed, could not create directories")
for to_bootstrap in [ self.manager.get_recorded_path(), self.manager.get_imported_path() ]:
self.__request_now_bootstrap( directory=to_bootstrap )
def file_delete(self, msg):
# deletes should be requested only from imported folder but we don't

View File

@ -52,7 +52,7 @@ class Bootstrapper(Loggable):
for to_delete in db_songs.difference(songs):
dispatcher.send(signal=self.watch_signal, sender=self, event=DeleteFile(to_delete))
deleted += 1
self.logger.info( "Flushed watch directories. (modified, deleted) = (%d, %d)"
% (modded, deleted) )
self.logger.info( "Flushed watch directory(%s). (modified, deleted) = (%d, %d)"
% (directory, modded, deleted) )

View File

@ -1,8 +1,11 @@
import pyinotify
from media.monitor.events import PathChannel
from media.monitor.log import Loggable
from media.monitor.listeners import StoreWatchListener, OrganizeListener
from media.monitor.handler import ProblemFileHandler
from media.monitor.organizer import Organizer
import media.monitor.pure as mmp
class Manager(Loggable):
"""
@ -20,13 +23,16 @@ class Manager(Loggable):
self.watch_listener = StoreWatchListener(signal=self.watch_channel)
self.organize = {
'organize_path' : None,
'store_path' : None,
'imported_path' : None,
'recorded_path' : None,
'problem_files_path' : None,
# This guy doesn't need to be changed, always the same.
# Gets hooked by wm to different directories
'organize_listener' : OrganizeListener(signal=self.organize_channel),
# Also stays the same as long as its target, the directory
# which the "organized" files go to, isn't changed.
'organizer' : None,
'problem_handler' : None,
}
# A private mapping path => watch_descriptor
# we use the same dictionary for organize, watch, store wd events.
@ -60,6 +66,21 @@ class Manager(Loggable):
"""
return Organizer(channel=self.organize_channel,target_path=target_path)
def get_problem_files_path(self):
return self.organize['problem_files_path']
def set_problem_files_path(self, new_path):
self.organize['problem_files_path'] = new_path
self.organize['problem_handler'] = ProblemFileHandler( PathChannel(signal='badfile',path=new_path) )
def get_recorded_path(self):
return self.organize['recorded_path']
def set_recorded_path(self, new_path):
self.__remove_watch(self.organize['recorded_path'])
self.organize['recorded_path'] = new_path
self.__add_watch(new_path, self.watch_listener)
def get_organize_path(self):
"""
returns the current path that is being watched for organization
@ -80,20 +101,15 @@ class Manager(Loggable):
self.organize['organize_listener'].flush_events(new_path)
self.__add_watch(new_path, self.organize['organize_listener'])
organize_path = property(get_organize_path, set_organize_path)
def get_imported_path(self):
return self.organize['imported_path']
def get_store_path(self):
"""
returns the store_path (should be accessed through the property usually)
"""
return self.organize['store_path']
def set_store_path(self,new_path):
def set_imported_path(self,new_path):
"""
set the directory where organized files go to
"""
self.__remove_watch(self.organize['store_path'])
self.organize['store_path'] = new_path
self.__remove_watch(self.organize['imported_path'])
self.organize['imported_path'] = new_path
self.organize['organizer'] = self.__create_organizer(new_path)
# flush all the files in the new store_directory. this is done so that
# new files are added to the database. Note that we are not responsible
@ -102,7 +118,18 @@ class Manager(Loggable):
# self.watch_listener.flush_events(new_path)
self.__add_watch(new_path, self.watch_listener)
store_path = property(get_store_path, set_store_path)
def change_storage_root(self, store):
"""
hooks up all the directories for you. Problem, recorded, imported, organize.
"""
store_paths = mmp.expand_storage(store)
self.set_problem_files_path(store_paths['problem_files'])
self.set_imported_path(store_paths['imported'])
self.set_recorded_path(store_paths['recorded'])
self.set_organize_path(store_paths['organize'])
mmp.create_dir(store)
for p in store_paths.values():
mmp.create_dir(p)
def has_watch(self, path):
"""

View File

@ -308,6 +308,15 @@ def import_organize(store):
store = os.path.normpath(store)
return os.path.join(store,'organize'), os.path.join(store,'imported')
def expand_storage(store):
store = os.path.normpath(store)
return {
'organize' : os.path.join(store, 'organize'),
'recorded' : os.path.join(store, 'recorded'),
'problem_files' : os.path.join(store, 'problem_files'),
'imported' : os.path.join(store, 'imported'),
}
def create_dir(path):
"""
will try and make sure that path exists at all costs. raises an exception

View File

@ -2,6 +2,7 @@
import os
from media.monitor.log import Loggable
from media.monitor.exceptions import NoDirectoryInAirtime
import media.monitor.pure as mmp
from os.path import normpath
class AirtimeDB(Loggable):
@ -27,6 +28,7 @@ class AirtimeDB(Loggable):
self.dir_to_id = dict([ (v,k) for k,v in dirs_with_id.iteritems() ])
self.base_storage = dirs_setup[u'stor']
self.storage_paths = mmp.expand_storage( self.base_storage )
self.base_id = self.dir_to_id[self.base_storage]
# hack to get around annoying schema of airtime db
@ -37,11 +39,17 @@ class AirtimeDB(Loggable):
# store...
self.watched_directories = set([ os.path.normpath(p) for p in dirs_setup[u'watched_dirs'] ])
def to_id(self, directory):
return self.dir_to_id[ directory ]
def to_directory(self, dir_id):
return self.id_to_dir[ dir_id ]
def storage_path(self): return self.base_storage
def organize_path(self): return os.path.join(self.base_storage, 'organize')
def problem_path(self): return os.path.join(self.base_storage, 'problem_files')
def import_path(self): return os.path.join(self.base_storage, 'imported')
def recorded_path(self): return os.path.join(self.base_storage, 'recorded')
def organize_path(self): return self.storage_paths['organize']
def problem_path(self): return self.storage_paths['problem_files']
def import_path(self): return self.storage_paths['imported']
def recorded_path(self): return self.storage_paths['recorded']
def list_watched(self):
"""

View File

@ -5,14 +5,12 @@ import os
from media.monitor.manager import Manager
from media.monitor.bootstrap import Bootstrapper
from media.monitor.log import get_logger
from media.monitor.events import PathChannel
from media.monitor.config import MMConfig
from media.monitor.toucher import ToucherThread
from media.monitor.syncdb import AirtimeDB
from media.monitor.exceptions import FailedToObtainLocale, FailedToSetLocale, NoConfigFile, FailedToCreateDir
from media.monitor.exceptions import FailedToObtainLocale, FailedToSetLocale, NoConfigFile
from media.monitor.airtime import AirtimeNotifier, AirtimeMessageReceiver
from media.monitor.watchersyncer import WatchSyncer
from media.monitor.handler import ProblemFileHandler
from media.monitor.eventdrainer import EventDrainer
import media.monitor.pure as mmp
@ -50,10 +48,6 @@ except Exception as e:
watch_syncer = WatchSyncer(signal='watch',
chunking_number=config['chunking_number'],
timeout=config['request_max_wait'])
try:
problem_handler = ProblemFileHandler( PathChannel(signal='badfile',path='/srv/airtime/stor/problem_files/') )
except FailedToCreateDir as e:
log.info("Failed to create problem directory: '%s'" % e.path)
apiclient = apc.AirtimeApiClient.create_right_config(log=log,config_path=global_config)
@ -64,12 +58,12 @@ sdb = AirtimeDB(apiclient)
manager = Manager()
airtime_receiver = AirtimeMessageReceiver(config,manager)
airtime_notifier = AirtimeNotifier(config, airtime_receiver)
store = apiclient.setup_media_monitor()
organize_dir, import_dir = mmp.import_organize(store[u'stor'])
# Order matters here:
# TODO : add flushing
manager.set_store_path(import_dir)
manager.set_organize_path(organize_dir)
airtime_receiver.change_storage({ 'directory':store[u'stor'] })
for watch_dir in store[u'watched_dirs']:
if not os.path.exists(watch_dir):
@ -78,16 +72,13 @@ for watch_dir in store[u'watched_dirs']:
except Exception as e:
log.error("Could not create watch directory: '%s' (given from the database)." % watch_dir)
if os.path.exists(watch_dir):
manager.add_watch_directory(watch_dir)
airtime_receiver.new_watch({ 'directory':watch_dir })
last_ran=config.last_ran()
bs = Bootstrapper( db=sdb, watch_signal='watch' )
bs.flush_all( config.last_ran() )
airtime_receiver = AirtimeMessageReceiver(config,manager)
airtime_notifier = AirtimeNotifier(config, airtime_receiver)
ed = EventDrainer(airtime_notifier.connection,interval=float(config['rmq_event_wait']))
# Launch the toucher that updates the last time when the script was ran every