cc-1799 : Filesystem

huge speed increase with multi processes.
This commit is contained in:
Naomi Aro 2011-06-28 12:42:06 +02:00
parent 4c8e6a04ac
commit 4eff714f32
2 changed files with 21 additions and 12 deletions

View file

@ -37,11 +37,14 @@ try:
logger.info("Initializing event processor") logger.info("Initializing event processor")
pe = AirtimeProcessEvent(airtime_config=config) pe = AirtimeProcessEvent(airtime_config=config)
notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=1, airtime_config=config) notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config)
notifier.coalesce_events() notifier.coalesce_events()
p = Process(target=notifier.process_file_events, args=(pe.file_events,)) #create 5 worker processes
p.start() for i in range(5):
p = Process(target=notifier.process_file_events, args=(pe.multi_queue,))
p.daemon = True
p.start()
signal.signal(signal.SIGTERM, handleSigTERM) signal.signal(signal.SIGTERM, handleSigTERM)

View file

@ -32,7 +32,8 @@ class AirtimeProcessEvent(ProcessEvent):
self.temp_files = {} self.temp_files = {}
self.moved_files = {} self.moved_files = {}
self.renamed_files = {} self.renamed_files = {}
self.file_events = mpQueue() self.file_events = []
self.multi_queue = mpQueue()
self.mask = pyinotify.ALL_EVENTS self.mask = pyinotify.ALL_EVENTS
self.wm = WatchManager() self.wm = WatchManager()
self.md_manager = AirtimeMetadata() self.md_manager = AirtimeMetadata()
@ -208,9 +209,9 @@ class AirtimeProcessEvent(ProcessEvent):
filepath, file_md, is_recorded_show = self.create_file_path(event.pathname) filepath, file_md, is_recorded_show = self.create_file_path(event.pathname)
self.move_file(event.pathname, filepath) self.move_file(event.pathname, filepath)
self.renamed_files[event.pathname] = filepath self.renamed_files[event.pathname] = filepath
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show})
else: else:
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False})
else: else:
if self.is_parent_directory(event.pathname, storage_directory): if self.is_parent_directory(event.pathname, storage_directory):
@ -223,7 +224,7 @@ class AirtimeProcessEvent(ProcessEvent):
if event.pathname in self.renamed_files: if event.pathname in self.renamed_files:
pass pass
elif self.is_audio_file(event.name): elif self.is_audio_file(event.name):
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY}) self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY})
def process_IN_MOVED_FROM(self, event): def process_IN_MOVED_FROM(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname) #self.logger.info("%s: %s", event.maskname, event.pathname)
@ -243,11 +244,11 @@ class AirtimeProcessEvent(ProcessEvent):
if not event.dir: if not event.dir:
if event.cookie in self.temp_files: if event.cookie in self.temp_files:
del self.temp_files[event.cookie] del self.temp_files[event.cookie]
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY}) self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY})
elif event.cookie in self.moved_files: elif event.cookie in self.moved_files:
old_filepath = self.moved_files[event.cookie] old_filepath = self.moved_files[event.cookie]
del self.moved_files[event.cookie] del self.moved_files[event.cookie]
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MOVED}) self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MOVED})
elif hasattr(event, 'src_pathname') and event.src_pathname in self.renamed_files: elif hasattr(event, 'src_pathname') and event.src_pathname in self.renamed_files:
del self.renamed_files[event.src_pathname] del self.renamed_files[event.src_pathname]
#self.logger.info("removing renamed path %s", event.src_pathname) #self.logger.info("removing renamed path %s", event.src_pathname)
@ -258,14 +259,14 @@ class AirtimeProcessEvent(ProcessEvent):
filepath, file_md, is_recorded_show = self.create_file_path(event.pathname) filepath, file_md, is_recorded_show = self.create_file_path(event.pathname)
self.move_file(event.pathname, filepath) self.move_file(event.pathname, filepath)
self.renamed_files[event.pathname] = filepath self.renamed_files[event.pathname] = filepath
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': False}) self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': False})
else: else:
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False})
def process_IN_DELETE(self, event): def process_IN_DELETE(self, event):
self.logger.info("%s: %s", event.maskname, event.pathname) self.logger.info("%s: %s", event.maskname, event.pathname)
if not event.dir: if not event.dir:
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_DELETE}) self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_DELETE})
def process_default(self, event): def process_default(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname) #self.logger.info("%s: %s", event.maskname, event.pathname)
@ -273,6 +274,11 @@ class AirtimeProcessEvent(ProcessEvent):
def notifier_loop_callback(self, notifier): def notifier_loop_callback(self, notifier):
for event in self.file_events:
self.multi_queue.put(event)
self.file_events = []
#check for any events recieved from Airtime. #check for any events recieved from Airtime.
try: try:
notifier.connection.drain_events(timeout=0.1) notifier.connection.drain_events(timeout=0.1)