From 65e647263f99cb247adb8b4bb77d9658efc5eae8 Mon Sep 17 00:00:00 2001 From: Naomi Aro Date: Thu, 23 Jun 2011 17:33:05 +0200 Subject: [PATCH 1/2] cc-1799 Human Readable File System refactoring media monitor, separate process for file event processing. --- .../application/controllers/ApiController.php | 36 +- .../controllers/PluploadController.php | 4 - airtime_mvc/application/models/StoredFile.php | 31 +- python_apps/api_clients/api_client.py | 22 +- python_apps/media-monitor/MediaMonitor.py | 616 +----------------- .../airtimefilemonitor/__init__.py | 0 .../airtimefilemonitor/airtimemetadata.py | 141 ++++ .../airtimefilemonitor/airtimenotifier.py | 134 ++++ .../airtimefilemonitor/airtimeprocessevent.py | 270 ++++++++ .../airtimefilemonitor/mediaconfig.py | 24 + 10 files changed, 616 insertions(+), 662 deletions(-) create mode 100644 python_apps/media-monitor/airtimefilemonitor/__init__.py create mode 100644 python_apps/media-monitor/airtimefilemonitor/airtimemetadata.py create mode 100644 python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py create mode 100644 python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py create mode 100644 python_apps/media-monitor/airtimefilemonitor/mediaconfig.py diff --git a/airtime_mvc/application/controllers/ApiController.php b/airtime_mvc/application/controllers/ApiController.php index f1c8bd63a..700632542 100644 --- a/airtime_mvc/application/controllers/ApiController.php +++ b/airtime_mvc/application/controllers/ApiController.php @@ -54,7 +54,7 @@ class ApiController extends Zend_Controller_Action * Allows remote client to download requested media file. * * @return void - * + * */ public function getMediaAction() { @@ -374,39 +374,7 @@ class ApiController extends Zend_Controller_Action exit; } - $plupload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload"; - - //need to make sure plupload dir exists so we can watch it. - if(!file_exists($plupload_dir)) { - @mkdir($plupload_dir, 0755); - } - $this->view->stor = $CC_CONFIG['storageDir']; - $this->view->plupload = $plupload_dir; - } - - public function mediaItemStatusAction() { - global $CC_CONFIG; - - $api_key = $this->_getParam('api_key'); - if (!in_array($api_key, $CC_CONFIG["apiKey"])) - { - header('HTTP/1.0 401 Unauthorized'); - print 'You are not allowed to access this resource.'; - exit; - } - - $md5 = $this->_getParam('md5'); - $file = StoredFile::RecallByMd5($md5); - - //New file added to Airtime - if (is_null($file)) { - $this->view->airtime_status = 0; - } - else { - $this->view->airtime_status = 1; - } - } public function reloadMetadataAction() { @@ -471,7 +439,7 @@ class ApiController extends Zend_Controller_Action $filepath = $md['MDATA_KEY_FILEPATH']; $filepath = str_replace("\\", "", $filepath); $file->setFilePath($filepath); - $file->setMetadata($md); + //$file->setMetadata($md); } } else if ($mode == "delete") { diff --git a/airtime_mvc/application/controllers/PluploadController.php b/airtime_mvc/application/controllers/PluploadController.php index d8fc10c08..8b6068f51 100644 --- a/airtime_mvc/application/controllers/PluploadController.php +++ b/airtime_mvc/application/controllers/PluploadController.php @@ -27,10 +27,6 @@ class PluploadController extends Zend_Controller_Action $upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload"; $res = StoredFile::uploadFile($upload_dir); - if (isset($res)) { - die('{"jsonrpc" : "2.0", "id" : '.$file->getMessage().' }'); - } - die('{"jsonrpc" : "2.0"}'); } } diff --git a/airtime_mvc/application/models/StoredFile.php b/airtime_mvc/application/models/StoredFile.php index 90c0152c8..a3337db38 100644 --- a/airtime_mvc/application/models/StoredFile.php +++ b/airtime_mvc/application/models/StoredFile.php @@ -95,9 +95,9 @@ class StoredFile { { global $CC_CONFIG, $CC_DBC; $sql = "SELECT count(*) as cnt FROM ".$CC_CONFIG["filesTable"]." WHERE state='ready'"; - return $CC_DBC->GetOne($sql); + return $CC_DBC->GetOne($sql); } - + /** * Set multiple metadata values using database columns as indexes. * @@ -717,7 +717,7 @@ class StoredFile { $fileName = isset($_REQUEST["name"]) ? $_REQUEST["name"] : ''; // Clean the fileName for security reasons - //$fileName = preg_replace('/[^\w\._]+/', '', $fileName); + $fileName = preg_replace('/[^\w\._]+/', '', $fileName); // Create target dir if (!file_exists($p_targetDir)) @@ -791,34 +791,19 @@ class StoredFile { die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": ' . $duplicate->getMessage() .'}}'); } else { - if (file_exists($duplicate->getRealFilePath())) { + if (file_exists($duplicate->getFilePath())) { $duplicateName = $duplicate->getMetadataValue(UI_MDATA_KEY_TITLE); die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": "An identical audioclip named ' . $duplicateName . ' already exists in the storage server."}}'); - } - else { - $res = $duplicate->replaceFile($audio_file); - if (PEAR::isError($res)) { - die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": ' . $duplicate->getMessage() .'}}'); - } - return $duplicate; } } } - else { - $storDir = MusicDir::getStorDir(); - $stor = $storDir->getDirectory(); + $storDir = MusicDir::getStorDir(); + $stor = $storDir->getDirectory(); - $audio_stor = $stor . DIRECTORY_SEPARATOR . $fileName; + $audio_stor = $stor . DIRECTORY_SEPARATOR . $fileName; - $md = array(); - $md['MDATA_KEY_MD5'] = $md5; - $md['MDATA_KEY_FILEPATH'] = $audio_stor; - $md['MDATA_KEY_TITLE'] = $fileName; - - StoredFile::Insert($md); - $r = @rename($audio_file, $audio_stor); - } + $r = @copy($audio_file, $audio_stor); } diff --git a/python_apps/api_clients/api_client.py b/python_apps/api_clients/api_client.py index abf58917c..568b933c0 100644 --- a/python_apps/api_clients/api_client.py +++ b/python_apps/api_clients/api_client.py @@ -360,25 +360,6 @@ class AirTimeApiClient(ApiClientInterface): return response - def check_media_status(self, md5): - logger = logging.getLogger() - - response = None - try: - url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["media_status_url"]) - url = url.replace("%%api_key%%", self.config["api_key"]) - url = url.replace("%%md5%%", md5) - logger.debug(url) - - response = urllib.urlopen(url) - response = json.loads(response.read()) - logger.info("Json Media Status %s", response) - - except Exception, e: - logger.error("Exception: %s", e) - - return response - def update_media_metadata(self, md, mode): logger = logging.getLogger() response = None @@ -387,14 +368,13 @@ class AirTimeApiClient(ApiClientInterface): url = url.replace("%%api_key%%", self.config["api_key"]) url = url.replace("%%mode%%", mode) - logger.debug(url) data = urllib.urlencode(md) req = urllib2.Request(url, data) response = urllib2.urlopen(req).read() - logger.info("update media %s", response) response = json.loads(response) + logger.info("update media %s", response) except Exception, e: response = None diff --git a/python_apps/media-monitor/MediaMonitor.py b/python_apps/media-monitor/MediaMonitor.py index ef9c22935..c78cd2bc2 100644 --- a/python_apps/media-monitor/MediaMonitor.py +++ b/python_apps/media-monitor/MediaMonitor.py @@ -1,602 +1,56 @@ #!/usr/local/bin/python -import logging -import logging.config import json import time -import datetime -import os +import logging +import logging.config import sys -import hashlib -import json -import shutil -import math -import socket -import grp -import pwd +import os -from collections import deque - -from subprocess import Popen, PIPE, STDOUT - -from configobj import ConfigObj - -import mutagen -import pyinotify from pyinotify import WatchManager, Notifier, ProcessEvent +from multiprocessing import Process, Lock, Queue as mpQueue -# For RabbitMQ -from kombu.connection import BrokerConnection -from kombu.messaging import Exchange, Queue, Consumer, Producer -from api_clients import api_client - -from multiprocessing import Process, Lock - -MODE_CREATE = "create" -MODE_MODIFY = "modify" -MODE_MOVED = "moved" -MODE_DELETE = "delete" - -global storage_directory -global plupload_directory - -# configure logging -try: - logging.config.fileConfig("logging.cfg") -except Exception, e: - print 'Error configuring logging: ', e - sys.exit() - -# loading config file -try: - config = ConfigObj('/etc/airtime/media-monitor.cfg') -except Exception, e: - logger = logging.getLogger(); - logger.error('Error loading config file: %s', e) - sys.exit() - -""" -list of supported easy tags in mutagen version 1.20 -['albumartistsort', 'musicbrainz_albumstatus', 'lyricist', 'releasecountry', 'date', 'performer', 'musicbrainz_albumartistid', 'composer', 'encodedby', 'tracknumber', 'musicbrainz_albumid', 'album', 'asin', 'musicbrainz_artistid', 'mood', 'copyright', 'author', 'media', 'length', 'version', 'artistsort', 'titlesort', 'discsubtitle', 'website', 'musicip_fingerprint', 'conductor', 'compilation', 'barcode', 'performer:*', 'composersort', 'musicbrainz_discid', 'musicbrainz_albumtype', 'genre', 'isrc', 'discnumber', 'musicbrainz_trmid', 'replaygain_*_gain', 'musicip_puid', 'artist', 'title', 'bpm', 'musicbrainz_trackid', 'arranger', 'albumsort', 'replaygain_*_peak', 'organization'] -""" - -class MetadataExtractor: - - def __init__(self): - - self.airtime2mutagen = {\ - "MDATA_KEY_TITLE": "title",\ - "MDATA_KEY_CREATOR": "artist",\ - "MDATA_KEY_SOURCE": "album",\ - "MDATA_KEY_GENRE": "genre",\ - "MDATA_KEY_MOOD": "mood",\ - "MDATA_KEY_TRACKNUMBER": "tracknumber",\ - "MDATA_KEY_BPM": "bpm",\ - "MDATA_KEY_LABEL": "organization",\ - "MDATA_KEY_COMPOSER": "composer",\ - "MDATA_KEY_ENCODER": "encodedby",\ - "MDATA_KEY_CONDUCTOR": "conductor",\ - "MDATA_KEY_YEAR": "date",\ - "MDATA_KEY_URL": "website",\ - "MDATA_KEY_ISRC": "isrc",\ - "MDATA_KEY_COPYRIGHT": "copyright",\ - } - - self.mutagen2airtime = {\ - "title": "MDATA_KEY_TITLE",\ - "artist": "MDATA_KEY_CREATOR",\ - "album": "MDATA_KEY_SOURCE",\ - "genre": "MDATA_KEY_GENRE",\ - "mood": "MDATA_KEY_MOOD",\ - "tracknumber": "MDATA_KEY_TRACKNUMBER",\ - "bpm": "MDATA_KEY_BPM",\ - "organization": "MDATA_KEY_LABEL",\ - "composer": "MDATA_KEY_COMPOSER",\ - "encodedby": "MDATA_KEY_ENCODER",\ - "conductor": "MDATA_KEY_CONDUCTOR",\ - "date": "MDATA_KEY_YEAR",\ - "website": "MDATA_KEY_URL",\ - "isrc": "MDATA_KEY_ISRC",\ - "copyright": "MDATA_KEY_COPYRIGHT",\ - } - - self.logger = logging.getLogger('root') - - def get_md5(self, filepath): - f = open(filepath, 'rb') - m = hashlib.md5() - m.update(f.read()) - md5 = m.hexdigest() - - return md5 - - ## mutagen_length is in seconds with the format (d+).dd - ## return format hh:mm:ss.uuu - def format_length(self, mutagen_length): - t = float(mutagen_length) - h = int(math.floor(t/3600)) - t = t % 3600 - m = int(math.floor(t/60)) - - s = t % 60 - # will be ss.uuu - s = str(s) - s = s[:6] - - length = "%s:%s:%s" % (h, m, s) - - return length - - def save_md_to_file(self, m): - try: - airtime_file = mutagen.File(m['MDATA_KEY_FILEPATH'], easy=True) - - for key in m.keys() : - if key in self.airtime2mutagen: - value = m[key] - if ((value is not None) and (len(str(value)) > 0)): - airtime_file[self.airtime2mutagen[key]] = str(value) - #self.logger.info('setting %s = %s ', key, str(value)) - - - airtime_file.save() - except Exception, e: - self.logger.error('Trying to save md') - self.logger.error('Exception: %s', e) - self.logger.error('Filepath %s', m['MDATA_KEY_FILEPATH']) - - def get_md_from_file(self, filepath): - - self.logger.info("getting info about file %s", filepath) - - md = {} - md5 = self.get_md5(filepath) - md['MDATA_KEY_MD5'] = md5 - - file_info = mutagen.File(filepath, easy=True) - - self.logger.info(file_info) - - #check if file has any metadata - if file_info is not None: - for key in file_info.keys() : - if key in self.mutagen2airtime : - md[self.mutagen2airtime[key]] = file_info[key][0] - - if 'MDATA_KEY_TITLE' not in md: - #get rid of file extention from original name, name might have more than 1 '.' in it. - original_name = os.path.basename(filepath) - original_name = original_name.split(".")[0:-1] - original_name = ''.join(original_name) - md['MDATA_KEY_TITLE'] = original_name - - #incase track number is in format u'4/11' - if 'MDATA_KEY_TRACKNUMBER' in md: - if isinstance(md['MDATA_KEY_TRACKNUMBER'], basestring): - md['MDATA_KEY_TRACKNUMBER'] = md['MDATA_KEY_TRACKNUMBER'].split("/")[0] - - md['MDATA_KEY_BITRATE'] = file_info.info.bitrate - md['MDATA_KEY_SAMPLERATE'] = file_info.info.sample_rate - md['MDATA_KEY_DURATION'] = self.format_length(file_info.info.length) - md['MDATA_KEY_MIME'] = file_info.mime[0] - - if "mp3" in md['MDATA_KEY_MIME']: - md['MDATA_KEY_FTYPE'] = "audioclip" - elif "vorbis" in md['MDATA_KEY_MIME']: - md['MDATA_KEY_FTYPE'] = "audioclip" - - #do this so object can be urlencoded properly. - for key in md.keys(): - if(isinstance(md[key], basestring)): - md[key] = md[key].encode('utf-8') - - return md - - -class AirtimeNotifier(Notifier): - - def __init__(self, watch_manager, default_proc_fun=None, read_freq=0, threshold=0, timeout=None): - Notifier.__init__(self, watch_manager, default_proc_fun, read_freq, threshold, timeout) - - schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True) - schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem") - self.connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], "/") - channel = self.connection.channel() - consumer = Consumer(channel, schedule_queue) - consumer.register_callback(self.handle_message) - consumer.consume() - - self.logger = logging.getLogger('root') - self.api_client = api_client.api_client_factory(config) - self.md_manager = MetadataExtractor() - self.import_processes = {} - self.watched_folders = [] - self.watches_to_remove = [] - - def handle_message(self, body, message): - # ACK the message to take it off the queue - message.ack() - - self.logger.info("Received md from RabbitMQ: " + body) - m = json.loads(message.body) - - if m['event_type'] == "md_update": - self.logger.info("AIRTIME NOTIFIER md update event") - self.md_manager.save_md_to_file(m) - - elif m['event_type'] == "new_watch": - self.logger.info("AIRTIME NOTIFIER add watched folder event " + m['directory']) - #start a new process to walk through this folder and add the files to Airtime. - p = Process(target=self.walk_newly_watched_directory, args=(m['directory'],)) - p.start() - self.import_processes[m['directory']] = p - #add this new folder to our list of watched folders - self.watched_folders.append(m['directory']) - - elif m['event_type'] == "remove_watch": - watched_directory = m['directory'].encode('utf-8') - - mm = self.proc_fun() - wd = mm.wm.get_wd(watched_directory) - self.logger.info("Removing watch on: %s wd %s", watched_directory, wd) - mm.wm.rm_watch(wd, rec=True) - - elif m['event_type'] == "change_stor": - global storage_directory - new_storage_directory = m['directory'].encode('utf-8') - - mm = self.proc_fun() - - wd = mm.wm.get_wd(storage_directory) - self.logger.info("Removing watch on: %s wd %s", storage_directory, wd) - mm.wm.rm_watch(wd, rec=True) - - mm.set_needed_file_permissions(new_storage_directory, True) - mm.move_file(storage_directory, new_storage_directory) - storage_directory = new_storage_directory - - mm.watch_directory(new_storage_directory) - - - def update_airtime(self, d): - - filepath = d['filepath'] - mode = d['mode'] - - data = None - md = {} - md['MDATA_KEY_FILEPATH'] = filepath - - if (os.path.exists(filepath) and (mode == MODE_CREATE)): - mutagen = self.md_manager.get_md_from_file(filepath) - md.update(mutagen) - data = md - elif (os.path.exists(filepath) and (mode == MODE_MODIFY)): - mutagen = self.md_manager.get_md_from_file(filepath) - md.update(mutagen) - data = md - elif (mode == MODE_MOVED): - mutagen = self.md_manager.get_md_from_file(filepath) - md.update(mutagen) - data = md - elif (mode == MODE_DELETE): - data = md - - if data is not None: - self.logger.info("Updating Change to Airtime " + filepath) - response = None - while response is None: - response = self.api_client.update_media_metadata(data, mode) - time.sleep(5) - - def walk_newly_watched_directory(self, directory): - - mm = self.proc_fun() - - for (path, dirs, files) in os.walk(directory): - for filename in files: - full_filepath = path+"/"+filename - - if mm.is_audio_file(full_filepath): - self.update_airtime({'filepath': full_filepath, 'mode': MODE_CREATE}) - - -class MediaMonitor(ProcessEvent): - - def my_init(self): - """ - Method automatically called from ProcessEvent.__init__(). Additional - keyworded arguments passed to ProcessEvent.__init__() are then - delegated to my_init(). - """ - self.api_client = api_client.api_client_factory(config) - self.supported_file_formats = ['mp3', 'ogg'] - self.logger = logging.getLogger('root') - self.temp_files = {} - self.moved_files = {} - self.file_events = deque() - self.mask = pyinotify.ALL_EVENTS - self.wm = WatchManager() - self.md_manager = MetadataExtractor() - - schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True) - schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem") - connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], "/") - channel = connection.channel() - - def watch_directory(self, directory): - return self.wm.add_watch(directory, self.mask, rec=True, auto_add=True) - - def is_parent_directory(self, filepath, directory): - return (directory == filepath[0:len(directory)]) - - def set_needed_file_permissions(self, item, is_dir): - - try: - omask = os.umask(0) - - uid = pwd.getpwnam('pypo')[2] - gid = grp.getgrnam('www-data')[2] - - os.chown(item, uid, gid) - - if is_dir is True: - os.chmod(item, 02777) - else: - os.chmod(item, 0666) - - except Exception, e: - self.logger.error("Failed to change file's owner/group/permissions.") - self.logger.error(item) - finally: - os.umask(omask) - - def ensure_dir(self, filepath): - directory = os.path.dirname(filepath) - - try: - omask = os.umask(0) - if ((not os.path.exists(directory)) or ((os.path.exists(directory) and not os.path.isdir(directory)))): - os.makedirs(directory, 02777) - self.watch_directory(directory) - finally: - os.umask(omask) - - def move_file(self, source, dest): - - try: - omask = os.umask(0) - os.rename(source, dest) - except Exception, e: - self.logger.error("failed to move file.") - finally: - os.umask(omask) - - def create_unique_filename(self, filepath): - - try: - if(os.path.exists(filepath)): - self.logger.info("Path %s exists", filepath) - file_dir = os.path.dirname(filepath) - filename = os.path.basename(filepath).split(".")[0] - #will be in the format .ext - file_ext = os.path.splitext(filepath)[1] - i = 1; - while(True): - new_filepath = '%s/%s(%s)%s' % (file_dir, filename, i, file_ext) - self.logger.error("Trying %s", new_filepath) - - if(os.path.exists(new_filepath)): - i = i+1; - else: - filepath = new_filepath - break - - except Exception, e: - self.logger.error("Exception %s", e) - - return filepath - - def create_file_path(self, imported_filepath): - - global storage_directory - - try: - #get rid of file extention from original name, name might have more than 1 '.' in it. - original_name = os.path.basename(imported_filepath) - original_name = original_name.split(".")[0:-1] - original_name = ''.join(original_name) - - #will be in the format .ext - file_ext = os.path.splitext(imported_filepath)[1] - file_ext = file_ext.encode('utf-8') - md = self.md_manager.get_md_from_file(imported_filepath) - - path_md = ['MDATA_KEY_TITLE', 'MDATA_KEY_CREATOR', 'MDATA_KEY_SOURCE', 'MDATA_KEY_TRACKNUMBER', 'MDATA_KEY_BITRATE'] - - self.logger.info('Getting md') - - for m in path_md: - if m not in md: - md[m] = u'unknown'.encode('utf-8') - else: - #get rid of any "/" which will interfere with the filepath. - if isinstance(md[m], basestring): - md[m] = md[m].replace("/", "-") - - self.logger.info(md) - - self.logger.info('Starting filepath creation') - - filepath = None - if (md['MDATA_KEY_TITLE'] == u'unknown'.encode('utf-8')): - self.logger.info('unknown title') - filepath = '%s/%s/%s/%s-%s%s' % (storage_directory.encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], original_name, md['MDATA_KEY_BITRATE'], file_ext) - elif(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')): - self.logger.info('unknown track number') - filepath = '%s/%s/%s/%s-%s%s' % (storage_directory.encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) - else: - self.logger.info('full metadata') - filepath = '%s/%s/%s/%s-%s-%s%s' % (storage_directory.encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TRACKNUMBER'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) - - self.logger.info(u'Created filepath: %s', filepath) - filepath = self.create_unique_filename(filepath) - self.logger.info(u'Unique filepath: %s', filepath) - self.ensure_dir(filepath) - - except Exception, e: - self.logger.error('Exception: %s', e) - - return filepath - - def is_temp_file(self, filename): - info = filename.split(".") - - if(info[-2] in self.supported_file_formats): - return True - else: - return False - - def is_audio_file(self, filename): - info = filename.split(".") - - if(info[-1] in self.supported_file_formats): - return True - else: - return False - - def process_IN_CREATE(self, event): - self.logger.info("%s: %s", event.maskname, event.pathname) - if not event.dir: - #file created is a tmp file which will be modified and then moved back to the original filename. - if self.is_temp_file(event.name) : - self.temp_files[event.pathname] = None - #This is a newly imported file. - else : - global plupload_directory - #files that have been added through plupload have a placeholder already put in Airtime's database. - if not self.is_parent_directory(event.pathname, plupload_directory): - if self.is_audio_file(event.pathname): - self.set_needed_file_permissions(event.pathname, event.dir) - md5 = self.md_manager.get_md5(event.pathname) - response = self.api_client.check_media_status(md5) - - #this file is new, md5 does not exist in Airtime. - if(response['airtime_status'] == 0): - global storage_directory - if self.is_parent_directory(event.pathname, storage_directory): - filepath = self.create_file_path(event.pathname) - self.move_file(event.pathname, filepath) - self.file_events.append({'mode': MODE_CREATE, 'filepath': filepath}) - else: - self.file_events.append({'mode': MODE_CREATE, 'filepath': event.pathname}) - - else: - self.set_needed_file_permissions(event.pathname, event.dir) - - - def process_IN_MODIFY(self, event): - if not event.dir: - self.logger.info("%s: %s", event.maskname, event.pathname) - global plupload_directory - #files that have been added through plupload have a placeholder already put in Airtime's database. - if not self.is_parent_directory(event.pathname, plupload_directory): - if self.is_audio_file(event.name) : - self.file_events.append({'filepath': event.pathname, 'mode': MODE_MODIFY}) - - def process_IN_MOVED_FROM(self, event): - self.logger.info("%s: %s", event.maskname, event.pathname) - if not event.dir: - if event.pathname in self.temp_files: - del self.temp_files[event.pathname] - self.temp_files[event.cookie] = event.pathname - else: - self.moved_files[event.cookie] = event.pathname - - def process_IN_MOVED_TO(self, event): - self.logger.info("%s: %s", event.maskname, event.pathname) - #if stuff dropped in stor via a UI move must change file permissions. - self.set_needed_file_permissions(event.pathname, event.dir) - if not event.dir: - if event.cookie in self.temp_files: - del self.temp_files[event.cookie] - self.file_events.append({'filepath': event.pathname, 'mode': MODE_MODIFY}) - elif event.cookie in self.moved_files: - old_filepath = self.moved_files[event.cookie] - del self.moved_files[event.cookie] - - global plupload_directory - if self.is_parent_directory(old_filepath, plupload_directory): - #file renamed from /tmp/plupload does not have a path in our naming scheme yet. - md_filepath = self.create_file_path(event.pathname) - #move the file a second time to its correct Airtime naming schema. - self.move_file(event.pathname, md_filepath) - self.file_events.append({'filepath': md_filepath, 'mode': MODE_MOVED}) - else: - self.file_events.append({'filepath': event.pathname, 'mode': MODE_MOVED}) - - else: - #TODO need to pass in if md5 exists to this file creation function, identical files will just replace current files not have a (1) etc. - #file has been most likely dropped into stor folder from an unwatched location. (from gui, mv command not cp) - global storage_directory - if self.is_parent_directory(event.pathname, storage_directory): - md_filepath = self.create_file_path(event.pathname) - self.move_file(event.pathname, md_filepath) - self.file_events.append({'mode': MODE_CREATE, 'filepath': md_filepath}) - else: - self.file_events.append({'mode': MODE_CREATE, 'filepath': event.pathname}) - - def process_IN_DELETE(self, event): - if not event.dir: - self.logger.info("%s: %s", event.maskname, event.pathname) - self.file_events.append({'filepath': event.pathname, 'mode': MODE_DELETE}) - - def process_default(self, event): - self.logger.info("%s: %s", event.maskname, event.pathname) - - def notifier_loop_callback(self, notifier): - - for watched_directory in notifier.import_processes.keys(): - process = notifier.import_processes[watched_directory] - if not process.is_alive(): - self.watch_directory(watched_directory) - del notifier.import_processes[watched_directory] - - while len(self.file_events) > 0: - self.logger.info("Processing a file event update to Airtime.") - file_info = self.file_events.popleft() - notifier.update_airtime(file_info) - - try: - notifier.connection.drain_events(timeout=1) - #avoid logging a bunch of timeout messages. - except socket.timeout: - pass - except Exception, e: - self.logger.info("%s", e) +from airtimefilemonitor.airtimenotifier import AirtimeNotifier +from airtimefilemonitor.airtimeprocessevent import AirtimeProcessEvent +from airtimefilemonitor.mediaconfig import AirtimeMediaConfig if __name__ == '__main__': + # configure logging try: - logger = logging.getLogger('root') - mm = MediaMonitor() + logging.config.fileConfig("logging.cfg") + except Exception, e: + print 'Error configuring logging: ', e + sys.exit() + logger = logging.getLogger() + + try: + config = AirtimeMediaConfig() + + logger.info("Initializing event processor") + + pe = AirtimeProcessEvent(airtime_config=config) + + notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=1, airtime_config=config) + notifier.coalesce_events() + + p = Process(target=notifier.process_file_events, args=(pe.file_events,)) + p.daemon = True + p.start() + + logger.info("Setting up monitor") response = None while response is None: - response = mm.api_client.setup_media_monitor() + response = notifier.api_client.setup_media_monitor() time.sleep(5) storage_directory = response["stor"].encode('utf-8') - plupload_directory = response["plupload"].encode('utf-8') + logger.info("Storage Directory is: %s", storage_directory) + config.storage_directory = storage_directory - wdd = mm.watch_directory(storage_directory) + wdd = pe.watch_directory(storage_directory) logger.info("Added watch to %s", storage_directory) logger.info("wdd result %s", wdd[storage_directory]) - wdd = mm.watch_directory(plupload_directory) - logger.info("Added watch to %s", plupload_directory) - logger.info("wdd result %s", wdd[plupload_directory]) - - notifier = AirtimeNotifier(mm.wm, mm, read_freq=int(config["check_filesystem_events"]), timeout=1) - notifier.coalesce_events() #notifier.loop(callback=mm.notifier_loop_callback) @@ -604,9 +58,11 @@ if __name__ == '__main__': if(notifier.check_events(1)): notifier.read_events() notifier.process_events() - mm.notifier_loop_callback(notifier) + pe.notifier_loop_callback(notifier) except KeyboardInterrupt: notifier.stop() except Exception, e: logger.error('Exception: %s', e) + finally: + p.terminate() diff --git a/python_apps/media-monitor/airtimefilemonitor/__init__.py b/python_apps/media-monitor/airtimefilemonitor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimemetadata.py b/python_apps/media-monitor/airtimefilemonitor/airtimemetadata.py new file mode 100644 index 000000000..be62d9f71 --- /dev/null +++ b/python_apps/media-monitor/airtimefilemonitor/airtimemetadata.py @@ -0,0 +1,141 @@ +import os +import hashlib +import mutagen +import logging +import math + +""" +list of supported easy tags in mutagen version 1.20 +['albumartistsort', 'musicbrainz_albumstatus', 'lyricist', 'releasecountry', 'date', 'performer', 'musicbrainz_albumartistid', 'composer', 'encodedby', 'tracknumber', 'musicbrainz_albumid', 'album', 'asin', 'musicbrainz_artistid', 'mood', 'copyright', 'author', 'media', 'length', 'version', 'artistsort', 'titlesort', 'discsubtitle', 'website', 'musicip_fingerprint', 'conductor', 'compilation', 'barcode', 'performer:*', 'composersort', 'musicbrainz_discid', 'musicbrainz_albumtype', 'genre', 'isrc', 'discnumber', 'musicbrainz_trmid', 'replaygain_*_gain', 'musicip_puid', 'artist', 'title', 'bpm', 'musicbrainz_trackid', 'arranger', 'albumsort', 'replaygain_*_peak', 'organization'] +""" + +class AirtimeMetadata: + + def __init__(self): + + self.airtime2mutagen = {\ + "MDATA_KEY_TITLE": "title",\ + "MDATA_KEY_CREATOR": "artist",\ + "MDATA_KEY_SOURCE": "album",\ + "MDATA_KEY_GENRE": "genre",\ + "MDATA_KEY_MOOD": "mood",\ + "MDATA_KEY_TRACKNUMBER": "tracknumber",\ + "MDATA_KEY_BPM": "bpm",\ + "MDATA_KEY_LABEL": "organization",\ + "MDATA_KEY_COMPOSER": "composer",\ + "MDATA_KEY_ENCODER": "encodedby",\ + "MDATA_KEY_CONDUCTOR": "conductor",\ + "MDATA_KEY_YEAR": "date",\ + "MDATA_KEY_URL": "website",\ + "MDATA_KEY_ISRC": "isrc",\ + "MDATA_KEY_COPYRIGHT": "copyright",\ + } + + self.mutagen2airtime = {\ + "title": "MDATA_KEY_TITLE",\ + "artist": "MDATA_KEY_CREATOR",\ + "album": "MDATA_KEY_SOURCE",\ + "genre": "MDATA_KEY_GENRE",\ + "mood": "MDATA_KEY_MOOD",\ + "tracknumber": "MDATA_KEY_TRACKNUMBER",\ + "bpm": "MDATA_KEY_BPM",\ + "organization": "MDATA_KEY_LABEL",\ + "composer": "MDATA_KEY_COMPOSER",\ + "encodedby": "MDATA_KEY_ENCODER",\ + "conductor": "MDATA_KEY_CONDUCTOR",\ + "date": "MDATA_KEY_YEAR",\ + "website": "MDATA_KEY_URL",\ + "isrc": "MDATA_KEY_ISRC",\ + "copyright": "MDATA_KEY_COPYRIGHT",\ + } + + self.logger = logging.getLogger() + + def get_md5(self, filepath): + f = open(filepath, 'rb') + m = hashlib.md5() + m.update(f.read()) + md5 = m.hexdigest() + + return md5 + + ## mutagen_length is in seconds with the format (d+).dd + ## return format hh:mm:ss.uuu + def format_length(self, mutagen_length): + t = float(mutagen_length) + h = int(math.floor(t/3600)) + t = t % 3600 + m = int(math.floor(t/60)) + + s = t % 60 + # will be ss.uuu + s = str(s) + s = s[:6] + + length = "%s:%s:%s" % (h, m, s) + + return length + + def save_md_to_file(self, m): + try: + airtime_file = mutagen.File(m['MDATA_KEY_FILEPATH'], easy=True) + + for key in m.keys() : + if key in self.airtime2mutagen: + value = m[key] + if ((value is not None) and (len(str(value)) > 0)): + airtime_file[self.airtime2mutagen[key]] = str(value) + + + airtime_file.save() + except Exception, e: + self.logger.error('Trying to save md') + self.logger.error('Exception: %s', e) + self.logger.error('Filepath %s', m['MDATA_KEY_FILEPATH']) + + def get_md_from_file(self, filepath): + + self.logger.info("getting info about file %s", filepath) + + md = {} + md5 = self.get_md5(filepath) + md['MDATA_KEY_MD5'] = md5 + + file_info = mutagen.File(filepath, easy=True) + + self.logger.info(file_info) + + #check if file has any metadata + if file_info is not None: + for key in file_info.keys() : + if key in self.mutagen2airtime : + md[self.mutagen2airtime[key]] = file_info[key][0] + + if 'MDATA_KEY_TITLE' not in md: + #get rid of file extention from original name, name might have more than 1 '.' in it. + original_name = os.path.basename(filepath) + original_name = original_name.split(".")[0:-1] + original_name = ''.join(original_name) + md['MDATA_KEY_TITLE'] = original_name + + #incase track number is in format u'4/11' + if 'MDATA_KEY_TRACKNUMBER' in md: + if isinstance(md['MDATA_KEY_TRACKNUMBER'], basestring): + md['MDATA_KEY_TRACKNUMBER'] = md['MDATA_KEY_TRACKNUMBER'].split("/")[0] + + md['MDATA_KEY_BITRATE'] = file_info.info.bitrate + md['MDATA_KEY_SAMPLERATE'] = file_info.info.sample_rate + md['MDATA_KEY_DURATION'] = self.format_length(file_info.info.length) + md['MDATA_KEY_MIME'] = file_info.mime[0] + + if "mp3" in md['MDATA_KEY_MIME']: + md['MDATA_KEY_FTYPE'] = "audioclip" + elif "vorbis" in md['MDATA_KEY_MIME']: + md['MDATA_KEY_FTYPE'] = "audioclip" + + #do this so object can be urlencoded properly. + for key in md.keys(): + if(isinstance(md[key], basestring)): + md[key] = md[key].encode('utf-8') + + return md diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py new file mode 100644 index 000000000..0401d7f8d --- /dev/null +++ b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py @@ -0,0 +1,134 @@ +import json +import time +import os +import logging + +from multiprocessing import Process, Lock, Queue as mpQueue + +# For RabbitMQ +from kombu.connection import BrokerConnection +from kombu.messaging import Exchange, Queue, Consumer, Producer + +import pyinotify +from pyinotify import WatchManager, Notifier, ProcessEvent + +from api_clients import api_client +from airtimemetadata import AirtimeMetadata + +class AirtimeNotifier(Notifier): + + def __init__(self, watch_manager, default_proc_fun=None, read_freq=0, threshold=0, timeout=None, airtime_config=None): + Notifier.__init__(self, watch_manager, default_proc_fun, read_freq, threshold, timeout) + + self.logger = logging.getLogger() + self.config = airtime_config + self.api_client = api_client.api_client_factory(self.config.cfg) + self.md_manager = AirtimeMetadata() + self.import_processes = {} + self.watched_folders = [] + + schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True) + schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem") + self.connection = BrokerConnection(self.config.cfg["rabbitmq_host"], self.config.cfg["rabbitmq_user"], self.config.cfg["rabbitmq_password"], "/") + channel = self.connection.channel() + consumer = Consumer(channel, schedule_queue) + consumer.register_callback(self.handle_message) + consumer.consume() + + def handle_message(self, body, message): + # ACK the message to take it off the queue + message.ack() + + self.logger.info("Received md from RabbitMQ: " + body) + m = json.loads(message.body) + + if m['event_type'] == "md_update": + self.logger.info("AIRTIME NOTIFIER md update event") + self.md_manager.save_md_to_file(m) + + elif m['event_type'] == "new_watch": + self.logger.info("AIRTIME NOTIFIER add watched folder event " + m['directory']) + #start a new process to walk through this folder and add the files to Airtime. + p = Process(target=self.walk_newly_watched_directory, args=(m['directory'],)) + p.start() + self.import_processes[m['directory']] = p + #add this new folder to our list of watched folders + self.watched_folders.append(m['directory']) + + elif m['event_type'] == "remove_watch": + watched_directory = m['directory'].encode('utf-8') + + mm = self.proc_fun() + wd = mm.wm.get_wd(watched_directory) + self.logger.info("Removing watch on: %s wd %s", watched_directory, wd) + mm.wm.rm_watch(wd, rec=True) + + elif m['event_type'] == "change_stor": + storage_directory = self.config.storage_directory + new_storage_directory = m['directory'].encode('utf-8') + + mm = self.proc_fun() + + wd = mm.wm.get_wd(storage_directory) + self.logger.info("Removing watch on: %s wd %s", storage_directory, wd) + mm.wm.rm_watch(wd, rec=True) + + mm.set_needed_file_permissions(new_storage_directory, True) + mm.move_file(storage_directory, new_storage_directory) + self.config.storage_directory = new_storage_directory + + mm.watch_directory(new_storage_directory) + + + def update_airtime(self, d): + + filepath = d['filepath'] + mode = d['mode'] + + md = {} + md['MDATA_KEY_FILEPATH'] = filepath + + if 'data' in d: + file_md = d['data'] + md.update(file_md) + else: + file_md = None + data = None + + + if (os.path.exists(filepath) and (mode == self.config.MODE_CREATE)): + if file_md is None: + mutagen = self.md_manager.get_md_from_file(filepath) + md.update(mutagen) + data = md + elif (os.path.exists(filepath) and (mode == self.config.MODE_MODIFY)): + mutagen = self.md_manager.get_md_from_file(filepath) + md.update(mutagen) + data = md + elif (mode == self.config.MODE_MOVED): + md['MDATA_KEY_MD5'] = self.md_manager.get_md5(filepath) + data = md + elif (mode == self.config.MODE_DELETE): + data = md + + if data is not None: + self.api_client.update_media_metadata(data, mode) + + def process_file_events(self, queue): + + while True: + event = queue.get() + self.logger.info("received event %s", event); + self.update_airtime(event) + + def walk_newly_watched_directory(self, directory): + + mm = self.proc_fun() + + for (path, dirs, files) in os.walk(directory): + for filename in files: + full_filepath = path+"/"+filename + + if mm.is_audio_file(full_filepath): + self.update_airtime({'filepath': full_filepath, 'mode': self.config.MODE_CREATE}) + diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py new file mode 100644 index 000000000..3c1bfbd7f --- /dev/null +++ b/python_apps/media-monitor/airtimefilemonitor/airtimeprocessevent.py @@ -0,0 +1,270 @@ +import os +import socket +import grp +import pwd +import logging + +from multiprocessing import Process, Lock, Queue as mpQueue + +import pyinotify +from pyinotify import WatchManager, Notifier, ProcessEvent + +# For RabbitMQ +from kombu.connection import BrokerConnection +from kombu.messaging import Exchange, Queue, Consumer, Producer + +from airtimemetadata import AirtimeMetadata +from airtimefilemonitor.mediaconfig import AirtimeMediaConfig + +class AirtimeProcessEvent(ProcessEvent): + + def my_init(self, airtime_config=None): + """ + Method automatically called from ProcessEvent.__init__(). Additional + keyworded arguments passed to ProcessEvent.__init__() are then + delegated to my_init(). + """ + + self.logger = logging.getLogger() + self.config = airtime_config + + self.supported_file_formats = ['mp3', 'ogg'] + self.temp_files = {} + self.moved_files = {} + self.file_events = mpQueue() + self.mask = pyinotify.ALL_EVENTS + self.wm = WatchManager() + self.md_manager = AirtimeMetadata() + + schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True) + schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem") + connection = BrokerConnection(self.config.cfg["rabbitmq_host"], self.config.cfg["rabbitmq_user"], self.config.cfg["rabbitmq_password"], "/") + channel = connection.channel() + + def watch_directory(self, directory): + return self.wm.add_watch(directory, self.mask, rec=True, auto_add=True) + + def is_parent_directory(self, filepath, directory): + return (directory == filepath[0:len(directory)]) + + def is_temp_file(self, filename): + info = filename.split(".") + + if(info[-2] in self.supported_file_formats): + return True + else: + return False + + def is_audio_file(self, filename): + info = filename.split(".") + + if(info[-1] in self.supported_file_formats): + return True + else: + return False + + def set_needed_file_permissions(self, item, is_dir): + + try: + omask = os.umask(0) + + uid = pwd.getpwnam('pypo')[2] + gid = grp.getgrnam('www-data')[2] + + os.chown(item, uid, gid) + + if is_dir is True: + os.chmod(item, 02777) + else: + os.chmod(item, 0666) + + except Exception, e: + self.logger.error("Failed to change file's owner/group/permissions.") + self.logger.error(item) + finally: + os.umask(omask) + + def ensure_dir(self, filepath): + directory = os.path.dirname(filepath) + + try: + omask = os.umask(0) + if ((not os.path.exists(directory)) or ((os.path.exists(directory) and not os.path.isdir(directory)))): + os.makedirs(directory, 02777) + self.watch_directory(directory) + finally: + os.umask(omask) + + def move_file(self, source, dest): + + try: + omask = os.umask(0) + os.rename(source, dest) + except Exception, e: + self.logger.error("failed to move file.") + finally: + os.umask(omask) + + #checks if path exists already in stor. If the path exists and the md5s are the same just moves file to same path anyway to avoid duplicates in the system. + def create_unique_filename(self, filepath, old_filepath): + + try: + if(os.path.exists(filepath)): + self.logger.info("Path %s exists", filepath) + + self.logger.info("Checking if md5s are the same.") + md5_fp = self.md_manager.get_md5(filepath) + md5_ofp = self.md_manager.get_md5(old_filepath) + + if(md5_fp == md5_ofp): + self.logger.info("Md5s are the same, moving to same filepath.") + return filepath + + self.logger.info("Md5s aren't the same, appending to filepath.") + file_dir = os.path.dirname(filepath) + filename = os.path.basename(filepath).split(".")[0] + #will be in the format .ext + file_ext = os.path.splitext(filepath)[1] + i = 1; + while(True): + new_filepath = '%s/%s(%s)%s' % (file_dir, filename, i, file_ext) + self.logger.error("Trying %s", new_filepath) + + if(os.path.exists(new_filepath)): + i = i+1; + else: + filepath = new_filepath + break + + except Exception, e: + self.logger.error("Exception %s", e) + + return filepath + + def create_file_path(self, imported_filepath): + + storage_directory = self.config.storage_directory + + try: + #will be in the format .ext + file_ext = os.path.splitext(imported_filepath)[1] + file_ext = file_ext.encode('utf-8') + + orig_md = self.md_manager.get_md_from_file(imported_filepath) + path_md = ['MDATA_KEY_TITLE', 'MDATA_KEY_CREATOR', 'MDATA_KEY_SOURCE', 'MDATA_KEY_TRACKNUMBER', 'MDATA_KEY_BITRATE'] + + md = {} + for m in path_md: + if m not in orig_md: + md[m] = u'unknown'.encode('utf-8') + else: + #get rid of any "/" which will interfere with the filepath. + if isinstance(orig_md[m], basestring): + md[m] = orig_md[m].replace("/", "-") + else: + md[m] = orig_md[m] + + filepath = None + if(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')): + filepath = '%s/%s/%s/%s-%s%s' % (storage_directory, md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) + else: + filepath = '%s/%s/%s/%s-%s-%s%s' % (storage_directory, md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TRACKNUMBER'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext) + + self.logger.info('Created filepath: %s', filepath) + filepath = self.create_unique_filename(filepath, imported_filepath) + self.logger.info('Unique filepath: %s', filepath) + self.ensure_dir(filepath) + + except Exception, e: + self.logger.error('Exception: %s', e) + + return filepath, orig_md + + def process_IN_CREATE(self, event): + + self.logger.info("%s: %s", event.maskname, event.pathname) + storage_directory = self.config.storage_directory + + if not event.dir: + #file created is a tmp file which will be modified and then moved back to the original filename. + if self.is_temp_file(event.name) : + self.temp_files[event.pathname] = None + #This is a newly imported file. + else : + if self.is_audio_file(event.pathname): + if self.is_parent_directory(event.pathname, storage_directory): + self.set_needed_file_permissions(event.pathname, event.dir) + filepath, file_md = self.create_file_path(event.pathname) + self.move_file(event.pathname, filepath) + self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md}) + else: + self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname}) + + else: + if self.is_parent_directory(event.pathname, storage_directory): + self.set_needed_file_permissions(event.pathname, event.dir) + + + def process_IN_MODIFY(self, event): + if not event.dir: + self.logger.info("%s: %s", event.maskname, event.pathname) + if self.is_audio_file(event.name) : + self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY}) + + def process_IN_MOVED_FROM(self, event): + self.logger.info("%s: %s", event.maskname, event.pathname) + if not event.dir: + if event.pathname in self.temp_files: + del self.temp_files[event.pathname] + self.temp_files[event.cookie] = event.pathname + else: + self.moved_files[event.cookie] = event.pathname + + def process_IN_MOVED_TO(self, event): + self.logger.info("%s: %s", event.maskname, event.pathname) + #if stuff dropped in stor via a UI move must change file permissions. + self.set_needed_file_permissions(event.pathname, event.dir) + if not event.dir: + if event.cookie in self.temp_files: + del self.temp_files[event.cookie] + self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY}) + elif event.cookie in self.moved_files: + old_filepath = self.moved_files[event.cookie] + del self.moved_files[event.cookie] + self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_MOVED}) + else: + storage_directory = self.config.storage_directory + if self.is_parent_directory(event.pathname, storage_directory): + filepath, file_md = self.create_file_path(event.pathname) + self.move_file(event.pathname, filepath) + self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md}) + else: + self.file_events.put({'mode': self.config.MODE_CREATE, 'filepath': event.pathname}) + + def process_IN_DELETE(self, event): + self.logger.info("%s: %s", event.maskname, event.pathname) + if not event.dir: + self.file_events.put({'filepath': event.pathname, 'mode': self.config.MODE_DELETE}) + + def process_default(self, event): + #self.logger.info("%s: %s", event.maskname, event.pathname) + pass + + def notifier_loop_callback(self, notifier): + + #put a watch on any fully imported watched directories. + for watched_directory in notifier.import_processes.keys(): + process = notifier.import_processes[watched_directory] + if not process.is_alive(): + self.watch_directory(watched_directory) + del notifier.import_processes[watched_directory] + + #check for any events recieved from Airtime. + try: + notifier.connection.drain_events(timeout=0.1) + #avoid logging a bunch of timeout messages. + except socket.timeout: + pass + except Exception, e: + self.logger.info("%s", e) + diff --git a/python_apps/media-monitor/airtimefilemonitor/mediaconfig.py b/python_apps/media-monitor/airtimefilemonitor/mediaconfig.py new file mode 100644 index 000000000..d1a4e04ae --- /dev/null +++ b/python_apps/media-monitor/airtimefilemonitor/mediaconfig.py @@ -0,0 +1,24 @@ +import sys + +from configobj import ConfigObj + +class AirtimeMediaConfig: + + MODE_CREATE = "create" + MODE_MODIFY = "modify" + MODE_MOVED = "moved" + MODE_DELETE = "delete" + + def __init__(self): + + # loading config file + try: + config = ConfigObj('/etc/airtime/media-monitor.cfg') + self.cfg = config + except Exception, e: + print 'Error loading config: ', e + sys.exit() + + self.storage_directory = None + + From e725e4fbe4efd024f3aef648ffd6f1349046375f Mon Sep 17 00:00:00 2001 From: Naomi Aro Date: Thu, 23 Jun 2011 17:51:38 +0200 Subject: [PATCH 2/2] cc-1799 Human Filesystem storage dir should be created first so then it's realpath can be inserted into the database. --- .../application/controllers/PluploadController.php | 2 +- airtime_mvc/application/models/StoredFile.php | 10 ++++------ install/include/airtime-install.php | 6 +++--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/airtime_mvc/application/controllers/PluploadController.php b/airtime_mvc/application/controllers/PluploadController.php index 8b6068f51..f52de7370 100644 --- a/airtime_mvc/application/controllers/PluploadController.php +++ b/airtime_mvc/application/controllers/PluploadController.php @@ -25,7 +25,7 @@ class PluploadController extends Zend_Controller_Action public function uploadAction() { $upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload"; - $res = StoredFile::uploadFile($upload_dir); + StoredFile::uploadFile($upload_dir); die('{"jsonrpc" : "2.0"}'); } diff --git a/airtime_mvc/application/models/StoredFile.php b/airtime_mvc/application/models/StoredFile.php index a45fc72d2..951591d47 100644 --- a/airtime_mvc/application/models/StoredFile.php +++ b/airtime_mvc/application/models/StoredFile.php @@ -800,12 +800,10 @@ class StoredFile { if (PEAR::isError($duplicate)) { die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": ' . $duplicate->getMessage() .'}}'); } - else { - if (file_exists($duplicate->getFilePath())) { - $duplicateName = $duplicate->getMetadataValue(UI_MDATA_KEY_TITLE); - die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": "An identical audioclip named ' . $duplicateName . ' already exists in the storage server."}}'); - } - } + if (file_exists($duplicate->getFilePath())) { + $duplicateName = $duplicate->getMetadataValue(UI_MDATA_KEY_TITLE); + die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": "An identical audioclip named ' . $duplicateName . ' already exists in the storage server."}}'); + } } $storDir = MusicDir::getStorDir(); diff --git a/install/include/airtime-install.php b/install/include/airtime-install.php index 2cda6da54..400b98f4d 100644 --- a/install/include/airtime-install.php +++ b/install/include/airtime-install.php @@ -66,7 +66,7 @@ if(isset($version) && ($version != false) && ($version < AIRTIME_VERSION)) { echo "It appears you already have a version of Airtime installed.\n"; echo "Upgrading is currently disabled for Airtime 1.9.0-beta1.\n"; exit(2); - + require_once("airtime-upgrade.php"); //Make sure to exit with non-zero error code so that airtime-install @@ -130,6 +130,8 @@ require_once(AirtimeInstall::GetAirtimeSrcDir().'/application/configs/conf.php') echo "* Airtime Version: ".AIRTIME_VERSION.PHP_EOL; +AirtimeInstall::InstallStorageDirectory(); + if ($db_install) { if($newInstall) { // This is called with "system" so that we can pass in a parameter. See the file itself @@ -141,8 +143,6 @@ if ($db_install) { } } -AirtimeInstall::InstallStorageDirectory(); - AirtimeInstall::CreateSymlinksToUtils(); AirtimeInstall::CreateZendPhpLogFile();