Merge branch 'mm_refactor_for_saas' of https://github.com/rgrinberg/Airtime into devel

This commit is contained in:
Martin Konecny 2012-11-15 13:37:45 -05:00
commit 5d03ed225d
19 changed files with 302 additions and 194 deletions

View File

@ -399,3 +399,4 @@ class AirtimeApiClient(object):
def push_stream_stats(self, data):
# TODO : users of this method should do their own error handling
response = self.services.push_stream_stats(_post_data={'data': json.dumps(data)})
return response

View File

@ -15,7 +15,7 @@ from media.monitor.exceptions import DirectoryIsNotListed
from media.monitor.bootstrap import Bootstrapper
from media.monitor.listeners import FileMediator
from api_clients import api_client as apc
from media.saas.thread import apc
class AirtimeNotifier(Loggable):
"""
@ -98,7 +98,7 @@ class AirtimeMessageReceiver(Loggable):
if (not directory_id) and (not directory):
raise ValueError("You must provide either directory_id or \
directory")
sdb = AirtimeDB(apc.AirtimeApiClient.create_right_config())
sdb = AirtimeDB(apc())
if directory : directory = os.path.normpath(directory)
if directory_id == None : directory_id = sdb.to_id(directory)
if directory == None : directory = sdb.to_directory(directory_id)

View File

@ -12,25 +12,21 @@ class MMConfig(object):
self.cfg = ConfigObj(path)
def __getitem__(self, key):
"""
We always return a copy of the config item to prevent callers from
doing any modifications through the returned objects methods
"""
""" We always return a copy of the config item to prevent
callers from doing any modifications through the returned
objects methods """
return copy.deepcopy(self.cfg[key])
def __setitem__(self, key, value):
"""
We use this method not to allow anybody to mess around with config file
any settings made should be done through MMConfig's instance methods
"""
""" We use this method not to allow anybody to mess around with
config file any settings made should be done through MMConfig's
instance methods """
raise ConfigAccessViolation(key)
def save(self): self.cfg.write()
def last_ran(self):
"""
Returns the last time media monitor was ran by looking at the time when
the file at 'index_path' was modified
"""
""" Returns the last time media monitor was ran by looking at
the time when the file at 'index_path' was modified """
return mmp.last_modified(self.cfg['index_path'])

View File

@ -23,7 +23,7 @@ class FailedToObtainLocale(Exception):
class CouldNotCreateIndexFile(Exception):
"""exception whenever index file cannot be created"""
def __init__(self, path, cause):
def __init__(self, path, cause=None):
self.path = path
self.cause = cause
def __str__(self): return "Failed to create touch file '%s'" % self.path

View File

@ -1,5 +1,4 @@
import pyinotify
import threading
import time
from pydispatch import dispatcher
@ -9,19 +8,19 @@ from media.monitor.log import Loggable
from media.monitor.listeners import StoreWatchListener, OrganizeListener
from media.monitor.handler import ProblemFileHandler
from media.monitor.organizer import Organizer
from media.saas.thread import InstanceInheritingThread
import media.monitor.pure as mmp
class ManagerTimeout(threading.Thread,Loggable):
"""
The purpose of this class is to flush the organize directory every 3
secnods. This used to be just a work around for cc-4235 but recently
became a permanent solution because it's "cheap" and reliable
"""
class ManagerTimeout(InstanceInheritingThread,Loggable):
""" The purpose of this class is to flush the organize directory
every 3 secnods. This used to be just a work around for cc-4235
but recently became a permanent solution because it's "cheap" and
reliable """
def __init__(self, manager, interval=1.5):
# TODO : interval should be read from config and passed here instead
# of just using the hard coded value
threading.Thread.__init__(self)
super(ManagerTimeout, self).__init__()
self.manager = manager
self.interval = interval
def run(self):
@ -30,19 +29,15 @@ class ManagerTimeout(threading.Thread,Loggable):
self.manager.flush_organize()
class Manager(Loggable):
"""
An abstraction over media monitors core pyinotify functions. These
include adding watched,store, organize directories, etc. Basically
composes over WatchManager from pyinotify
"""
""" An abstraction over media monitors core pyinotify functions.
These include adding watched,store, organize directories, etc.
Basically composes over WatchManager from pyinotify """
def __init__(self):
self.wm = pyinotify.WatchManager()
# These two instance variables are assumed to be constant
self.watch_channel = 'watch'
self.organize_channel = 'organize'
self.watch_listener = StoreWatchListener(signal = self.watch_channel)
# TODO : change this to a weak ref
# TODO : get rid of this hack once cc-4235 is fixed
self.__timeout_thread = ManagerTimeout(self)
self.__timeout_thread.daemon = True
self.__timeout_thread.start()
@ -76,23 +71,19 @@ class Manager(Loggable):
# through dedicated handler objects. Because we must have access to a
# manager instance. Hence we must slightly break encapsulation.
def watch_move(self, watch_dir, sender=None):
"""
handle 'watch move' events directly sent from listener
"""
""" handle 'watch move' events directly sent from listener """
self.logger.info("Watch dir '%s' has been renamed (hence removed)" %
watch_dir)
self.remove_watch_directory(normpath(watch_dir))
def watch_signal(self):
"""
Return the signal string our watch_listener is reading events from
"""
""" Return the signal string our watch_listener is reading
events from """
return self.watch_listener.signal
def __remove_watch(self,path):
"""
Remove path from being watched (first will check if 'path' is watched)
"""
""" Remove path from being watched (first will check if 'path'
is watched) """
# only delete if dir is actually being watched
if path in self.__wd_path:
wd = self.__wd_path[path]
@ -100,10 +91,8 @@ class Manager(Loggable):
del(self.__wd_path[path])
def __add_watch(self,path,listener):
"""
Start watching 'path' using 'listener'. First will check if directory
is being watched before adding another watch
"""
""" Start watching 'path' using 'listener'. First will check if
directory is being watched before adding another watch """
self.logger.info("Attempting to add listener to path '%s'" % path)
self.logger.info( 'Listener: %s' % str(listener) )
@ -114,9 +103,8 @@ class Manager(Loggable):
if wd: self.__wd_path[path] = wd.values()[0]
def __create_organizer(self, target_path, recorded_path):
"""
creates an organizer at new destination path or modifies the old one
"""
""" creates an organizer at new destination path or modifies the
old one """
# TODO : find a proper fix for the following hack
# We avoid creating new instances of organize because of the way
# it interacts with pydispatch. We must be careful to never have
@ -134,23 +122,17 @@ class Manager(Loggable):
recorded_path=recorded_path)
def get_problem_files_path(self):
"""
returns the path where problem files should go
"""
""" returns the path where problem files should go """
return self.organize['problem_files_path']
def set_problem_files_path(self, new_path):
"""
Set the path where problem files should go
"""
""" Set the path where problem files should go """
self.organize['problem_files_path'] = new_path
self.organize['problem_handler'] = \
ProblemFileHandler( PathChannel(signal='badfile',path=new_path) )
def get_recorded_path(self):
"""
returns the path of the recorded directory
"""
""" returns the path of the recorded directory """
return self.organize['recorded_path']
def set_recorded_path(self, new_path):
@ -160,17 +142,14 @@ class Manager(Loggable):
self.__add_watch(new_path, self.watch_listener)
def get_organize_path(self):
"""
returns the current path that is being watched for organization
"""
""" returns the current path that is being watched for
organization """
return self.organize['organize_path']
def set_organize_path(self, new_path):
"""
sets the organize path to be new_path. Under the current scheme there
is only one organize path but there is no reason why more cannot be
supported
"""
""" sets the organize path to be new_path. Under the current
scheme there is only one organize path but there is no reason
why more cannot be supported """
# if we are already organizing a particular directory we remove the
# watch from it first before organizing another directory
self.__remove_watch(self.organize['organize_path'])
@ -188,19 +167,15 @@ class Manager(Loggable):
return self.organize['imported_path']
def set_imported_path(self,new_path):
"""
set the directory where organized files go to.
"""
""" set the directory where organized files go to. """
self.__remove_watch(self.organize['imported_path'])
self.organize['imported_path'] = new_path
self.__create_organizer( new_path, self.organize['recorded_path'])
self.__add_watch(new_path, self.watch_listener)
def change_storage_root(self, store):
"""
hooks up all the directories for you. Problem, recorded, imported,
organize.
"""
""" hooks up all the directories for you. Problem, recorded,
imported, organize. """
store_paths = mmp.expand_storage(store)
# First attempt to make sure that all paths exist before adding any
# watches
@ -217,18 +192,14 @@ class Manager(Loggable):
mmp.create_dir(p)
def has_watch(self, path):
"""
returns true if the path is being watched or not. Any kind of watch:
organize, store, watched.
"""
""" returns true if the path is being watched or not. Any kind
of watch: organize, store, watched. """
return path in self.__wd_path
def add_watch_directory(self, new_dir):
"""
adds a directory to be "watched". "watched" directories are
""" adds a directory to be "watched". "watched" directories are
those that are being monitored by media monitor for airtime in
this context and not directories pyinotify calls watched
"""
this context and not directories pyinotify calls watched """
if self.has_watch(new_dir):
self.logger.info("Cannot add '%s' to watched directories. It's \
already being watched" % new_dir)
@ -237,9 +208,8 @@ class Manager(Loggable):
self.__add_watch(new_dir, self.watch_listener)
def remove_watch_directory(self, watch_dir):
"""
removes a directory from being "watched". Undoes add_watch_directory
"""
""" removes a directory from being "watched". Undoes
add_watch_directory """
if self.has_watch(watch_dir):
self.logger.info("Removing watched directory: '%s'", watch_dir)
self.__remove_watch(watch_dir)
@ -250,9 +220,7 @@ class Manager(Loggable):
self.logger.info( self.__wd_path )
def loop(self):
"""
block until we receive pyinotify events
"""
""" block until we receive pyinotify events """
notifier = pyinotify.Notifier(self.wm)
notifier.coalesce_events()
notifier.loop()

View File

@ -1,14 +1,12 @@
# -*- coding: utf-8 -*-
import threading
from media.monitor.exceptions import BadSongFile
from media.monitor.log import Loggable
import api_clients.api_client as ac
from media.saas.thread import apc, InstanceInheritingThread
class ThreadedRequestSync(threading.Thread, Loggable):
class ThreadedRequestSync(InstanceInheritingThread, Loggable):
def __init__(self, rs):
threading.Thread.__init__(self)
super(ThreadedRequestSync, self).__init__()
self.rs = rs
self.daemon = True
self.start()
@ -22,7 +20,7 @@ class RequestSync(Loggable):
for some number of times """
@classmethod
def create_with_api_client(cls, watcher, requests):
apiclient = ac.AirtimeApiClient.create_right_config()
apiclient = apc()
self = cls(watcher, requests, apiclient)
return self

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
import threading
import time
import copy
@ -9,15 +8,16 @@ from media.monitor.exceptions import BadSongFile
from media.monitor.eventcontractor import EventContractor
from media.monitor.events import EventProxy
from media.monitor.request import ThreadedRequestSync, RequestSync
from media.saas.thread import InstanceInheritingThread
class TimeoutWatcher(threading.Thread,Loggable):
class TimeoutWatcher(InstanceInheritingThread,Loggable):
"""
The job of this thread is to keep an eye on WatchSyncer and force a
request whenever the requests go over time out
"""
def __init__(self, watcher, timeout=5):
self.logger.info("Created timeout thread...")
threading.Thread.__init__(self)
super(TimeoutWatcher, self).__init__()
self.watcher = watcher
self.timeout = timeout

