cc-4105: major code cleanup

This commit is contained in:
Rudi Grinberg 2012-08-09 16:15:53 -04:00
parent 57a8a6a7f7
commit 38e8c3871e
6 changed files with 96 additions and 63 deletions

View File

@ -24,19 +24,22 @@ from api_clients import api_client as apc
class AirtimeNotifier(Loggable):
"""
AirtimeNotifier is responsible for interecepting RabbitMQ messages and feeding them to the
event_handler object it was initialized with. The only thing it does to the messages is parse
them from json
AirtimeNotifier is responsible for interecepting RabbitMQ messages and
feeding them to the event_handler object it was initialized with. The only
thing it does to the messages is parse them from json
"""
def __init__(self, cfg, message_receiver):
self.cfg = cfg
try:
self.handler = message_receiver
self.logger.info("Initializing RabbitMQ message consumer...")
schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem")
self.connection = BrokerConnection(cfg["rabbitmq_host"], cfg["rabbitmq_user"],
cfg["rabbitmq_password"], cfg["rabbitmq_vhost"])
schedule_exchange = Exchange("airtime-media-monitor", "direct",
durable=True, auto_delete=True)
schedule_queue = Queue("media-monitor", exchange=schedule_exchange,
key="filesystem")
self.connection = BrokerConnection(cfg["rabbitmq_host"],
cfg["rabbitmq_user"], cfg["rabbitmq_password"],
cfg["rabbitmq_vhost"])
channel = self.connection.channel()
consumer = Consumer(channel, schedule_queue)
consumer.register_callback(self.handle_message)
@ -72,9 +75,9 @@ class AirtimeMessageReceiver(Loggable):
self.manager = manager
def message(self, msg):
"""
This method is called by an AirtimeNotifier instance that consumes the Rabbit MQ events
that trigger this. The method return true when the event was executed and false when it
wasn't
This method is called by an AirtimeNotifier instance that consumes the
Rabbit MQ events that trigger this. The method return true when the
event was executed and false when it wasn't
"""
msg = copy.deepcopy(msg)
if msg['event_type'] in self.dispatch_table:
@ -84,7 +87,8 @@ class AirtimeMessageReceiver(Loggable):
self._execute_message(evt,msg)
return True
else:
self.logger.info("Received invalid message with 'event_type': '%s'" % msg['event_type'])
self.logger.info("Received invalid message with 'event_type': '%s'"
% msg['event_type'])
self.logger.info("Message details: %s" % str(msg))
return False
def _execute_message(self,evt,message):
@ -93,7 +97,8 @@ class AirtimeMessageReceiver(Loggable):
def __request_now_bootstrap(self, directory_id=None, directory=None):
if (not directory_id) and (not directory):
raise ValueError("You must provide either directory_id or directory")
raise ValueError("You must provide either directory_id or \
directory")
sdb = AirtimeDB(apc.AirtimeApiClient.create_right_config())
if directory: directory = os.path.normpath(directory)
if directory_id == None: directory_id = sdb.to_id(directory)
@ -102,7 +107,8 @@ class AirtimeMessageReceiver(Loggable):
bs = Bootstrapper( sdb, self.manager.watch_signal() )
bs.flush_watch( directory=directory, last_ran=time.time() )
except Exception as e:
self.logger.info( "Exception bootstrapping: (dir,id)=(%s,%s)" % (directory, directory_id) )
self.logger.info( "Exception bootstrapping: (dir,id)=(%s,%s)" %
(directory, directory_id) )
self.logger.info( str(e) )
self.logger.error( traceback.format_exc() )
raise DirectoryIsNotListed(directory)
@ -111,7 +117,8 @@ class AirtimeMessageReceiver(Loggable):
return self.dispatch_table.keys()
def md_update(self, msg):
self.logger.info("Updating metadata for: '%s'" % msg['MDATA_KEY_FILEPATH'])
self.logger.info("Updating metadata for: '%s'" %
msg['MDATA_KEY_FILEPATH'])
md_path = msg['MDATA_KEY_FILEPATH']
try:
Metadata.write_unsafe(path=md_path, md=msg)
@ -119,15 +126,18 @@ class AirtimeMessageReceiver(Loggable):
self.logger.info("Cannot find metadata file: '%s'" % e.path)
except Exception as e:
# TODO : add md_path to problem path or something?
self.logger.info("Unknown error when writing metadata to: '%s'" % md_path)
self.logger.info("Unknown error when writing metadata to: '%s'" %
md_path)
self.logger.info( traceback.format_exc() )
def new_watch(self, msg):
self.logger.info("Creating watch for directory: '%s'" % msg['directory'])
self.logger.info("Creating watch for directory: '%s'" %
msg['directory'])
if not os.path.exists(msg['directory']):
try: os.makedirs(msg['directory'])
except Exception as e:
self.logger.info("Failed to create watched dir '%s'" % msg['directory'])
self.logger.info("Failed to create watched dir '%s'" %
msg['directory'])
self.logger.info(str(e))
else: self.new_watch(msg)
else:
@ -138,11 +148,13 @@ class AirtimeMessageReceiver(Loggable):
self.manager.add_watch_directory(msg['directory'])
def remove_watch(self, msg):
self.logger.info("Removing watch from directory: '%s'" % msg['directory'])
self.logger.info("Removing watch from directory: '%s'" %
msg['directory'])
self.manager.remove_watch_directory(msg['directory'])
def rescan_watch(self, msg):
self.logger.info("Trying to rescan watched directory: '%s'" % msg['directory'])
self.logger.info("Trying to rescan watched directory: '%s'" %
msg['directory'])
try:
# id is always an integer but in the dictionary the key is always a
# string
@ -161,7 +173,8 @@ class AirtimeMessageReceiver(Loggable):
new_storage_directory = msg['directory']
self.manager.change_storage_root(new_storage_directory)
for to_bootstrap in [ self.manager.get_recorded_path(), self.manager.get_imported_path() ]:
for to_bootstrap in [ self.manager.get_recorded_path(),
self.manager.get_imported_path() ]:
self.__request_now_bootstrap( directory=to_bootstrap )
def file_delete(self, msg):
@ -171,20 +184,23 @@ class AirtimeMessageReceiver(Loggable):
if msg['delete']:
if os.path.exists(msg['filepath']):
try:
self.logger.info("Attempting to delete '%s'" % msg['filepath'])
self.logger.info("Attempting to delete '%s'" %
msg['filepath'])
FileMediator.ignore(msg['filepath'])
os.unlink(msg['filepath'])
if not os.path.exists(msg['filepath']):
self.logger.info("Successfully deleted: '%s'" % msg['filepath'])
self.logger.info("Successfully deleted: '%s'" %
msg['filepath'])
except Exception as e:
self.logger.info("Failed to delete '%s'" % msg['filepath'])
self.logger.info("Error: " % str(e))
else:
self.logger.info("Attempting to delete file '%s' that does not exist. Full request coming:"
% msg['filepath'])
self.logger.info("Attempting to delete file '%s' that does not \
exist. Full request coming:" % msg['filepath'])
self.logger.info(msg)
else:
self.logger.info("No clippy confirmation, ignoring event. Out of curiousity we will print some details.")
self.logger.info("No clippy confirmation, ignoring event. \
Out of curiousity we will print some details.")
self.logger.info(msg)

