diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index a44d6b26b..004455122 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -37,11 +37,14 @@ try: logger.info("Initializing event processor") pe = AirtimeProcessEvent(airtime_config=config) - notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=1, airtime_config=config) + notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config) notifier.coalesce_events() - p = Process(target=notifier.process_file_events, args=(pe.file_events,)) - p.start() + #create 5 worker processes + for i in range(5): + p = Process(target=notifier.process_file_events, args=(pe.multi_queue,)) + p.daemon = True + p.start() signal.signal(signal.SIGTERM, handleSigTERM) diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py index ffcea265c..7f7df9079 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py @@ -32,7 +32,8 @@ class AirtimeProcessEvent(ProcessEvent): self.temp_files = {} self.moved_files = {} self.renamed_files = {} - self.file_events = mpQueue() + self.file_events = [] + self.multi_queue = mpQueue() self.mask = pyinotify.ALL_EVENTS self.wm = WatchManager() self.md_manager = AirtimeMetadata() @@ -208,9 +209,9 @@ class AirtimeProcessEvent(ProcessEvent): filepath, file_md, is_recorded_show = self.create_file_path(event.pathname) self.move_file(event.pathname, filepath) self.renamed_files[event.pathname] = filepath - self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) else: - self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) else: if self.is_parent_directory(event.pathname, storage_directory): @@ -223,7 +224,7 @@ class AirtimeProcessEvent(ProcessEvent): if event.pathname in self.renamed_files: pass elif self.is_audio_file(event.name): - self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY}) + self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY}) def process_IN_MOVED_FROM(self, event): #self.logger.info("%s: %s", event.maskname, event.pathname) @@ -243,11 +244,11 @@ class AirtimeProcessEvent(ProcessEvent): if not event.dir: if event.cookie in self.temp_files: del self.temp_files[event.cookie] - self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY}) + self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY}) elif event.cookie in self.moved_files: old_filepath = self.moved_files[event.cookie] del self.moved_files[event.cookie] - self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MOVED}) + self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MOVED}) elif hasattr(event, 'src_pathname') and event.src_pathname in self.renamed_files: del self.renamed_files[event.src_pathname] #self.logger.info("removing renamed path %s", event.src_pathname) @@ -258,14 +259,14 @@ class AirtimeProcessEvent(ProcessEvent): filepath, file_md, is_recorded_show = self.create_file_path(event.pathname) self.move_file(event.pathname, filepath) self.renamed_files[event.pathname] = filepath - self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': False}) + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': False}) else: - self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) def process_IN_DELETE(self, event): self.logger.info("%s: %s", event.maskname, event.pathname) if not event.dir: - self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_DELETE}) + self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_DELETE}) def process_default(self, event): #self.logger.info("%s: %s", event.maskname, event.pathname) @@ -273,6 +274,11 @@ class AirtimeProcessEvent(ProcessEvent): def notifier_loop_callback(self, notifier): + for event in self.file_events: + self.multi_queue.put(event) + + self.file_events = [] + #check for any events recieved from Airtime. try: notifier.connection.drain_events(timeout=0.1)