cc-1799 Human Readable File System
refactoring media monitor, separate process for file event processing.
This commit is contained in:
parent
d642a48e8c
commit
65e647263f
10 changed files with 616 additions and 662 deletions
|
@ -0,0 +1,270 @@
|
|||
import os
|
||||
import socket
|
||||
import grp
|
||||
import pwd
|
||||
import logging
|
||||
|
||||
from multiprocessing import Process, Lock, Queue as mpQueue
|
||||
|
||||
import pyinotify
|
||||
from pyinotify import WatchManager, Notifier, ProcessEvent
|
||||
|
||||
# For RabbitMQ
|
||||
from kombu.connection import BrokerConnection
|
||||
from kombu.messaging import Exchange, Queue, Consumer, Producer
|
||||
|
||||
from airtimemetadata import AirtimeMetadata
|
||||
from airtimefilemonitor.mediaconfig import AirtimeMediaConfig
|
||||
|
||||
class AirtimeProcessEvent(ProcessEvent):
|
||||
|
||||
def my_init(self, airtime_config=None):
|
||||
"""
|
||||
Method automatically called from ProcessEvent.__init__(). Additional
|
||||
keyworded arguments passed to ProcessEvent.__init__() are then
|
||||
delegated to my_init().
|
||||
"""
|
||||
|
||||
self.logger = logging.getLogger()
|
||||
self.config = airtime_config
|
||||
|
||||
self.supported_file_formats = ['mp3', 'ogg']
|
||||
self.temp_files = {}
|
||||
self.moved_files = {}
|
||||
self.file_events = mpQueue()
|
||||
self.mask = pyinotify.ALL_EVENTS
|
||||
self.wm = WatchManager()
|
||||
self.md_manager = AirtimeMetadata()
|
||||
|
||||
schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True)
|
||||
schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem")
|
||||
connection = BrokerConnection(self.config.cfg["rabbitmq_host"], self.config.cfg["rabbitmq_user"], self.config.cfg["rabbitmq_password"], "/")
|
||||
channel = connection.channel()
|
||||
|
||||
def watch_directory(self, directory):
|
||||
return self.wm.add_watch(directory, self.mask, rec=True, auto_add=True)
|
||||
|
||||
def is_parent_directory(self, filepath, directory):
|
||||
return (directory == filepath[0:len(directory)])
|
||||
|
||||
def is_temp_file(self, filename):
|
||||
info = filename.split(".")
|
||||
|
||||
if(info[-2] in self.supported_file_formats):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def is_audio_file(self, filename):
|
||||
info = filename.split(".")
|
||||
|
||||
if(info[-1] in self.supported_file_formats):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def set_needed_file_permissions(self, item, is_dir):
|
||||
|
||||
try:
|
||||
omask = os.umask(0)
|
||||
|
||||
uid = pwd.getpwnam('pypo')[2]
|
||||
gid = grp.getgrnam('www-data')[2]
|
||||
|
||||
os.chown(item, uid, gid)
|
||||
|
||||
if is_dir is True:
|
||||
os.chmod(item, 02777)
|
||||
else:
|
||||
os.chmod(item, 0666)
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error("Failed to change file's owner/group/permissions.")
|
||||
self.logger.error(item)
|
||||
finally:
|
||||
os.umask(omask)
|
||||
|
||||
def ensure_dir(self, filepath):
|
||||
directory = os.path.dirname(filepath)
|
||||
|
||||
try:
|
||||
omask = os.umask(0)
|
||||
if ((not os.path.exists(directory)) or ((os.path.exists(directory) and not os.path.isdir(directory)))):
|
||||
os.makedirs(directory, 02777)
|
||||
self.watch_directory(directory)
|
||||
finally:
|
||||
os.umask(omask)
|
||||
|
||||
def move_file(self, source, dest):
|
||||
|
||||
try:
|
||||
omask = os.umask(0)
|
||||
os.rename(source, dest)
|
||||
except Exception, e:
|
||||
self.logger.error("failed to move file.")
|
||||
finally:
|
||||
os.umask(omask)
|
||||
|
||||
#checks if path exists already in stor. If the path exists and the md5s are the same just moves file to same path anyway to avoid duplicates in the system.
|
||||
def create_unique_filename(self, filepath, old_filepath):
|
||||
|
||||
try:
|
||||
if(os.path.exists(filepath)):
|
||||
self.logger.info("Path %s exists", filepath)
|
||||
|
||||
self.logger.info("Checking if md5s are the same.")
|
||||
md5_fp = self.md_manager.get_md5(filepath)
|
||||
md5_ofp = self.md_manager.get_md5(old_filepath)
|
||||
|
||||
if(md5_fp == md5_ofp):
|
||||
self.logger.info("Md5s are the same, moving to same filepath.")
|
||||
return filepath
|
||||
|
||||
self.logger.info("Md5s aren't the same, appending to filepath.")
|
||||
file_dir = os.path.dirname(filepath)
|
||||
filename = os.path.basename(filepath).split(".")[0]
|
||||
#will be in the format .ext
|
||||
file_ext = os.path.splitext(filepath)[1]
|
||||
i = 1;
|
||||
while(True):
|
||||
new_filepath = '%s/%s(%s)%s' % (file_dir, filename, i, file_ext)
|
||||
self.logger.error("Trying %s", new_filepath)
|
||||
|
||||
if(os.path.exists(new_filepath)):
|
||||
i = i+1;
|
||||
else:
|
||||
filepath = new_filepath
|
||||
break
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error("Exception %s", e)
|
||||
|
||||
return filepath
|
||||
|
||||
def create_file_path(self, imported_filepath):
|
||||
|
||||
storage_directory = self.config.storage_directory
|
||||
|
||||
try:
|
||||
#will be in the format .ext
|
||||
file_ext = os.path.splitext(imported_filepath)[1]
|
||||
file_ext = file_ext.encode('utf-8')
|
||||
|
||||
orig_md = self.md_manager.get_md_from_file(imported_filepath)
|
||||
path_md = ['MDATA_KEY_TITLE', 'MDATA_KEY_CREATOR', 'MDATA_KEY_SOURCE', 'MDATA_KEY_TRACKNUMBER', 'MDATA_KEY_BITRATE']
|
||||
|
||||
md = {}
|
||||
for m in path_md:
|
||||
if m not in orig_md:
|
||||
md[m] = u'unknown'.encode('utf-8')
|
||||
else:
|
||||
#get rid of any "/" which will interfere with the filepath.
|
||||
if isinstance(orig_md[m], basestring):
|
||||
md[m] = orig_md[m].replace("/", "-")
|
||||
else:
|
||||
md[m] = orig_md[m]
|
||||
|
||||
filepath = None
|
||||
if(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')):
|
||||
filepath = '%s/%s/%s/%s-%s%s' % (storage_directory, md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
|
||||
else:
|
||||
filepath = '%s/%s/%s/%s-%s-%s%s' % (storage_directory, md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TRACKNUMBER'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
|
||||
|
||||
self.logger.info('Created filepath: %s', filepath)
|
||||
filepath = self.create_unique_filename(filepath, imported_filepath)
|
||||
self.logger.info('Unique filepath: %s', filepath)
|
||||
self.ensure_dir(filepath)
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error('Exception: %s', e)
|
||||
|
||||
return filepath, orig_md
|
||||
|
||||
def process_IN_CREATE(self, event):
|
||||
|
||||
self.logger.info("%s: %s", event.maskname, event.pathname)
|
||||
storage_directory = self.config.storage_directory
|
||||
|
||||
if not event.dir:
|
||||
#file created is a tmp file which will be modified and then moved back to the original filename.
|
||||
if self.is_temp_file(event.name) :
|
||||
self.temp_files[event.pathname] = None
|
||||
#This is a newly imported file.
|
||||
else :
|
||||
if self.is_audio_file(event.pathname):
|
||||
if self.is_parent_directory(event.pathname, storage_directory):
|
||||
self.set_needed_file_permissions(event.pathname, event.dir)
|
||||
filepath, file_md = self.create_file_path(event.pathname)
|
||||
self.move_file(event.pathname, filepath)
|
||||
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md})
|
||||
else:
|
||||
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname})
|
||||
|
||||
else:
|
||||
if self.is_parent_directory(event.pathname, storage_directory):
|
||||
self.set_needed_file_permissions(event.pathname, event.dir)
|
||||
|
||||
|
||||
def process_IN_MODIFY(self, event):
|
||||
if not event.dir:
|
||||
self.logger.info("%s: %s", event.maskname, event.pathname)
|
||||
if self.is_audio_file(event.name) :
|
||||
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY})
|
||||
|
||||
def process_IN_MOVED_FROM(self, event):
|
||||
self.logger.info("%s: %s", event.maskname, event.pathname)
|
||||
if not event.dir:
|
||||
if event.pathname in self.temp_files:
|
||||
del self.temp_files[event.pathname]
|
||||
self.temp_files[event.cookie] = event.pathname
|
||||
else:
|
||||
self.moved_files[event.cookie] = event.pathname
|
||||
|
||||
def process_IN_MOVED_TO(self, event):
|
||||
self.logger.info("%s: %s", event.maskname, event.pathname)
|
||||
#if stuff dropped in stor via a UI move must change file permissions.
|
||||
self.set_needed_file_permissions(event.pathname, event.dir)
|
||||
if not event.dir:
|
||||
if event.cookie in self.temp_files:
|
||||
del self.temp_files[event.cookie]
|
||||
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY})
|
||||
elif event.cookie in self.moved_files:
|
||||
old_filepath = self.moved_files[event.cookie]
|
||||
del self.moved_files[event.cookie]
|
||||
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MOVED})
|
||||
else:
|
||||
storage_directory = self.config.storage_directory
|
||||
if self.is_parent_directory(event.pathname, storage_directory):
|
||||
filepath, file_md = self.create_file_path(event.pathname)
|
||||
self.move_file(event.pathname, filepath)
|
||||
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md})
|
||||
else:
|
||||
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname})
|
||||
|
||||
def process_IN_DELETE(self, event):
|
||||
self.logger.info("%s: %s", event.maskname, event.pathname)
|
||||
if not event.dir:
|
||||
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_DELETE})
|
||||
|
||||
def process_default(self, event):
|
||||
#self.logger.info("%s: %s", event.maskname, event.pathname)
|
||||
pass
|
||||
|
||||
def notifier_loop_callback(self, notifier):
|
||||
|
||||
#put a watch on any fully imported watched directories.
|
||||
for watched_directory in notifier.import_processes.keys():
|
||||
process = notifier.import_processes[watched_directory]
|
||||
if not process.is_alive():
|
||||
self.watch_directory(watched_directory)
|
||||
del notifier.import_processes[watched_directory]
|
||||
|
||||
#check for any events recieved from Airtime.
|
||||
try:
|
||||
notifier.connection.drain_events(timeout=0.1)
|
||||
#avoid logging a bunch of timeout messages.
|
||||
except socket.timeout:
|
||||
pass
|
||||
except Exception, e:
|
||||
self.logger.info("%s", e)
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue