initial commit

This commit is contained in:
Rudi Grinberg 2012-07-09 16:27:37 -04:00 committed by Martin Konecny
parent 794cf2c845
commit 3b385df969
11 changed files with 294 additions and 0 deletions

View File

@ -0,0 +1,15 @@
from pydispatch import dispatcher
SIGNAL = 'my-first-signal'
def handle_event( sender ):
"""Simple event handler"""
print 'Signal was sent by', sender
dispatcher.connect( handle_event, signal=SIGNAL, sender=dispatcher.Any )
first_sender = object()
second_sender = {}
def main( ):
dispatcher.send( signal=SIGNAL, sender=first_sender )
dispatcher.send( signal=SIGNAL, sender=second_sender )
main()

View File

@ -0,0 +1,40 @@
import os
import mutagen
import abc
# Note: this isn't really good design...
# Anyone who expects a BaseEvent object should be able to handle any instances
# of its subclasses by the substitution principle. CLearly not the case with
# the DeleteFile subclass.
# It would be good if we could parameterize this class by the attribute
# that would contain the path to obtain the meta data. But it would be too much
# work for little reward
class HasMetaData(object):
__metaclass__ = abc.ABCMeta
def __init__(self, *args, **kwargs):
self.__metadata = None
self.__loaded = False
@property
def metadata(self):
if self.__loaded: return self.__metadata
else:
f = mutagen.File(self.path, easy=True)
self.__metadata = f
self.__loaded = True
return self.metadata
class BaseEvent(object):
__metaclass__ = abc.ABCMeta
def __init__(self, raw_event):
self.__raw_event = raw_event
self.path = os.path.normpath(raw_event.pathname)
super(BaseEvent, self).__init__()
def exists(self): return os.path.exists(self.path)
def __str__(self):
return "Event. Path: %s" % self.__raw_event.pathname
class OrganizeFile(BaseEvent, HasMetaData): pass
class NewFile(BaseEvent, HasMetaData): pass
class DeleteFile(BaseEvent): pass

View File

@ -0,0 +1,16 @@
from pydispatch import dispatcher
import abc
class Handler(object):
__metaclass__ = abc.ABCMeta
def __init__(self, signal, target):
self.target = target
self.signal = signal
def dummy(sender, event):
self.handle(sender,event)
dispatcher.connect(dummy, signal=signal, sender=dispatcher.Any, weak=False)
@abc.abstractmethod
def handle(self, sender, event): pass

View File

@ -0,0 +1,48 @@
import pyinotify
from pydispatch import dispatcher
import media.monitor.pure as mmp
from media.monitor.events import OrganizeFile, NewFile, DeleteFile
class IncludeOnly(object):
def __init__(self, *deco_args):
self.exts = set([])
for arg in deco_args:
if isinstance(arg,str): self.add(arg)
elif hasattr(arg, '__iter__'):
for x in arg: self.exts.add(x)
def __call__(self, func):
def _wrap(moi, event, *args, **kwargs):
ext = mmp.extension(event.pathname)
if ext in self.exts: func(moi, event, *args, **kwargs)
return _wrap
class BaseListener(object):
def my_init(self, signal):
self.signal = signal
class OrganizeListener(BaseListener, pyinotify.ProcessEvent):
# this class still don't handle the case where a dir was copied recursively
def process_IN_CLOSE_WRITE(self, event): self.process_to_organize(event)
# got cookie
def process_IN_MOVED_TO(self, event): self.process_to_organize(event)
@IncludeOnly(mmp.supported_extensions)
def process_to_organize(self, event):
dispatcher.send(signal=self.signal, sender=self, event=OrganizeFile(event))
class StoreWatchListener(BaseListener, pyinotify.ProcessEvent):
def process_IN_CLOSE_WRITE(self, event): self.process_create(event)
def process_IN_MOVE_TO(self, event): self.process_create(event)
def process_IN_MOVE_FROM(self, event): self.process_delete(event)
def process_IN_DELETE(self,event): self.process_delete(event)
@IncludeOnly(mmp.supported_extensions)
def process_create(self, event):
dispatcher.send(signal=self.signal, sender=self, event=NewFile(event))
@IncludeOnly(mmp.supported_extensions)
def process_delete(self, event):
dispatcher.send(signal=self.signal, sender=self, event=DeleteFile(event))

View File

@ -0,0 +1,8 @@
from media.monitor.handler import Handler
class Organizer(Handler):
def correct_path(self): pass
def handle(self, sender, event):
print("Handling event: %s" % str(event))

View File

