Merge branch 'devel' of dev.sourcefabric.org:airtime into devel

This commit is contained in:
James 2012-11-20 15:05:20 -05:00
commit d69faef560
40 changed files with 969 additions and 572 deletions

View File

@ -1116,7 +1116,6 @@ class Application_Model_Preference
} else {
/*For now we just have this hack for debugging. We should not
rely on this crappy behaviour in case of failure*/
Logging::info("Pref: $pref_param");
Logging::warn("Index $x does not exist preferences");
Logging::warn("Defaulting to identity and printing preferences");
Logging::warn($ds);

View File

@ -321,7 +321,7 @@ SQL;
ws.description AS file_album_title,
ws.length AS file_length,
't'::BOOL AS file_exists,
NULL as file_mime
ws.mime as file_mime
SQL;
$streamJoin = <<<SQL
cc_schedule AS sched

View File

@ -402,8 +402,9 @@ function setupUI() {
$(".repeat_tracks_help_icon").qtip({
content: {
text: "If your criteria is too strict, Airtime may not be able to fill up the desired smart block length." +
" Hence, if you check this option, tracks will be used more than once."
text: "The desired block length will not be reached if Airtime cannot find " +
"enough unique tracks to match your criteria. Enable this option if you wish to allow " +
"tracks to be added multiple times to the smart block."
},
hide: {
delay: 500,

View File

@ -25,7 +25,8 @@ echo "----------------------------------------------------"
dist=`lsb_release -is`
code=`lsb_release -cs`
if [ "$dist" = "Debian" ]; then
#enable squeeze backports to get lame packages
if [ "$dist" = "Debian" -a "$code" = "squeeze" ]; then
set +e
grep -E "deb http://backports.debian.org/debian-backports squeeze-backports main" /etc/apt/sources.list
returncode=$?

View File

@ -28,7 +28,8 @@ echo "----------------------------------------------------"
dist=`lsb_release -is`
code=`lsb_release -cs`
if [ "$dist" -eq "Debian" ]; then
#enable squeeze backports to get lame packages
if [ "$dist" = "Debian" -a "$code" = "squeeze" ]; then
grep "deb http://backports.debian.org/debian-backports squeeze-backports main" /etc/apt/sources.list
if [ "$?" -ne "0" ]; then
echo "deb http://backports.debian.org/debian-backports squeeze-backports main" >> /etc/apt/sources.list

View File

@ -127,24 +127,6 @@ class RequestProvider(object):
class AirtimeApiClient(object):
# This is a little hacky fix so that I don't have to pass the config object
# everywhere where AirtimeApiClient needs to be initialized
default_config = None
# the purpose of this custom constructor is to remember which config file
# it was called with. So that after the initial call:
# AirtimeApiClient.create_right_config('/path/to/config')
# All subsequence calls to create_right_config will be with that config
# file
@staticmethod
def create_right_config(log=None,config_path=None):
if config_path: AirtimeApiClient.default_config = config_path
elif (not AirtimeApiClient.default_config):
raise ValueError("Cannot slip config_path attribute when it has \
never been passed yet")
return AirtimeApiClient( logger=None,
config_path=AirtimeApiClient.default_config )
def __init__(self, logger=None,config_path='/etc/airtime/api_client.cfg'):
if logger is None: self.logger = logging
else: self.logger = logger
@ -399,3 +381,4 @@ class AirtimeApiClient(object):
def push_stream_stats(self, data):
# TODO : users of this method should do their own error handling
response = self.services.push_stream_stats(_post_data={'data': json.dumps(data)})
return response

View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
import re
from media.saas.launcher import setup_logger, setup_global, MM2
from media.saas.airtimeinstance import AirtimeInstance
from os.path import isdir, join, abspath, exists
from os import listdir
def list_dirs(d): return (x for x in listdir(d) if isdir(join(d,x)))
def filter_instance(d): return bool(re.match('.+\d+$',d))
def get_name(p): return re.match('.+/(\d+)$',p).group(1)
def filter_instances(l): return (x for x in l if filter_instance(x))
def autoscan_instances(main_cfg):
root = main_cfg['instance_root']
instances = []
for instance_machine in list_dirs(root):
instance_machine = join(root, instance_machine)
for instance_root in filter_instances(list_dirs(instance_machine)):
full_path = abspath(join(instance_machine,instance_root))
ai = AirtimeInstance.root_make(get_name(full_path), full_path)
instances.append(ai)
return instances
def verify_exists(p):
if not exists(p): raise Exception("%s must exist" % p)
def main(main_cfg):
log_config, log_path = main_cfg['log_config'], main_cfg['log_path']
verify_exists(log_config)
log = setup_logger(log_config, log_path)
setup_global(log)
for instance in autoscan_instances(main_cfg):
print("Launching instance: %s" % str(instance))
MM2(instance).start()
print("Launched all instances")
if __name__ == '__main__':
root = '/home/rudi/reps/Airtime/python_apps/media-monitor2'
default = {
'log_path' : join(root, 'test.log'), # config for log
'log_config' : join(root, 'configs/logging.cfg'), # where to log
# root dir of all instances
'instance_root' : join(root, 'saas_stub')
}
main(default)

View File

@ -0,0 +1,32 @@
[database]
host = localhost
dbname = airtime
dbuser = airtime
dbpass = airtime
[rabbitmq]
host = 127.0.0.1
port = 5672
user = guest
password = guest
vhost = /
[general]
api_key = I6EUOJM0D1EIGSMZ9T70
web_server_user = www-data
airtime_dir = /usr/share/airtime
base_url = localhost
base_port = 80
base_dir = ''
;How many hours ahead of time should Airtime playout engine (PYPO)
;cache scheduled media files.
cache_ahead_hours = 1
[monit]
monit_user = guest
monit_password = airtime
[soundcloud]
connection_retries = 3
time_between_retries = 60

View File

@ -0,0 +1,126 @@
bin_dir = "/usr/lib/airtime/api_clients"
#############################
## Common
#############################
# Value needed to access the API
api_key = 'I6EUOJM0D1EIGSMZ9T70'
# Path to the base of the API
api_base = 'api'
# URL to get the version number of the server API
version_url = 'version/api_key/%%api_key%%'
#URL to register a components IP Address with the central web server
register_component = 'register-component/format/json/api_key/%%api_key%%/component/%%component%%'
# Hostname
host = 'localhost'
base_port = 80
base_dir = ''
#############################
## Config for Media Monitor
#############################
# URL to setup the media monitor
media_setup_url = 'media-monitor-setup/format/json/api_key/%%api_key%%'
# Tell Airtime the file id associated with a show instance.
upload_recorded = 'upload-recorded/format/json/api_key/%%api_key%%/fileid/%%fileid%%/showinstanceid/%%showinstanceid%%'
# URL to tell Airtime to update file's meta data
update_media_url = 'reload-metadata/format/json/api_key/%%api_key%%/mode/%%mode%%'
# URL to tell Airtime we want a listing of all files it knows about
list_all_db_files = 'list-all-files/format/json/api_key/%%api_key%%/dir_id/%%dir_id%%/all/%%all%%'
# URL to tell Airtime we want a listing of all dirs its watching (including the stor dir)
list_all_watched_dirs = 'list-all-watched-dirs/format/json/api_key/%%api_key%%'
# URL to tell Airtime we want to add watched directory
add_watched_dir = 'add-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%'
# URL to tell Airtime we want to add watched directory
remove_watched_dir = 'remove-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%'
# URL to tell Airtime we want to add watched directory
set_storage_dir = 'set-storage-dir/format/json/api_key/%%api_key%%/path/%%path%%'
# URL to tell Airtime about file system mount change
update_fs_mount = 'update-file-system-mount/format/json/api_key/%%api_key%%'
# URL to commit multiple updates from media monitor at the same time
reload_metadata_group = 'reload-metadata-group/format/json/api_key/%%api_key%%'
# URL to tell Airtime about file system mount change
handle_watched_dir_missing = 'handle-watched-dir-missing/format/json/api_key/%%api_key%%/dir/%%dir%%'
#############################
## Config for Recorder
#############################
# URL to get the schedule of shows set to record
show_schedule_url = 'recorded-shows/format/json/api_key/%%api_key%%'
# URL to upload the recorded show's file to Airtime
upload_file_url = 'upload-file/format/json/api_key/%%api_key%%'
# URL to commit multiple updates from media monitor at the same time
#number of retries to upload file if connection problem
upload_retries = 3
#time to wait between attempts to upload file if connection problem (in seconds)
upload_wait = 60
################################################################################
# Uncomment *one of the sets* of values from the API clients below, and comment
# out all the others.
################################################################################
#############################
## Config for Pypo
#############################
# Schedule export path.
# %%from%% - starting date/time in the form YYYY-MM-DD-hh-mm
# %%to%% - starting date/time in the form YYYY-MM-DD-hh-mm
export_url = 'schedule/api_key/%%api_key%%'
get_media_url = 'get-media/file/%%file%%/api_key/%%api_key%%'
# Update whether a schedule group has begun playing.
update_item_url = 'notify-schedule-group-play/api_key/%%api_key%%/schedule_id/%%schedule_id%%'
# Update whether an audio clip is currently playing.
update_start_playing_url = 'notify-media-item-start-play/api_key/%%api_key%%/media_id/%%media_id%%/'
# URL to tell Airtime we want to get stream setting
get_stream_setting = 'get-stream-setting/format/json/api_key/%%api_key%%/'
#URL to update liquidsoap status
update_liquidsoap_status = 'update-liquidsoap-status/format/json/api_key/%%api_key%%/msg/%%msg%%/stream_id/%%stream_id%%/boot_time/%%boot_time%%'
#URL to check live stream auth
check_live_stream_auth = 'check-live-stream-auth/format/json/api_key/%%api_key%%/username/%%username%%/password/%%password%%/djtype/%%djtype%%'
#URL to update source status
update_source_status = 'update-source-status/format/json/api_key/%%api_key%%/sourcename/%%sourcename%%/status/%%status%%'
get_bootstrap_info = 'get-bootstrap-info/format/json/api_key/%%api_key%%'
get_files_without_replay_gain = 'get-files-without-replay-gain/api_key/%%api_key%%/dir_id/%%dir_id%%'
update_replay_gain_value = 'update-replay-gain-value/api_key/%%api_key%%'
notify_webstream_data = 'notify-webstream-data/api_key/%%api_key%%/media_id/%%media_id%%/format/json'
notify_liquidsoap_started = 'rabbitmq-do-push/api_key/%%api_key%%/format/json'
get_stream_parameters = 'get-stream-parameters/api_key/%%api_key%%/format/json'
push_stream_stats = 'push-stream-stats/api_key/%%api_key%%/format/json'

View File

@ -0,0 +1,32 @@
[loggers]
keys= root,notifier,metadata
[handlers]
keys=fileOutHandler
[formatters]
keys=simpleFormatter
[logger_root]
level=DEBUG
handlers=fileOutHandler
[logger_notifier]
level=DEBUG
handlers=fileOutHandler
qualname=notifier
[logger_metadata]
level=DEBUG
handlers=fileOutHandler
qualname=metadata
[handler_fileOutHandler]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=simpleFormatter
args=("/var/log/airtime/media-monitor/media-monitor.log", 'a', 10000000, 5,)
[formatter_simpleFormatter]
format=%(asctime)s %(levelname)s - [%(threadName)s] [%(filename)s : %(funcName)s()] : LINE %(lineno)d - %(message)s
datefmt=

View File

@ -0,0 +1,31 @@
api_client = "airtime"
# where the binary files live
bin_dir = '/usr/lib/airtime/media-monitor'
# where the logging files live
log_dir = '/var/log/airtime/media-monitor'
############################################
# RabbitMQ settings #
############################################
rabbitmq_host = 'localhost'
rabbitmq_user = 'guest'
rabbitmq_password = 'guest'
rabbitmq_vhost = '/'
############################################
# Media-Monitor preferences #
############################################
check_filesystem_events = 5 #how long to queue up events performed on the files themselves.
check_airtime_events = 30 #how long to queue metadata input from airtime.
# MM2 only:
touch_interval = 5
chunking_number = 450
request_max_wait = 3.0
rmq_event_wait = 0.1
logpath = '/var/log/airtime/media-monitor/media-monitor.log'
index_path = '/var/tmp/airtime/media-monitor/last_index'

View File

@ -13,9 +13,8 @@ from media.monitor.log import Loggable
from media.monitor.syncdb import AirtimeDB
from media.monitor.exceptions import DirectoryIsNotListed
from media.monitor.bootstrap import Bootstrapper
from media.monitor.listeners import FileMediator
from api_clients import api_client as apc
from media.saas.thread import apc, user
class AirtimeNotifier(Loggable):
"""
@ -98,7 +97,7 @@ class AirtimeMessageReceiver(Loggable):
if (not directory_id) and (not directory):
raise ValueError("You must provide either directory_id or \
directory")
sdb = AirtimeDB(apc.AirtimeApiClient.create_right_config())
sdb = AirtimeDB(apc())
if directory : directory = os.path.normpath(directory)
if directory_id == None : directory_id = sdb.to_id(directory)
if directory == None : directory = sdb.to_directory(directory_id)
@ -192,7 +191,7 @@ class AirtimeMessageReceiver(Loggable):
# request that we'd normally get form pyinotify. But right
# now event contractor would take care of this sort of
# thing anyway so this might not be necessary after all
FileMediator.ignore(msg['filepath'])
user().file_mediator.ignore(msg['filepath'])
os.unlink(msg['filepath'])
# Verify deletion:
if not os.path.exists(msg['filepath']):

View File

@ -2,6 +2,7 @@ import os
from pydispatch import dispatcher
from media.monitor.events import NewFile, DeleteFile, ModifyFile
from media.monitor.log import Loggable
from media.saas.thread import getsig
import media.monitor.pure as mmp
class Bootstrapper(Loggable):
@ -16,7 +17,7 @@ class Bootstrapper(Loggable):
watch_signal - the signals should send events for every file on.
"""
self.db = db
self.watch_signal = watch_signal
self.watch_signal = getsig(watch_signal)
def flush_all(self, last_ran):
"""

View File

@ -12,25 +12,21 @@ class MMConfig(object):
self.cfg = ConfigObj(path)
def __getitem__(self, key):
"""
We always return a copy of the config item to prevent callers from
doing any modifications through the returned objects methods
"""
""" We always return a copy of the config item to prevent
callers from doing any modifications through the returned
objects methods """
return copy.deepcopy(self.cfg[key])
def __setitem__(self, key, value):
"""
We use this method not to allow anybody to mess around with config file
any settings made should be done through MMConfig's instance methods
"""
""" We use this method not to allow anybody to mess around with
config file any settings made should be done through MMConfig's
instance methods """
raise ConfigAccessViolation(key)
def save(self): self.cfg.write()
def last_ran(self):
"""
Returns the last time media monitor was ran by looking at the time when
the file at 'index_path' was modified
"""
""" Returns the last time media monitor was ran by looking at
the time when the file at 'index_path' was modified """
return mmp.last_modified(self.cfg['index_path'])

View File

@ -3,11 +3,11 @@ import os
import abc
import re
import media.monitor.pure as mmp
import media.monitor.owners as owners
from media.monitor.pure import LazyProperty
from media.monitor.metadata import Metadata
from media.monitor.log import Loggable
from media.monitor.exceptions import BadSongFile
from media.saas.thread import getsig, user
class PathChannel(object):
"""
@ -15,34 +15,26 @@ class PathChannel(object):
used as a named tuple
"""
def __init__(self, signal, path):
self.signal = signal
self.signal = getsig(signal)
self.path = path
# TODO : Move this to it's file. Also possible unsingleton and use it as a
# simple module just like m.m.owners
class EventRegistry(object):
"""
This class's main use is to keep track all events with a cookie attribute.
This is done mainly because some events must be 'morphed' into other events
because we later detect that they are move events instead of delete events.
"""
registry = {}
@staticmethod
def register(evt): EventRegistry.registry[evt.cookie] = evt
@staticmethod
def unregister(evt): del EventRegistry.registry[evt.cookie]
@staticmethod
def registered(evt): return evt.cookie in EventRegistry.registry
@staticmethod
def matching(evt):
event = EventRegistry.registry[evt.cookie]
""" This class's main use is to keep track all events with a cookie
attribute. This is done mainly because some events must be 'morphed'
into other events because we later detect that they are move events
instead of delete events. """
def __init__(self):
self.registry = {}
def register(self,evt): self.registry[evt.cookie] = evt
def unregister(self,evt): del self.registry[evt.cookie]
def registered(self,evt): return evt.cookie in self.registry
def matching(self,evt):
event = self.registry[evt.cookie]
# Want to disallow accessing the same event twice
EventRegistry.unregister(event)
self.unregister(event)
return event
def __init__(self,*args,**kwargs):
raise Exception("You can instantiate this class. Must only use class \
methods")
class EventProxy(Loggable):
"""
@ -101,7 +93,7 @@ class BaseEvent(Loggable):
self._raw_event = raw_event
self.path = os.path.normpath(raw_event.pathname)
else: self.path = raw_event
self.owner = owners.get_owner(self.path)
self.owner = user().owner.get_owner(self.path)
owner_re = re.search('stor/imported/(?P<owner>\d+)/', self.path)
if owner_re:
self.logger.info("matched path: %s" % self.path)
@ -152,7 +144,7 @@ class BaseEvent(Loggable):
ret = self.pack()
# Remove owner of this file only after packing. Otherwise packing
# will not serialize the owner correctly into the airtime request
owners.remove_file_owner(self.path)
user().owner.remove_file_owner(self.path)
return ret
except BadSongFile as e: return [e]
except Exception as e:

View File

@ -23,7 +23,7 @@ class FailedToObtainLocale(Exception):
class CouldNotCreateIndexFile(Exception):
"""exception whenever index file cannot be created"""
def __init__(self, path, cause):
def __init__(self, path, cause=None):
self.path = path
self.cause = cause
def __str__(self): return "Failed to create touch file '%s'" % self.path

View File

@ -3,6 +3,7 @@ from pydispatch import dispatcher
import abc
from media.monitor.log import Loggable
from media.saas.thread import getsig
import media.monitor.pure as mmp
# Defines the handle interface
@ -21,10 +22,10 @@ class ReportHandler(Handles):
"""
__metaclass__ = abc.ABCMeta
def __init__(self, signal, weak=False):
self.signal = signal
self.report_signal = "badfile"
self.signal = getsig(signal)
self.report_signal = getsig("badfile")
def dummy(sender, event): self.handle(sender,event)
dispatcher.connect(dummy, signal=signal, sender=dispatcher.Any,
dispatcher.connect(dummy, signal=self.signal, sender=dispatcher.Any,
weak=weak)
def report_problem_file(self, event, exception=None):
@ -38,7 +39,7 @@ class ProblemFileHandler(Handles, Loggable):
"""
def __init__(self, channel, **kwargs):
self.channel = channel
self.signal = self.channel.signal
self.signal = getsig(self.channel.signal)
self.problem_dir = self.channel.path
def dummy(sender, event, exception):
self.handle(sender, event, exception)

View File

@ -6,38 +6,31 @@ from functools import wraps
import media.monitor.pure as mmp
from media.monitor.pure import IncludeOnly
from media.monitor.events import OrganizeFile, NewFile, MoveFile, DeleteFile, \
DeleteDir, EventRegistry, MoveDir,\
DeleteDir, MoveDir,\
DeleteDirWatch
from media.monitor.log import Loggable, get_logger
from media.monitor.log import Loggable
from media.saas.thread import getsig, user
# Note: Because of the way classes that inherit from pyinotify.ProcessEvent
# interact with constructors. you should only instantiate objects from them
# using keyword arguments. For example:
# OrganizeListener('watch_signal') <= wrong
# OrganizeListener(signal='watch_signal') <= right
class FileMediator(object):
"""
FileMediator is used an intermediate mechanism that filters out certain
events.
"""
ignored_set = set([]) # for paths only
logger = get_logger()
@staticmethod
def is_ignored(path): return path in FileMediator.ignored_set
@staticmethod
def ignore(path): FileMediator.ignored_set.add(path)
@staticmethod
def unignore(path): FileMediator.ignored_set.remove(path)
class FileMediator(Loggable):
""" FileMediator is used an intermediate mechanism that filters out
certain events. """
def __init__(self) : self.ignored_set = set([]) # for paths only
def is_ignored(self,path) : return path in self.ignored_set
def ignore(self, path) : self.ignored_set.add(path)
def unignore(self, path) : self.ignored_set.remove(path)
def mediate_ignored(fn):
@wraps(fn)
def wrapped(self, event, *args,**kwargs):
event.pathname = unicode(event.pathname, "utf-8")
if FileMediator.is_ignored(event.pathname):
FileMediator.logger.info("Ignoring: '%s' (once)" % event.pathname)
FileMediator.unignore(event.pathname)
if user().file_mediator.is_ignored(event.pathname):
user().file_mediator.logger.info("Ignoring: '%s' (once)" % event.pathname)
user().file_mediator.unignore(event.pathname)
else: return fn(self, event, *args, **kwargs)
return wrapped
@ -45,7 +38,7 @@ class BaseListener(object):
def __str__(self):
return "Listener(%s), Signal(%s)" % \
(self.__class__.__name__, self. signal)
def my_init(self, signal): self.signal = signal
def my_init(self, signal): self.signal = getsig(signal)
class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable):
def process_IN_CLOSE_WRITE(self, event):
@ -66,25 +59,25 @@ class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable):
self.logger.info("Bootstrapping: File in 'organize' directory: \
'%s'" % f)
if not mmp.file_locked(f):
dispatcher.send(signal=self.signal, sender=self,
dispatcher.send(signal=getsig(self.signal), sender=self,
event=OrganizeFile(f))
flushed += 1
#self.logger.info("Flushed organized directory with %d files" % flushed)
@IncludeOnly(mmp.supported_extensions)
def process_to_organize(self, event):
dispatcher.send(signal=self.signal, sender=self,
dispatcher.send(signal=getsig(self.signal), sender=self,
event=OrganizeFile(event))
class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
def process_IN_CLOSE_WRITE(self, event):
self.process_create(event)
def process_IN_MOVED_TO(self, event):
if EventRegistry.registered(event):
if user().event_registry.registered(event):
# We need this trick because we don't how to "expand" dir events
# into file events until we know for sure if we deleted or moved
morph = MoveDir(event) if event.dir else MoveFile(event)
EventRegistry.matching(event).morph_into(morph)
user().event_registry.matching(event).morph_into(morph)
else: self.process_create(event)
def process_IN_MOVED_FROM(self, event):
# Is either delete dir or delete file
@ -92,7 +85,7 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
# evt can be none whenever event points that a file that would be
# ignored by @IncludeOnly
if hasattr(event,'cookie') and (evt != None):
EventRegistry.register(evt)
user().event_registry.register(evt)
def process_IN_DELETE(self,event): self.process_delete(event)
def process_IN_MOVE_SELF(self, event):
if '-unknown-path' in event.pathname:
@ -101,14 +94,14 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
def delete_watch_dir(self, event):
e = DeleteDirWatch(event)
dispatcher.send(signal='watch_move', sender=self, event=e)
dispatcher.send(signal=self.signal, sender=self, event=e)
dispatcher.send(signal=getsig('watch_move'), sender=self, event=e)
dispatcher.send(signal=getsig(self.signal), sender=self, event=e)
@mediate_ignored
@IncludeOnly(mmp.supported_extensions)
def process_create(self, event):
evt = NewFile(event)
dispatcher.send(signal=self.signal, sender=self, event=evt)
dispatcher.send(signal=getsig(self.signal), sender=self, event=evt)
return evt
@mediate_ignored
@ -117,13 +110,13 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
evt = None
if event.dir : evt = DeleteDir(event)
else : evt = DeleteFile(event)
dispatcher.send(signal=self.signal, sender=self, event=evt)
dispatcher.send(signal=getsig(self.signal), sender=self, event=evt)
return evt
@mediate_ignored
def process_delete_dir(self, event):
evt = DeleteDir(event)
dispatcher.send(signal=self.signal, sender=self, event=evt)
dispatcher.send(signal=getsig(self.signal), sender=self, event=evt)
return evt
def flush_events(self, path):
@ -138,6 +131,6 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
added = 0
for f in mmp.walk_supported(path, clean_empties=False):
added += 1
dispatcher.send( signal=self.signal, sender=self, event=NewFile(f) )
dispatcher.send( signal=getsig(self.signal), sender=self, event=NewFile(f) )
self.logger.info( "Flushed watch directory. added = %d" % added )

View File

@ -1,5 +1,4 @@
import pyinotify
import threading
import time
from pydispatch import dispatcher
@ -9,19 +8,19 @@ from media.monitor.log import Loggable
from media.monitor.listeners import StoreWatchListener, OrganizeListener
from media.monitor.handler import ProblemFileHandler
from media.monitor.organizer import Organizer
from media.saas.thread import InstanceInheritingThread, getsig
import media.monitor.pure as mmp
class ManagerTimeout(threading.Thread,Loggable):
"""
The purpose of this class is to flush the organize directory every 3
secnods. This used to be just a work around for cc-4235 but recently
became a permanent solution because it's "cheap" and reliable
"""
class ManagerTimeout(InstanceInheritingThread,Loggable):
""" The purpose of this class is to flush the organize directory
every 3 secnods. This used to be just a work around for cc-4235
but recently became a permanent solution because it's "cheap" and
reliable """
def __init__(self, manager, interval=1.5):
# TODO : interval should be read from config and passed here instead
# of just using the hard coded value
threading.Thread.__init__(self)
super(ManagerTimeout, self).__init__()
self.manager = manager
self.interval = interval
def run(self):
@ -30,19 +29,17 @@ class ManagerTimeout(threading.Thread,Loggable):
self.manager.flush_organize()
class Manager(Loggable):
"""
An abstraction over media monitors core pyinotify functions. These
include adding watched,store, organize directories, etc. Basically
composes over WatchManager from pyinotify
"""
# NOTE : this massive class is a source of many problems of mm and
# is in dire need of breaking up and refactoring.
""" An abstraction over media monitors core pyinotify functions.
These include adding watched,store, organize directories, etc.
Basically composes over WatchManager from pyinotify """
def __init__(self):
self.wm = pyinotify.WatchManager()
# These two instance variables are assumed to be constant
self.watch_channel = 'watch'
self.organize_channel = 'organize'
self.watch_channel = getsig('watch')
self.organize_channel = getsig('organize')
self.watch_listener = StoreWatchListener(signal = self.watch_channel)
# TODO : change this to a weak ref
# TODO : get rid of this hack once cc-4235 is fixed
self.__timeout_thread = ManagerTimeout(self)
self.__timeout_thread.daemon = True
self.__timeout_thread.start()
@ -57,11 +54,11 @@ class Manager(Loggable):
self.organize_channel),
}
def dummy(sender, event): self.watch_move( event.path, sender=sender )
dispatcher.connect(dummy, signal='watch_move', sender=dispatcher.Any,
weak=False)
dispatcher.connect(dummy, signal=getsig('watch_move'),
sender=dispatcher.Any, weak=False)
def subwatch_add(sender, directory):
self.__add_watch(directory, self.watch_listener)
dispatcher.connect(subwatch_add, signal='add_subwatch',
dispatcher.connect(subwatch_add, signal=getsig('add_subwatch'),
sender=dispatcher.Any, weak=False)
# A private mapping path => watch_descriptor
# we use the same dictionary for organize, watch, store wd events.
@ -76,23 +73,19 @@ class Manager(Loggable):
# through dedicated handler objects. Because we must have access to a
# manager instance. Hence we must slightly break encapsulation.
def watch_move(self, watch_dir, sender=None):
"""
handle 'watch move' events directly sent from listener
"""
""" handle 'watch move' events directly sent from listener """
self.logger.info("Watch dir '%s' has been renamed (hence removed)" %
watch_dir)
self.remove_watch_directory(normpath(watch_dir))
def watch_signal(self):
"""
Return the signal string our watch_listener is reading events from
"""
return self.watch_listener.signal
""" Return the signal string our watch_listener is reading
events from """
return getsig(self.watch_listener.signal)
def __remove_watch(self,path):
"""
Remove path from being watched (first will check if 'path' is watched)
"""
""" Remove path from being watched (first will check if 'path'
is watched) """
# only delete if dir is actually being watched
if path in self.__wd_path:
wd = self.__wd_path[path]
@ -100,10 +93,8 @@ class Manager(Loggable):
del(self.__wd_path[path])
def __add_watch(self,path,listener):
"""
Start watching 'path' using 'listener'. First will check if directory
is being watched before adding another watch
"""
""" Start watching 'path' using 'listener'. First will check if
directory is being watched before adding another watch """
self.logger.info("Attempting to add listener to path '%s'" % path)
self.logger.info( 'Listener: %s' % str(listener) )
@ -114,9 +105,8 @@ class Manager(Loggable):
if wd: self.__wd_path[path] = wd.values()[0]
def __create_organizer(self, target_path, recorded_path):
"""
creates an organizer at new destination path or modifies the old one
"""
""" creates an organizer at new destination path or modifies the
old one """
# TODO : find a proper fix for the following hack
# We avoid creating new instances of organize because of the way
# it interacts with pydispatch. We must be careful to never have
@ -134,23 +124,18 @@ class Manager(Loggable):
recorded_path=recorded_path)
def get_problem_files_path(self):
"""
returns the path where problem files should go
"""
""" returns the path where problem files should go """
return self.organize['problem_files_path']
def set_problem_files_path(self, new_path):
"""
Set the path where problem files should go
"""
""" Set the path where problem files should go """
self.organize['problem_files_path'] = new_path
self.organize['problem_handler'] = \
ProblemFileHandler( PathChannel(signal='badfile',path=new_path) )
ProblemFileHandler( PathChannel(signal=getsig('badfile'),
path=new_path) )
def get_recorded_path(self):
"""
returns the path of the recorded directory
"""
""" returns the path of the recorded directory """
return self.organize['recorded_path']
def set_recorded_path(self, new_path):
@ -160,17 +145,14 @@ class Manager(Loggable):
self.__add_watch(new_path, self.watch_listener)
def get_organize_path(self):
"""
returns the current path that is being watched for organization
"""
""" returns the current path that is being watched for
organization """
return self.organize['organize_path']
def set_organize_path(self, new_path):
"""
sets the organize path to be new_path. Under the current scheme there
is only one organize path but there is no reason why more cannot be
supported
"""
""" sets the organize path to be new_path. Under the current
scheme there is only one organize path but there is no reason
why more cannot be supported """
# if we are already organizing a particular directory we remove the
# watch from it first before organizing another directory
self.__remove_watch(self.organize['organize_path'])
@ -188,19 +170,15 @@ class Manager(Loggable):
return self.organize['imported_path']
def set_imported_path(self,new_path):
"""
set the directory where organized files go to.
"""
""" set the directory where organized files go to. """
self.__remove_watch(self.organize['imported_path'])
self.organize['imported_path'] = new_path
self.__create_organizer( new_path, self.organize['recorded_path'])
self.__add_watch(new_path, self.watch_listener)
def change_storage_root(self, store):
"""
hooks up all the directories for you. Problem, recorded, imported,
organize.
"""
""" hooks up all the directories for you. Problem, recorded,
imported, organize. """
store_paths = mmp.expand_storage(store)
# First attempt to make sure that all paths exist before adding any
# watches
@ -217,18 +195,14 @@ class Manager(Loggable):
mmp.create_dir(p)
def has_watch(self, path):
"""
returns true if the path is being watched or not. Any kind of watch:
organize, store, watched.
"""
""" returns true if the path is being watched or not. Any kind
of watch: organize, store, watched. """
return path in self.__wd_path
def add_watch_directory(self, new_dir):
"""
adds a directory to be "watched". "watched" directories are
""" adds a directory to be "watched". "watched" directories are
those that are being monitored by media monitor for airtime in
this context and not directories pyinotify calls watched
"""
this context and not directories pyinotify calls watched """
if self.has_watch(new_dir):
self.logger.info("Cannot add '%s' to watched directories. It's \
already being watched" % new_dir)
@ -237,9 +211,8 @@ class Manager(Loggable):
self.__add_watch(new_dir, self.watch_listener)
def remove_watch_directory(self, watch_dir):
"""
removes a directory from being "watched". Undoes add_watch_directory
"""
""" removes a directory from being "watched". Undoes
add_watch_directory """
if self.has_watch(watch_dir):
self.logger.info("Removing watched directory: '%s'", watch_dir)
self.__remove_watch(watch_dir)
@ -250,9 +223,7 @@ class Manager(Loggable):
self.logger.info( self.__wd_path )
def loop(self):
"""
block until we receive pyinotify events
"""
""" block until we receive pyinotify events """
notifier = pyinotify.Notifier(self.wm)
notifier.coalesce_events()
notifier.loop()

View File

@ -1,12 +1,12 @@
# -*- coding: utf-8 -*-
import media.monitor.pure as mmp
import media.monitor.owners as owners
from media.monitor.handler import ReportHandler
from media.monitor.log import Loggable
from media.monitor.exceptions import BadSongFile
from media.monitor.events import OrganizeFile
from pydispatch import dispatcher
from os.path import dirname
from media.saas.thread import getsig, user
import os.path
class Organizer(ReportHandler,Loggable):
@ -36,7 +36,7 @@ class Organizer(ReportHandler,Loggable):
self.channel = channel
self.target_path = target_path
self.recorded_path = recorded_path
super(Organizer, self).__init__(signal=self.channel, weak=False)
super(Organizer, self).__init__(signal=getsig(self.channel), weak=False)
def handle(self, sender, event):
""" Intercept events where a new file has been added to the
@ -63,7 +63,7 @@ class Organizer(ReportHandler,Loggable):
def new_dir_watch(d):
# TODO : rewrite as return lambda : dispatcher.send(...
def cb():
dispatcher.send(signal="add_subwatch", sender=self,
dispatcher.send(signal=getsig("add_subwatch"), sender=self,
directory=d)
return cb
@ -74,7 +74,7 @@ class Organizer(ReportHandler,Loggable):
# backwards way is bewcause we are unable to encode the owner id
# into the file itself so that the StoreWatchListener listener can
# detect it from the file
owners.add_file_owner(new_path, owner_id )
user().owner.add_file_owner(new_path, owner_id )
self.logger.info('Organized: "%s" into "%s"' %
(event.path, new_path))

View File

@ -1,44 +1,40 @@
# -*- coding: utf-8 -*-
from media.monitor.log import get_logger
log = get_logger()
# hash: 'filepath' => owner_id
owners = {}
from media.monitor.log import Loggable
def reset_owners():
""" Wipes out all file => owner associations """
global owners
owners = {}
class Owner(Loggable):
def __init__(self):
# hash: 'filepath' => owner_id
self.owners = {}
def get_owner(self,f):
""" Get the owner id of the file 'f' """
o = self.owners[f] if f in self.owners else -1
self.logger.info("Received owner for %s. Owner: %s" % (f, o))
return o
def get_owner(f):
""" Get the owner id of the file 'f' """
o = owners[f] if f in owners else -1
log.info("Received owner for %s. Owner: %s" % (f, o))
return o
def add_file_owner(f,owner):
""" Associate file f with owner. If owner is -1 then do we will not record
it because -1 means there is no owner. Returns True if f is being stored
after the function. False otherwise. """
if owner == -1: return False
if f in owners:
if owner != owners[f]: # check for fishiness
log.info("Warning ownership of file '%s' changed from '%d' to '%d'"
% (f, owners[f], owner))
else: return True
owners[f] = owner
return True
def has_owner(f):
""" True if f is owned by somebody. False otherwise. """
return f in owners
def remove_file_owner(f):
""" Try and delete any association made with file f. Returns true if
the the association was actually deleted. False otherwise. """
if f in owners:
del owners[f]
def add_file_owner(self,f,owner):
""" Associate file f with owner. If owner is -1 then do we will not record
it because -1 means there is no owner. Returns True if f is being stored
after the function. False otherwise. """
if owner == -1: return False
if f in self.owners:
if owner != self.owners[f]: # check for fishiness
self.logger.info("Warning ownership of file '%s' changed from '%d' to '%d'"
% (f, self.owners[f], owner))
else: return True
self.owners[f] = owner
return True
else: return False
def has_owner(self,f):
""" True if f is owned by somebody. False otherwise. """
return f in self.owners
def remove_file_owner(self,f):
""" Try and delete any association made with file f. Returns true if
the the association was actually deleted. False otherwise. """
if f in self.owners:
del self.owners[f]
return True
else: return False

View File

@ -1,14 +1,12 @@
# -*- coding: utf-8 -*-
import threading
from media.monitor.exceptions import BadSongFile
from media.monitor.log import Loggable
import api_clients.api_client as ac
from media.saas.thread import apc, InstanceInheritingThread
class ThreadedRequestSync(threading.Thread, Loggable):
class ThreadedRequestSync(InstanceInheritingThread, Loggable):
def __init__(self, rs):
threading.Thread.__init__(self)
super(ThreadedRequestSync, self).__init__()
self.rs = rs
self.daemon = True
self.start()
@ -22,7 +20,7 @@ class RequestSync(Loggable):
for some number of times """
@classmethod
def create_with_api_client(cls, watcher, requests):
apiclient = ac.AirtimeApiClient.create_right_config()
apiclient = apc()
self = cls(watcher, requests, apiclient)
return self

View File

@ -2,7 +2,8 @@
import os
from media.monitor.log import Loggable
from media.monitor.exceptions import NoDirectoryInAirtime
from os.path import normpath
from media.saas.thread import user
from os.path import normpath, join
import media.monitor.pure as mmp
class AirtimeDB(Loggable):
@ -11,17 +12,20 @@ class AirtimeDB(Loggable):
if reload_now: self.reload_directories()
def reload_directories(self):
"""
this is the 'real' constructor, should be called if you ever want the
class reinitialized. there's not much point to doing it yourself
however, you should just create a new AirtimeDB instance.
"""
""" this is the 'real' constructor, should be called if you ever
want the class reinitialized. there's not much point to doing
it yourself however, you should just create a new AirtimeDB
instance. """
saas = user().root_path
# dirs_setup is a dict with keys:
# u'watched_dirs' and u'stor' which point to lists of corresponding
# dirs
dirs_setup = self.apc.setup_media_monitor()
dirs_setup[u'stor'] = normpath( dirs_setup[u'stor'] )
dirs_setup[u'watched_dirs'] = map(normpath, dirs_setup[u'watched_dirs'])
dirs_setup[u'stor'] = normpath( join(saas, dirs_setup[u'stor'] ) )
dirs_setup[u'watched_dirs'] = map(lambda p: normpath(join(saas,p)),
dirs_setup[u'watched_dirs'])
dirs_with_id = dict([ (k,normpath(v)) for k,v in
self.apc.list_all_watched_dirs()['dirs'].iteritems() ])
@ -42,15 +46,11 @@ class AirtimeDB(Loggable):
dirs_setup[u'watched_dirs'] ])
def to_id(self, directory):
"""
directory path -> id
"""
""" directory path -> id """
return self.dir_to_id[ directory ]
def to_directory(self, dir_id):
"""
id -> directory path
"""
""" id -> directory path """
return self.id_to_dir[ dir_id ]
def storage_path(self) : return self.base_storage
@ -60,37 +60,31 @@ class AirtimeDB(Loggable):
def recorded_path(self) : return self.storage_paths['recorded']
def list_watched(self):
"""
returns all watched directories as a list
"""
""" returns all watched directories as a list """
return list(self.watched_directories)
def list_storable_paths(self):
"""
returns a list of all the watched directories in the datatabase.
(Includes the imported directory and the recorded directory)
"""
""" returns a list of all the watched directories in the
datatabase. (Includes the imported directory and the recorded
directory) """
l = self.list_watched()
l.append(self.import_path())
l.append(self.recorded_path())
return l
def dir_id_get_files(self, dir_id, all_files=True):
"""
Get all files in a directory with id dir_id
"""
""" Get all files in a directory with id dir_id """
base_dir = self.id_to_dir[ dir_id ]
return set(( os.path.join(base_dir,p) for p in
return set(( join(base_dir,p) for p in
self.apc.list_all_db_files( dir_id, all_files ) ))
def directory_get_files(self, directory, all_files=True):
"""
returns all the files(recursively) in a directory. a directory is an
"actual" directory path instead of its id. This is super hacky because
you create one request for the recorded directory and one for the
imported directory even though they're the same dir in the database so
you get files for both dirs in 1 request...
"""
""" returns all the files(recursively) in a directory. a
directory is an "actual" directory path instead of its id. This
is super hacky because you create one request for the recorded
directory and one for the imported directory even though they're
the same dir in the database so you get files for both dirs in 1
request... """
normal_dir = os.path.normpath(unicode(directory))
if normal_dir not in self.dir_to_id:
raise NoDirectoryInAirtime( normal_dir, self.dir_to_id )

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
import threading
import time
import copy
@ -9,15 +8,16 @@ from media.monitor.exceptions import BadSongFile
from media.monitor.eventcontractor import EventContractor
from media.monitor.events import EventProxy
from media.monitor.request import ThreadedRequestSync, RequestSync
from media.saas.thread import InstanceInheritingThread, getsig
class TimeoutWatcher(threading.Thread,Loggable):
class TimeoutWatcher(InstanceInheritingThread,Loggable):
"""
The job of this thread is to keep an eye on WatchSyncer and force a
request whenever the requests go over time out
"""
def __init__(self, watcher, timeout=5):
self.logger.info("Created timeout thread...")
threading.Thread.__init__(self)
super(TimeoutWatcher, self).__init__()
self.watcher = watcher
self.timeout = timeout
@ -52,7 +52,7 @@ class WatchSyncer(ReportHandler,Loggable):
tc = TimeoutWatcher(self, self.timeout)
tc.daemon = True
tc.start()
super(WatchSyncer, self).__init__(signal=signal)
super(WatchSyncer, self).__init__(signal=getsig(signal))
def handle(self, sender, event):
"""

View File

@ -0,0 +1,68 @@
import os
from os.path import join
from media.monitor.exceptions import NoConfigFile
from media.monitor.pure import LazyProperty
from media.monitor.config import MMConfig
from media.monitor.owners import Owner
from media.monitor.events import EventRegistry
from media.monitor.listeners import FileMediator
from api_clients.api_client import AirtimeApiClient
# poor man's phantom types...
class SignalString(str): pass
class AirtimeInstance(object):
""" AirtimeInstance is a class that abstracts away every airtime
instance by providing all the necessary objects required to interact
with the instance. ApiClient, configs, root_directory """
@classmethod
def root_make(cls, name, root):
cfg = {
'api_client' : join(root, 'etc/airtime/api_client.cfg'),
'media_monitor' : join(root, 'etc/airtime/media-monitor.cfg'),
}
return cls(name, root, cfg)
def __init__(self,name, root_path, config_paths):
""" name is an internal name only """
for cfg in ['api_client','media_monitor']:
if cfg not in config_paths: raise NoConfigFile(config_paths)
elif not os.path.exists(config_paths[cfg]):
raise NoConfigFile(config_paths[cfg])
self.name = name
self.config_paths = config_paths
self.root_path = root_path
def signal(self, sig):
if isinstance(sig, SignalString): return sig
else: return SignalString("%s_%s" % (self.name, sig))
def __str__(self):
return "%s,%s(%s)" % (self.name, self.root_path, self.config_paths)
@LazyProperty
def api_client(self):
return AirtimeApiClient(config_path=self.config_paths['api_client'])
@LazyProperty
def mm_config(self):
return MMConfig(self.config_paths['media_monitor'])
# NOTE to future code monkeys:
# I'm well aware that I'm using the shitty service locator pattern
# instead of normal constructor injection as I should be. The reason
# for this is that I found these issues a little too close to the
# end of my tenure. It's highly recommended to rewrite this crap
# using proper constructor injection if you ever have the time
@LazyProperty
def owner(self): return Owner()
@LazyProperty
def event_registry(self): return EventRegistry()
@LazyProperty
def file_mediator(self): return FileMediator()

View File

@ -0,0 +1,125 @@
import os, sys
import logging
import logging.config
import media.monitor.pure as mmp
from media.monitor.exceptions import FailedToObtainLocale, FailedToSetLocale
from media.monitor.log import get_logger, setup_logging
from std_err_override import LogWriter
from media.saas.thread import InstanceThread, user, apc, getsig
from media.monitor.log import Loggable
from media.monitor.exceptions import CouldNotCreateIndexFile
from media.monitor.toucher import ToucherThread
from media.monitor.airtime import AirtimeNotifier, AirtimeMessageReceiver
from media.monitor.watchersyncer import WatchSyncer
from media.monitor.eventdrainer import EventDrainer
from media.monitor.manager import Manager
from media.monitor.syncdb import AirtimeDB
from media.saas.airtimeinstance import AirtimeInstance
class MM2(InstanceThread, Loggable):
def index_create(self, index_create_attempt=False):
config = user().mm_config
if not index_create_attempt:
if not os.path.exists(config['index_path']):
self.logger.info("Attempting to create index file:...")
try:
with open(config['index_path'], 'w') as f: f.write(" ")
except Exception as e:
self.logger.info("Failed to create index file with exception: %s" \
% str(e))
else:
self.logger.info("Created index file, reloading configuration:")
self.index_create(index_create_attempt=True)
else:
self.logger.info("Already tried to create index. Will not try again ")
if not os.path.exists(config['index_path']):
raise CouldNotCreateIndexFile(config['index_path'])
def run(self):
self.index_create()
manager = Manager()
apiclient = apc()
config = user().mm_config
watch_syncer = WatchSyncer(signal=getsig('watch'),
chunking_number=config['chunking_number'],
timeout=config['request_max_wait'])
airtime_receiver = AirtimeMessageReceiver(config,manager)
airtime_notifier = AirtimeNotifier(config, airtime_receiver)
adb = AirtimeDB(apiclient)
store = {
u'stor' : adb.storage_path(),
u'watched_dirs' : adb.list_watched(),
}
self.logger.info("initializing mm with directories: %s" % str(store))
self.logger.info(
"Initing with the following airtime response:%s" % str(store))
airtime_receiver.change_storage({ 'directory':store[u'stor'] })
for watch_dir in store[u'watched_dirs']:
if not os.path.exists(watch_dir):
# Create the watch_directory here
try: os.makedirs(watch_dir)
except Exception:
self.logger.error("Could not create watch directory: '%s' \
(given from the database)." % watch_dir)
if os.path.exists(watch_dir):
airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True)
else: self.logger.info("Failed to add watch on %s" % str(watch_dir))
ed = EventDrainer(airtime_notifier.connection,
interval=float(config['rmq_event_wait']))
# Launch the toucher that updates the last time when the script was
# ran every n seconds.
# TODO : verify that this does not interfere with bootstrapping because the
# toucher thread might update the last_ran variable too fast
tt = ToucherThread(path=config['index_path'],
interval=int(config['touch_interval']))
apiclient.register_component('media-monitor')
manager.loop()
def launch_instance(name, root, global_cfg, apc_cfg):
cfg = {
'api_client' : apc_cfg,
'media_monitor' : global_cfg,
}
ai = AirtimeInstance(name, root, cfg)
MM2(ai).start()
def setup_global(log):
""" setup unicode and other stuff """
log.info("Attempting to set the locale...")
try: mmp.configure_locale(mmp.get_system_locale())
except FailedToSetLocale as e:
log.info("Failed to set the locale...")
sys.exit(1)
except FailedToObtainLocale as e:
log.info("Failed to obtain the locale form the default path: \
'/etc/default/locale'")
sys.exit(1)
except Exception as e:
log.info("Failed to set the locale for unknown reason. \
Logging exception.")
log.info(str(e))
def setup_logger(log_config, logpath):
logging.config.fileConfig(log_config)
#need to wait for Python 2.7 for this..
#logging.captureWarnings(True)
logger = logging.getLogger()
LogWriter.override_std_err(logger)
logfile = unicode(logpath)
setup_logging(logfile)
log = get_logger()
return log

View File

@ -0,0 +1,27 @@
import threading
class UserlessThread(Exception):
def __str__():
return "Current thread: %s is not an instance of InstanceThread \
of InstanceInheritingThread" % str(threading.current_thread())
class HasUser(object):
def user(self): return self._user
class InstanceThread(threading.Thread, HasUser):
def __init__(self,user, *args, **kwargs):
super(InstanceThread, self).__init__(*args, **kwargs)
self._user = user
class InstanceInheritingThread(threading.Thread, HasUser):
def __init__(self, *args, **kwargs):
self._user = threading.current_thread().user()
super(InstanceInheritingThread, self).__init__(*args, **kwargs)
def user():
try: return threading.current_thread().user()
except AttributeError: raise UserlessThread()
def apc(): return user().api_client
def getsig(s): return user().signal(s)

View File

@ -1,141 +1,15 @@
# -*- coding: utf-8 -*-
import sys
import os
import logging
import logging.config
from media.saas.launcher import setup_global, launch_instance, setup_logger
from media.monitor.config import MMConfig
from media.monitor.manager import Manager
from media.monitor.bootstrap import Bootstrapper
from media.monitor.log import get_logger, setup_logging
from media.monitor.config import MMConfig
from media.monitor.toucher import ToucherThread
from media.monitor.syncdb import AirtimeDB
from media.monitor.exceptions import FailedToObtainLocale, \
FailedToSetLocale, \
NoConfigFile
from media.monitor.airtime import AirtimeNotifier, \
AirtimeMessageReceiver
from media.monitor.watchersyncer import WatchSyncer
from media.monitor.eventdrainer import EventDrainer
from media.update.replaygainupdater import ReplayGainUpdater
from std_err_override import LogWriter
import media.monitor.pure as mmp
from api_clients import api_client as apc
def main(global_config, api_client_config, log_config,
index_create_attempt=False):
for cfg in [global_config, api_client_config]:
if not os.path.exists(cfg): raise NoConfigFile(cfg)
# MMConfig is a proxy around ConfigObj instances. it does not allow
# itself users of MMConfig instances to modify any config options
# directly through the dictionary. Users of this object muse use the
# correct methods designated for modification
try: config = MMConfig(global_config)
except NoConfigFile as e:
print("Cannot run mediamonitor2 without configuration file.")
print("Current config path: '%s'" % global_config)
sys.exit(1)
except Exception as e:
print("Unknown error reading configuration file: '%s'" % global_config)
print(str(e))
logging.config.fileConfig(log_config)
#need to wait for Python 2.7 for this..
#logging.captureWarnings(True)
logger = logging.getLogger()
LogWriter.override_std_err(logger)
logfile = unicode( config['logpath'] )
setup_logging(logfile)
log = get_logger()
if not index_create_attempt:
if not os.path.exists(config['index_path']):
log.info("Attempting to create index file:...")
try:
with open(config['index_path'], 'w') as f: f.write(" ")
except Exception as e:
log.info("Failed to create index file with exception: %s" \
% str(e))
else:
log.info("Created index file, reloading configuration:")
main( global_config, api_client_config, log_config,
index_create_attempt=True )
else:
log.info("Already tried to create index. Will not try again ")
if not os.path.exists(config['index_path']):
log.info("Index file does not exist. Terminating")
log.info("Attempting to set the locale...")
try:
mmp.configure_locale(mmp.get_system_locale())
except FailedToSetLocale as e:
log.info("Failed to set the locale...")
sys.exit(1)
except FailedToObtainLocale as e:
log.info("Failed to obtain the locale form the default path: \
'/etc/default/locale'")
sys.exit(1)
except Exception as e:
log.info("Failed to set the locale for unknown reason. \
Logging exception.")
log.info(str(e))
watch_syncer = WatchSyncer(signal='watch',
chunking_number=config['chunking_number'],
timeout=config['request_max_wait'])
apiclient = apc.AirtimeApiClient.create_right_config(log=log,
config_path=api_client_config)
ReplayGainUpdater.start_reply_gain(apiclient)
sdb = AirtimeDB(apiclient)
manager = Manager()
airtime_receiver = AirtimeMessageReceiver(config,manager)
airtime_notifier = AirtimeNotifier(config, airtime_receiver)
store = apiclient.setup_media_monitor()
log.info("Initing with the following airtime response:%s" % str(store))
airtime_receiver.change_storage({ 'directory':store[u'stor'] })
for watch_dir in store[u'watched_dirs']:
if not os.path.exists(watch_dir):
# Create the watch_directory here
try: os.makedirs(watch_dir)
except Exception as e:
log.error("Could not create watch directory: '%s' \
(given from the database)." % watch_dir)
if os.path.exists(watch_dir):
airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True)
else: log.info("Failed to add watch on %s" % str(watch_dir))
bs = Bootstrapper( db=sdb, watch_signal='watch' )
ed = EventDrainer(airtime_notifier.connection,
interval=float(config['rmq_event_wait']))
# Launch the toucher that updates the last time when the script was
# ran every n seconds.
# TODO : verify that this does not interfere with bootstrapping because the
# toucher thread might update the last_ran variable too fast
tt = ToucherThread(path=config['index_path'],
interval=int(config['touch_interval']))
apiclient.register_component('media-monitor')
return manager.loop()
def main(global_config, api_client_config, log_config):
""" function to run hosted install """
mm_config = MMConfig(global_config)
log = setup_logger( log_config, mm_config['logpath'] )
setup_global(log)
launch_instance('hosted_install', '/', global_config, api_client_config)
__doc__ = """
Usage:
@ -148,9 +22,6 @@ Options:
--log=<path> log config at <path>
"""
def main_loop():
while True: pass
if __name__ == '__main__':
from docopt import docopt
args = docopt(__doc__,version="mm1.99")
@ -160,5 +31,4 @@ if __name__ == '__main__':
sys.exit(0)
print("Running mm1.99")
main(args['--config'],args['--apiclient'],args['--log'])
#gevent.joinall([ gevent.spawn(main_loop) ])

View File

@ -0,0 +1,21 @@
import unittest
from copy import deepcopy
from media.saas.airtimeinstance import AirtimeInstance, NoConfigFile
class TestAirtimeInstance(unittest.TestCase):
def setUp(self):
self.cfg = {
'api_client' : 'tests/test_instance.py',
'media_monitor' : 'tests/test_instance.py',
'logging' : 'tests/test_instance.py',
}
def test_init_good(self):
AirtimeInstance("/root", self.cfg)
self.assertTrue(True)
def test_init_bad(self):
cfg = deepcopy(self.cfg)
cfg['api_client'] = 'bs'
with self.assertRaises(NoConfigFile):
AirtimeInstance("/root", cfg)

View File

@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
import unittest
import time
from media.saas.thread import InstanceThread, InstanceInheritingThread
# ugly but necessary for 2.7
signal = False
signal2 = False
class TestInstanceThread(unittest.TestCase):
def test_user_inject(self):
global signal
signal = False
u = "rudi"
class T(InstanceThread):
def run(me):
global signal
super(T, me).run()
signal = True
self.assertEquals(u, me.user())
t = T(u, name="test_user_inject")
t.daemon = True
t.start()
time.sleep(0.2)
self.assertTrue(signal)
def test_inheriting_thread(utest):
global signal2
u = "testing..."
class TT(InstanceInheritingThread):
def run(self):
global signal2
utest.assertEquals(self.user(), u)
signal2 = True
class T(InstanceThread):
def run(self):
super(T, self).run()
child_thread = TT(name="child thread")
child_thread.daemon = True
child_thread.start()
parent_thread = T(u, name="Parent instance thread")
parent_thread.daemon = True
parent_thread.start()
time.sleep(0.2)
utest.assertTrue(signal2)
def test_different_user(utest):
u1, u2 = "ru", "di"
class T(InstanceThread):
def run(self):
super(T, self).run()
for u in [u1, u2]:
t = T(u)
t.daemon = True
t.start()
utest.assertEquals(t.user(), u)
if __name__ == '__main__': unittest.main()

View File

@ -254,29 +254,15 @@ def output_to(output_type, type, bitrate, host, port, pass, mount_point, url, de
if user == "" then
user_ref := "source"
end
description_ref = ref description
if description == "" then
description_ref := "N/A"
end
genre_ref = ref genre
if genre == "" then
genre_ref := "N/A"
end
url_ref = ref url
if url == "" then
url_ref := "N/A"
end
output.shoutcast_mono = output.shoutcast(id = "shoutcast_stream_#{stream}",
host = host,
port = port,
password = pass,
fallible = true,
url = !url_ref,
genre = !genre_ref,
name = !description_ref,
url = url,
genre = genre,
name = description,
user = !user_ref,
on_error = on_error,
on_connect = on_connect)
@ -286,9 +272,9 @@ def output_to(output_type, type, bitrate, host, port, pass, mount_point, url, de
port = port,
password = pass,
fallible = true,
url = !url_ref,
genre = !genre_ref,
name = !description_ref,
url = url,
genre = genre,
name = description,
user = !user_ref,
on_error = on_error,
on_connect = on_connect)
@ -390,13 +376,6 @@ def add_skip_command(s)
"skip",fun(s) -> begin log("source.skip") skip(s) end)
end
dyn_out = output.icecast(%wav,
host="localhost",
port=8999,
password=stream_harbor_pass,
mount="test-harbor",
fallible=true)
def set_dynamic_source_id(id) =
current_dyn_id := id
string_of(!current_dyn_id)
@ -406,123 +385,159 @@ def get_dynamic_source_id() =
string_of(!current_dyn_id)
end
#cc-4633
# Function to create a playlist source and output it.
def create_dynamic_source(uri) =
# The playlist source
s = audio_to_stereo(input.http(buffer=2., max=12., uri))
# The output
active_dyn_out = dyn_out(s)
# NOTE
# A few values are hardcoded and may be dependent:
# - the delay in gracetime is linked with the buffer duration of input.http
# (delay should be a bit less than buffer)
# - crossing duration should be less than buffer length
# (at best, a higher duration will be ineffective)
# We register both source and output
# in the list of sources
dyn_sources :=
list.append([(!current_dyn_id, s),(!current_dyn_id, active_dyn_out)], !dyn_sources)
# HTTP input with "restart" command that waits for "stop" to be effected
# before "start" command is issued. Optionally it takes a new URL to play,
# which makes it a convenient replacement for "url".
# In the future, this may become a core feature of the HTTP input.
# TODO If we stop and restart quickly several times in a row,
# the data bursts accumulate and create buffer overflow.
# Flushing the buffer on restart could be a good idea, but
# it would also create an interruptions while the buffer is
# refilling... on the other hand, this would avoid having to
# fade using both cross() and switch().
def input.http_restart(~id,~initial_url="http://dummy/url")
source = input.http(buffer=5.,max=15.,id=id,autostart=false,initial_url)
def stopped()
"stopped" == list.hd(server.execute("#{id}.status"))
end
server.register(namespace=id,
"restart",
usage="restart [url]",
fun (url) -> begin
if url != "" then
log(string_of(server.execute("#{id}.url #{url}")))
end
log(string_of(server.execute("#{id}.stop")))
add_timeout(0.5,
{ if stopped() then
log(string_of(server.execute("#{id}.start"))) ;
(-1.)
else 0.5 end})
"OK"
end)
# Dummy output should be useless if HTTP stream is meant
# to be listened to immediately. Otherwise, apply it.
#
# output.dummy(fallible=true,source)
source
notify([("schedule_table_id", !current_dyn_id)])
"Done!"
end
# Transitions between URL changes in HTTP streams.
def cross_http(~debug=true,~http_input_id,source)
# A function to destroy a dynamic source
def destroy_dynamic_source(id) =
# We need to find the source in the list,
# remove it and destroy it. Currently, the language
# lacks some nice operators for that so we do it
# the functional way
id = http_input_id
last_url = ref ""
change = ref false
# This function is executed on every item in the list
# of dynamic sources
def parse_list(ret, current_element) =
# ret is of the form: (matching_sources, remaining_sources)
# We extract those two:
matching_sources = fst(ret)
remaining_sources = snd(ret)
# current_element is of the form: ("uri", source) so
# we check the first element
current_id = fst(current_element)
if current_id == id then
# In this case, we add the source to the list of
# matched sources
(list.append( [snd(current_element)],
matching_sources),
remaining_sources)
else
# In this case, we put the element in the list of remaining
# sources
(matching_sources,
list.append([current_element],
remaining_sources))
def on_m(m)
notify_stream(m)
changed = m["source_url"] != !last_url
log("URL now #{m['source_url']} (change: #{changed})")
if changed then
if !last_url != "" then change := true end
last_url := m["source_url"]
end
end
# Now we execute the function:
result = list.fold(parse_list, ([], []), !dyn_sources)
matching_sources = fst(result)
remaining_sources = snd(result)
# We store the remaining sources in dyn_sources
dyn_sources := remaining_sources
# We use both metadata and status to know about the current URL.
# Using only metadata may be more precise is crazy corner cases,
# but it's also asking too much: the metadata may not pass through
# before the crosser is instantiated.
# Using only status in crosser misses some info, eg. on first URL.
source = on_metadata(on_m,source)
# If no source matched, we return an error
if list.length(matching_sources) == 0 then
"Error: no matching sources!"
else
# We stop all sources
list.iter(source.shutdown, matching_sources)
# And return
"Done!"
cross_d = 3.
def crosser(a,b)
url = list.hd(server.execute('#{id}.url'))
status = list.hd(server.execute('#{id}.status'))
on_m([("source_url",url)])
if debug then
log("New track inside HTTP stream")
log(" status: #{status}")
log(" need to cross: #{!change}")
log(" remaining #{source.remaining(a)} sec before, \
#{source.remaining(b)} sec after")
end
if !change then
change := false
# In principle one should avoid crossing on a live stream
# it'd be okay to do it here (eg. use add instead of sequence)
# because it's only once per URL, but be cautious.
sequence([fade.out(duration=cross_d,a),fade.in(b)])
else
# This is done on tracks inside a single stream.
# Do NOT cross here or you'll gradually empty the buffer!
sequence([a,b])
end
end
# Setting conservative=true would mess with the delayed switch below
cross(duration=cross_d,conservative=false,crosser,source)
end
# Custom fallback between http and default source with fading of
# beginning and end of HTTP stream.
# It does not take potential URL changes into account, as long as
# they do not interrupt streaming (thanks to the HTTP buffer).
def http_fallback(~http_input_id,~http,~default)
id = http_input_id
# We use a custom switching predicate to trigger switching (and thus,
# transitions) before the end of a track (rather, end of HTTP stream).
# It is complexified because we don't want to trigger switching when
# HTTP disconnects for just an instant, when changing URL: for that
# we use gracetime below.
# A function to destroy a dynamic source
def destroy_dynamic_source_all() =
# We need to find the source in the list,
# remove it and destroy it. Currently, the language
# lacks some nice operators for that so we do it
# the functional way
# This function is executed on every item in the list
# of dynamic sources
def parse_list(ret, current_element) =
# ret is of the form: (matching_sources, remaining_sources)
# We extract those two:
matching_sources = fst(ret)
remaining_sources = snd(ret)
# current_element is of the form: ("uri", source) so
# we check the first element
current_uri = fst(current_element)
# in this case, we add the source to the list of
# matched sources
(list.append( [snd(current_element)],
matching_sources),
remaining_sources)
def gracetime(~delay=3.,f)
last_true = ref 0.
{ if f() then
last_true := gettimeofday()
true
else
gettimeofday() < !last_true+delay
end }
end
# now we execute the function:
result = list.fold(parse_list, ([], []), !dyn_sources)
matching_sources = fst(result)
remaining_sources = snd(result)
# we store the remaining sources in dyn_sources
dyn_sources := remaining_sources
# if no source matched, we return an error
if list.length(matching_sources) == 0 then
"error: no matching sources!"
else
# we stop all sources
list.iter(source.shutdown, matching_sources)
# And return
"Done!"
def connected()
status = list.hd(server.execute("#{id}.status"))
not(list.mem(status,["polling","stopped"]))
end
connected = gracetime(connected)
def to_live(a,b) =
log("TRANSITION to live")
add(normalize=false,
[fade.initial(b),fade.final(a)])
end
def to_static(a,b) =
log("TRANSITION to static")
sequence([fade.out(a),fade.initial(b)])
end
switch(
track_sensitive=false,
transitions=[to_live,to_static],
[(# make sure it is connected, and not buffering
{connected() and source.is_ready(http) and !webstream_enabled}, http),
({true},default)])
end

View File

@ -7,11 +7,11 @@ set("server.telnet", true)
set("server.telnet.port", 1234)
#Dynamic source list
dyn_sources = ref []
#dyn_sources = ref []
webstream_enabled = ref false
time = ref string_of(gettimeofday())
queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5))
queue = audio_to_stereo(id="queue_src", mksafe(request.equeue(id="queue", length=0.5)))
queue = cue_cut(queue)
queue = amplify(1., override="replay_gain", queue)
@ -35,15 +35,19 @@ s2_namespace = ref ''
s3_namespace = ref ''
just_switched = ref false
stream_harbor_pass = list.hd(get_process_lines('pwgen -s -N 1 -n 20'))
#stream_harbor_pass = list.hd(get_process_lines('pwgen -s -N 1 -n 20'))
%include "ls_lib.liq"
web_stream = input.harbor("test-harbor", port=8999, password=stream_harbor_pass)
web_stream = on_metadata(notify_stream, web_stream)
output.dummy(fallible=true, web_stream)
#web_stream = input.harbor("test-harbor", port=8999, password=stream_harbor_pass)
#web_stream = on_metadata(notify_stream, web_stream)
#output.dummy(fallible=true, web_stream)
http = input.http_restart(id="http")
http = cross_http(http_input_id="http",http)
stream_queue = http_fallback(http_input_id="http",http=http,default=queue)
# the crossfade function controls fade in/out
queue = crossfade_airtime(queue)
queue = on_metadata(notify, queue)
@ -51,10 +55,10 @@ queue = map_metadata(update=false, append_title, queue)
output.dummy(fallible=true, queue)
stream_queue = switch(id="stream_queue_switch", track_sensitive=false,
transitions=[transition, transition],
[({!webstream_enabled},web_stream),
({true}, queue)])
#stream_queue = switch(id="stream_queue_switch", track_sensitive=false,
# transitions=[transition, transition],
# [({!webstream_enabled},web_stream),
# ({true}, queue)])
ignore(output.dummy(stream_queue, fallible=true))
@ -92,32 +96,33 @@ server.register(namespace="dynamic_source",
fun (s) -> begin log("dynamic_source.output_stop") webstream_enabled := false "disabled" end)
server.register(namespace="dynamic_source",
description="Set the cc_schedule row id",
description="Set the streams cc_schedule row id",
usage="id <id>",
"id",
fun (s) -> begin log("dynamic_source.id") set_dynamic_source_id(s) end)
server.register(namespace="dynamic_source",
description="Get the cc_schedule row id",
description="Get the streams cc_schedule row id",
usage="get_id",
"get_id",
fun (s) -> begin log("dynamic_source.get_id") get_dynamic_source_id() end)
server.register(namespace="dynamic_source",
description="Start a new dynamic source.",
usage="start <uri>",
"read_start",
fun (uri) -> begin log("dynamic_source.read_start") create_dynamic_source(uri) end)
server.register(namespace="dynamic_source",
description="Stop a dynamic source.",
usage="stop <id>",
"read_stop",
fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source(s) end)
server.register(namespace="dynamic_source",
description="Stop a dynamic source.",
usage="stop <id>",
"read_stop_all",
fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source_all() end)
#server.register(namespace="dynamic_source",
# description="Start a new dynamic source.",
# usage="start <uri>",
# "read_start",
# fun (uri) -> begin log("dynamic_source.read_start") begin_stream_read(uri) end)
#server.register(namespace="dynamic_source",
# description="Stop a dynamic source.",
# usage="stop <id>",
# "read_stop",
# fun (s) -> begin log("dynamic_source.read_stop") stop_stream_read(s) end)
#server.register(namespace="dynamic_source",
# description="Stop a dynamic source.",
# usage="stop <id>",
# "read_stop_all",
# fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source_all() end)
default = amplify(id="silence_src", 0.00001, noise())
default = rewrite_metadata([("artist","Airtime"), ("title", "offline")], default)

View File

View File

@ -3,12 +3,11 @@ from threading import Thread
import traceback
import os
import time
import logging
from media.update import replaygain
from media.monitor.log import Loggable
class ReplayGainUpdater(Thread, Loggable):
class ReplayGainUpdater(Thread):
"""
The purpose of the class is to query the server for a list of files which
do not have a ReplayGain value calculated. This class will iterate over the
@ -30,6 +29,7 @@ class ReplayGainUpdater(Thread, Loggable):
def __init__(self,apc):
Thread.__init__(self)
self.api_client = apc
self.logger = logging.getLogger()
def main(self):
raw_response = self.api_client.list_all_watched_dirs()

View File

@ -24,6 +24,8 @@ from recorder import Recorder
from listenerstat import ListenerStat
from pypomessagehandler import PypoMessageHandler
from media.update.replaygainupdater import ReplayGainUpdater
from configobj import ConfigObj
# custom imports
@ -174,6 +176,9 @@ if __name__ == '__main__':
sys.exit()
api_client = api_client.AirtimeApiClient()
ReplayGainUpdater.start_reply_gain(api_client)
api_client.register_component("pypo")
pypoFetch_q = Queue()

View File

@ -478,8 +478,8 @@ class PypoPush(Thread):
self.logger.debug(msg)
tn.write(msg)
#example: dynamic_source.read_start http://87.230.101.24:80/top100station.mp3
msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1')
#msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1')
msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1')
self.logger.debug(msg)
tn.write(msg)
@ -520,7 +520,8 @@ class PypoPush(Thread):
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = 'dynamic_source.read_stop_all xxx\n'
#msg = 'dynamic_source.read_stop_all xxx\n'
msg = 'http.stop\n'
self.logger.debug(msg)
tn.write(msg)
@ -546,7 +547,8 @@ class PypoPush(Thread):
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
msg = 'dynamic_source.read_stop %s\n' % media_item['row_id']
#msg = 'dynamic_source.read_stop %s\n' % media_item['row_id']
msg = 'http.stop\n'
self.logger.debug(msg)
tn.write(msg)

View File

@ -27,14 +27,14 @@ def printUsage():
print " -m mount (default: test) "
print " -h show help menu"
def find_liquidsoap_binary():
"""
Starting with Airtime 2.0, we don't know the exact location of the Liquidsoap
binary because it may have been installed through a debian package. Let's find
the location of this binary.
"""
rv = subprocess.call("which airtime-liquidsoap > /dev/null", shell=True)
if rv == 0:
return "airtime-liquidsoap"
@ -78,7 +78,7 @@ for o, a in optlist:
mount = a
try:
print "Protocol: %s " % stream_type
print "Host: %s" % host
print "Port: %s" % port
@ -86,35 +86,35 @@ try:
print "Password: %s" % password
if stream_type == "icecast":
print "Mount: %s\n" % mount
url = "http://%s:%s/%s" % (host, port, mount)
print "Outputting to %s streaming server. You should be able to hear a monotonous tone on '%s'. Press ctrl-c to quit." % (stream_type, url)
liquidsoap_exe = find_liquidsoap_binary()
if liquidsoap_exe is None:
raise Exception("Liquidsoap not found!")
if stream_type == "icecast":
command = "%s 'output.icecast(%%vorbis, host = \"%s\", port = %s, user= \"%s\", password = \"%s\", mount=\"%s\", sine())'" % (liquidsoap_exe, host, port, user, password, mount)
else:
command = "%s /usr/lib/airtime/pypo/bin/liquidsoap_scripts/library/pervasives.liq 'output.shoutcast(%%mp3, host=\"%s\", port = %s, user= \"%s\", password = \"%s\", sine())'" \
% (liquidsoap_exe, host, port, user, password)
if not verbose:
command += " 2>/dev/null | grep \"failed\""
else:
print command
#print command
rv = subprocess.call(command, shell=True)
#if we reach this point, it means that our subprocess exited without the user
#doing a keyboard interrupt. This means there was a problem outputting to the
#doing a keyboard interrupt. This means there was a problem outputting to the
#stream server. Print appropriate message.
print "There was an error with your stream configuration. Please review your configuration " + \
"and run this program again. Use the -h option for help"
except KeyboardInterrupt, ki:
print "\nExiting"
except Exception, e: