cc-4105: added mechanism to remove duplicate events

This commit is contained in:
Rudi Grinberg 2012-08-10 18:30:07 -04:00
parent 745221f77c
commit 076a9c2296
6 changed files with 83 additions and 35 deletions

View File

@ -16,7 +16,17 @@ rabbitmq_password = 'guest'
rabbitmq_vhost = '/'
############################################
# Media-Monitor preferences #
# Media-Monitor preferences #
############################################
check_filesystem_events = 5 #how long to queue up events performed on the files themselves.
check_airtime_events = 30 #how long to queue metadata input from airtime.
check_filesystem_events = 5 #how long to queue up events performed on the files themselves.
check_airtime_events = 30 #how long to queue metadata input from airtime.
# MM2 only:
touch_interval = 5
chunking_number = 450
request_max_wait = 3.0
rmq_event_wait = 0.5
logpath = '/var/log/airtime/media-monitor/mm2.log'

View File

@ -0,0 +1,30 @@
from media.monitor.log import Loggable
from media.monitor.events import DeleteFile
class EventContractor(Loggable):
def __init__(self):
self.store = {}
def event_registered(self, evt):
return evt.path in self.store
def get_old_event(self, evt):
return evt.store[ evt.path ]
def register(self, evt):
if self.event_registered(evt):
old_e = self.get_old_event(evt)
# If two events are of the same type we can safely discard the old
# one
if evt.__class__ == old_e.__class__:
old_e.morph_into(evt)
# delete overrides any other event
elif isinstance(evt, DeleteFile):
old_e.morph_into(evt)
else:
evt.add_safe_pack_hook( lambda : self.__unregister(evt) )
self.store[ evt.path ] = evt
# events are unregistered automatically no need to screw around with them
def __unregister(self, evt):
del self.store[evt.path]

View File

@ -56,6 +56,7 @@ class BaseEvent(Loggable):
self._raw_event = raw_event
self.path = os.path.normpath(raw_event.pathname)
else: self.path = raw_event
self._pack_hook = lambda _ : _ # no op
def exists(self): return os.path.exists(self.path)
@LazyProperty
def cookie(self):
@ -64,6 +65,8 @@ class BaseEvent(Loggable):
def __str__(self): return "Event. Path: %s" % self.__raw_event.pathname
def is_dir_event(self): return self._raw_event.dir
def add_safe_pack_hook(self,k): self._pack_hook = k
# As opposed to unsafe_pack...
def safe_pack(self):
"""
@ -73,19 +76,18 @@ class BaseEvent(Loggable):
"""
# pack will only throw an exception if it processes one file but this
# is a little bit hacky
try: return self.pack()
try:
return self.pack()
self._pack_hook()
except BadSongFile as e: return [e]
# nothing to see here, please move along
def morph_into(self, evt):
"""
'Morphing' should preserve the self.cookie invariant. I.e.
either should either stay an integer or stay none.
"""
self.logger.info("Morphing '%s' into '%s'" % (self.__class__.__name__,
evt.__class__.__name__))
self._raw_event = evt
self.path = evt.path
self.add_safe_pack_hook(evt._pack_hook)
self.__class__ = evt.__class__
return self
@ -138,9 +140,19 @@ class MoveFile(BaseEvent, HasMetaData):
req_dict['MDATA_KEY_FILEPATH'] = unicode( self.path )
return [req_dict]
class ModifyFile(BaseEvent, HasMetaData):
def __init__(self, *args, **kwargs):
super(ModifyFile, self).__init__(*args, **kwargs)
def pack(self):
req_dict = self.metadata.extract()
req_dict['mode'] = u'modify'
# path to directory that is to be removed
req_dict['MDATA_KEY_FILEPATH'] = unicode( self.path )
return [req_dict]
def map_events(directory, constructor):
# -unknown-path should not appear in the path here but more testing might
# be necessary
# -unknown-path should not appear in the path here but more testing
# might be necessary
for f in mmp.walk_supported(directory, clean_empties=False):
try:
for e in constructor( FakePyinotify(f) ).pack(): yield e
@ -167,13 +179,3 @@ class DeleteDirWatch(BaseEvent):
req_dict['MDATA_KEY_FILEPATH'] = unicode( self.path + "/" )
return [req_dict]
class ModifyFile(BaseEvent, HasMetaData):
def __init__(self, *args, **kwargs):
super(ModifyFile, self).__init__(*args, **kwargs)
def pack(self):
req_dict = self.metadata.extract()
req_dict['mode'] = u'modify'
# path to directory that is to be removed
req_dict['MDATA_KEY_FILEPATH'] = unicode( self.path )
return [req_dict]

