# -*- coding: utf-8 -*- import copy import subprocess import os import math import shutil import re import sys import hashlib import locale import operator as op from os.path import normpath from itertools import takewhile # you need to import reduce in python 3 try: from functools import reduce except: pass from configobj import ConfigObj from media.monitor.exceptions import FailedToSetLocale, FailedToCreateDir #supported_extensions = [u"mp3", u"ogg", u"oga"] supported_extensions = [u"mp3", u"ogg", u"oga", u"flac", u"aac", u"wav"] unicode_unknown = u'unknown' path_md = ['MDATA_KEY_TITLE', 'MDATA_KEY_CREATOR', 'MDATA_KEY_SOURCE', 'MDATA_KEY_TRACKNUMBER', 'MDATA_KEY_BITRATE'] class LazyProperty(object): """ meant to be used for lazy evaluation of an object attribute. property should represent non-mutable data, as it replaces itself. """ def __init__(self,fget): self.fget = fget self.func_name = fget.__name__ def __get__(self,obj,cls): if obj is None: return None value = self.fget(obj) setattr(obj,self.func_name,value) return value class IncludeOnly(object): """ A little decorator to help listeners only be called on extensions they support NOTE: this decorator only works on methods and not functions. Maybe fix this? """ def __init__(self, *deco_args): self.exts = set([]) for arg in deco_args: if isinstance(arg,str): self.add(arg) elif hasattr(arg, '__iter__'): for x in arg: self.exts.add(x) def __call__(self, func): def _wrap(moi, event, *args, **kwargs): ext = extension(event.pathname) # Checking for emptiness b/c we don't want to skip direcotries if (ext.lower() in self.exts) or event.dir: return func(moi, event, *args, **kwargs) return _wrap def partition(f, alist): """ Partition is very similar to filter except that it also returns the elements for which f return false but in a tuple. >>> partition(lambda x : x > 3, [1,2,3,4,5,6]) ([4, 5, 6], [1, 2, 3]) """ return (filter(f, alist), filter(lambda x: not f(x), alist)) def is_file_supported(path): """ Checks if a file's path(filename) extension matches the kind that we support note that this is case insensitive. >>> is_file_supported("test.mp3") True >>> is_file_supported("/bs/path/test.mP3") True >>> is_file_supported("test.txt") False """ return extension(path).lower() in supported_extensions # TODO : In the future we would like a better way to find out whether a show # has been recorded def is_airtime_recorded(md): """ Takes a metadata dictionary and returns True if it belongs to a file that was recorded by Airtime. """ if not 'MDATA_KEY_CREATOR' in md: return False return md['MDATA_KEY_CREATOR'] == u'Airtime Show Recorder' def clean_empty_dirs(path): """ walks path and deletes every empty directory it finds """ # TODO : test this function if path.endswith('/'): clean_empty_dirs(path[0:-1]) else: for root, dirs, _ in os.walk(path, topdown=False): full_paths = ( os.path.join(root, d) for d in dirs ) for d in full_paths: if os.path.exists(d): if not os.listdir(d): os.removedirs(d) def extension(path): """ return extension of path, empty string otherwise. Prefer to return empty string instead of None because of bad handling of "maybe" types in python. I.e. interpreter won't enforce None checks on the programmer >>> extension("testing.php") 'php' >>> extension("a.b.c.d.php") 'php' >>> extension('/no/extension') '' >>> extension('/path/extension.ml') 'ml' """ ext = path.split(".") if len(ext) < 2: return "" else: return ext[-1] def no_extension_basename(path): """ returns the extensionsless basename of a filepath >>> no_extension_basename("/home/test.mp3") u'test' >>> no_extension_basename("/home/test") u'test' >>> no_extension_basename('blah.ml') u'blah' >>> no_extension_basename('a.b.c.d.mp3') u'a.b.c.d' """ base = unicode(os.path.basename(path)) if extension(base) == "": return base else: return '.'.join(base.split(".")[0:-1]) def walk_supported(directory, clean_empties=False): """ A small generator wrapper around os.walk to only give us files that support the extensions we are considering. When clean_empties is True we recursively delete empty directories left over in directory after the walk. """ for root, dirs, files in os.walk(directory): full_paths = ( os.path.join(root, name) for name in files if is_file_supported(name) ) for fp in full_paths: yield fp if clean_empties: clean_empty_dirs(directory) def magic_move(old, new, after_dir_make=lambda : None): """ Moves path old to new and constructs the necessary to directories for new along the way """ new_dir = os.path.dirname(new) if not os.path.exists(new_dir): os.makedirs(new_dir) # We need this crusty hack because anytime a directory is created we must # re-add it with add_watch otherwise putting files in it will not trigger # pyinotify events after_dir_make() shutil.move(old,new) def move_to_dir(dir_path,file_path): """ moves a file at file_path into dir_path/basename(filename) """ bs = os.path.basename(file_path) magic_move(file_path, os.path.join(dir_path, bs)) def apply_rules_dict(d, rules): """ Consumes a dictionary of rules that maps some keys to lambdas which it applies to every matching element in d and returns a new dictionary with the rules applied. If a rule returns none then it's not applied """ new_d = copy.deepcopy(d) for k, rule in rules.iteritems(): if k in d: new_val = rule(d[k]) if new_val is not None: new_d[k] = new_val return new_d def default_to_f(dictionary, keys, default, condition): new_d = copy.deepcopy(dictionary) for k in keys: if condition(dictionary=new_d, key=k): new_d[k] = default return new_d def default_to(dictionary, keys, default): """ Checks if the list of keys 'keys' exists in 'dictionary'. If not then it returns a new dictionary with all those missing keys defaults to 'default' """ cnd = lambda dictionary, key: key not in dictionary return default_to_f(dictionary, keys, default, cnd) def remove_whitespace(dictionary): """ Remove values that empty whitespace in the dictionary """ nd = copy.deepcopy(dictionary) bad_keys = [] for k,v in nd.iteritems(): if hasattr(v,'strip'): stripped = v.strip() # ghetto and maybe unnecessary if stripped == '' or stripped == u'': bad_keys.append(k) for bad_key in bad_keys: del nd[bad_key] return nd def parse_int(s): """ Tries very hard to get some sort of integer result from s. Defaults to 0 when it fails >>> parse_int("123") '123' >>> parse_int("123saf") '123' >>> parse_int("asdf") None """ if s.isdigit(): return s else: try : return str(reduce(op.add, takewhile(lambda x: x.isdigit(), s))) except: return None def normalized_metadata(md, original_path): """ consumes a dictionary of metadata and returns a new dictionary with the formatted meta data. We also consume original_path because we must set MDATA_KEY_CREATOR based on in it sometimes """ new_md = copy.deepcopy(md) # replace all slashes with dashes #for k,v in new_md.iteritems(): new_md[k] = unicode(v).replace('/','-') # Specific rules that are applied in a per attribute basis format_rules = { 'MDATA_KEY_TRACKNUMBER' : parse_int, 'MDATA_KEY_FILEPATH' : lambda x: os.path.normpath(x), #'MDATA_KEY_MIME' : lambda x: x.replace('-','/'), 'MDATA_KEY_BPM' : lambda x: x[0:8], } new_md = remove_whitespace(new_md) # remove whitespace fields # Format all the fields in format_rules new_md = apply_rules_dict(new_md, format_rules) # set filetype to audioclip by default new_md = default_to(dictionary=new_md, keys=['MDATA_KEY_FTYPE'], default=u'audioclip') # Try to parse bpm but delete the whole key if that fails if 'MDATA_KEY_BPM' in new_md: new_md['MDATA_KEY_BPM'] = parse_int(new_md['MDATA_KEY_BPM']) if new_md['MDATA_KEY_BPM'] is None: del new_md['MDATA_KEY_BPM'] if is_airtime_recorded(new_md): #hour,minute,second,name = new_md['MDATA_KEY_TITLE'].split("-",3) #new_md['MDATA_KEY_TITLE'] = u'%s-%s-%s:%s:%s' % \ #(name, new_md['MDATA_KEY_YEAR'], hour, minute, second) # We changed show recorder to output correct metadata for recorded # shows pass else: # Read title from filename if it does not exist default_title = no_extension_basename(original_path) if re.match(".+-%s-.+$" % unicode_unknown, default_title): default_title = u'' new_md = default_to(dictionary=new_md, keys=['MDATA_KEY_TITLE'], default=default_title) new_md['MDATA_KEY_TITLE'] = re.sub(r'-\d+kbps$', u'', new_md['MDATA_KEY_TITLE']) # TODO : wtf is this for again? new_md['MDATA_KEY_TITLE'] = re.sub(r'-?%s-?' % unicode_unknown, u'', new_md['MDATA_KEY_TITLE']) # ugly mother fucking band aid until enterprise metadata framework is # working return new_md def organized_path(old_path, root_path, orig_md): """ old_path - path where file is store at the moment <= maybe not necessary? root_path - the parent directory where all organized files go orig_md - original meta data of the file as given by mutagen AFTER being normalized return value: new file path """ filepath = None ext = extension(old_path) def default_f(dictionary, key): if key in dictionary: return len(dictionary[key]) == 0 else: return True # We set some metadata elements to a default "unknown" value because we use # these fields to create a path hence they cannot be empty Here "normal" # means normalized only for organized path # MDATA_KEY_BITRATE is in bytes/second i.e. (256000) we want to turn this # into 254kbps # Some metadata elements cannot be empty, hence we default them to some # value just so that we can create a correct path normal_md = default_to_f(orig_md, path_md, unicode_unknown, default_f) try: formatted = str(int(normal_md['MDATA_KEY_BITRATE']) / 1000) normal_md['MDATA_KEY_BITRATE'] = formatted + 'kbps' except: normal_md['MDATA_KEY_BITRATE'] = unicode_unknown if is_airtime_recorded(normal_md): # normal_md['MDATA_KEY_TITLE'] = 'show_name-yyyy-mm-dd-hh:mm:ss' r = "(?P.+)-(?P\d+-\d+-\d+)-(?P