@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
import copy
import os
supported_extensions = ["mp3", "ogg"]
unicode_unknown = u'unknown'
def is_airtime_show_recorder(md):
return md['MDATA_KEY_CREATOR'] == u'Airtime Show Recorder'
def extension(path):
"""
return extension of path, empty string otherwise. Prefer
to return empty string instead of None because of bad handling of "maybe"
types in python. I.e. interpreter won't enforce None checks on the programmer
>>> extension("testing.php")
'php'
>>> extension('/no/extension')
''
>>> extension('/path/extension.ml')
'ml'
"""
ext = path.split(".")
if len(ext) < 2: return ""
else: return ext[-1]
def apply_rules_dict(d, rules):
""" NOTE: this function isn't actually pure but probably should be... """
for k, rule in rules.iteritems():
if k in d: d[k] = rule(d[k])
def default_to(dictionary, keys, default):
""" NOTE: this function mutates dictionary as well. The name for this module
is terrible. Change it later."""
for k in keys:
if not (k in dictionary): dictionary[k] = default
def normalized_metadata(md):
""" consumes a dictionary of metadata and returns a new dictionary with the
formatted meta data """
new_md = copy.deepcopy(md)
# replace all slashes with dashes
for k,v in new_md.iteritems(): new_md[k] = v.replace('/','-')
# Specific rules that are applied in a per attribute basis
format_rules = {
# It's very likely that the following isn't strictly necessary. But the old
# code would cast MDATA_KEY_TRACKNUMBER to an integer as a byproduct of
# formatting the track number to 2 digits.
'MDATA_KEY_TRACKNUMBER' : lambda x: int(x),
'MDATA_KEY_BITRATE' : lambda x: str(x / 1000) + "kbps",
# note: you don't actually need the lambda here. It's only used for clarity
'MDATA_KEY_FILEPATH' : lambda x: os.path.normpath(x),
}
path_md = ['MDATA_KEY_TITLE', 'MDATA_KEY_CREATOR', 'MDATA_KEY_SOURCE',
'MDATA_KEY_TRACKNUMBER', 'MDATA_KEY_BITRATE']
# note that we could have saved a bit of code by rewriting new_md using
# defaultdict(lambda x: "unknown"). But it seems to be too implicit and
# could possibly lead to subtle bugs down the road. Plus the following
# approach gives us the flexibility to use different defaults for
# different attributes
default_to(dictionary=new_md, keys=path_md, default=unicode_unknown)
# should apply the format_rules last
apply_rules_dict(new_md, format_rules)
# In the case where the creator is 'Airtime Show Recorder' we would like to
# format the MDATA_KEY_TITLE slightly differently
# Note: I don't know why I'm doing a unicode string comparison here
# that part is copied from the original code
if is_airtime_show_recorder(md):
hour,minute,second,name = md['MDATA_KEY_TITLE'].split("-",4)
# We assume that MDATA_KEY_YEAR is always given for airtime recorded
# shows
new_md['MDATA_KEY_TITLE'] = '%s-%s-%s:%s:%s' % \
(name, new_md['MDATA_KEY_YEAR'], hour, minute, second)
# IMPORTANT: in the original code. MDATA_KEY_FILEPATH would also
# be set to the original path of the file for airtime recorded shows
# (before it was "organized"). We will skip this procedure for now
# because it's not clear why it was done
return new_md
def organized_path(self, old_path, root_path, normal_md):
"""
old_path - path where file is store at the moment <= maybe not necessary?
root_path - the parent directory where all organized files go
normal_md - original meta data of the file as given by mutagen AFTER being normalized
return value: new file path
"""
filepath = None
ext = extension(filepath)
# The blocks for each if statement look awfully similar. Perhaps there is a
# way to simplify this code
if is_airtime_show_recorder(normal_md):
fname = u'%s-%s-%s.%s' % ( normal_md['MDATA_KEY_YEAR'], normal_md['MDATA_KEY_TITLE'],
normal_md['MDATA_KEY_BITRATE'], ext )
yyyy, mm, _ = normal_md['MDATA_KEY_YEAR'].split('-',3)
path = os.path.join(root_path,"recorded", yyyy, mm)
filepath = os.path.join(path,fname)
elif normal_md['MDATA_KEY_TRACKNUMBER'] == unicode_unknown:
fname = u'%s-%s.%s' % (normal_md['MDATA_KEY_TITLE'], normal_md['MDATA_KEY_BITRATE'], ext)
path = os.path.join(root_path, "imported", normal_md['MDATA_KEY_CREATOR'],
normal_md['MDATA_KEY_SOURCE'] )
filepath = os.path.join(path, fname)
else: # The "normal" case
fname = u'%s-%s-%s.%s' % (normal_md['MDATA_KEY_TRACKNUMBER'], normal_md['MDATA_KEY_TITLE'],
normal_md['MDATA_KEY_BITRATE'], ext)
path = os.path.join(root_path, "imported", normal_md['MDATA_KEY_CREATOR'],
normal_md['MDATA_KEY_SOURCE'])
filepath = os.path.join(path, fname)
return filepath
if __name__ == '__main__':
import doctest
doctest.testmod()

View File

@ -0,0 +1,22 @@
# testing ground for the script
import pyinotify
from pydispatch import dispatcher
from media.monitor.listeners import OrganizeListener, StoreWatchListener
from media.monitor.organizer import Organizer
wm = pyinotify.WatchManager()
o1 = OrganizeListener(signal='org')
o2 = StoreWatchListener(signal='watch')
notifier = pyinotify.Notifier(wm)
wdd1 = wm.add_watch('/home/rudi/throwaway/fucking_around/organize', pyinotify.ALL_EVENTS, rec=True, auto_add=True, proc_fun=o1)
wdd2 = wm.add_watch('/home/rudi/throwaway/fucking_around/watch', pyinotify.ALL_EVENTS, rec=True, auto_add=True, proc_fun=o2)
def watch_event(sender, event):
print("Watch: Was sent by %s with %s" % (sender, event))
org = Organizer(signal='org', target='/home/rudi/throwaway/fucking_around/watch')
dispatcher.connect(watch_event, signal='watch', sender=dispatcher.Any)
notifier.loop()

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
import unittest
import media.monitor.pure as mmp
class TestMMP(unittest.TestCase):
def test_apply_rules(self):
sample_dict = {
'key' : 'val',
'test' : 'IT',
}
rules = {
'key' : lambda x : x.upper(),
'test' : lambda y : y.lower()
}
mmp.apply_rules_dict(sample_dict, rules)
self.assertEqual(sample_dict['key'], 'VAL')
self.assertEqual(sample_dict['test'], 'it')
def test_default_to(self):
sd = { }
def_keys = ['one','two','three']
mmp.default_to(dictionary=sd, keys=def_keys, default='DEF')
for k in def_keys: self.assertEqual( sd[k], 'DEF' )
def test_normalized_metadata(self):
pass
def test_organized_path(self):
pass
if __name__ == '__main__': unittest.main()