From 7abe8824559937c744583833fe45ee96b5b3fd28 Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Fri, 27 Jul 2012 15:37:16 -0400 Subject: [PATCH] cc-4105-2: added retries on failed requests. added ignored files set like in the old media monitor --- .../application/controllers/ApiController.php | 5 ++-- airtime_mvc/application/models/StoredFile.php | 1 + python_apps/api_clients/api_client.py | 1 - .../media-monitor2/media/monitor/airtime.py | 9 +++---- .../media-monitor2/media/monitor/bootstrap.py | 2 +- .../media-monitor2/media/monitor/events.py | 8 +++---- .../media-monitor2/media/monitor/listeners.py | 24 ++++++++++++++++++- .../media-monitor2/media/monitor/manager.py | 3 +++ .../media/monitor/watchersyncer.py | 16 ++++++++++++- python_apps/media-monitor2/mm2.py | 4 ++++ 10 files changed, 58 insertions(+), 15 deletions(-) diff --git a/airtime_mvc/application/controllers/ApiController.php b/airtime_mvc/application/controllers/ApiController.php index 5a9c547ae..cbe7521e6 100644 --- a/airtime_mvc/application/controllers/ApiController.php +++ b/airtime_mvc/application/controllers/ApiController.php @@ -468,7 +468,8 @@ class ApiController extends Zend_Controller_Action ); } Application_Model_Preference::SetImportTimestamp(); - Logging::log("--->Mode: $mode and file: {$md['MDATA_KEY_FILEPATH']} "); + Logging::log("--->Mode: $mode || file: {$md['MDATA_KEY_FILEPATH']} "); + Logging::log( $md ); if ($mode == "create") { $filepath = $md['MDATA_KEY_FILEPATH']; $filepath = Application_Common_OsPath::normpath($filepath); @@ -540,7 +541,6 @@ class ApiController extends Zend_Controller_Action return $return_hash; } $return_hash['fileid'] = $file->getId(); - Logging::log("Have we returned jack shit???"); return $return_hash; } @@ -593,7 +593,6 @@ class ApiController extends Zend_Controller_Action $this->uploadRecordedActionParam($info_json['showinstanceid'],$info_json['fileid'],$dry_run=$dry); } } - Logging::log("returning response ><<><><><><><><"); die( json_encode($responses) ); } diff --git a/airtime_mvc/application/models/StoredFile.php b/airtime_mvc/application/models/StoredFile.php index 62c900907..5e44fbb6f 100644 --- a/airtime_mvc/application/models/StoredFile.php +++ b/airtime_mvc/application/models/StoredFile.php @@ -96,6 +96,7 @@ class Application_Model_StoredFile */ public function setMetadata($p_md=null) { + Logging::log("entered setMetadata"); if (is_null($p_md)) { $this->setDbColMetadata(); } else { diff --git a/python_apps/api_clients/api_client.py b/python_apps/api_clients/api_client.py index 04865b783..1c71abfe7 100644 --- a/python_apps/api_clients/api_client.py +++ b/python_apps/api_clients/api_client.py @@ -428,7 +428,6 @@ class AirtimeApiClient(): response = json.loads(response) return response except Exception, e: - import ipdb; ipdb.set_trace() logger.error('Exception: %s', e) logger.error("traceback: %s", traceback.format_exc()) raise diff --git a/python_apps/media-monitor2/media/monitor/airtime.py b/python_apps/media-monitor2/media/monitor/airtime.py index cf1ff7f01..671b27d00 100644 --- a/python_apps/media-monitor2/media/monitor/airtime.py +++ b/python_apps/media-monitor2/media/monitor/airtime.py @@ -12,6 +12,7 @@ from media.monitor.log import Loggable from media.monitor.syncdb import SyncDB from media.monitor.exceptions import DirectoryIsNotListed from media.monitor.bootstrap import Bootstrapper +from media.monitor.listeners import FileMediator from api_clients import api_client as apc @@ -33,14 +34,13 @@ class AirtimeNotifier(Loggable): self.logger.info("Initializing RabbitMQ message consumer...") schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True) schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem") - #self.connection = BrokerConnection(cfg["rabbitmq_host"], cfg["rabbitmq_user"], - #cfg["rabbitmq_password"], cfg["rabbitmq_vhost"]) - connection = BrokerConnection(cfg["rabbitmq_host"], cfg["rabbitmq_user"], + self.connection = BrokerConnection(cfg["rabbitmq_host"], cfg["rabbitmq_user"], cfg["rabbitmq_password"], cfg["rabbitmq_vhost"]) - channel = connection.channel() + channel = self.connection.channel() consumer = Consumer(channel, schedule_queue) consumer.register_callback(self.handle_message) consumer.consume() + self.logger.info("Initialized RabbitMQ consumer.") except Exception as e: self.logger.info("Failed to initialize RabbitMQ consumer") self.logger.error(e) @@ -181,6 +181,7 @@ class AirtimeMessageReceiver(Loggable): if os.path.exists(msg['filepath']): try: self.logger.info("Attempting to delete '%s'" % msg['filepath']) + FileMediator.ignore(msg['filepath']) os.unlink(msg['filepath']) except Exception as e: self.logger.info("Failed to delete '%s'" % msg['filepath']) diff --git a/python_apps/media-monitor2/media/monitor/bootstrap.py b/python_apps/media-monitor2/media/monitor/bootstrap.py index 5baff7cef..65c12afde 100644 --- a/python_apps/media-monitor2/media/monitor/bootstrap.py +++ b/python_apps/media-monitor2/media/monitor/bootstrap.py @@ -48,7 +48,7 @@ class Bootstrapper(Loggable): # Get all the files that are in the database but in the file # system. These are the files marked for deletions for to_delete in db_songs.difference(songs): - dispatcher.send(signal=self.watch_signal, sender=self, event=DeleteFile(f)) + dispatcher.send(signal=self.watch_signal, sender=self, event=DeleteFile(to_delete)) deleted += 1 self.logger.info( "Flushed watch directories. (modified, deleted) = (%d, %d)" % (modded, deleted) ) diff --git a/python_apps/media-monitor2/media/monitor/events.py b/python_apps/media-monitor2/media/monitor/events.py index 402a10621..c0c1d84ad 100644 --- a/python_apps/media-monitor2/media/monitor/events.py +++ b/python_apps/media-monitor2/media/monitor/events.py @@ -45,14 +45,14 @@ class NewFile(BaseEvent, HasMetaData): packs turns an event into a media monitor request """ req_dict = self.metadata.extract() - req_dict['mode'] = 'create' - req_dict['MDATA_KEY_FILEPATH'] = self.path + req_dict['mode'] = u'create' + req_dict['MDATA_KEY_FILEPATH'] = unicode( self.path ) return req_dict class DeleteFile(BaseEvent): def __init__(self, *args, **kwargs): super(DeleteFile, self).__init__(*args, **kwargs) def pack(self): req_dict = {} - req_dict['mode'] = 'delete' - req_dict['MDATA_KEY_FILEPATH'] = self.path + req_dict['mode'] = u'delete' + req_dict['MDATA_KEY_FILEPATH'] = unicode( self.path ) return req_dict diff --git a/python_apps/media-monitor2/media/monitor/listeners.py b/python_apps/media-monitor2/media/monitor/listeners.py index 767ee2917..aee41c4f3 100644 --- a/python_apps/media-monitor2/media/monitor/listeners.py +++ b/python_apps/media-monitor2/media/monitor/listeners.py @@ -5,7 +5,7 @@ from pydispatch import dispatcher import media.monitor.pure as mmp from media.monitor.pure import IncludeOnly from media.monitor.events import OrganizeFile, NewFile, DeleteFile -from media.monitor.log import Loggable +from media.monitor.log import Loggable, get_logger # We attempt to document a list of all special cases and hacks that the # following classes should be able to handle. @@ -33,6 +33,25 @@ from media.monitor.log import Loggable # OrganizeListener('watch_signal') <= wrong # OrganizeListener(signal='watch_signal') <= right +# This could easily be a module +class FileMediator(object): + ignored_set = set([]) + logger = get_logger() + @staticmethod + def is_ignored(path): return path in FileMediator.ignored_set + @staticmethod + def ignore(path): FileMediator.ignored_set.add(path) + @staticmethod + def unignore(path): FileMediator.ignored_set.remove(path) + +def mediate_ignored(fn): + def wrapped(self, event, *args,**kwargs): + if FileMediator.is_ignored(event.pathname): + FileMediator.logger.info("Ignoring: '%s' (once)" % event.pathname) + FileMediator.unignore(event.pathname) + else: fn(self, event, *args, **kwargs) + return wrapped + class BaseListener(object): def my_init(self, signal): self.signal = signal @@ -54,6 +73,7 @@ class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable): flushed += 1 self.logger.info("Flushed organized directory with %d files" % flushed) + @mediate_ignored @IncludeOnly(mmp.supported_extensions) def process_to_organize(self, event): dispatcher.send(signal=self.signal, sender=self, event=OrganizeFile(event)) @@ -65,10 +85,12 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent): def process_IN_MOVED_FROM(self, event): self.process_delete(event) def process_IN_DELETE(self,event): self.process_delete(event) + @mediate_ignored @IncludeOnly(mmp.supported_extensions) def process_create(self, event): dispatcher.send(signal=self.signal, sender=self, event=NewFile(event)) + @mediate_ignored @IncludeOnly(mmp.supported_extensions) def process_delete(self, event): dispatcher.send(signal=self.signal, sender=self, event=DeleteFile(event)) diff --git a/python_apps/media-monitor2/media/monitor/manager.py b/python_apps/media-monitor2/media/monitor/manager.py index ecef7332d..67e2a1ec4 100644 --- a/python_apps/media-monitor2/media/monitor/manager.py +++ b/python_apps/media-monitor2/media/monitor/manager.py @@ -143,3 +143,6 @@ class Manager(Loggable): block until we receive pyinotify events """ pyinotify.Notifier(self.wm).loop() + #import asyncore + #pyinotify.AsyncNotifier(self.wm).loop() + #asyncore.loop() diff --git a/python_apps/media-monitor2/media/monitor/watchersyncer.py b/python_apps/media-monitor2/media/monitor/watchersyncer.py index 1095b52c9..7c4d15c8e 100644 --- a/python_apps/media-monitor2/media/monitor/watchersyncer.py +++ b/python_apps/media-monitor2/media/monitor/watchersyncer.py @@ -16,6 +16,7 @@ class RequestSync(threading.Thread,Loggable): threading.Thread.__init__(self) self.watcher = watcher self.requests = requests + self.retries = 3 @LazyProperty def apiclient(self): @@ -28,7 +29,20 @@ class RequestSync(threading.Thread,Loggable): # Not forget to attach the 'is_record' to any requests that are related # to recorded shows # A simplistic request would like: - self.apiclient.send_media_monitor_requests([ req.pack() for req in self.requests ]) + # TODO : recorded shows aren't flagged right + packed_requests = [ req.pack() for req in self.requests ] + # Remove when finished debugging + def send_one(x): self.apiclient.send_media_monitor_requests( [x] ) + def make_req(): self.apiclient.send_media_monitor_requests( packed_requests ) + for try_index in range(0,self.retries): + try: make_req() + except ValueError: + self.logger.info("Api Controller is a piece of shit... will fix once I setup the damn debugger") + self.logger.info("Trying again...") + else: + self.logger.info("Request worked on the '%d' try" % (try_index + 1)) + break + else: self.logger.info("Failed to send request after '%d' tries..." % self.retries) self.watcher.flag_done() class TimeoutWatcher(threading.Thread,Loggable): diff --git a/python_apps/media-monitor2/mm2.py b/python_apps/media-monitor2/mm2.py index 6ebf2d4bf..8a7e5e937 100644 --- a/python_apps/media-monitor2/mm2.py +++ b/python_apps/media-monitor2/mm2.py @@ -11,6 +11,7 @@ from media.monitor.syncdb import SyncDB from media.monitor.exceptions import FailedToObtainLocale, FailedToSetLocale, NoConfigFile from media.monitor.airtime import AirtimeNotifier, AirtimeMessageReceiver from media.monitor.watchersyncer import WatchSyncer +from media.monitor.eventdrainer import EventDrainer import media.monitor.pure as mmp from api_clients import api_client as apc @@ -93,6 +94,9 @@ bs.flush_all( config.last_ran() ) airtime_receiver = AirtimeMessageReceiver(config,manager) airtime_notifier = AirtimeNotifier(config, airtime_receiver) + +ed = EventDrainer(airtime_notifier.connection,interval=1) + # Launch the toucher that updates the last time when the script was ran every # n seconds. tt = ToucherThread(path=config['index_path'], interval=int(config['touch_interval']))