View File

@ -12,10 +12,7 @@ appname = 'mediamonitor2'
class Loggable(object):
__metaclass__ = abc.ABCMeta
@LazyProperty
def logger(self):
# TODO : Clean this up
if not hasattr(self,"_logger"): self._logger = logging.getLogger(appname)
return self._logger
def logger(self): return logging.getLogger(appname)
def unexpected_exception(self,e):
self.fatal_exception("'Unexpected' exception has occured:", e)

View File

@ -30,7 +30,8 @@ class Manager(Loggable):
'problem_files_path' : None,
# This guy doesn't need to be changed, always the same.
# Gets hooked by wm to different directories
'organize_listener' : OrganizeListener(signal=self.organize_channel),
'organize_listener' : OrganizeListener(signal=
self.organize_channel),
# Also stays the same as long as its target, the directory
# which the "organized" files go to, isn't changed.
'organizer' : None,
@ -61,14 +62,15 @@ class Manager(Loggable):
return self.watch_listener.signal
def __remove_watch(self,path):
if path in self.__wd_path: # only delete if dir is actually being watched
# only delete if dir is actually being watched
if path in self.__wd_path:
wd = self.__wd_path[path]
self.wm.rm_watch(wd, rec=True)
del(self.__wd_path[path])
def __add_watch(self,path,listener):
wd = self.wm.add_watch(path, pyinotify.ALL_EVENTS, rec=True, auto_add=True,
proc_fun=listener)
wd = self.wm.add_watch(path, pyinotify.ALL_EVENTS, rec=True,
auto_add=True, proc_fun=listener)
self.__wd_path[path] = wd.values()[0]
def __create_organizer(self, target_path, recorded_path):
@ -77,14 +79,16 @@ class Manager(Loggable):
adding the channel/signal as a parameter to the original constructor
every time
"""
return Organizer(channel=self.organize_channel,target_path=target_path, recorded_path=recorded_path)
return Organizer(channel=self.organize_channel,target_path=target_path,
recorded_path=recorded_path)
def get_problem_files_path(self):
return self.organize['problem_files_path']
def set_problem_files_path(self, new_path):
self.organize['problem_files_path'] = new_path
self.organize['problem_handler'] = ProblemFileHandler( PathChannel(signal='badfile',path=new_path) )
self.organize['problem_handler'] = \
ProblemFileHandler( PathChannel(signal='badfile',path=new_path) )
def get_recorded_path(self):
return self.organize['recorded_path']
@ -92,7 +96,8 @@ class Manager(Loggable):
def set_recorded_path(self, new_path):
self.__remove_watch(self.organize['recorded_path'])
self.organize['recorded_path'] = new_path
self.organize['organizer'] = self.__create_organizer(self.organize['imported_path'], new_path)
self.organize['organizer'] = self.__create_organizer(
self.organize['imported_path'], new_path)
self.__add_watch(new_path, self.watch_listener)
def get_organize_path(self):
@ -103,8 +108,9 @@ class Manager(Loggable):
def set_organize_path(self, new_path):
"""
sets the organize path to be new_path. Under the current scheme there is
only one organize path but there is no reason why more cannot be supported
sets the organize path to be new_path. Under the current scheme there
is only one organize path but there is no reason why more cannot be
supported
"""
# if we are already organizing a particular directory we remove the
# watch from it first before organizing another directory
@ -124,12 +130,14 @@ class Manager(Loggable):
"""
self.__remove_watch(self.organize['imported_path'])
self.organize['imported_path'] = new_path
self.organize['organizer'] = self.__create_organizer(new_path, self.organize['recorded_path'])
self.organize['organizer'] = self.__create_organizer(
new_path, self.organize['recorded_path'])
self.__add_watch(new_path, self.watch_listener)
def change_storage_root(self, store):
"""
hooks up all the directories for you. Problem, recorded, imported, organize.
hooks up all the directories for you. Problem, recorded, imported,
organize.
"""
store_paths = mmp.expand_storage(store)
self.set_problem_files_path(store_paths['problem_files'])
@ -168,8 +176,8 @@ class Manager(Loggable):
self.logger.info("Removing watched directory: '%s'", watch_dir)
self.__remove_watch(watch_dir)
else:
self.logger.info("'%s' is not being watched, hence cannot be removed"
% watch_dir)
self.logger.info("'%s' is not being watched, hence cannot be \
removed" % watch_dir)
def pyinotify(self):
return pyinotify.Notifier(self.wm)

View File

@ -21,16 +21,20 @@ class Organizer(ReportHandler,Loggable):
def handle(self, sender, event):
"""
Intercept events where a new file has been added to the organize
directory and place it in the correct path (starting with self.target_path)
directory and place it in the correct path (starting with
self.target_path)
"""
try:
# We must select the target_path based on whether file was recorded
# by airtime or not.
# Do we need to "massage" the path using mmp.organized_path?
target_path = self.recorded_path if event.metadata.is_recorded() else self.target_path
new_path = mmp.organized_path(event.path, target_path, event.metadata.extract())
target_path = self.recorded_path if event.metadata.is_recorded() \
else self.target_path
new_path = mmp.organized_path(event.path, target_path,
event.metadata.extract())
mmp.magic_move(event.path, new_path)
self.logger.info('Organized: "%s" into "%s"' % (event.path, new_path))
self.logger.info('Organized: "%s" into "%s"' %
(event.path, new_path))
except BadSongFile as e:
self.report_problem_file(event=event, exception=e)
# probably general error in mmp.magic.move...

View File

@ -12,17 +12,18 @@ class AirtimeDB(Loggable):
def reload_directories(self):
"""
this is the 'real' constructor, should be called if you ever want the class reinitialized.
there's not much point to doing it yourself however, you should just create a new AirtimeDB
instance.
this is the 'real' constructor, should be called if you ever want the
class reinitialized. there's not much point to doing it yourself
however, you should just create a new AirtimeDB instance.
"""
# dirs_setup is a dict with keys:
# u'watched_dirs' and u'stor' which point to lists of corresponding
# dirs
dirs_setup = self.apc.setup_media_monitor()
dirs_setup[u'stor'] = normpath( dirs_setup[u'stor'] )
dirs_setup[u'watched_dirs'] = map(normpath, dirs_setup[u'watched_dirs'] )
dirs_with_id = dict([ (k,normpath(v)) for k,v in self.apc.list_all_watched_dirs()['dirs'].iteritems() ])
dirs_setup[u'watched_dirs'] = map(normpath, dirs_setup[u'watched_dirs'])
dirs_with_id = dict([ (k,normpath(v)) for k,v in
self.apc.list_all_watched_dirs()['dirs'].iteritems() ])
self.id_to_dir = dirs_with_id
self.dir_to_id = dict([ (v,k) for k,v in dirs_with_id.iteritems() ])
@ -37,7 +38,8 @@ class AirtimeDB(Loggable):
# We don't know from the x_to_y dict which directory is watched or
# store...
self.watched_directories = set([ os.path.normpath(p) for p in dirs_setup[u'watched_dirs'] ])
self.watched_directories = set([ os.path.normpath(p) for p in
dirs_setup[u'watched_dirs'] ])
def to_id(self, directory):
return self.dir_to_id[ directory ]
@ -69,26 +71,30 @@ class AirtimeDB(Loggable):
def dir_id_get_files(self, dir_id):
base_dir = self.id_to_dir[ dir_id ]
return set(( os.path.join(base_dir,p) for p in self.apc.list_all_db_files( dir_id ) ))
return set(( os.path.join(base_dir,p) for p in
self.apc.list_all_db_files( dir_id ) ))
def directory_get_files(self, directory):
"""
returns all the files(recursively) in a directory. a directory is an "actual" directory
path instead of its id. This is super hacky because you create one request for the
recorded directory and one for the imported directory even though they're the same dir
in the database so you get files for both dirs in 1 request...
returns all the files(recursively) in a directory. a directory is an
"actual" directory path instead of its id. This is super hacky because
you create one request for the recorded directory and one for the
imported directory even though they're the same dir in the database so
you get files for both dirs in 1 request...
"""
normal_dir = os.path.normpath(unicode(directory))
if normal_dir not in self.dir_to_id:
raise NoDirectoryInAirtime( normal_dir, self.dir_to_id )
all_files = self.dir_id_get_files( self.dir_to_id[normal_dir] )
if normal_dir == self.recorded_path():
all_files = [ p for p in all_files if mmp.sub_path( self.recorded_path(), p ) ]
all_files = [ p for p in all_files if
mmp.sub_path( self.recorded_path(), p ) ]
elif normal_dir == self.import_path():
all_files = [ p for p in all_files if mmp.sub_path( self.import_path(), p ) ]
all_files = [ p for p in all_files if
mmp.sub_path( self.import_path(), p ) ]
elif normal_dir == self.storage_path():
self.logger.info("Warning, you're getting all files in '%s' which includes imported + record"
% normal_dir)
self.logger.info("Warning, you're getting all files in '%s' which \
includes imported + record" % normal_dir)
return set(all_files)

View File

@ -10,7 +10,8 @@ class Toucher(Loggable):
def __call__(self):
try: mmp.fondle(self.path)
except Exception as e:
self.logger.info("Failed to touch file: '%s'. Logging exception." % self.path)
self.logger.info("Failed to touch file: '%s'. Logging exception." %
self.path)
self.logger.info(str(e))
#http://code.activestate.com/lists/python-ideas/8982/
@ -21,7 +22,8 @@ import threading
class RepeatTimer(threading.Thread):
def __init__(self, interval, callable, args=[], kwargs={}):
threading.Thread.__init__(self)
# interval_current shows number of milliseconds in currently triggered <tick>
# interval_current shows number of milliseconds in currently triggered
# <tick>
self.interval_current = interval
# interval_new shows number of milliseconds for next <tick>
self.interval_new = interval