View File

@ -8,6 +8,7 @@ from media.monitor.handler import ReportHandler
from media.monitor.log import Loggable
from media.monitor.exceptions import BadSongFile
from media.monitor.pure import LazyProperty
from media.monitor.eventcontractor import EventContractor
import api_clients.api_client as ac
@ -31,8 +32,21 @@ class RequestSync(threading.Thread,Loggable):
# to recorded shows
# TODO : recorded shows aren't flagged right
# Is this retry shit even necessary? Consider getting rid of this.
packed_requests = []
for request_event in self.requests:
try:
for request in request_event.safe_pack():
if isinstance(request, BadSongFile):
self.logger.info("Bad song file: '%s'" % request.path)
else: packed_requests.append(request)
except BadSongFile as e:
self.logger.info("This should never occur anymore!!!")
self.logger.info("Bad song file: '%s'" % e.path)
except Exception as e:
self.logger.info("An evil exception occured")
self.logger.error( traceback.format_exc() )
def make_req():
self.apiclient.send_media_monitor_requests( self.requests )
self.apiclient.send_media_monitor_requests( packed_requests )
for try_index in range(0,self.retries):
try: make_req()
# most likely we did not get json response as we expected
@ -89,6 +103,7 @@ class WatchSyncer(ReportHandler,Loggable):
self.request_running = False
# we don't actually use this "private" instance variable anywhere
self.__current_thread = None
self.contractor = EventContractor()
tc = TimeoutWatcher(self, self.timeout)
tc.daemon = True
tc.start()
@ -103,7 +118,11 @@ class WatchSyncer(ReportHandler,Loggable):
self.logger.info("Received event '%s'. Path: '%s'" % \
( event.__class__.__name__,
getattr(event,'path','No path exists') ))
try: self.push_queue( event )
try:
self.push_queue( event )
# If there is a strange bug anywhere in the code the next line
# should be a suspect
self.contractor.register( event )
except BadSongFile as e:
self.fatal_exception("Received bas song file '%s'" % e.path, e)
except Exception as e:
@ -160,22 +179,9 @@ class WatchSyncer(ReportHandler,Loggable):
self.logger.info("WatchSyncer : Unleashing request")
# want to do request asyncly and empty the queue
requests = copy.copy(self.__queue)
packed_requests = []
for request_event in requests:
try:
for request in request_event.safe_pack():
if isinstance(request, BadSongFile):
self.logger.info("Bad song file: '%s'" % request.path)
else: packed_requests.append(request)
except BadSongFile as e:
self.logger.info("This should never occur anymore!!!")
self.logger.info("Bad song file: '%s'" % e.path)
except Exception as e:
self.logger.info("An evil exception occured")
self.logger.error( traceback.format_exc() )
def launch_request():
# Need shallow copy here
t = RequestSync(watcher=self, requests=packed_requests)
t = RequestSync(watcher=self, requests=requests)
t.start()
self.__current_thread = t
self.__requests.append(launch_request)

View File

@ -36,6 +36,7 @@ def main(global_config, api_client_config):
print(str(e))
logfile = unicode( config['logpath'] )
setup_logging(logfile)
log = get_logger()
log.info("Attempting to set the locale...")
@ -118,5 +119,4 @@ if __name__ == '__main__':
print("'%s' must exist" % args[k])
sys.exit(0)
print("Running mm1.99")
setup_logging(args['--log'])
main(args['--config'],args['--apiclient'])