diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index ab81ec8b3..71228769a 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -6,11 +6,16 @@ import sys import os import signal +from api_clients import api_client + from multiprocessing import Process, Queue as mpQueue +from pyinotify import WatchManager + from airtimefilemonitor.airtimenotifier import AirtimeNotifier from airtimefilemonitor.airtimeprocessevent import AirtimeProcessEvent from airtimefilemonitor.mediaconfig import AirtimeMediaConfig +from airtimefilemonitor.workerprocess import MediaMonitorWorkerProcess from airtimefilemonitor.airtimemediamonitorbootstrap import AirtimeMediaMonitorBootstrap def handleSigTERM(signum, frame): @@ -35,18 +40,12 @@ processes = [] try: config = AirtimeMediaConfig(logger) - - multi_queue = mpQueue() - logger.info("Initializing event processor") - pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config) - - notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=0, airtime_config=config) - notifier.coalesce_events() + api_client = api_client.api_client_factory(config.cfg) logger.info("Setting up monitor") response = None while response is None: - response = notifier.api_client.setup_media_monitor() + response = api_client.setup_media_monitor() time.sleep(5) storage_directory = response["stor"].encode('utf-8') @@ -54,12 +53,25 @@ try: config.storage_directory = storage_directory config.imported_directory = storage_directory + '/imported' + multi_queue = mpQueue() + logger.info("Initializing event processor") +except Exception, e: + logger.error('Exception: %s', e) + +try: + wm = WatchManager() + pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config, wm=wm) + + notifier = AirtimeNotifier(wm, pe, read_freq=1, timeout=0, airtime_config=config, api_client=api_client) + notifier.coalesce_events() + bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe, config) bootstrap.scan() - + #create 5 worker processes + wp = MediaMonitorWorkerProcess() for i in range(5): - p = Process(target=notifier.process_file_events, args=(multi_queue,)) + p = Process(target=wp.process_file_events, args=(multi_queue, notifier)) processes.append(p) p.start() diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py index 311888746..95b201898 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimemediamonitorbootstrap.py @@ -19,7 +19,7 @@ class AirtimeMediaMonitorBootstrap(): """ on bootup we want to scan all directories and look for files that weren't there or files that changed before media-monitor process - went offline. We can do this by doing a hash of the directory metadata. + went offline. """ def scan(self): directories = self.get_list_of_watched_dirs(); @@ -39,7 +39,7 @@ class AirtimeMediaMonitorBootstrap(): def check_for_diff(self, dir_id, dir): #set to hold new and/or modified files. We use a set to make it ok if files are added - #twice. This is become some of the tests for new files return result sets that are not + #twice. This is because some of the tests for new files return result sets that are not #mutually exclusive from each other. new_and_modified_files = set() removed_files = set() diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py index 361e8248d..382024202 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py @@ -8,19 +8,19 @@ from kombu.connection import BrokerConnection from kombu.messaging import Exchange, Queue, Consumer, Producer import pyinotify -from pyinotify import WatchManager, Notifier, ProcessEvent +from pyinotify import Notifier -from api_clients import api_client +#from api_clients import api_client from airtimemetadata import AirtimeMetadata class AirtimeNotifier(Notifier): - def __init__(self, watch_manager, default_proc_fun=None, read_freq=0, threshold=0, timeout=None, airtime_config=None): + def __init__(self, watch_manager, default_proc_fun=None, read_freq=0, threshold=0, timeout=None, airtime_config=None, api_client=None): Notifier.__init__(self, watch_manager, default_proc_fun, read_freq, threshold, timeout) self.logger = logging.getLogger() self.config = airtime_config - self.api_client = api_client.api_client_factory(self.config.cfg) + self.api_client = api_client self.md_manager = AirtimeMetadata() self.import_processes = {} self.watched_folders = [] @@ -155,15 +155,6 @@ class AirtimeNotifier(Notifier): self.api_client.update_media_metadata(md, mode) - #this function is run in its own process, and continuously - #checks the queue for any new file events. - def process_file_events(self, queue): - - while True: - event = queue.get() - self.logger.info("received event %s", event) - self.update_airtime(event) - def walk_newly_watched_directory(self, directory): mm = self.proc_fun() diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py index 08eac7117..ac10a7201 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py @@ -3,11 +3,12 @@ import socket import grp import pwd import logging +import time from subprocess import Popen, PIPE import pyinotify -from pyinotify import WatchManager, Notifier, ProcessEvent +from pyinotify import ProcessEvent # For RabbitMQ from kombu.connection import BrokerConnection @@ -18,7 +19,7 @@ from airtimefilemonitor.mediaconfig import AirtimeMediaConfig class AirtimeProcessEvent(ProcessEvent): - def my_init(self, queue, airtime_config=None): + def my_init(self, queue, airtime_config=None, wm=None): """ Method automatically called from ProcessEvent.__init__(). Additional keyworded arguments passed to ProcessEvent.__init__() are then @@ -30,13 +31,16 @@ class AirtimeProcessEvent(ProcessEvent): self.supported_file_formats = ['mp3', 'ogg'] self.temp_files = {} + self.renamed_files = {} + """ self.moved_files = {} self.gui_replaced = {} - self.renamed_files = {} + """ + self.cookies_IN_MOVED_FROM = {} self.file_events = [] self.multi_queue = queue self.mask = pyinotify.ALL_EVENTS - self.wm = WatchManager() + self.wm = wm self.md_manager = AirtimeMetadata() #define which directories the pyinotify WatchManager should watch. @@ -95,9 +99,12 @@ class AirtimeProcessEvent(ProcessEvent): try: omask = os.umask(0) - if ((not os.path.exists(directory)) or ((os.path.exists(directory) and not os.path.isdir(directory)))): + if not os.path.exists(directory): os.makedirs(directory, 02777) - self.watch_directory(directory) + #self.watch_directory(directory) + elif not os.path.isdir(directory): + #path exists but it is a file not a directory! + self.logger.error("path %s exists, but it is not a directory!!!") finally: os.umask(omask) @@ -111,7 +118,8 @@ class AirtimeProcessEvent(ProcessEvent): finally: os.umask(omask) - #checks if path exists already in stor. If the path exists and the md5s are the same just moves file to same path anyway to avoid duplicates in the system. + #checks if path exists already in stor. If the path exists and the md5s are the + #same just overwrite. def create_unique_filename(self, filepath, old_filepath): try: @@ -148,7 +156,7 @@ class AirtimeProcessEvent(ProcessEvent): return filepath #create path in /srv/airtime/stor/imported/[song-metadata] - def create_file_path(self, imported_filepath, orig_md): + def create_file_path(self, original_path, orig_md): storage_directory = self.config.storage_directory @@ -156,7 +164,7 @@ class AirtimeProcessEvent(ProcessEvent): try: #will be in the format .ext - file_ext = os.path.splitext(imported_filepath)[1] + file_ext = os.path.splitext(original_path)[1] file_ext = file_ext.encode('utf-8') path_md = ['MDATA_KEY_TITLE', 'MDATA_KEY_CREATOR', 'MDATA_KEY_SOURCE', 'MDATA_KEY_TRACKNUMBER', 'MDATA_KEY_BITRATE'] @@ -189,10 +197,10 @@ class AirtimeProcessEvent(ProcessEvent): elif(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')): filepath = '%s/%s/%s/%s/%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) else: - filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TRACKNUMBER'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) + filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TRACKNUMBER'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) self.logger.info('Created filepath: %s', filepath) - filepath = self.create_unique_filename(filepath, imported_filepath) + filepath = self.create_unique_filename(filepath, original_path) self.logger.info('Unique filepath: %s', filepath) self.ensure_dir(filepath) @@ -202,30 +210,32 @@ class AirtimeProcessEvent(ProcessEvent): return filepath #event.dir: True if the event was raised against a directory. - #event.name + #event.name: filename #event.pathname: pathname (str): Concatenation of 'path' and 'name'. def process_IN_CREATE(self, event): + self.logger.debug("PROCESS_IN_CREATE: %s", event) self.handle_created_file(event.dir, event.name, event.pathname) def handle_created_file(self, dir, name, pathname): - self.logger.debug("PROCESS_IN_CREATE") self.logger.debug("dir: %s, name: %s, pathname: %s ", dir, name, pathname) storage_directory = self.config.storage_directory if not dir: - #file created is a tmp file which will be modified and then moved back to the original filename. + #event is because of a created file if self.is_temp_file(name) : + #file created is a tmp file which will be modified and then moved back to the original filename. self.temp_files[pathname] = None - #This is a newly imported file. elif self.is_audio_file(pathname): if self.is_parent_directory(pathname, storage_directory): + #file was created in /srv/airtime/stor. Need to process and copy + #to /srv/airtime/stor/imported self.set_needed_file_permissions(pathname, dir) - self.process_new_file(pathname) else: self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'is_recorded_show': False}) else: + #event is because of a created directory if self.is_parent_directory(pathname, storage_directory): self.set_needed_file_permissions(pathname, dir) @@ -237,16 +247,20 @@ class AirtimeProcessEvent(ProcessEvent): if file_md is not None: is_recorded_show = 'MDATA_KEY_CREATOR' in file_md and \ file_md['MDATA_KEY_CREATOR'] == "AIRTIMERECORDERSOURCEFABRIC".encode('utf-8') - if not self.is_parent_directory(pathname, self.config.imported_directory): - filepath = self.create_file_path(pathname, file_md) - self.move_file(pathname, filepath) - self.renamed_files[pathname] = filepath - self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) - else: - self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'data': file_md, 'is_recorded_show': is_recorded_show}) - + #if not self.is_parent_directory(pathname, self.config.imported_directory): + #file has not been "imported" yet. Need to move this file to /srv/airtime/stor/imported + filepath = self.create_file_path(pathname, file_md) + self.move_file(pathname, filepath) + self.renamed_files[pathname] = filepath + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show}) + #else: + # self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'data': file_md, 'is_recorded_show': is_recorded_show}) + else: + self.logger.warn("File %s, has invalid metadata", pathname) + def process_IN_MODIFY(self, event): + self.logger.info("process_IN_MODIFY: %s", event) self.handle_modified_file(event.dir, event.pathname, event.name) def handle_modified_file(self, dir, pathname, name): @@ -257,9 +271,16 @@ class AirtimeProcessEvent(ProcessEvent): elif self.is_audio_file(name): self.file_events.append({'filepath': pathname, 'mode': self.config.MODE_MODIFY}) + #if a file is moved somewhere, this callback is run. With details about + #where the file is being moved from. The corresponding process_IN_MOVED_TO + #callback is only called if the destination of the file is also in a watched + #directory. def process_IN_MOVED_FROM(self, event): - self.logger.info("%s: %s", event.maskname, event.pathname) + self.logger.info("process_IN_MOVED_FROM: %s", event) if not event.dir: + self.cookies_IN_MOVED_FROM[event.cookie] = (event, time.time()) + + """ if "goutputstream" in event.pathname: self.gui_replaced[event.cookie] = None elif event.pathname in self.temp_files: @@ -269,12 +290,34 @@ class AirtimeProcessEvent(ProcessEvent): pass else: self.moved_files[event.cookie] = event.pathname + """ def process_IN_MOVED_TO(self, event): - self.logger.info("%s: %s", event.maskname, event.pathname) + self.logger.info("process_IN_MOVED_TO: %s", event) #if stuff dropped in stor via a UI move must change file permissions. self.set_needed_file_permissions(event.pathname, event.dir) if not event.dir: + if event.cookie in self.cookies_IN_MOVED_FROM: + #files original location was also in a watched directory, in this case + #we won't try to create a new file name, and move the file to the appropriate + #location. We'll just assume the user knows what he is doing. + del self.cookies_IN_MOVED_FROM[event.cookie] + self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MOVED}) + else: + storage_directory = self.config.storage_directory + if self.is_parent_directory(event.pathname, storage_directory): + #show dragged from unwatched directory into the storage directory + file_md = self.md_manager.get_md_from_file(event.pathname) + if file_md is not None: + filepath = self.create_file_path(event.pathname, file_md) + self.move_file(event.pathname, filepath) + #self.renamed_files[event.pathname] = filepath + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': False}) + else: + #show dragged from unwatched folder into a watched folder. + self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) + + """ if event.cookie in self.temp_files: del self.temp_files[event.cookie] self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY}) @@ -299,9 +342,10 @@ class AirtimeProcessEvent(ProcessEvent): self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': False}) else: self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False}) + """ def process_IN_DELETE(self, event): - self.logger.info("%s: %s", event.maskname, event.pathname) + self.logger.info("process_IN_DELETE: %s", event) if not event.dir: self.handle_removed_file(event.pathname) @@ -311,7 +355,7 @@ class AirtimeProcessEvent(ProcessEvent): def process_default(self, event): - #self.logger.info("%s: %s", event.maskname, event.pathname) + #self.logger.info("PROCESS_DEFAULT: %s", event) pass def execCommandAndReturnStdOut(self, command): @@ -332,6 +376,22 @@ class AirtimeProcessEvent(ProcessEvent): self.multi_queue.put(event) self.file_events = [] + + for k, pair in self.cookies_IN_MOVED_FROM.items(): + event = pair[0] + timestamp = pair[1] + + timestamp_now = time.time() + + if timestamp_now - timestamp > 5: + #in_moved_from event didn't have a corresponding + #in_moved_to event in the last 5 seconds. + #This means the file was moved to outside of the + #watched directories. Let's handle this by deleting + #it from the Airtime directory. + del self.cookies_IN_MOVED_FROM[k] + self.handle_removed_file(event.pathname) + #check for any events recieved from Airtime. try: diff --git a/python_apps/media-monitor/airtimefilemonitor/workerprocess.py b/python_apps/media-monitor/airtimefilemonitor/workerprocess.py new file mode 100644 index 000000000..434c79907 --- /dev/null +++ b/python_apps/media-monitor/airtimefilemonitor/workerprocess.py @@ -0,0 +1,12 @@ +class MediaMonitorWorkerProcess: + + #this function is run in its own process, and continuously + #checks the queue for any new file events. + def process_file_events(self, queue, notifier): + + while True: + event = queue.get() + notifier.logger.info("received event %s", event) + notifier.update_airtime(event) + + \ No newline at end of file