From 4708d76091a4f5b8027597d784bee59e252edad6 Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Mon, 13 Aug 2012 12:16:57 -0400 Subject: [PATCH] cc-4105: more event contractor bug fixes --- .../media-monitor2/media/monitor/airtime.py | 12 ++++----- .../media-monitor2/media/monitor/events.py | 11 +++++--- .../media/monitor/watchersyncer.py | 26 +++++++++---------- python_apps/media-monitor2/mm2.py | 1 + 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/python_apps/media-monitor2/media/monitor/airtime.py b/python_apps/media-monitor2/media/monitor/airtime.py index 9a43ee39c..96147ada5 100644 --- a/python_apps/media-monitor2/media/monitor/airtime.py +++ b/python_apps/media-monitor2/media/monitor/airtime.py @@ -72,11 +72,12 @@ class AirtimeMessageReceiver(Loggable): } self.cfg = cfg self.manager = manager + def message(self, msg): """ - This method is called by an AirtimeNotifier instance that consumes the - Rabbit MQ events that trigger this. The method return true when the - event was executed and false when it wasn't + This method is called by an AirtimeNotifier instance that + consumes the Rabbit MQ events that trigger this. The method + return true when the event was executed and false when it wasn't """ msg = copy.deepcopy(msg) if msg['event_type'] in self.dispatch_table: @@ -93,7 +94,6 @@ class AirtimeMessageReceiver(Loggable): def _execute_message(self,evt,message): self.dispatch_table[evt](message) - def __request_now_bootstrap(self, directory_id=None, directory=None): if (not directory_id) and (not directory): raise ValueError("You must provide either directory_id or \ @@ -177,8 +177,8 @@ class AirtimeMessageReceiver(Loggable): self.__request_now_bootstrap( directory=to_bootstrap ) def file_delete(self, msg): - # deletes should be requested only from imported folder but we don't - # verify that. + # deletes should be requested only from imported folder but we + # don't verify that. self.logger.info("Attempting to delete(maybe) '%s'" % msg['filepath']) if msg['delete']: if os.path.exists(msg['filepath']): diff --git a/python_apps/media-monitor2/media/monitor/events.py b/python_apps/media-monitor2/media/monitor/events.py index aae44fecb..1c4735d2b 100644 --- a/python_apps/media-monitor2/media/monitor/events.py +++ b/python_apps/media-monitor2/media/monitor/events.py @@ -57,12 +57,17 @@ class BaseEvent(Loggable): self.path = os.path.normpath(raw_event.pathname) else: self.path = raw_event self._pack_hook = lambda _ : _ # no op + self._morph_target = False # returns true if event was used to moprh + # into another event def exists(self): return os.path.exists(self.path) @LazyProperty def cookie(self): return getattr( self._raw_event, 'cookie', None ) - def __str__(self): return "Event. Path: %s" % self.__raw_event.pathname + def morph_target(self): return self._morph_target + + def __str__(self): + return "Event(%s). Path(%s)" % ( self.path, self.__class__.__name__) def is_dir_event(self): return self._raw_event.dir def add_safe_pack_hook(self,k): self._pack_hook = k @@ -83,12 +88,12 @@ class BaseEvent(Loggable): # nothing to see here, please move along def morph_into(self, evt): - self.logger.info("Morphing '%s' into '%s'" % (self.__class__.__name__, - evt.__class__.__name__)) + self.logger.info("Morphing %s into %s" % ( str(self), str(evt) ) ) self._raw_event = evt self.path = evt.path self.add_safe_pack_hook(evt._pack_hook) self.__class__ = evt.__class__ + evt._morph_target = True return self class FakePyinotify(object): diff --git a/python_apps/media-monitor2/media/monitor/watchersyncer.py b/python_apps/media-monitor2/media/monitor/watchersyncer.py index 95192b150..f7497b9b4 100644 --- a/python_apps/media-monitor2/media/monitor/watchersyncer.py +++ b/python_apps/media-monitor2/media/monitor/watchersyncer.py @@ -3,6 +3,7 @@ import threading import time import copy import traceback +from itertools import ifilter from media.monitor.handler import ReportHandler from media.monitor.log import Loggable @@ -27,13 +28,14 @@ class RequestSync(threading.Thread,Loggable): def run(self): self.logger.info("Attempting request with %d items." % len(self.requests)) - # Note that we must attach the appropriate mode to every response. Also - # Not forget to attach the 'is_record' to any requests that are related - # to recorded shows + # Note that we must attach the appropriate mode to every + # response. Also Not forget to attach the 'is_record' to any + # requests that are related to recorded shows # TODO : recorded shows aren't flagged right # Is this retry shit even necessary? Consider getting rid of this. packed_requests = [] - for request_event in self.requests: + for request_event in ifilter(lambda x: not x.morph_target(), + self.requests): try: for request in request_event.safe_pack(): if isinstance(request, BadSongFile): @@ -78,9 +80,9 @@ class TimeoutWatcher(threading.Thread,Loggable): # so that the people do not have to wait for the queue to fill up while True: time.sleep(self.timeout) - # If there is any requests left we launch em. - # Note that this isn't strictly necessary since RequestSync threads - # already chain themselves + # If there is any requests left we launch em. Note that this + # isn't strictly necessary since RequestSync threads already + # chain themselves if self.watcher.requests_in_queue(): self.logger.info("We got %d requests waiting to be launched" % self.watcher.requests_left_count()) @@ -121,10 +123,8 @@ class WatchSyncer(ReportHandler,Loggable): try: # If there is a strange bug anywhere in the code the next line # should be a suspect - if self.contractor.register( event ): - # only if the event was actually registered we add it. - # Otherwise some other event is mutated somewhere - self.push_queue( event ) + self.contractor.register( event ) + self.push_queue( event ) except BadSongFile as e: self.fatal_exception("Received bas song file '%s'" % e.path, e) except Exception as e: @@ -142,7 +142,7 @@ class WatchSyncer(ReportHandler,Loggable): if self.events_left_count() >= self.chunking_number: self.push_request() self.request_do() # Launch the request if nothing is running - self.__queue.add(elem) + self.__queue.append(elem) def flush_events(self): self.logger.info("Force flushing events...") @@ -189,7 +189,7 @@ class WatchSyncer(ReportHandler,Loggable): self.__requests.append(launch_request) self.__reset_queue() - def __reset_queue(self): self.__queue = set([]) + def __reset_queue(self): self.__queue = [] def __del__(self): # Ideally we would like to do a little more to ensure safe shutdown diff --git a/python_apps/media-monitor2/mm2.py b/python_apps/media-monitor2/mm2.py index 998a3901f..f1f794f17 100644 --- a/python_apps/media-monitor2/mm2.py +++ b/python_apps/media-monitor2/mm2.py @@ -35,6 +35,7 @@ def main(global_config, api_client_config): print("Unknown error reading configuration file: '%s'" % global_config) print(str(e)) + # TODO : use the logging config file. logfile = unicode( config['logpath'] ) setup_logging(logfile) log = get_logger()