sintonia/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py

297 lines
12 KiB
Python

import os
import socket
import grp
import pwd
import logging
from multiprocessing import Process, Lock, Queue as mpQueue
import pyinotify
from pyinotify import WatchManager, Notifier, ProcessEvent
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
from airtimemetadata import AirtimeMetadata
from airtimefilemonitor.mediaconfig import AirtimeMediaConfig
class AirtimeProcessEvent(ProcessEvent):
def my_init(self, airtime_config=None):
"""
Method automatically called from ProcessEvent.__init__(). Additional
keyworded arguments passed to ProcessEvent.__init__() are then
delegated to my_init().
"""
self.logger = logging.getLogger()
self.config = airtime_config
self.supported_file_formats = ['mp3', 'ogg']
self.temp_files = {}
self.moved_files = {}
self.renamed_files = {}
self.file_events = mpQueue()
self.mask = pyinotify.ALL_EVENTS
self.wm = WatchManager()
self.md_manager = AirtimeMetadata()
schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem")
connection = BrokerConnection(self.config.cfg["rabbitmq_host"], self.config.cfg["rabbitmq_user"], self.config.cfg["rabbitmq_password"], "/")
channel = connection.channel()
def watch_directory(self, directory):
return self.wm.add_watch(directory, self.mask, rec=True, auto_add=True)
def is_parent_directory(self, filepath, directory):
return (directory == filepath[0:len(directory)])
def is_temp_file(self, filename):
info = filename.split(".")
if(info[-2] in self.supported_file_formats):
return True
else:
return False
def is_audio_file(self, filename):
info = filename.split(".")
if(info[-1] in self.supported_file_formats):
return True
else:
return False
def set_needed_file_permissions(self, item, is_dir):
try:
omask = os.umask(0)
uid = pwd.getpwnam('pypo')[2]
gid = grp.getgrnam('www-data')[2]
os.chown(item, uid, gid)
if is_dir is True:
os.chmod(item, 02777)
else:
os.chmod(item, 0666)
except Exception, e:
self.logger.error("Failed to change file's owner/group/permissions.")
self.logger.error(item)
finally:
os.umask(omask)
def ensure_dir(self, filepath):
directory = os.path.dirname(filepath)
try:
omask = os.umask(0)
if ((not os.path.exists(directory)) or ((os.path.exists(directory) and not os.path.isdir(directory)))):
os.makedirs(directory, 02777)
self.watch_directory(directory)
finally:
os.umask(omask)
def move_file(self, source, dest):
try:
omask = os.umask(0)
os.rename(source, dest)
except Exception, e:
self.logger.error("failed to move file.")
finally:
os.umask(omask)
#checks if path exists already in stor. If the path exists and the md5s are the same just moves file to same path anyway to avoid duplicates in the system.
def create_unique_filename(self, filepath, old_filepath):
try:
if(os.path.exists(filepath)):
self.logger.info("Path %s exists", filepath)
self.logger.info("Checking if md5s are the same.")
md5_fp = self.md_manager.get_md5(filepath)
md5_ofp = self.md_manager.get_md5(old_filepath)
if(md5_fp == md5_ofp):
self.logger.info("Md5s are the same, moving to same filepath.")
return filepath
self.logger.info("Md5s aren't the same, appending to filepath.")
file_dir = os.path.dirname(filepath)
filename = os.path.basename(filepath).split(".")[0]
#will be in the format .ext
file_ext = os.path.splitext(filepath)[1]
i = 1;
while(True):
new_filepath = '%s/%s(%s)%s' % (file_dir, filename, i, file_ext)
self.logger.error("Trying %s", new_filepath)
if(os.path.exists(new_filepath)):
i = i+1;
else:
filepath = new_filepath
break
except Exception, e:
self.logger.error("Exception %s", e)
return filepath
def create_file_path(self, imported_filepath):
storage_directory = self.config.storage_directory
is_recorded_show = False
try:
#will be in the format .ext
file_ext = os.path.splitext(imported_filepath)[1]
file_ext = file_ext.encode('utf-8')
orig_md = self.md_manager.get_md_from_file(imported_filepath)
path_md = ['MDATA_KEY_TITLE', 'MDATA_KEY_CREATOR', 'MDATA_KEY_SOURCE', 'MDATA_KEY_TRACKNUMBER', 'MDATA_KEY_BITRATE']
md = {}
for m in path_md:
if m not in orig_md:
md[m] = u'unknown'.encode('utf-8')
else:
#get rid of any "/" which will interfere with the filepath.
if isinstance(orig_md[m], basestring):
md[m] = orig_md[m].replace("/", "-")
else:
md[m] = orig_md[m]
#make sure all track numbers are at least 2 digits long in the filepath.
if len(str(md['MDATA_KEY_TRACKNUMBER'])) == 1:
md['MDATA_KEY_TRACKNUMBER'] = "0"+str(md['MDATA_KEY_TRACKNUMBER'])
#format bitrate as 128kbps
md['MDATA_KEY_BITRATE'] = str(md['MDATA_KEY_BITRATE'])[0:3]+"kbps"
filepath = None
#file is recorded by Airtime
#/srv/airtime/stor/recorded/year/month/year-month-day-time-showname-bitrate.ext
if(md['MDATA_KEY_CREATOR'] == "AIRTIMERECORDERSOURCEFABRIC".encode('utf-8')):
#yyyy-mm-dd-hh-MM-ss
y = orig_md['MDATA_KEY_YEAR'].split("-")
filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "recorded".encode('utf-8'), y[0], y[1], orig_md['MDATA_KEY_YEAR'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
is_recorded_show = True
elif(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')):
filepath = '%s/%s/%s/%s/%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
else:
filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TRACKNUMBER'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
self.logger.info('Created filepath: %s', filepath)
filepath = self.create_unique_filename(filepath, imported_filepath)
self.logger.info('Unique filepath: %s', filepath)
self.ensure_dir(filepath)
except Exception, e:
self.logger.error('Exception: %s', e)
return filepath, orig_md, is_recorded_show
def process_IN_CREATE(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname)
storage_directory = self.config.storage_directory
if not event.dir:
#file created is a tmp file which will be modified and then moved back to the original filename.
if self.is_temp_file(event.name) :
self.temp_files[event.pathname] = None
#This is a newly imported file.
elif self.is_audio_file(event.pathname):
if self.is_parent_directory(event.pathname, storage_directory):
self.set_needed_file_permissions(event.pathname, event.dir)
filepath, file_md, is_recorded_show = self.create_file_path(event.pathname)
self.move_file(event.pathname, filepath)
self.renamed_files[event.pathname] = filepath
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show})
else:
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False})
else:
if self.is_parent_directory(event.pathname, storage_directory):
self.set_needed_file_permissions(event.pathname, event.dir)
def process_IN_MODIFY(self, event):
if not event.dir:
#self.logger.info("%s: %s", event.maskname, event.pathname)
if event.pathname in self.renamed_files:
pass
elif self.is_audio_file(event.name):
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY})
def process_IN_MOVED_FROM(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname)
if not event.dir:
if event.pathname in self.temp_files:
del self.temp_files[event.pathname]
self.temp_files[event.cookie] = event.pathname
elif event.pathname in self.renamed_files:
pass
else:
self.moved_files[event.cookie] = event.pathname
def process_IN_MOVED_TO(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname)
#if stuff dropped in stor via a UI move must change file permissions.
self.set_needed_file_permissions(event.pathname, event.dir)
if not event.dir:
if event.cookie in self.temp_files:
del self.temp_files[event.cookie]
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY})
elif event.cookie in self.moved_files:
old_filepath = self.moved_files[event.cookie]
del self.moved_files[event.cookie]
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MOVED})
elif hasattr(event, 'src_pathname') and event.src_pathname in self.renamed_files:
del self.renamed_files[event.src_pathname]
#self.logger.info("removing renamed path %s", event.src_pathname)
else:
# show dragged from unwatched folder into a watched folder.
storage_directory = self.config.storage_directory
if self.is_parent_directory(event.pathname, storage_directory):
filepath, file_md, is_recorded_show = self.create_file_path(event.pathname)
self.move_file(event.pathname, filepath)
self.renamed_files[event.pathname] = filepath
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': False})
else:
self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False})
def process_IN_DELETE(self, event):
self.logger.info("%s: %s", event.maskname, event.pathname)
if not event.dir:
self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_DELETE})
def process_default(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname)
pass
def notifier_loop_callback(self, notifier):
#put a watch on any fully imported watched directories.
for watched_directory in notifier.import_processes.keys():
process = notifier.import_processes[watched_directory]
if not process.is_alive():
self.watch_directory(watched_directory)
del notifier.import_processes[watched_directory]
#check for any events recieved from Airtime.
try:
notifier.connection.drain_events(timeout=0.1)
#avoid logging a bunch of timeout messages.
except socket.timeout:
pass
except Exception, e:
self.logger.info("%s", e)