View File

@ -0,0 +1,32 @@
import os
from media.monitor.exceptions import NoConfigFile
from media.monitor.pure import LazyProperty
from media.monitor.config import MMConfig
from api_clients.api_client import AirtimeApiClient
class AirtimeInstance(object):
""" AirtimeInstance is a class that abstracts away every airtime
instance by providing all the necessary objects required to interact
with the instance. ApiClient, configs, root_directory """
def __init__(self,name, root_path, config_paths):
""" name is an internal name only """
for cfg in ['api_client','media_monitor', 'logging']:
if cfg not in config_paths: raise NoConfigFile(config_paths)
elif not os.path.exists(config_paths[cfg]):
raise NoConfigFile(config_paths[cfg])
self.name = name
self.config_paths = config_paths
self.root_path = root_path
def __str__(self):
return "%s,%s(%s)" % (self.name, self.root_path, self.config_paths)
@LazyProperty
def api_client(self):
return AirtimeApiClient(config_path=self.config_paths['api_client'])
@LazyProperty
def mm_config(self):
return MMConfig(self.config_paths['media_monitor'])

View File

@ -0,0 +1,75 @@
import os
from media.saas.thread import InstanceThread, user, apc
from media.monitor.log import Loggable
from media.monitor.exceptions import CouldNotCreateIndexFile
from media.monitor.toucher import ToucherThread
from media.monitor.airtime import AirtimeNotifier, \
AirtimeMessageReceiver
from media.monitor.watchersyncer import WatchSyncer
from media.monitor.eventdrainer import EventDrainer
from media.monitor.manager import Manager
class MM2(InstanceThread, Loggable):
def index_create(self, index_create_attempt=False):
config = user().mm_config
if not index_create_attempt:
if not os.path.exists(config['index_path']):
self.logger.info("Attempting to create index file:...")
try:
with open(config['index_path'], 'w') as f: f.write(" ")
except Exception as e:
self.logger.info("Failed to create index file with exception: %s" \
% str(e))
else:
self.logger.info("Created index file, reloading configuration:")
self.index_create(index_create_attempt=True)
else:
self.logger.info("Already tried to create index. Will not try again ")
if not os.path.exists(config['index_path']):
raise CouldNotCreateIndexFile(config['index_path'])
def run(self):
self.index_create()
manager = Manager()
apiclient = apc()
config = user().mm_config
watch_syncer = WatchSyncer(signal='watch',
chunking_number=config['chunking_number'],
timeout=config['request_max_wait'])
airtime_receiver = AirtimeMessageReceiver(config,manager)
airtime_notifier = AirtimeNotifier(config, airtime_receiver)
store = apiclient.setup_media_monitor()
self.logger.info(
"Initing with the following airtime response:%s" % str(store))
airtime_receiver.change_storage({ 'directory':store[u'stor'] })
for watch_dir in store[u'watched_dirs']:
if not os.path.exists(watch_dir):
# Create the watch_directory here
try: os.makedirs(watch_dir)
except Exception:
self.logger.error("Could not create watch directory: '%s' \
(given from the database)." % watch_dir)
if os.path.exists(watch_dir):
airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True)
else: self.logger.info("Failed to add watch on %s" % str(watch_dir))
ed = EventDrainer(airtime_notifier.connection,
interval=float(config['rmq_event_wait']))
# Launch the toucher that updates the last time when the script was
# ran every n seconds.
# TODO : verify that this does not interfere with bootstrapping because the
# toucher thread might update the last_ran variable too fast
tt = ToucherThread(path=config['index_path'],
interval=int(config['touch_interval']))
apiclient.register_component('media-monitor')
manager.loop()

View File

@ -0,0 +1,25 @@
import threading
class UserlessThread(Exception):
def __str__():
return "Current thread: %s is not an instance of InstanceThread \
of InstanceInheritingThread" % str(threading.current_thread())
class HasUser(object):
def user(self): return self._user
class InstanceThread(threading.Thread, HasUser):
def __init__(self,user, *args, **kwargs):
super(InstanceThread, self).__init__(*args, **kwargs)
self._user = user
class InstanceInheritingThread(threading.Thread, HasUser):
def __init__(self, *args, **kwargs):
self._user = threading.current_thread().user()
super(InstanceInheritingThread, self).__init__(*args, **kwargs)
def user():
try: return threading.current_thread().user()
except AttributeError: raise UserlessThread()
def apc(): return user().api_client

View File

@ -5,11 +5,9 @@ import logging
import logging.config
from media.monitor.manager import Manager
from media.monitor.bootstrap import Bootstrapper
from media.monitor.log import get_logger, setup_logging
from media.monitor.config import MMConfig
from media.monitor.toucher import ToucherThread
from media.monitor.syncdb import AirtimeDB
from media.monitor.exceptions import FailedToObtainLocale, \
FailedToSetLocale, \
NoConfigFile
@ -17,65 +15,28 @@ from media.monitor.airtime import AirtimeNotifier, \
AirtimeMessageReceiver
from media.monitor.watchersyncer import WatchSyncer
from media.monitor.eventdrainer import EventDrainer
from media.update.replaygainupdater import ReplayGainUpdater
from std_err_override import LogWriter
from media.saas.launcher import MM2
from media.saas.airtimeinstance import AirtimeInstance
import media.monitor.pure as mmp
from api_clients import api_client as apc
def main(global_config, api_client_config, log_config,
index_create_attempt=False):
for cfg in [global_config, api_client_config]:
if not os.path.exists(cfg): raise NoConfigFile(cfg)
# MMConfig is a proxy around ConfigObj instances. it does not allow
# itself users of MMConfig instances to modify any config options
# directly through the dictionary. Users of this object muse use the
# correct methods designated for modification
try: config = MMConfig(global_config)
except NoConfigFile as e:
print("Cannot run mediamonitor2 without configuration file.")
print("Current config path: '%s'" % global_config)
sys.exit(1)
except Exception as e:
print("Unknown error reading configuration file: '%s'" % global_config)
print(str(e))
def setup_logger(log_config, logpath):
logging.config.fileConfig(log_config)
#need to wait for Python 2.7 for this..
#logging.captureWarnings(True)
logger = logging.getLogger()
LogWriter.override_std_err(logger)
logfile = unicode( config['logpath'] )
logfile = unicode(logpath)
setup_logging(logfile)
log = get_logger()
return log
if not index_create_attempt:
if not os.path.exists(config['index_path']):
log.info("Attempting to create index file:...")
try:
with open(config['index_path'], 'w') as f: f.write(" ")
except Exception as e:
log.info("Failed to create index file with exception: %s" \
% str(e))
else:
log.info("Created index file, reloading configuration:")
main( global_config, api_client_config, log_config,
index_create_attempt=True )
else:
log.info("Already tried to create index. Will not try again ")
if not os.path.exists(config['index_path']):
log.info("Index file does not exist. Terminating")
def setup_global(log):
""" setup unicode and other stuff """
log.info("Attempting to set the locale...")
try:
mmp.configure_locale(mmp.get_system_locale())
try: mmp.configure_locale(mmp.get_system_locale())
except FailedToSetLocale as e:
log.info("Failed to set the locale...")
sys.exit(1)
@ -88,54 +49,20 @@ def main(global_config, api_client_config, log_config,
Logging exception.")
log.info(str(e))
watch_syncer = WatchSyncer(signal='watch',
chunking_number=config['chunking_number'],
timeout=config['request_max_wait'])
apiclient = apc.AirtimeApiClient.create_right_config(log=log,
def main(global_config, api_client_config, log_config):
cfg = {
'api_client' : api_client_config,
'media_monitor' : global_config,
'logging' : log_config,
}
ai = AirtimeInstance('hosted_install', '/', cfg)
log = setup_logger( log_config, ai.mm_config['logpath'] )
setup_global(log)
apc.AirtimeApiClient.create_right_config(log=log,
config_path=api_client_config)
ReplayGainUpdater.start_reply_gain(apiclient)
sdb = AirtimeDB(apiclient)
manager = Manager()
airtime_receiver = AirtimeMessageReceiver(config,manager)
airtime_notifier = AirtimeNotifier(config, airtime_receiver)
store = apiclient.setup_media_monitor()
log.info("Initing with the following airtime response:%s" % str(store))
airtime_receiver.change_storage({ 'directory':store[u'stor'] })
for watch_dir in store[u'watched_dirs']:
if not os.path.exists(watch_dir):
# Create the watch_directory here
try: os.makedirs(watch_dir)
except Exception as e:
log.error("Could not create watch directory: '%s' \
(given from the database)." % watch_dir)
if os.path.exists(watch_dir):
airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True)
else: log.info("Failed to add watch on %s" % str(watch_dir))
bs = Bootstrapper( db=sdb, watch_signal='watch' )
ed = EventDrainer(airtime_notifier.connection,
interval=float(config['rmq_event_wait']))
# Launch the toucher that updates the last time when the script was
# ran every n seconds.
# TODO : verify that this does not interfere with bootstrapping because the
# toucher thread might update the last_ran variable too fast
tt = ToucherThread(path=config['index_path'],
interval=int(config['touch_interval']))
apiclient.register_component('media-monitor')
return manager.loop()
apc.AirtimeApiClient(api_client_config)
mm = MM2(ai)
mm.start()
__doc__ = """
Usage:
@ -148,9 +75,6 @@ Options:
--log=<path> log config at <path>
"""
def main_loop():
while True: pass
if __name__ == '__main__':
from docopt import docopt
args = docopt(__doc__,version="mm1.99")
@ -160,5 +84,4 @@ if __name__ == '__main__':
sys.exit(0)
print("Running mm1.99")
main(args['--config'],args['--apiclient'],args['--log'])
#gevent.joinall([ gevent.spawn(main_loop) ])

View File

@ -0,0 +1,21 @@
import unittest
from copy import deepcopy
from media.saas.airtimeinstance import AirtimeInstance, NoConfigFile
class TestAirtimeInstance(unittest.TestCase):
def setUp(self):
self.cfg = {
'api_client' : 'tests/test_instance.py',
'media_monitor' : 'tests/test_instance.py',
'logging' : 'tests/test_instance.py',
}
def test_init_good(self):
AirtimeInstance("/root", self.cfg)
self.assertTrue(True)
def test_init_bad(self):
cfg = deepcopy(self.cfg)
cfg['api_client'] = 'bs'
with self.assertRaises(NoConfigFile):
AirtimeInstance("/root", cfg)

View File

@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
import unittest
import time
from media.saas.thread import InstanceThread, InstanceInheritingThread
# ugly but necessary for 2.7
signal = False
signal2 = False
class TestInstanceThread(unittest.TestCase):
def test_user_inject(self):
global signal
signal = False
u = "rudi"
class T(InstanceThread):
def run(me):
global signal
super(T, me).run()
signal = True
self.assertEquals(u, me.user())
t = T(u, name="test_user_inject")
t.daemon = True
t.start()
time.sleep(0.2)
self.assertTrue(signal)
def test_inheriting_thread(utest):
global signal2
u = "testing..."
class TT(InstanceInheritingThread):
def run(self):
global signal2
utest.assertEquals(self.user(), u)
signal2 = True
class T(InstanceThread):
def run(self):
super(T, self).run()
child_thread = TT(name="child thread")
child_thread.daemon = True
child_thread.start()
parent_thread = T(u, name="Parent instance thread")
parent_thread.daemon = True
parent_thread.start()
time.sleep(0.2)
utest.assertTrue(signal2)
def test_different_user(utest):
u1, u2 = "ru", "di"
class T(InstanceThread):
def run(self):
super(T, self).run()
for u in [u1, u2]:
t = T(u)
t.daemon = True
t.start()
utest.assertEquals(t.user(), u)
if __name__ == '__main__': unittest.main()

View File

View File

@ -3,12 +3,11 @@ from threading import Thread
import traceback
import os
import time
import logging
from media.update import replaygain
from media.monitor.log import Loggable
class ReplayGainUpdater(Thread, Loggable):
class ReplayGainUpdater(Thread):
"""
The purpose of the class is to query the server for a list of files which
do not have a ReplayGain value calculated. This class will iterate over the
@ -30,6 +29,7 @@ class ReplayGainUpdater(Thread, Loggable):
def __init__(self,apc):
Thread.__init__(self)
self.api_client = apc
self.logger = logging.getLogger()
def main(self):
raw_response = self.api_client.list_all_watched_dirs()

View File

@ -24,6 +24,8 @@ from recorder import Recorder
from listenerstat import ListenerStat
from pypomessagehandler import PypoMessageHandler
from media.update.replaygainupdater import ReplayGainUpdater
from configobj import ConfigObj
# custom imports
@ -174,6 +176,9 @@ if __name__ == '__main__':
sys.exit()
api_client = api_client.AirtimeApiClient()
ReplayGainUpdater.start_reply_gain(api_client)
api_client.register_component("pypo")
pypoFetch_q = Queue()