cc-4105: fixed first big event contracting bug

This commit is contained in:
Rudi Grinberg 2012-08-13 11:19:51 -04:00
parent 3ce2555a35
commit f2fea19c85
2 changed files with 26 additions and 8 deletions

View File

@ -2,6 +2,10 @@ from media.monitor.log import Loggable
from media.monitor.events import DeleteFile
class EventContractor(Loggable):
"""
This class is responsible for "contracting" events together to ease the
load on airtime. It does this by morphing old events into newer ones
"""
def __init__(self):
self.store = {}
@ -12,18 +16,28 @@ class EventContractor(Loggable):
return self.store[ evt.path ]
def register(self, evt):
"""
Returns true if event was actually registered. This means that
no old events were touched. On the other hand returns false if
some other event in the storage was morphed into this newer one.
Which should mean that the old event should be discarded.
"""
if self.event_registered(evt):
old_e = self.get_old_event(evt)
# TODO : Perhaps there are other events that we can "contract"
# together
# If two events are of the same type we can safely discard the old
# one
if evt.__class__ == old_e.__class__:
old_e.morph_into(evt)
return False
# delete overrides any other event
elif isinstance(evt, DeleteFile):
old_e.morph_into(evt)
else:
evt.add_safe_pack_hook( lambda : self.__unregister(evt) )
self.store[ evt.path ] = evt
return False
evt.add_safe_pack_hook( lambda : self.__unregister(evt) )
self.store[ evt.path ] = evt
return True # We actually added something, hence we return true.
# events are unregistered automatically no need to screw around with them
def __unregister(self, evt):

View File

@ -96,7 +96,7 @@ class WatchSyncer(ReportHandler,Loggable):
#self.signal = signal
self.timeout = float(timeout)
self.chunking_number = int(chunking_number)
self.__queue = []
self.__reset_queue()
# Even though we are not blocking on the http requests, we are still
# trying to send the http requests in order
self.__requests = []
@ -119,10 +119,12 @@ class WatchSyncer(ReportHandler,Loggable):
( event.__class__.__name__,
getattr(event,'path','No path exists') ))
try:
self.push_queue( event )
# If there is a strange bug anywhere in the code the next line
# should be a suspect
self.contractor.register( event )
if self.contractor.register( event ):
# only if the event was actually registered we add it.
# Otherwise some other event is mutated somewhere
self.push_queue( event )
except BadSongFile as e:
self.fatal_exception("Received bas song file '%s'" % e.path, e)
except Exception as e:
@ -140,7 +142,7 @@ class WatchSyncer(ReportHandler,Loggable):
if self.events_left_count() >= self.chunking_number:
self.push_request()
self.request_do() # Launch the request if nothing is running
self.__queue.append(elem)
self.__queue.add(elem)
def flush_events(self):
self.logger.info("Force flushing events...")
@ -185,7 +187,9 @@ class WatchSyncer(ReportHandler,Loggable):
t.start()
self.__current_thread = t
self.__requests.append(launch_request)
self.__queue = []
self.__reset_queue()
def __reset_queue(self): self.__queue = set([])
def __del__(self):
# Ideally we would like to do a little more to ensure safe shutdown