diff --git a/airtime_mvc/application/models/Preference.php b/airtime_mvc/application/models/Preference.php index f95280063..e695c404c 100644 --- a/airtime_mvc/application/models/Preference.php +++ b/airtime_mvc/application/models/Preference.php @@ -1116,7 +1116,6 @@ class Application_Model_Preference } else { /*For now we just have this hack for debugging. We should not rely on this crappy behaviour in case of failure*/ - Logging::info("Pref: $pref_param"); Logging::warn("Index $x does not exist preferences"); Logging::warn("Defaulting to identity and printing preferences"); Logging::warn($ds); diff --git a/airtime_mvc/application/models/Schedule.php b/airtime_mvc/application/models/Schedule.php index fa2a6c79b..f46bfedde 100644 --- a/airtime_mvc/application/models/Schedule.php +++ b/airtime_mvc/application/models/Schedule.php @@ -321,7 +321,7 @@ SQL; ws.description AS file_album_title, ws.length AS file_length, 't'::BOOL AS file_exists, - NULL as file_mime + ws.mime as file_mime SQL; $streamJoin = <<> /etc/apt/sources.list diff --git a/python_apps/api_clients/api_client.py b/python_apps/api_clients/api_client.py index 6038b6895..f9c85c0a9 100644 --- a/python_apps/api_clients/api_client.py +++ b/python_apps/api_clients/api_client.py @@ -127,24 +127,6 @@ class RequestProvider(object): class AirtimeApiClient(object): - - # This is a little hacky fix so that I don't have to pass the config object - # everywhere where AirtimeApiClient needs to be initialized - default_config = None - # the purpose of this custom constructor is to remember which config file - # it was called with. So that after the initial call: - # AirtimeApiClient.create_right_config('/path/to/config') - # All subsequence calls to create_right_config will be with that config - # file - @staticmethod - def create_right_config(log=None,config_path=None): - if config_path: AirtimeApiClient.default_config = config_path - elif (not AirtimeApiClient.default_config): - raise ValueError("Cannot slip config_path attribute when it has \ - never been passed yet") - return AirtimeApiClient( logger=None, - config_path=AirtimeApiClient.default_config ) - def __init__(self, logger=None,config_path='/etc/airtime/api_client.cfg'): if logger is None: self.logger = logging else: self.logger = logger @@ -399,3 +381,4 @@ class AirtimeApiClient(object): def push_stream_stats(self, data): # TODO : users of this method should do their own error handling response = self.services.push_stream_stats(_post_data={'data': json.dumps(data)}) + return response diff --git a/python_apps/media-monitor2/baby.py b/python_apps/media-monitor2/baby.py new file mode 100644 index 000000000..f9eb74d6f --- /dev/null +++ b/python_apps/media-monitor2/baby.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +import re +from media.saas.launcher import setup_logger, setup_global, MM2 +from media.saas.airtimeinstance import AirtimeInstance +from os.path import isdir, join, abspath, exists +from os import listdir + +def list_dirs(d): return (x for x in listdir(d) if isdir(join(d,x))) + +def filter_instance(d): return bool(re.match('.+\d+$',d)) + +def get_name(p): return re.match('.+/(\d+)$',p).group(1) + +def filter_instances(l): return (x for x in l if filter_instance(x)) + +def autoscan_instances(main_cfg): + root = main_cfg['instance_root'] + instances = [] + for instance_machine in list_dirs(root): + instance_machine = join(root, instance_machine) + for instance_root in filter_instances(list_dirs(instance_machine)): + full_path = abspath(join(instance_machine,instance_root)) + ai = AirtimeInstance.root_make(get_name(full_path), full_path) + instances.append(ai) + return instances + +def verify_exists(p): + if not exists(p): raise Exception("%s must exist" % p) + +def main(main_cfg): + log_config, log_path = main_cfg['log_config'], main_cfg['log_path'] + verify_exists(log_config) + log = setup_logger(log_config, log_path) + setup_global(log) + for instance in autoscan_instances(main_cfg): + print("Launching instance: %s" % str(instance)) + MM2(instance).start() + print("Launched all instances") + +if __name__ == '__main__': + root = '/home/rudi/reps/Airtime/python_apps/media-monitor2' + default = { + 'log_path' : join(root, 'test.log'), # config for log + 'log_config' : join(root, 'configs/logging.cfg'), # where to log + # root dir of all instances + 'instance_root' : join(root, 'saas_stub') + } + main(default) diff --git a/python_apps/media-monitor2/configs/airtime.conf b/python_apps/media-monitor2/configs/airtime.conf new file mode 100755 index 000000000..f2749ab74 --- /dev/null +++ b/python_apps/media-monitor2/configs/airtime.conf @@ -0,0 +1,32 @@ +[database] +host = localhost +dbname = airtime +dbuser = airtime +dbpass = airtime + +[rabbitmq] +host = 127.0.0.1 +port = 5672 +user = guest +password = guest +vhost = / + +[general] +api_key = I6EUOJM0D1EIGSMZ9T70 +web_server_user = www-data +airtime_dir = /usr/share/airtime +base_url = localhost +base_port = 80 +base_dir = '' + +;How many hours ahead of time should Airtime playout engine (PYPO) +;cache scheduled media files. +cache_ahead_hours = 1 + +[monit] +monit_user = guest +monit_password = airtime + +[soundcloud] +connection_retries = 3 +time_between_retries = 60 diff --git a/python_apps/media-monitor2/configs/api_client.cfg b/python_apps/media-monitor2/configs/api_client.cfg new file mode 100755 index 000000000..c7b00417c --- /dev/null +++ b/python_apps/media-monitor2/configs/api_client.cfg @@ -0,0 +1,126 @@ +bin_dir = "/usr/lib/airtime/api_clients" + +############################# +## Common +############################# + +# Value needed to access the API +api_key = 'I6EUOJM0D1EIGSMZ9T70' + +# Path to the base of the API +api_base = 'api' + +# URL to get the version number of the server API +version_url = 'version/api_key/%%api_key%%' + +#URL to register a components IP Address with the central web server +register_component = 'register-component/format/json/api_key/%%api_key%%/component/%%component%%' + +# Hostname +host = 'localhost' +base_port = 80 +base_dir = '' + +############################# +## Config for Media Monitor +############################# + +# URL to setup the media monitor +media_setup_url = 'media-monitor-setup/format/json/api_key/%%api_key%%' + +# Tell Airtime the file id associated with a show instance. +upload_recorded = 'upload-recorded/format/json/api_key/%%api_key%%/fileid/%%fileid%%/showinstanceid/%%showinstanceid%%' + +# URL to tell Airtime to update file's meta data +update_media_url = 'reload-metadata/format/json/api_key/%%api_key%%/mode/%%mode%%' + +# URL to tell Airtime we want a listing of all files it knows about +list_all_db_files = 'list-all-files/format/json/api_key/%%api_key%%/dir_id/%%dir_id%%/all/%%all%%' + +# URL to tell Airtime we want a listing of all dirs its watching (including the stor dir) +list_all_watched_dirs = 'list-all-watched-dirs/format/json/api_key/%%api_key%%' + +# URL to tell Airtime we want to add watched directory +add_watched_dir = 'add-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%' + +# URL to tell Airtime we want to add watched directory +remove_watched_dir = 'remove-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%' + +# URL to tell Airtime we want to add watched directory +set_storage_dir = 'set-storage-dir/format/json/api_key/%%api_key%%/path/%%path%%' + +# URL to tell Airtime about file system mount change +update_fs_mount = 'update-file-system-mount/format/json/api_key/%%api_key%%' + +# URL to commit multiple updates from media monitor at the same time + +reload_metadata_group = 'reload-metadata-group/format/json/api_key/%%api_key%%' + +# URL to tell Airtime about file system mount change +handle_watched_dir_missing = 'handle-watched-dir-missing/format/json/api_key/%%api_key%%/dir/%%dir%%' + +############################# +## Config for Recorder +############################# + +# URL to get the schedule of shows set to record +show_schedule_url = 'recorded-shows/format/json/api_key/%%api_key%%' + +# URL to upload the recorded show's file to Airtime +upload_file_url = 'upload-file/format/json/api_key/%%api_key%%' + +# URL to commit multiple updates from media monitor at the same time + +#number of retries to upload file if connection problem +upload_retries = 3 + +#time to wait between attempts to upload file if connection problem (in seconds) +upload_wait = 60 + +################################################################################ +# Uncomment *one of the sets* of values from the API clients below, and comment +# out all the others. +################################################################################ + +############################# +## Config for Pypo +############################# + +# Schedule export path. +# %%from%% - starting date/time in the form YYYY-MM-DD-hh-mm +# %%to%% - starting date/time in the form YYYY-MM-DD-hh-mm +export_url = 'schedule/api_key/%%api_key%%' + +get_media_url = 'get-media/file/%%file%%/api_key/%%api_key%%' + +# Update whether a schedule group has begun playing. +update_item_url = 'notify-schedule-group-play/api_key/%%api_key%%/schedule_id/%%schedule_id%%' + +# Update whether an audio clip is currently playing. +update_start_playing_url = 'notify-media-item-start-play/api_key/%%api_key%%/media_id/%%media_id%%/' + +# URL to tell Airtime we want to get stream setting +get_stream_setting = 'get-stream-setting/format/json/api_key/%%api_key%%/' + +#URL to update liquidsoap status +update_liquidsoap_status = 'update-liquidsoap-status/format/json/api_key/%%api_key%%/msg/%%msg%%/stream_id/%%stream_id%%/boot_time/%%boot_time%%' + +#URL to check live stream auth +check_live_stream_auth = 'check-live-stream-auth/format/json/api_key/%%api_key%%/username/%%username%%/password/%%password%%/djtype/%%djtype%%' + +#URL to update source status +update_source_status = 'update-source-status/format/json/api_key/%%api_key%%/sourcename/%%sourcename%%/status/%%status%%' + +get_bootstrap_info = 'get-bootstrap-info/format/json/api_key/%%api_key%%' + +get_files_without_replay_gain = 'get-files-without-replay-gain/api_key/%%api_key%%/dir_id/%%dir_id%%' + +update_replay_gain_value = 'update-replay-gain-value/api_key/%%api_key%%' + +notify_webstream_data = 'notify-webstream-data/api_key/%%api_key%%/media_id/%%media_id%%/format/json' + +notify_liquidsoap_started = 'rabbitmq-do-push/api_key/%%api_key%%/format/json' + +get_stream_parameters = 'get-stream-parameters/api_key/%%api_key%%/format/json' + +push_stream_stats = 'push-stream-stats/api_key/%%api_key%%/format/json' diff --git a/python_apps/media-monitor2/configs/logging.cfg b/python_apps/media-monitor2/configs/logging.cfg new file mode 100755 index 000000000..ea24f69e0 --- /dev/null +++ b/python_apps/media-monitor2/configs/logging.cfg @@ -0,0 +1,32 @@ +[loggers] +keys= root,notifier,metadata + +[handlers] +keys=fileOutHandler + +[formatters] +keys=simpleFormatter + +[logger_root] +level=DEBUG +handlers=fileOutHandler + +[logger_notifier] +level=DEBUG +handlers=fileOutHandler +qualname=notifier + +[logger_metadata] +level=DEBUG +handlers=fileOutHandler +qualname=metadata + +[handler_fileOutHandler] +class=logging.handlers.RotatingFileHandler +level=DEBUG +formatter=simpleFormatter +args=("/var/log/airtime/media-monitor/media-monitor.log", 'a', 10000000, 5,) + +[formatter_simpleFormatter] +format=%(asctime)s %(levelname)s - [%(threadName)s] [%(filename)s : %(funcName)s()] : LINE %(lineno)d - %(message)s +datefmt= diff --git a/python_apps/media-monitor2/configs/media-monitor.cfg b/python_apps/media-monitor2/configs/media-monitor.cfg new file mode 100755 index 000000000..b1167f56b --- /dev/null +++ b/python_apps/media-monitor2/configs/media-monitor.cfg @@ -0,0 +1,31 @@ +api_client = "airtime" + +# where the binary files live +bin_dir = '/usr/lib/airtime/media-monitor' + +# where the logging files live +log_dir = '/var/log/airtime/media-monitor' + + +############################################ +# RabbitMQ settings # +############################################ +rabbitmq_host = 'localhost' +rabbitmq_user = 'guest' +rabbitmq_password = 'guest' +rabbitmq_vhost = '/' + +############################################ +# Media-Monitor preferences # +############################################ +check_filesystem_events = 5 #how long to queue up events performed on the files themselves. +check_airtime_events = 30 #how long to queue metadata input from airtime. + +# MM2 only: +touch_interval = 5 +chunking_number = 450 +request_max_wait = 3.0 +rmq_event_wait = 0.1 +logpath = '/var/log/airtime/media-monitor/media-monitor.log' +index_path = '/var/tmp/airtime/media-monitor/last_index' + diff --git a/python_apps/media-monitor2/media/monitor/airtime.py b/python_apps/media-monitor2/media/monitor/airtime.py index b920ecd65..fa23d2fbd 100644 --- a/python_apps/media-monitor2/media/monitor/airtime.py +++ b/python_apps/media-monitor2/media/monitor/airtime.py @@ -13,9 +13,8 @@ from media.monitor.log import Loggable from media.monitor.syncdb import AirtimeDB from media.monitor.exceptions import DirectoryIsNotListed from media.monitor.bootstrap import Bootstrapper -from media.monitor.listeners import FileMediator -from api_clients import api_client as apc +from media.saas.thread import apc, user class AirtimeNotifier(Loggable): """ @@ -98,7 +97,7 @@ class AirtimeMessageReceiver(Loggable): if (not directory_id) and (not directory): raise ValueError("You must provide either directory_id or \ directory") - sdb = AirtimeDB(apc.AirtimeApiClient.create_right_config()) + sdb = AirtimeDB(apc()) if directory : directory = os.path.normpath(directory) if directory_id == None : directory_id = sdb.to_id(directory) if directory == None : directory = sdb.to_directory(directory_id) @@ -192,7 +191,7 @@ class AirtimeMessageReceiver(Loggable): # request that we'd normally get form pyinotify. But right # now event contractor would take care of this sort of # thing anyway so this might not be necessary after all - FileMediator.ignore(msg['filepath']) + user().file_mediator.ignore(msg['filepath']) os.unlink(msg['filepath']) # Verify deletion: if not os.path.exists(msg['filepath']): diff --git a/python_apps/media-monitor2/media/monitor/bootstrap.py b/python_apps/media-monitor2/media/monitor/bootstrap.py index 0a13ea0b4..6e685c964 100644 --- a/python_apps/media-monitor2/media/monitor/bootstrap.py +++ b/python_apps/media-monitor2/media/monitor/bootstrap.py @@ -2,6 +2,7 @@ import os from pydispatch import dispatcher from media.monitor.events import NewFile, DeleteFile, ModifyFile from media.monitor.log import Loggable +from media.saas.thread import getsig import media.monitor.pure as mmp class Bootstrapper(Loggable): @@ -16,7 +17,7 @@ class Bootstrapper(Loggable): watch_signal - the signals should send events for every file on. """ self.db = db - self.watch_signal = watch_signal + self.watch_signal = getsig(watch_signal) def flush_all(self, last_ran): """ diff --git a/python_apps/media-monitor2/media/monitor/config.py b/python_apps/media-monitor2/media/monitor/config.py index 3a7be8eb5..b06e00a84 100644 --- a/python_apps/media-monitor2/media/monitor/config.py +++ b/python_apps/media-monitor2/media/monitor/config.py @@ -12,25 +12,21 @@ class MMConfig(object): self.cfg = ConfigObj(path) def __getitem__(self, key): - """ - We always return a copy of the config item to prevent callers from - doing any modifications through the returned objects methods - """ + """ We always return a copy of the config item to prevent + callers from doing any modifications through the returned + objects methods """ return copy.deepcopy(self.cfg[key]) def __setitem__(self, key, value): - """ - We use this method not to allow anybody to mess around with config file - any settings made should be done through MMConfig's instance methods - """ + """ We use this method not to allow anybody to mess around with + config file any settings made should be done through MMConfig's + instance methods """ raise ConfigAccessViolation(key) def save(self): self.cfg.write() def last_ran(self): - """ - Returns the last time media monitor was ran by looking at the time when - the file at 'index_path' was modified - """ + """ Returns the last time media monitor was ran by looking at + the time when the file at 'index_path' was modified """ return mmp.last_modified(self.cfg['index_path']) diff --git a/python_apps/media-monitor2/media/monitor/events.py b/python_apps/media-monitor2/media/monitor/events.py index 8fb373704..4914827c5 100644 --- a/python_apps/media-monitor2/media/monitor/events.py +++ b/python_apps/media-monitor2/media/monitor/events.py @@ -3,11 +3,11 @@ import os import abc import re import media.monitor.pure as mmp -import media.monitor.owners as owners from media.monitor.pure import LazyProperty from media.monitor.metadata import Metadata from media.monitor.log import Loggable from media.monitor.exceptions import BadSongFile +from media.saas.thread import getsig, user class PathChannel(object): """ @@ -15,34 +15,26 @@ class PathChannel(object): used as a named tuple """ def __init__(self, signal, path): - self.signal = signal + self.signal = getsig(signal) self.path = path # TODO : Move this to it's file. Also possible unsingleton and use it as a # simple module just like m.m.owners class EventRegistry(object): - """ - This class's main use is to keep track all events with a cookie attribute. - This is done mainly because some events must be 'morphed' into other events - because we later detect that they are move events instead of delete events. - """ - registry = {} - @staticmethod - def register(evt): EventRegistry.registry[evt.cookie] = evt - @staticmethod - def unregister(evt): del EventRegistry.registry[evt.cookie] - @staticmethod - def registered(evt): return evt.cookie in EventRegistry.registry - @staticmethod - def matching(evt): - event = EventRegistry.registry[evt.cookie] + """ This class's main use is to keep track all events with a cookie + attribute. This is done mainly because some events must be 'morphed' + into other events because we later detect that they are move events + instead of delete events. """ + def __init__(self): + self.registry = {} + def register(self,evt): self.registry[evt.cookie] = evt + def unregister(self,evt): del self.registry[evt.cookie] + def registered(self,evt): return evt.cookie in self.registry + def matching(self,evt): + event = self.registry[evt.cookie] # Want to disallow accessing the same event twice - EventRegistry.unregister(event) + self.unregister(event) return event - def __init__(self,*args,**kwargs): - raise Exception("You can instantiate this class. Must only use class \ - methods") - class EventProxy(Loggable): """ @@ -101,7 +93,7 @@ class BaseEvent(Loggable): self._raw_event = raw_event self.path = os.path.normpath(raw_event.pathname) else: self.path = raw_event - self.owner = owners.get_owner(self.path) + self.owner = user().owner.get_owner(self.path) owner_re = re.search('stor/imported/(?P\d+)/', self.path) if owner_re: self.logger.info("matched path: %s" % self.path) @@ -152,7 +144,7 @@ class BaseEvent(Loggable): ret = self.pack() # Remove owner of this file only after packing. Otherwise packing # will not serialize the owner correctly into the airtime request - owners.remove_file_owner(self.path) + user().owner.remove_file_owner(self.path) return ret except BadSongFile as e: return [e] except Exception as e: diff --git a/python_apps/media-monitor2/media/monitor/exceptions.py b/python_apps/media-monitor2/media/monitor/exceptions.py index b7b834920..f7d022fb1 100644 --- a/python_apps/media-monitor2/media/monitor/exceptions.py +++ b/python_apps/media-monitor2/media/monitor/exceptions.py @@ -23,7 +23,7 @@ class FailedToObtainLocale(Exception): class CouldNotCreateIndexFile(Exception): """exception whenever index file cannot be created""" - def __init__(self, path, cause): + def __init__(self, path, cause=None): self.path = path self.cause = cause def __str__(self): return "Failed to create touch file '%s'" % self.path diff --git a/python_apps/media-monitor2/media/monitor/handler.py b/python_apps/media-monitor2/media/monitor/handler.py index dd1d3843a..c67a437ef 100644 --- a/python_apps/media-monitor2/media/monitor/handler.py +++ b/python_apps/media-monitor2/media/monitor/handler.py @@ -3,6 +3,7 @@ from pydispatch import dispatcher import abc from media.monitor.log import Loggable +from media.saas.thread import getsig import media.monitor.pure as mmp # Defines the handle interface @@ -21,10 +22,10 @@ class ReportHandler(Handles): """ __metaclass__ = abc.ABCMeta def __init__(self, signal, weak=False): - self.signal = signal - self.report_signal = "badfile" + self.signal = getsig(signal) + self.report_signal = getsig("badfile") def dummy(sender, event): self.handle(sender,event) - dispatcher.connect(dummy, signal=signal, sender=dispatcher.Any, + dispatcher.connect(dummy, signal=self.signal, sender=dispatcher.Any, weak=weak) def report_problem_file(self, event, exception=None): @@ -38,7 +39,7 @@ class ProblemFileHandler(Handles, Loggable): """ def __init__(self, channel, **kwargs): self.channel = channel - self.signal = self.channel.signal + self.signal = getsig(self.channel.signal) self.problem_dir = self.channel.path def dummy(sender, event, exception): self.handle(sender, event, exception) diff --git a/python_apps/media-monitor2/media/monitor/listeners.py b/python_apps/media-monitor2/media/monitor/listeners.py index b33a5c1a9..5757b9203 100644 --- a/python_apps/media-monitor2/media/monitor/listeners.py +++ b/python_apps/media-monitor2/media/monitor/listeners.py @@ -6,38 +6,31 @@ from functools import wraps import media.monitor.pure as mmp from media.monitor.pure import IncludeOnly from media.monitor.events import OrganizeFile, NewFile, MoveFile, DeleteFile, \ - DeleteDir, EventRegistry, MoveDir,\ + DeleteDir, MoveDir,\ DeleteDirWatch -from media.monitor.log import Loggable, get_logger - +from media.monitor.log import Loggable +from media.saas.thread import getsig, user # Note: Because of the way classes that inherit from pyinotify.ProcessEvent # interact with constructors. you should only instantiate objects from them # using keyword arguments. For example: # OrganizeListener('watch_signal') <= wrong # OrganizeListener(signal='watch_signal') <= right -class FileMediator(object): - """ - FileMediator is used an intermediate mechanism that filters out certain - events. - """ - ignored_set = set([]) # for paths only - logger = get_logger() - - @staticmethod - def is_ignored(path): return path in FileMediator.ignored_set - @staticmethod - def ignore(path): FileMediator.ignored_set.add(path) - @staticmethod - def unignore(path): FileMediator.ignored_set.remove(path) +class FileMediator(Loggable): + """ FileMediator is used an intermediate mechanism that filters out + certain events. """ + def __init__(self) : self.ignored_set = set([]) # for paths only + def is_ignored(self,path) : return path in self.ignored_set + def ignore(self, path) : self.ignored_set.add(path) + def unignore(self, path) : self.ignored_set.remove(path) def mediate_ignored(fn): @wraps(fn) def wrapped(self, event, *args,**kwargs): event.pathname = unicode(event.pathname, "utf-8") - if FileMediator.is_ignored(event.pathname): - FileMediator.logger.info("Ignoring: '%s' (once)" % event.pathname) - FileMediator.unignore(event.pathname) + if user().file_mediator.is_ignored(event.pathname): + user().file_mediator.logger.info("Ignoring: '%s' (once)" % event.pathname) + user().file_mediator.unignore(event.pathname) else: return fn(self, event, *args, **kwargs) return wrapped @@ -45,7 +38,7 @@ class BaseListener(object): def __str__(self): return "Listener(%s), Signal(%s)" % \ (self.__class__.__name__, self. signal) - def my_init(self, signal): self.signal = signal + def my_init(self, signal): self.signal = getsig(signal) class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable): def process_IN_CLOSE_WRITE(self, event): @@ -66,25 +59,25 @@ class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable): self.logger.info("Bootstrapping: File in 'organize' directory: \ '%s'" % f) if not mmp.file_locked(f): - dispatcher.send(signal=self.signal, sender=self, + dispatcher.send(signal=getsig(self.signal), sender=self, event=OrganizeFile(f)) flushed += 1 #self.logger.info("Flushed organized directory with %d files" % flushed) @IncludeOnly(mmp.supported_extensions) def process_to_organize(self, event): - dispatcher.send(signal=self.signal, sender=self, + dispatcher.send(signal=getsig(self.signal), sender=self, event=OrganizeFile(event)) class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent): def process_IN_CLOSE_WRITE(self, event): self.process_create(event) def process_IN_MOVED_TO(self, event): - if EventRegistry.registered(event): + if user().event_registry.registered(event): # We need this trick because we don't how to "expand" dir events # into file events until we know for sure if we deleted or moved morph = MoveDir(event) if event.dir else MoveFile(event) - EventRegistry.matching(event).morph_into(morph) + user().event_registry.matching(event).morph_into(morph) else: self.process_create(event) def process_IN_MOVED_FROM(self, event): # Is either delete dir or delete file @@ -92,7 +85,7 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent): # evt can be none whenever event points that a file that would be # ignored by @IncludeOnly if hasattr(event,'cookie') and (evt != None): - EventRegistry.register(evt) + user().event_registry.register(evt) def process_IN_DELETE(self,event): self.process_delete(event) def process_IN_MOVE_SELF(self, event): if '-unknown-path' in event.pathname: @@ -101,14 +94,14 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent): def delete_watch_dir(self, event): e = DeleteDirWatch(event) - dispatcher.send(signal='watch_move', sender=self, event=e) - dispatcher.send(signal=self.signal, sender=self, event=e) + dispatcher.send(signal=getsig('watch_move'), sender=self, event=e) + dispatcher.send(signal=getsig(self.signal), sender=self, event=e) @mediate_ignored @IncludeOnly(mmp.supported_extensions) def process_create(self, event): evt = NewFile(event) - dispatcher.send(signal=self.signal, sender=self, event=evt) + dispatcher.send(signal=getsig(self.signal), sender=self, event=evt) return evt @mediate_ignored @@ -117,13 +110,13 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent): evt = None if event.dir : evt = DeleteDir(event) else : evt = DeleteFile(event) - dispatcher.send(signal=self.signal, sender=self, event=evt) + dispatcher.send(signal=getsig(self.signal), sender=self, event=evt) return evt @mediate_ignored def process_delete_dir(self, event): evt = DeleteDir(event) - dispatcher.send(signal=self.signal, sender=self, event=evt) + dispatcher.send(signal=getsig(self.signal), sender=self, event=evt) return evt def flush_events(self, path): @@ -138,6 +131,6 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent): added = 0 for f in mmp.walk_supported(path, clean_empties=False): added += 1 - dispatcher.send( signal=self.signal, sender=self, event=NewFile(f) ) + dispatcher.send( signal=getsig(self.signal), sender=self, event=NewFile(f) ) self.logger.info( "Flushed watch directory. added = %d" % added ) diff --git a/python_apps/media-monitor2/media/monitor/manager.py b/python_apps/media-monitor2/media/monitor/manager.py index 548590f38..1fc7b695b 100644 --- a/python_apps/media-monitor2/media/monitor/manager.py +++ b/python_apps/media-monitor2/media/monitor/manager.py @@ -1,5 +1,4 @@ import pyinotify -import threading import time from pydispatch import dispatcher @@ -9,19 +8,19 @@ from media.monitor.log import Loggable from media.monitor.listeners import StoreWatchListener, OrganizeListener from media.monitor.handler import ProblemFileHandler from media.monitor.organizer import Organizer +from media.saas.thread import InstanceInheritingThread, getsig import media.monitor.pure as mmp -class ManagerTimeout(threading.Thread,Loggable): - """ - The purpose of this class is to flush the organize directory every 3 - secnods. This used to be just a work around for cc-4235 but recently - became a permanent solution because it's "cheap" and reliable - """ +class ManagerTimeout(InstanceInheritingThread,Loggable): + """ The purpose of this class is to flush the organize directory + every 3 secnods. This used to be just a work around for cc-4235 + but recently became a permanent solution because it's "cheap" and + reliable """ def __init__(self, manager, interval=1.5): # TODO : interval should be read from config and passed here instead # of just using the hard coded value - threading.Thread.__init__(self) + super(ManagerTimeout, self).__init__() self.manager = manager self.interval = interval def run(self): @@ -30,19 +29,17 @@ class ManagerTimeout(threading.Thread,Loggable): self.manager.flush_organize() class Manager(Loggable): - """ - An abstraction over media monitors core pyinotify functions. These - include adding watched,store, organize directories, etc. Basically - composes over WatchManager from pyinotify - """ + # NOTE : this massive class is a source of many problems of mm and + # is in dire need of breaking up and refactoring. + """ An abstraction over media monitors core pyinotify functions. + These include adding watched,store, organize directories, etc. + Basically composes over WatchManager from pyinotify """ def __init__(self): self.wm = pyinotify.WatchManager() # These two instance variables are assumed to be constant - self.watch_channel = 'watch' - self.organize_channel = 'organize' + self.watch_channel = getsig('watch') + self.organize_channel = getsig('organize') self.watch_listener = StoreWatchListener(signal = self.watch_channel) - # TODO : change this to a weak ref - # TODO : get rid of this hack once cc-4235 is fixed self.__timeout_thread = ManagerTimeout(self) self.__timeout_thread.daemon = True self.__timeout_thread.start() @@ -57,11 +54,11 @@ class Manager(Loggable): self.organize_channel), } def dummy(sender, event): self.watch_move( event.path, sender=sender ) - dispatcher.connect(dummy, signal='watch_move', sender=dispatcher.Any, - weak=False) + dispatcher.connect(dummy, signal=getsig('watch_move'), + sender=dispatcher.Any, weak=False) def subwatch_add(sender, directory): self.__add_watch(directory, self.watch_listener) - dispatcher.connect(subwatch_add, signal='add_subwatch', + dispatcher.connect(subwatch_add, signal=getsig('add_subwatch'), sender=dispatcher.Any, weak=False) # A private mapping path => watch_descriptor # we use the same dictionary for organize, watch, store wd events. @@ -76,23 +73,19 @@ class Manager(Loggable): # through dedicated handler objects. Because we must have access to a # manager instance. Hence we must slightly break encapsulation. def watch_move(self, watch_dir, sender=None): - """ - handle 'watch move' events directly sent from listener - """ + """ handle 'watch move' events directly sent from listener """ self.logger.info("Watch dir '%s' has been renamed (hence removed)" % watch_dir) self.remove_watch_directory(normpath(watch_dir)) def watch_signal(self): - """ - Return the signal string our watch_listener is reading events from - """ - return self.watch_listener.signal + """ Return the signal string our watch_listener is reading + events from """ + return getsig(self.watch_listener.signal) def __remove_watch(self,path): - """ - Remove path from being watched (first will check if 'path' is watched) - """ + """ Remove path from being watched (first will check if 'path' + is watched) """ # only delete if dir is actually being watched if path in self.__wd_path: wd = self.__wd_path[path] @@ -100,10 +93,8 @@ class Manager(Loggable): del(self.__wd_path[path]) def __add_watch(self,path,listener): - """ - Start watching 'path' using 'listener'. First will check if directory - is being watched before adding another watch - """ + """ Start watching 'path' using 'listener'. First will check if + directory is being watched before adding another watch """ self.logger.info("Attempting to add listener to path '%s'" % path) self.logger.info( 'Listener: %s' % str(listener) ) @@ -114,9 +105,8 @@ class Manager(Loggable): if wd: self.__wd_path[path] = wd.values()[0] def __create_organizer(self, target_path, recorded_path): - """ - creates an organizer at new destination path or modifies the old one - """ + """ creates an organizer at new destination path or modifies the + old one """ # TODO : find a proper fix for the following hack # We avoid creating new instances of organize because of the way # it interacts with pydispatch. We must be careful to never have @@ -134,23 +124,18 @@ class Manager(Loggable): recorded_path=recorded_path) def get_problem_files_path(self): - """ - returns the path where problem files should go - """ + """ returns the path where problem files should go """ return self.organize['problem_files_path'] def set_problem_files_path(self, new_path): - """ - Set the path where problem files should go - """ + """ Set the path where problem files should go """ self.organize['problem_files_path'] = new_path self.organize['problem_handler'] = \ - ProblemFileHandler( PathChannel(signal='badfile',path=new_path) ) + ProblemFileHandler( PathChannel(signal=getsig('badfile'), + path=new_path) ) def get_recorded_path(self): - """ - returns the path of the recorded directory - """ + """ returns the path of the recorded directory """ return self.organize['recorded_path'] def set_recorded_path(self, new_path): @@ -160,17 +145,14 @@ class Manager(Loggable): self.__add_watch(new_path, self.watch_listener) def get_organize_path(self): - """ - returns the current path that is being watched for organization - """ + """ returns the current path that is being watched for + organization """ return self.organize['organize_path'] def set_organize_path(self, new_path): - """ - sets the organize path to be new_path. Under the current scheme there - is only one organize path but there is no reason why more cannot be - supported - """ + """ sets the organize path to be new_path. Under the current + scheme there is only one organize path but there is no reason + why more cannot be supported """ # if we are already organizing a particular directory we remove the # watch from it first before organizing another directory self.__remove_watch(self.organize['organize_path']) @@ -188,19 +170,15 @@ class Manager(Loggable): return self.organize['imported_path'] def set_imported_path(self,new_path): - """ - set the directory where organized files go to. - """ + """ set the directory where organized files go to. """ self.__remove_watch(self.organize['imported_path']) self.organize['imported_path'] = new_path self.__create_organizer( new_path, self.organize['recorded_path']) self.__add_watch(new_path, self.watch_listener) def change_storage_root(self, store): - """ - hooks up all the directories for you. Problem, recorded, imported, - organize. - """ + """ hooks up all the directories for you. Problem, recorded, + imported, organize. """ store_paths = mmp.expand_storage(store) # First attempt to make sure that all paths exist before adding any # watches @@ -217,18 +195,14 @@ class Manager(Loggable): mmp.create_dir(p) def has_watch(self, path): - """ - returns true if the path is being watched or not. Any kind of watch: - organize, store, watched. - """ + """ returns true if the path is being watched or not. Any kind + of watch: organize, store, watched. """ return path in self.__wd_path def add_watch_directory(self, new_dir): - """ - adds a directory to be "watched". "watched" directories are + """ adds a directory to be "watched". "watched" directories are those that are being monitored by media monitor for airtime in - this context and not directories pyinotify calls watched - """ + this context and not directories pyinotify calls watched """ if self.has_watch(new_dir): self.logger.info("Cannot add '%s' to watched directories. It's \ already being watched" % new_dir) @@ -237,9 +211,8 @@ class Manager(Loggable): self.__add_watch(new_dir, self.watch_listener) def remove_watch_directory(self, watch_dir): - """ - removes a directory from being "watched". Undoes add_watch_directory - """ + """ removes a directory from being "watched". Undoes + add_watch_directory """ if self.has_watch(watch_dir): self.logger.info("Removing watched directory: '%s'", watch_dir) self.__remove_watch(watch_dir) @@ -250,9 +223,7 @@ class Manager(Loggable): self.logger.info( self.__wd_path ) def loop(self): - """ - block until we receive pyinotify events - """ + """ block until we receive pyinotify events """ notifier = pyinotify.Notifier(self.wm) notifier.coalesce_events() notifier.loop() diff --git a/python_apps/media-monitor2/media/monitor/organizer.py b/python_apps/media-monitor2/media/monitor/organizer.py index ce1849b90..17f46054c 100644 --- a/python_apps/media-monitor2/media/monitor/organizer.py +++ b/python_apps/media-monitor2/media/monitor/organizer.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- import media.monitor.pure as mmp -import media.monitor.owners as owners from media.monitor.handler import ReportHandler from media.monitor.log import Loggable from media.monitor.exceptions import BadSongFile from media.monitor.events import OrganizeFile from pydispatch import dispatcher from os.path import dirname +from media.saas.thread import getsig, user import os.path class Organizer(ReportHandler,Loggable): @@ -36,7 +36,7 @@ class Organizer(ReportHandler,Loggable): self.channel = channel self.target_path = target_path self.recorded_path = recorded_path - super(Organizer, self).__init__(signal=self.channel, weak=False) + super(Organizer, self).__init__(signal=getsig(self.channel), weak=False) def handle(self, sender, event): """ Intercept events where a new file has been added to the @@ -63,7 +63,7 @@ class Organizer(ReportHandler,Loggable): def new_dir_watch(d): # TODO : rewrite as return lambda : dispatcher.send(... def cb(): - dispatcher.send(signal="add_subwatch", sender=self, + dispatcher.send(signal=getsig("add_subwatch"), sender=self, directory=d) return cb @@ -74,7 +74,7 @@ class Organizer(ReportHandler,Loggable): # backwards way is bewcause we are unable to encode the owner id # into the file itself so that the StoreWatchListener listener can # detect it from the file - owners.add_file_owner(new_path, owner_id ) + user().owner.add_file_owner(new_path, owner_id ) self.logger.info('Organized: "%s" into "%s"' % (event.path, new_path)) diff --git a/python_apps/media-monitor2/media/monitor/owners.py b/python_apps/media-monitor2/media/monitor/owners.py index 2d6c4be44..48898c9aa 100644 --- a/python_apps/media-monitor2/media/monitor/owners.py +++ b/python_apps/media-monitor2/media/monitor/owners.py @@ -1,44 +1,40 @@ # -*- coding: utf-8 -*- -from media.monitor.log import get_logger -log = get_logger() -# hash: 'filepath' => owner_id -owners = {} +from media.monitor.log import Loggable -def reset_owners(): - """ Wipes out all file => owner associations """ - global owners - owners = {} +class Owner(Loggable): + def __init__(self): + # hash: 'filepath' => owner_id + self.owners = {} + + def get_owner(self,f): + """ Get the owner id of the file 'f' """ + o = self.owners[f] if f in self.owners else -1 + self.logger.info("Received owner for %s. Owner: %s" % (f, o)) + return o -def get_owner(f): - """ Get the owner id of the file 'f' """ - o = owners[f] if f in owners else -1 - log.info("Received owner for %s. Owner: %s" % (f, o)) - return o - - -def add_file_owner(f,owner): - """ Associate file f with owner. If owner is -1 then do we will not record - it because -1 means there is no owner. Returns True if f is being stored - after the function. False otherwise. """ - if owner == -1: return False - if f in owners: - if owner != owners[f]: # check for fishiness - log.info("Warning ownership of file '%s' changed from '%d' to '%d'" - % (f, owners[f], owner)) - else: return True - owners[f] = owner - return True - -def has_owner(f): - """ True if f is owned by somebody. False otherwise. """ - return f in owners - -def remove_file_owner(f): - """ Try and delete any association made with file f. Returns true if - the the association was actually deleted. False otherwise. """ - if f in owners: - del owners[f] + def add_file_owner(self,f,owner): + """ Associate file f with owner. If owner is -1 then do we will not record + it because -1 means there is no owner. Returns True if f is being stored + after the function. False otherwise. """ + if owner == -1: return False + if f in self.owners: + if owner != self.owners[f]: # check for fishiness + self.logger.info("Warning ownership of file '%s' changed from '%d' to '%d'" + % (f, self.owners[f], owner)) + else: return True + self.owners[f] = owner return True - else: return False + + def has_owner(self,f): + """ True if f is owned by somebody. False otherwise. """ + return f in self.owners + + def remove_file_owner(self,f): + """ Try and delete any association made with file f. Returns true if + the the association was actually deleted. False otherwise. """ + if f in self.owners: + del self.owners[f] + return True + else: return False diff --git a/python_apps/media-monitor2/media/monitor/request.py b/python_apps/media-monitor2/media/monitor/request.py index a49c6bb25..e2c5fb5ef 100644 --- a/python_apps/media-monitor2/media/monitor/request.py +++ b/python_apps/media-monitor2/media/monitor/request.py @@ -1,14 +1,12 @@ # -*- coding: utf-8 -*- -import threading - from media.monitor.exceptions import BadSongFile from media.monitor.log import Loggable -import api_clients.api_client as ac +from media.saas.thread import apc, InstanceInheritingThread -class ThreadedRequestSync(threading.Thread, Loggable): +class ThreadedRequestSync(InstanceInheritingThread, Loggable): def __init__(self, rs): - threading.Thread.__init__(self) + super(ThreadedRequestSync, self).__init__() self.rs = rs self.daemon = True self.start() @@ -22,7 +20,7 @@ class RequestSync(Loggable): for some number of times """ @classmethod def create_with_api_client(cls, watcher, requests): - apiclient = ac.AirtimeApiClient.create_right_config() + apiclient = apc() self = cls(watcher, requests, apiclient) return self diff --git a/python_apps/media-monitor2/media/monitor/syncdb.py b/python_apps/media-monitor2/media/monitor/syncdb.py index cc8abc294..786fb1a39 100644 --- a/python_apps/media-monitor2/media/monitor/syncdb.py +++ b/python_apps/media-monitor2/media/monitor/syncdb.py @@ -2,7 +2,8 @@ import os from media.monitor.log import Loggable from media.monitor.exceptions import NoDirectoryInAirtime -from os.path import normpath +from media.saas.thread import user +from os.path import normpath, join import media.monitor.pure as mmp class AirtimeDB(Loggable): @@ -11,17 +12,20 @@ class AirtimeDB(Loggable): if reload_now: self.reload_directories() def reload_directories(self): - """ - this is the 'real' constructor, should be called if you ever want the - class reinitialized. there's not much point to doing it yourself - however, you should just create a new AirtimeDB instance. - """ + """ this is the 'real' constructor, should be called if you ever + want the class reinitialized. there's not much point to doing + it yourself however, you should just create a new AirtimeDB + instance. """ + + saas = user().root_path + # dirs_setup is a dict with keys: # u'watched_dirs' and u'stor' which point to lists of corresponding # dirs dirs_setup = self.apc.setup_media_monitor() - dirs_setup[u'stor'] = normpath( dirs_setup[u'stor'] ) - dirs_setup[u'watched_dirs'] = map(normpath, dirs_setup[u'watched_dirs']) + dirs_setup[u'stor'] = normpath( join(saas, dirs_setup[u'stor'] ) ) + dirs_setup[u'watched_dirs'] = map(lambda p: normpath(join(saas,p)), + dirs_setup[u'watched_dirs']) dirs_with_id = dict([ (k,normpath(v)) for k,v in self.apc.list_all_watched_dirs()['dirs'].iteritems() ]) @@ -42,15 +46,11 @@ class AirtimeDB(Loggable): dirs_setup[u'watched_dirs'] ]) def to_id(self, directory): - """ - directory path -> id - """ + """ directory path -> id """ return self.dir_to_id[ directory ] def to_directory(self, dir_id): - """ - id -> directory path - """ + """ id -> directory path """ return self.id_to_dir[ dir_id ] def storage_path(self) : return self.base_storage @@ -60,37 +60,31 @@ class AirtimeDB(Loggable): def recorded_path(self) : return self.storage_paths['recorded'] def list_watched(self): - """ - returns all watched directories as a list - """ + """ returns all watched directories as a list """ return list(self.watched_directories) def list_storable_paths(self): - """ - returns a list of all the watched directories in the datatabase. - (Includes the imported directory and the recorded directory) - """ + """ returns a list of all the watched directories in the + datatabase. (Includes the imported directory and the recorded + directory) """ l = self.list_watched() l.append(self.import_path()) l.append(self.recorded_path()) return l def dir_id_get_files(self, dir_id, all_files=True): - """ - Get all files in a directory with id dir_id - """ + """ Get all files in a directory with id dir_id """ base_dir = self.id_to_dir[ dir_id ] - return set(( os.path.join(base_dir,p) for p in + return set(( join(base_dir,p) for p in self.apc.list_all_db_files( dir_id, all_files ) )) def directory_get_files(self, directory, all_files=True): - """ - returns all the files(recursively) in a directory. a directory is an - "actual" directory path instead of its id. This is super hacky because - you create one request for the recorded directory and one for the - imported directory even though they're the same dir in the database so - you get files for both dirs in 1 request... - """ + """ returns all the files(recursively) in a directory. a + directory is an "actual" directory path instead of its id. This + is super hacky because you create one request for the recorded + directory and one for the imported directory even though they're + the same dir in the database so you get files for both dirs in 1 + request... """ normal_dir = os.path.normpath(unicode(directory)) if normal_dir not in self.dir_to_id: raise NoDirectoryInAirtime( normal_dir, self.dir_to_id ) diff --git a/python_apps/media-monitor2/media/monitor/watchersyncer.py b/python_apps/media-monitor2/media/monitor/watchersyncer.py index d2df9ed3a..558759c89 100644 --- a/python_apps/media-monitor2/media/monitor/watchersyncer.py +++ b/python_apps/media-monitor2/media/monitor/watchersyncer.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -import threading import time import copy @@ -9,15 +8,16 @@ from media.monitor.exceptions import BadSongFile from media.monitor.eventcontractor import EventContractor from media.monitor.events import EventProxy from media.monitor.request import ThreadedRequestSync, RequestSync +from media.saas.thread import InstanceInheritingThread, getsig -class TimeoutWatcher(threading.Thread,Loggable): +class TimeoutWatcher(InstanceInheritingThread,Loggable): """ The job of this thread is to keep an eye on WatchSyncer and force a request whenever the requests go over time out """ def __init__(self, watcher, timeout=5): self.logger.info("Created timeout thread...") - threading.Thread.__init__(self) + super(TimeoutWatcher, self).__init__() self.watcher = watcher self.timeout = timeout @@ -52,7 +52,7 @@ class WatchSyncer(ReportHandler,Loggable): tc = TimeoutWatcher(self, self.timeout) tc.daemon = True tc.start() - super(WatchSyncer, self).__init__(signal=signal) + super(WatchSyncer, self).__init__(signal=getsig(signal)) def handle(self, sender, event): """ diff --git a/python_apps/media-monitor2/media/update/__init__.py b/python_apps/media-monitor2/media/saas/__init__.py similarity index 100% rename from python_apps/media-monitor2/media/update/__init__.py rename to python_apps/media-monitor2/media/saas/__init__.py diff --git a/python_apps/media-monitor2/media/saas/airtimeinstance.py b/python_apps/media-monitor2/media/saas/airtimeinstance.py new file mode 100644 index 000000000..5d02cd8e4 --- /dev/null +++ b/python_apps/media-monitor2/media/saas/airtimeinstance.py @@ -0,0 +1,68 @@ +import os +from os.path import join + +from media.monitor.exceptions import NoConfigFile +from media.monitor.pure import LazyProperty +from media.monitor.config import MMConfig +from media.monitor.owners import Owner +from media.monitor.events import EventRegistry +from media.monitor.listeners import FileMediator +from api_clients.api_client import AirtimeApiClient + +# poor man's phantom types... +class SignalString(str): pass + +class AirtimeInstance(object): + """ AirtimeInstance is a class that abstracts away every airtime + instance by providing all the necessary objects required to interact + with the instance. ApiClient, configs, root_directory """ + + @classmethod + def root_make(cls, name, root): + cfg = { + 'api_client' : join(root, 'etc/airtime/api_client.cfg'), + 'media_monitor' : join(root, 'etc/airtime/media-monitor.cfg'), + } + return cls(name, root, cfg) + + def __init__(self,name, root_path, config_paths): + """ name is an internal name only """ + for cfg in ['api_client','media_monitor']: + if cfg not in config_paths: raise NoConfigFile(config_paths) + elif not os.path.exists(config_paths[cfg]): + raise NoConfigFile(config_paths[cfg]) + self.name = name + self.config_paths = config_paths + self.root_path = root_path + + def signal(self, sig): + if isinstance(sig, SignalString): return sig + else: return SignalString("%s_%s" % (self.name, sig)) + + def __str__(self): + return "%s,%s(%s)" % (self.name, self.root_path, self.config_paths) + + @LazyProperty + def api_client(self): + return AirtimeApiClient(config_path=self.config_paths['api_client']) + + @LazyProperty + def mm_config(self): + return MMConfig(self.config_paths['media_monitor']) + + # NOTE to future code monkeys: + # I'm well aware that I'm using the shitty service locator pattern + # instead of normal constructor injection as I should be. The reason + # for this is that I found these issues a little too close to the + # end of my tenure. It's highly recommended to rewrite this crap + # using proper constructor injection if you ever have the time + + @LazyProperty + def owner(self): return Owner() + + @LazyProperty + def event_registry(self): return EventRegistry() + + @LazyProperty + def file_mediator(self): return FileMediator() + diff --git a/python_apps/media-monitor2/media/saas/launcher.py b/python_apps/media-monitor2/media/saas/launcher.py new file mode 100644 index 000000000..e80f46034 --- /dev/null +++ b/python_apps/media-monitor2/media/saas/launcher.py @@ -0,0 +1,125 @@ +import os, sys +import logging +import logging.config + +import media.monitor.pure as mmp + +from media.monitor.exceptions import FailedToObtainLocale, FailedToSetLocale +from media.monitor.log import get_logger, setup_logging +from std_err_override import LogWriter +from media.saas.thread import InstanceThread, user, apc, getsig +from media.monitor.log import Loggable +from media.monitor.exceptions import CouldNotCreateIndexFile +from media.monitor.toucher import ToucherThread +from media.monitor.airtime import AirtimeNotifier, AirtimeMessageReceiver +from media.monitor.watchersyncer import WatchSyncer +from media.monitor.eventdrainer import EventDrainer +from media.monitor.manager import Manager +from media.monitor.syncdb import AirtimeDB +from media.saas.airtimeinstance import AirtimeInstance + +class MM2(InstanceThread, Loggable): + + def index_create(self, index_create_attempt=False): + config = user().mm_config + if not index_create_attempt: + if not os.path.exists(config['index_path']): + self.logger.info("Attempting to create index file:...") + try: + with open(config['index_path'], 'w') as f: f.write(" ") + except Exception as e: + self.logger.info("Failed to create index file with exception: %s" \ + % str(e)) + else: + self.logger.info("Created index file, reloading configuration:") + self.index_create(index_create_attempt=True) + else: + self.logger.info("Already tried to create index. Will not try again ") + + if not os.path.exists(config['index_path']): + raise CouldNotCreateIndexFile(config['index_path']) + + def run(self): + self.index_create() + manager = Manager() + apiclient = apc() + config = user().mm_config + watch_syncer = WatchSyncer(signal=getsig('watch'), + chunking_number=config['chunking_number'], + timeout=config['request_max_wait']) + airtime_receiver = AirtimeMessageReceiver(config,manager) + airtime_notifier = AirtimeNotifier(config, airtime_receiver) + + + adb = AirtimeDB(apiclient) + store = { + u'stor' : adb.storage_path(), + u'watched_dirs' : adb.list_watched(), + } + + self.logger.info("initializing mm with directories: %s" % str(store)) + + self.logger.info( + "Initing with the following airtime response:%s" % str(store)) + + airtime_receiver.change_storage({ 'directory':store[u'stor'] }) + + for watch_dir in store[u'watched_dirs']: + if not os.path.exists(watch_dir): + # Create the watch_directory here + try: os.makedirs(watch_dir) + except Exception: + self.logger.error("Could not create watch directory: '%s' \ + (given from the database)." % watch_dir) + if os.path.exists(watch_dir): + airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True) + else: self.logger.info("Failed to add watch on %s" % str(watch_dir)) + + ed = EventDrainer(airtime_notifier.connection, + interval=float(config['rmq_event_wait'])) + + # Launch the toucher that updates the last time when the script was + # ran every n seconds. + # TODO : verify that this does not interfere with bootstrapping because the + # toucher thread might update the last_ran variable too fast + tt = ToucherThread(path=config['index_path'], + interval=int(config['touch_interval'])) + + apiclient.register_component('media-monitor') + + manager.loop() + +def launch_instance(name, root, global_cfg, apc_cfg): + cfg = { + 'api_client' : apc_cfg, + 'media_monitor' : global_cfg, + } + ai = AirtimeInstance(name, root, cfg) + MM2(ai).start() + +def setup_global(log): + """ setup unicode and other stuff """ + log.info("Attempting to set the locale...") + try: mmp.configure_locale(mmp.get_system_locale()) + except FailedToSetLocale as e: + log.info("Failed to set the locale...") + sys.exit(1) + except FailedToObtainLocale as e: + log.info("Failed to obtain the locale form the default path: \ + '/etc/default/locale'") + sys.exit(1) + except Exception as e: + log.info("Failed to set the locale for unknown reason. \ + Logging exception.") + log.info(str(e)) + +def setup_logger(log_config, logpath): + logging.config.fileConfig(log_config) + #need to wait for Python 2.7 for this.. + #logging.captureWarnings(True) + logger = logging.getLogger() + LogWriter.override_std_err(logger) + logfile = unicode(logpath) + setup_logging(logfile) + log = get_logger() + return log diff --git a/python_apps/media-monitor2/media/saas/thread.py b/python_apps/media-monitor2/media/saas/thread.py new file mode 100644 index 000000000..49a3acd6f --- /dev/null +++ b/python_apps/media-monitor2/media/saas/thread.py @@ -0,0 +1,27 @@ +import threading + +class UserlessThread(Exception): + def __str__(): + return "Current thread: %s is not an instance of InstanceThread \ + of InstanceInheritingThread" % str(threading.current_thread()) + +class HasUser(object): + def user(self): return self._user + +class InstanceThread(threading.Thread, HasUser): + def __init__(self,user, *args, **kwargs): + super(InstanceThread, self).__init__(*args, **kwargs) + self._user = user + +class InstanceInheritingThread(threading.Thread, HasUser): + def __init__(self, *args, **kwargs): + self._user = threading.current_thread().user() + super(InstanceInheritingThread, self).__init__(*args, **kwargs) + +def user(): + try: return threading.current_thread().user() + except AttributeError: raise UserlessThread() + +def apc(): return user().api_client + +def getsig(s): return user().signal(s) diff --git a/python_apps/media-monitor2/mm2.py b/python_apps/media-monitor2/mm2.py index 2abace71c..7c447c060 100644 --- a/python_apps/media-monitor2/mm2.py +++ b/python_apps/media-monitor2/mm2.py @@ -1,141 +1,15 @@ # -*- coding: utf-8 -*- import sys import os -import logging -import logging.config +from media.saas.launcher import setup_global, launch_instance, setup_logger +from media.monitor.config import MMConfig -from media.monitor.manager import Manager -from media.monitor.bootstrap import Bootstrapper -from media.monitor.log import get_logger, setup_logging -from media.monitor.config import MMConfig -from media.monitor.toucher import ToucherThread -from media.monitor.syncdb import AirtimeDB -from media.monitor.exceptions import FailedToObtainLocale, \ - FailedToSetLocale, \ - NoConfigFile -from media.monitor.airtime import AirtimeNotifier, \ - AirtimeMessageReceiver -from media.monitor.watchersyncer import WatchSyncer -from media.monitor.eventdrainer import EventDrainer -from media.update.replaygainupdater import ReplayGainUpdater -from std_err_override import LogWriter - -import media.monitor.pure as mmp -from api_clients import api_client as apc - - - -def main(global_config, api_client_config, log_config, - index_create_attempt=False): - for cfg in [global_config, api_client_config]: - if not os.path.exists(cfg): raise NoConfigFile(cfg) - # MMConfig is a proxy around ConfigObj instances. it does not allow - # itself users of MMConfig instances to modify any config options - # directly through the dictionary. Users of this object muse use the - # correct methods designated for modification - try: config = MMConfig(global_config) - except NoConfigFile as e: - print("Cannot run mediamonitor2 without configuration file.") - print("Current config path: '%s'" % global_config) - sys.exit(1) - except Exception as e: - print("Unknown error reading configuration file: '%s'" % global_config) - print(str(e)) - - - logging.config.fileConfig(log_config) - - #need to wait for Python 2.7 for this.. - #logging.captureWarnings(True) - - logger = logging.getLogger() - LogWriter.override_std_err(logger) - logfile = unicode( config['logpath'] ) - setup_logging(logfile) - log = get_logger() - - if not index_create_attempt: - if not os.path.exists(config['index_path']): - log.info("Attempting to create index file:...") - try: - with open(config['index_path'], 'w') as f: f.write(" ") - except Exception as e: - log.info("Failed to create index file with exception: %s" \ - % str(e)) - else: - log.info("Created index file, reloading configuration:") - main( global_config, api_client_config, log_config, - index_create_attempt=True ) - else: - log.info("Already tried to create index. Will not try again ") - - if not os.path.exists(config['index_path']): - log.info("Index file does not exist. Terminating") - - log.info("Attempting to set the locale...") - - try: - mmp.configure_locale(mmp.get_system_locale()) - except FailedToSetLocale as e: - log.info("Failed to set the locale...") - sys.exit(1) - except FailedToObtainLocale as e: - log.info("Failed to obtain the locale form the default path: \ - '/etc/default/locale'") - sys.exit(1) - except Exception as e: - log.info("Failed to set the locale for unknown reason. \ - Logging exception.") - log.info(str(e)) - - watch_syncer = WatchSyncer(signal='watch', - chunking_number=config['chunking_number'], - timeout=config['request_max_wait']) - - apiclient = apc.AirtimeApiClient.create_right_config(log=log, - config_path=api_client_config) - - ReplayGainUpdater.start_reply_gain(apiclient) - - sdb = AirtimeDB(apiclient) - - manager = Manager() - - airtime_receiver = AirtimeMessageReceiver(config,manager) - airtime_notifier = AirtimeNotifier(config, airtime_receiver) - - store = apiclient.setup_media_monitor() - - log.info("Initing with the following airtime response:%s" % str(store)) - - airtime_receiver.change_storage({ 'directory':store[u'stor'] }) - - for watch_dir in store[u'watched_dirs']: - if not os.path.exists(watch_dir): - # Create the watch_directory here - try: os.makedirs(watch_dir) - except Exception as e: - log.error("Could not create watch directory: '%s' \ - (given from the database)." % watch_dir) - if os.path.exists(watch_dir): - airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True) - else: log.info("Failed to add watch on %s" % str(watch_dir)) - - bs = Bootstrapper( db=sdb, watch_signal='watch' ) - - ed = EventDrainer(airtime_notifier.connection, - interval=float(config['rmq_event_wait'])) - - # Launch the toucher that updates the last time when the script was - # ran every n seconds. - # TODO : verify that this does not interfere with bootstrapping because the - # toucher thread might update the last_ran variable too fast - tt = ToucherThread(path=config['index_path'], - interval=int(config['touch_interval'])) - - apiclient.register_component('media-monitor') - - return manager.loop() +def main(global_config, api_client_config, log_config): + """ function to run hosted install """ + mm_config = MMConfig(global_config) + log = setup_logger( log_config, mm_config['logpath'] ) + setup_global(log) + launch_instance('hosted_install', '/', global_config, api_client_config) __doc__ = """ Usage: @@ -148,9 +22,6 @@ Options: --log= log config at """ -def main_loop(): - while True: pass - if __name__ == '__main__': from docopt import docopt args = docopt(__doc__,version="mm1.99") @@ -160,5 +31,4 @@ if __name__ == '__main__': sys.exit(0) print("Running mm1.99") main(args['--config'],args['--apiclient'],args['--log']) - #gevent.joinall([ gevent.spawn(main_loop) ]) diff --git a/python_apps/media-monitor2/tests/test_instance.py b/python_apps/media-monitor2/tests/test_instance.py new file mode 100644 index 000000000..badaadf07 --- /dev/null +++ b/python_apps/media-monitor2/tests/test_instance.py @@ -0,0 +1,21 @@ +import unittest +from copy import deepcopy +from media.saas.airtimeinstance import AirtimeInstance, NoConfigFile + +class TestAirtimeInstance(unittest.TestCase): + def setUp(self): + self.cfg = { + 'api_client' : 'tests/test_instance.py', + 'media_monitor' : 'tests/test_instance.py', + 'logging' : 'tests/test_instance.py', + } + + def test_init_good(self): + AirtimeInstance("/root", self.cfg) + self.assertTrue(True) + + def test_init_bad(self): + cfg = deepcopy(self.cfg) + cfg['api_client'] = 'bs' + with self.assertRaises(NoConfigFile): + AirtimeInstance("/root", cfg) diff --git a/python_apps/media-monitor2/tests/test_thread.py b/python_apps/media-monitor2/tests/test_thread.py new file mode 100644 index 000000000..1638a60e3 --- /dev/null +++ b/python_apps/media-monitor2/tests/test_thread.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +import unittest +import time +from media.saas.thread import InstanceThread, InstanceInheritingThread + +# ugly but necessary for 2.7 +signal = False +signal2 = False + +class TestInstanceThread(unittest.TestCase): + def test_user_inject(self): + global signal + signal = False + u = "rudi" + class T(InstanceThread): + def run(me): + global signal + super(T, me).run() + signal = True + self.assertEquals(u, me.user()) + t = T(u, name="test_user_inject") + t.daemon = True + t.start() + time.sleep(0.2) + self.assertTrue(signal) + + def test_inheriting_thread(utest): + global signal2 + u = "testing..." + + class TT(InstanceInheritingThread): + def run(self): + global signal2 + utest.assertEquals(self.user(), u) + signal2 = True + + class T(InstanceThread): + def run(self): + super(T, self).run() + child_thread = TT(name="child thread") + child_thread.daemon = True + child_thread.start() + + parent_thread = T(u, name="Parent instance thread") + parent_thread.daemon = True + parent_thread.start() + + time.sleep(0.2) + utest.assertTrue(signal2) + + def test_different_user(utest): + u1, u2 = "ru", "di" + class T(InstanceThread): + def run(self): + super(T, self).run() + + for u in [u1, u2]: + t = T(u) + t.daemon = True + t.start() + utest.assertEquals(t.user(), u) + + +if __name__ == '__main__': unittest.main() diff --git a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq index 5c5919e2e..1a18fd00a 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq @@ -254,29 +254,15 @@ def output_to(output_type, type, bitrate, host, port, pass, mount_point, url, de if user == "" then user_ref := "source" end - - description_ref = ref description - if description == "" then - description_ref := "N/A" - end - - genre_ref = ref genre - if genre == "" then - genre_ref := "N/A" - end - - url_ref = ref url - if url == "" then - url_ref := "N/A" - end + output.shoutcast_mono = output.shoutcast(id = "shoutcast_stream_#{stream}", host = host, port = port, password = pass, fallible = true, - url = !url_ref, - genre = !genre_ref, - name = !description_ref, + url = url, + genre = genre, + name = description, user = !user_ref, on_error = on_error, on_connect = on_connect) @@ -286,9 +272,9 @@ def output_to(output_type, type, bitrate, host, port, pass, mount_point, url, de port = port, password = pass, fallible = true, - url = !url_ref, - genre = !genre_ref, - name = !description_ref, + url = url, + genre = genre, + name = description, user = !user_ref, on_error = on_error, on_connect = on_connect) @@ -390,13 +376,6 @@ def add_skip_command(s) "skip",fun(s) -> begin log("source.skip") skip(s) end) end -dyn_out = output.icecast(%wav, - host="localhost", - port=8999, - password=stream_harbor_pass, - mount="test-harbor", - fallible=true) - def set_dynamic_source_id(id) = current_dyn_id := id string_of(!current_dyn_id) @@ -406,123 +385,159 @@ def get_dynamic_source_id() = string_of(!current_dyn_id) end +#cc-4633 -# Function to create a playlist source and output it. -def create_dynamic_source(uri) = - # The playlist source - s = audio_to_stereo(input.http(buffer=2., max=12., uri)) - # The output - active_dyn_out = dyn_out(s) +# NOTE +# A few values are hardcoded and may be dependent: +# - the delay in gracetime is linked with the buffer duration of input.http +# (delay should be a bit less than buffer) +# - crossing duration should be less than buffer length +# (at best, a higher duration will be ineffective) - # We register both source and output - # in the list of sources - dyn_sources := - list.append([(!current_dyn_id, s),(!current_dyn_id, active_dyn_out)], !dyn_sources) +# HTTP input with "restart" command that waits for "stop" to be effected +# before "start" command is issued. Optionally it takes a new URL to play, +# which makes it a convenient replacement for "url". +# In the future, this may become a core feature of the HTTP input. +# TODO If we stop and restart quickly several times in a row, +# the data bursts accumulate and create buffer overflow. +# Flushing the buffer on restart could be a good idea, but +# it would also create an interruptions while the buffer is +# refilling... on the other hand, this would avoid having to +# fade using both cross() and switch(). +def input.http_restart(~id,~initial_url="http://dummy/url") + + source = input.http(buffer=5.,max=15.,id=id,autostart=false,initial_url) + + def stopped() + "stopped" == list.hd(server.execute("#{id}.status")) + end + + server.register(namespace=id, + "restart", + usage="restart [url]", + fun (url) -> begin + if url != "" then + log(string_of(server.execute("#{id}.url #{url}"))) + end + log(string_of(server.execute("#{id}.stop"))) + add_timeout(0.5, + { if stopped() then + log(string_of(server.execute("#{id}.start"))) ; + (-1.) + else 0.5 end}) + "OK" + end) + + # Dummy output should be useless if HTTP stream is meant + # to be listened to immediately. Otherwise, apply it. + # + # output.dummy(fallible=true,source) + + source - notify([("schedule_table_id", !current_dyn_id)]) - "Done!" end +# Transitions between URL changes in HTTP streams. +def cross_http(~debug=true,~http_input_id,source) -# A function to destroy a dynamic source -def destroy_dynamic_source(id) = - # We need to find the source in the list, - # remove it and destroy it. Currently, the language - # lacks some nice operators for that so we do it - # the functional way + id = http_input_id + last_url = ref "" + change = ref false - # This function is executed on every item in the list - # of dynamic sources - def parse_list(ret, current_element) = - # ret is of the form: (matching_sources, remaining_sources) - # We extract those two: - matching_sources = fst(ret) - remaining_sources = snd(ret) - - # current_element is of the form: ("uri", source) so - # we check the first element - current_id = fst(current_element) - if current_id == id then - # In this case, we add the source to the list of - # matched sources - (list.append( [snd(current_element)], - matching_sources), - remaining_sources) - else - # In this case, we put the element in the list of remaining - # sources - (matching_sources, - list.append([current_element], - remaining_sources)) + def on_m(m) + notify_stream(m) + changed = m["source_url"] != !last_url + log("URL now #{m['source_url']} (change: #{changed})") + if changed then + if !last_url != "" then change := true end + last_url := m["source_url"] end end - - # Now we execute the function: - result = list.fold(parse_list, ([], []), !dyn_sources) - matching_sources = fst(result) - remaining_sources = snd(result) - # We store the remaining sources in dyn_sources - dyn_sources := remaining_sources + # We use both metadata and status to know about the current URL. + # Using only metadata may be more precise is crazy corner cases, + # but it's also asking too much: the metadata may not pass through + # before the crosser is instantiated. + # Using only status in crosser misses some info, eg. on first URL. + source = on_metadata(on_m,source) - # If no source matched, we return an error - if list.length(matching_sources) == 0 then - "Error: no matching sources!" - else - # We stop all sources - list.iter(source.shutdown, matching_sources) - # And return - "Done!" + cross_d = 3. + + def crosser(a,b) + url = list.hd(server.execute('#{id}.url')) + status = list.hd(server.execute('#{id}.status')) + on_m([("source_url",url)]) + if debug then + log("New track inside HTTP stream") + log(" status: #{status}") + log(" need to cross: #{!change}") + log(" remaining #{source.remaining(a)} sec before, \ + #{source.remaining(b)} sec after") + end + if !change then + change := false + # In principle one should avoid crossing on a live stream + # it'd be okay to do it here (eg. use add instead of sequence) + # because it's only once per URL, but be cautious. + sequence([fade.out(duration=cross_d,a),fade.in(b)]) + else + # This is done on tracks inside a single stream. + # Do NOT cross here or you'll gradually empty the buffer! + sequence([a,b]) + end end + + # Setting conservative=true would mess with the delayed switch below + cross(duration=cross_d,conservative=false,crosser,source) + end +# Custom fallback between http and default source with fading of +# beginning and end of HTTP stream. +# It does not take potential URL changes into account, as long as +# they do not interrupt streaming (thanks to the HTTP buffer). +def http_fallback(~http_input_id,~http,~default) + id = http_input_id + # We use a custom switching predicate to trigger switching (and thus, + # transitions) before the end of a track (rather, end of HTTP stream). + # It is complexified because we don't want to trigger switching when + # HTTP disconnects for just an instant, when changing URL: for that + # we use gracetime below. -# A function to destroy a dynamic source -def destroy_dynamic_source_all() = - # We need to find the source in the list, - # remove it and destroy it. Currently, the language - # lacks some nice operators for that so we do it - # the functional way - - # This function is executed on every item in the list - # of dynamic sources - def parse_list(ret, current_element) = - # ret is of the form: (matching_sources, remaining_sources) - # We extract those two: - matching_sources = fst(ret) - remaining_sources = snd(ret) - - # current_element is of the form: ("uri", source) so - # we check the first element - current_uri = fst(current_element) - # in this case, we add the source to the list of - # matched sources - (list.append( [snd(current_element)], - matching_sources), - remaining_sources) + def gracetime(~delay=3.,f) + last_true = ref 0. + { if f() then + last_true := gettimeofday() + true + else + gettimeofday() < !last_true+delay + end } end - - # now we execute the function: - result = list.fold(parse_list, ([], []), !dyn_sources) - matching_sources = fst(result) - remaining_sources = snd(result) - # we store the remaining sources in dyn_sources - dyn_sources := remaining_sources - - # if no source matched, we return an error - if list.length(matching_sources) == 0 then - "error: no matching sources!" - else - # we stop all sources - list.iter(source.shutdown, matching_sources) - # And return - "Done!" + def connected() + status = list.hd(server.execute("#{id}.status")) + not(list.mem(status,["polling","stopped"])) end + connected = gracetime(connected) + + def to_live(a,b) = + log("TRANSITION to live") + add(normalize=false, + [fade.initial(b),fade.final(a)]) + end + def to_static(a,b) = + log("TRANSITION to static") + sequence([fade.out(a),fade.initial(b)]) + end + + switch( + track_sensitive=false, + transitions=[to_live,to_static], + [(# make sure it is connected, and not buffering + {connected() and source.is_ready(http) and !webstream_enabled}, http), + ({true},default)]) + end - - - diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq index 34e56786e..962b20b2c 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq @@ -7,11 +7,11 @@ set("server.telnet", true) set("server.telnet.port", 1234) #Dynamic source list -dyn_sources = ref [] +#dyn_sources = ref [] webstream_enabled = ref false time = ref string_of(gettimeofday()) -queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5)) +queue = audio_to_stereo(id="queue_src", mksafe(request.equeue(id="queue", length=0.5))) queue = cue_cut(queue) queue = amplify(1., override="replay_gain", queue) @@ -35,15 +35,19 @@ s2_namespace = ref '' s3_namespace = ref '' just_switched = ref false -stream_harbor_pass = list.hd(get_process_lines('pwgen -s -N 1 -n 20')) +#stream_harbor_pass = list.hd(get_process_lines('pwgen -s -N 1 -n 20')) %include "ls_lib.liq" -web_stream = input.harbor("test-harbor", port=8999, password=stream_harbor_pass) -web_stream = on_metadata(notify_stream, web_stream) -output.dummy(fallible=true, web_stream) +#web_stream = input.harbor("test-harbor", port=8999, password=stream_harbor_pass) +#web_stream = on_metadata(notify_stream, web_stream) +#output.dummy(fallible=true, web_stream) +http = input.http_restart(id="http") +http = cross_http(http_input_id="http",http) +stream_queue = http_fallback(http_input_id="http",http=http,default=queue) + # the crossfade function controls fade in/out queue = crossfade_airtime(queue) queue = on_metadata(notify, queue) @@ -51,10 +55,10 @@ queue = map_metadata(update=false, append_title, queue) output.dummy(fallible=true, queue) -stream_queue = switch(id="stream_queue_switch", track_sensitive=false, - transitions=[transition, transition], - [({!webstream_enabled},web_stream), - ({true}, queue)]) +#stream_queue = switch(id="stream_queue_switch", track_sensitive=false, +# transitions=[transition, transition], +# [({!webstream_enabled},web_stream), +# ({true}, queue)]) ignore(output.dummy(stream_queue, fallible=true)) @@ -92,32 +96,33 @@ server.register(namespace="dynamic_source", fun (s) -> begin log("dynamic_source.output_stop") webstream_enabled := false "disabled" end) server.register(namespace="dynamic_source", - description="Set the cc_schedule row id", + description="Set the streams cc_schedule row id", usage="id ", "id", fun (s) -> begin log("dynamic_source.id") set_dynamic_source_id(s) end) server.register(namespace="dynamic_source", - description="Get the cc_schedule row id", + description="Get the streams cc_schedule row id", usage="get_id", "get_id", fun (s) -> begin log("dynamic_source.get_id") get_dynamic_source_id() end) -server.register(namespace="dynamic_source", - description="Start a new dynamic source.", - usage="start ", - "read_start", - fun (uri) -> begin log("dynamic_source.read_start") create_dynamic_source(uri) end) -server.register(namespace="dynamic_source", - description="Stop a dynamic source.", - usage="stop ", - "read_stop", - fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source(s) end) -server.register(namespace="dynamic_source", - description="Stop a dynamic source.", - usage="stop ", - "read_stop_all", - fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source_all() end) +#server.register(namespace="dynamic_source", +# description="Start a new dynamic source.", +# usage="start ", +# "read_start", +# fun (uri) -> begin log("dynamic_source.read_start") begin_stream_read(uri) end) +#server.register(namespace="dynamic_source", +# description="Stop a dynamic source.", +# usage="stop ", +# "read_stop", +# fun (s) -> begin log("dynamic_source.read_stop") stop_stream_read(s) end) + +#server.register(namespace="dynamic_source", +# description="Stop a dynamic source.", +# usage="stop ", +# "read_stop_all", +# fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source_all() end) default = amplify(id="silence_src", 0.00001, noise()) default = rewrite_metadata([("artist","Airtime"), ("title", "offline")], default) diff --git a/python_apps/pypo/media/__init__.py b/python_apps/pypo/media/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python_apps/pypo/media/update/__init__.py b/python_apps/pypo/media/update/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python_apps/media-monitor2/media/update/replaygain.py b/python_apps/pypo/media/update/replaygain.py similarity index 100% rename from python_apps/media-monitor2/media/update/replaygain.py rename to python_apps/pypo/media/update/replaygain.py diff --git a/python_apps/media-monitor2/media/update/replaygainupdater.py b/python_apps/pypo/media/update/replaygainupdater.py similarity index 97% rename from python_apps/media-monitor2/media/update/replaygainupdater.py rename to python_apps/pypo/media/update/replaygainupdater.py index df48f09de..2f52c0a23 100644 --- a/python_apps/media-monitor2/media/update/replaygainupdater.py +++ b/python_apps/pypo/media/update/replaygainupdater.py @@ -3,12 +3,11 @@ from threading import Thread import traceback import os import time +import logging from media.update import replaygain -from media.monitor.log import Loggable - -class ReplayGainUpdater(Thread, Loggable): +class ReplayGainUpdater(Thread): """ The purpose of the class is to query the server for a list of files which do not have a ReplayGain value calculated. This class will iterate over the @@ -30,6 +29,7 @@ class ReplayGainUpdater(Thread, Loggable): def __init__(self,apc): Thread.__init__(self) self.api_client = apc + self.logger = logging.getLogger() def main(self): raw_response = self.api_client.list_all_watched_dirs() diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index 1b51a13f8..ea8950d41 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -24,6 +24,8 @@ from recorder import Recorder from listenerstat import ListenerStat from pypomessagehandler import PypoMessageHandler +from media.update.replaygainupdater import ReplayGainUpdater + from configobj import ConfigObj # custom imports @@ -174,6 +176,9 @@ if __name__ == '__main__': sys.exit() api_client = api_client.AirtimeApiClient() + + ReplayGainUpdater.start_reply_gain(api_client) + api_client.register_component("pypo") pypoFetch_q = Queue() diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 0382b5e06..c0bb36ff9 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -478,8 +478,8 @@ class PypoPush(Thread): self.logger.debug(msg) tn.write(msg) - #example: dynamic_source.read_start http://87.230.101.24:80/top100station.mp3 - msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1') + #msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1') + msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1') self.logger.debug(msg) tn.write(msg) @@ -520,7 +520,8 @@ class PypoPush(Thread): self.telnet_lock.acquire() tn = telnetlib.Telnet(LS_HOST, LS_PORT) - msg = 'dynamic_source.read_stop_all xxx\n' + #msg = 'dynamic_source.read_stop_all xxx\n' + msg = 'http.stop\n' self.logger.debug(msg) tn.write(msg) @@ -546,7 +547,8 @@ class PypoPush(Thread): tn = telnetlib.Telnet(LS_HOST, LS_PORT) #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 - msg = 'dynamic_source.read_stop %s\n' % media_item['row_id'] + #msg = 'dynamic_source.read_stop %s\n' % media_item['row_id'] + msg = 'http.stop\n' self.logger.debug(msg) tn.write(msg) diff --git a/utils/airtime-test-stream.py b/utils/airtime-test-stream.py index b7d887c0d..a2ab06982 100644 --- a/utils/airtime-test-stream.py +++ b/utils/airtime-test-stream.py @@ -27,14 +27,14 @@ def printUsage(): print " -m mount (default: test) " print " -h show help menu" - + def find_liquidsoap_binary(): """ Starting with Airtime 2.0, we don't know the exact location of the Liquidsoap binary because it may have been installed through a debian package. Let's find the location of this binary. """ - + rv = subprocess.call("which airtime-liquidsoap > /dev/null", shell=True) if rv == 0: return "airtime-liquidsoap" @@ -78,7 +78,7 @@ for o, a in optlist: mount = a try: - + print "Protocol: %s " % stream_type print "Host: %s" % host print "Port: %s" % port @@ -86,35 +86,35 @@ try: print "Password: %s" % password if stream_type == "icecast": print "Mount: %s\n" % mount - + url = "http://%s:%s/%s" % (host, port, mount) print "Outputting to %s streaming server. You should be able to hear a monotonous tone on '%s'. Press ctrl-c to quit." % (stream_type, url) - + liquidsoap_exe = find_liquidsoap_binary() - + if liquidsoap_exe is None: raise Exception("Liquidsoap not found!") - + if stream_type == "icecast": command = "%s 'output.icecast(%%vorbis, host = \"%s\", port = %s, user= \"%s\", password = \"%s\", mount=\"%s\", sine())'" % (liquidsoap_exe, host, port, user, password, mount) else: command = "%s /usr/lib/airtime/pypo/bin/liquidsoap_scripts/library/pervasives.liq 'output.shoutcast(%%mp3, host=\"%s\", port = %s, user= \"%s\", password = \"%s\", sine())'" \ % (liquidsoap_exe, host, port, user, password) - + if not verbose: command += " 2>/dev/null | grep \"failed\"" else: print command - + #print command rv = subprocess.call(command, shell=True) - + #if we reach this point, it means that our subprocess exited without the user - #doing a keyboard interrupt. This means there was a problem outputting to the + #doing a keyboard interrupt. This means there was a problem outputting to the #stream server. Print appropriate message. print "There was an error with your stream configuration. Please review your configuration " + \ "and run this program again. Use the -h option for help" - + except KeyboardInterrupt, ki: print "\nExiting" except Exception, e: