cc-4105: more event contractor bug fixes

This commit is contained in:
Rudi Grinberg 2012-08-13 12:16:57 -04:00
parent f2fea19c85
commit 4708d76091
4 changed files with 28 additions and 22 deletions

View File

@ -72,11 +72,12 @@ class AirtimeMessageReceiver(Loggable):
}
self.cfg = cfg
self.manager = manager
def message(self, msg):
"""
This method is called by an AirtimeNotifier instance that consumes the
Rabbit MQ events that trigger this. The method return true when the
event was executed and false when it wasn't
This method is called by an AirtimeNotifier instance that
consumes the Rabbit MQ events that trigger this. The method
return true when the event was executed and false when it wasn't
"""
msg = copy.deepcopy(msg)
if msg['event_type'] in self.dispatch_table:
@ -93,7 +94,6 @@ class AirtimeMessageReceiver(Loggable):
def _execute_message(self,evt,message):
self.dispatch_table[evt](message)
def __request_now_bootstrap(self, directory_id=None, directory=None):
if (not directory_id) and (not directory):
raise ValueError("You must provide either directory_id or \
@ -177,8 +177,8 @@ class AirtimeMessageReceiver(Loggable):
self.__request_now_bootstrap( directory=to_bootstrap )
def file_delete(self, msg):
# deletes should be requested only from imported folder but we don't
# verify that.
# deletes should be requested only from imported folder but we
# don't verify that.
self.logger.info("Attempting to delete(maybe) '%s'" % msg['filepath'])
if msg['delete']:
if os.path.exists(msg['filepath']):

View File

@ -57,12 +57,17 @@ class BaseEvent(Loggable):
self.path = os.path.normpath(raw_event.pathname)
else: self.path = raw_event
self._pack_hook = lambda _ : _ # no op
self._morph_target = False # returns true if event was used to moprh
# into another event
def exists(self): return os.path.exists(self.path)
@LazyProperty
def cookie(self):
return getattr( self._raw_event, 'cookie', None )
def __str__(self): return "Event. Path: %s" % self.__raw_event.pathname
def morph_target(self): return self._morph_target
def __str__(self):
return "Event(%s). Path(%s)" % ( self.path, self.__class__.__name__)
def is_dir_event(self): return self._raw_event.dir
def add_safe_pack_hook(self,k): self._pack_hook = k
@ -83,12 +88,12 @@ class BaseEvent(Loggable):
# nothing to see here, please move along
def morph_into(self, evt):
self.logger.info("Morphing '%s' into '%s'" % (self.__class__.__name__,
evt.__class__.__name__))
self.logger.info("Morphing %s into %s" % ( str(self), str(evt) ) )
self._raw_event = evt
self.path = evt.path
self.add_safe_pack_hook(evt._pack_hook)
self.__class__ = evt.__class__
evt._morph_target = True
return self
class FakePyinotify(object):

View File

@ -3,6 +3,7 @@ import threading
import time
import copy
import traceback
from itertools import ifilter
from media.monitor.handler import ReportHandler
from media.monitor.log import Loggable
@ -27,13 +28,14 @@ class RequestSync(threading.Thread,Loggable):
def run(self):
self.logger.info("Attempting request with %d items." %
len(self.requests))
# Note that we must attach the appropriate mode to every response. Also
# Not forget to attach the 'is_record' to any requests that are related
# to recorded shows
# Note that we must attach the appropriate mode to every
# response. Also Not forget to attach the 'is_record' to any
# requests that are related to recorded shows
# TODO : recorded shows aren't flagged right
# Is this retry shit even necessary? Consider getting rid of this.
packed_requests = []
for request_event in self.requests:
for request_event in ifilter(lambda x: not x.morph_target(),
self.requests):
try:
for request in request_event.safe_pack():
if isinstance(request, BadSongFile):
@ -78,9 +80,9 @@ class TimeoutWatcher(threading.Thread,Loggable):
# so that the people do not have to wait for the queue to fill up
while True:
time.sleep(self.timeout)
# If there is any requests left we launch em.
# Note that this isn't strictly necessary since RequestSync threads
# already chain themselves
# If there is any requests left we launch em. Note that this
# isn't strictly necessary since RequestSync threads already
# chain themselves
if self.watcher.requests_in_queue():
self.logger.info("We got %d requests waiting to be launched" %
self.watcher.requests_left_count())
@ -121,10 +123,8 @@ class WatchSyncer(ReportHandler,Loggable):
try:
# If there is a strange bug anywhere in the code the next line
# should be a suspect
if self.contractor.register( event ):
# only if the event was actually registered we add it.
# Otherwise some other event is mutated somewhere
self.push_queue( event )
self.contractor.register( event )
self.push_queue( event )
except BadSongFile as e:
self.fatal_exception("Received bas song file '%s'" % e.path, e)
except Exception as e:
@ -142,7 +142,7 @@ class WatchSyncer(ReportHandler,Loggable):
if self.events_left_count() >= self.chunking_number:
self.push_request()
self.request_do() # Launch the request if nothing is running
self.__queue.add(elem)
self.__queue.append(elem)
def flush_events(self):
self.logger.info("Force flushing events...")
@ -189,7 +189,7 @@ class WatchSyncer(ReportHandler,Loggable):
self.__requests.append(launch_request)
self.__reset_queue()
def __reset_queue(self): self.__queue = set([])
def __reset_queue(self): self.__queue = []
def __del__(self):
# Ideally we would like to do a little more to ensure safe shutdown

View File

@ -35,6 +35,7 @@ def main(global_config, api_client_config):
print("Unknown error reading configuration file: '%s'" % global_config)
print(str(e))
# TODO : use the logging config file.
logfile = unicode( config['logpath'] )
setup_logging(logfile)
log = get_logger()