Merge branch 'devel' of dev.sourcefabric.org:airtime into devel
This commit is contained in:
commit
e0b19cc2d1
|
@ -25,6 +25,7 @@ class PlaylistController extends Zend_Controller_Action
|
|||
->addActionContext('smart-block-generate', 'json')
|
||||
->addActionContext('smart-block-shuffle', 'json')
|
||||
->addActionContext('get-block-info', 'json')
|
||||
->addActionContext('shuffle', 'json')
|
||||
->initContext();
|
||||
|
||||
}
|
||||
|
@ -72,7 +73,6 @@ class PlaylistController extends Zend_Controller_Action
|
|||
$isBlock = true;
|
||||
$viewPath = 'playlist/smart-block.phtml';
|
||||
}
|
||||
|
||||
if (isset($obj)) {
|
||||
$formatter = new LengthFormatter($obj->getLength());
|
||||
$this->view->length = $formatter->format();
|
||||
|
@ -94,7 +94,11 @@ class PlaylistController extends Zend_Controller_Action
|
|||
} else {
|
||||
$this->view->obj = $obj;
|
||||
$this->view->id = $obj->getId();
|
||||
$this->view->html = $this->view->render($viewPath);
|
||||
if ($isJson) {
|
||||
return $this->view->html = $this->view->render($viewPath);
|
||||
} else {
|
||||
$this->view->html = $this->view->render($viewPath);
|
||||
}
|
||||
unset($this->view->obj);
|
||||
}
|
||||
} else {
|
||||
|
@ -537,6 +541,26 @@ class PlaylistController extends Zend_Controller_Action
|
|||
$this->playlistUnknownError($e);
|
||||
}
|
||||
}
|
||||
|
||||
public function shuffleAction()
|
||||
{
|
||||
$request = $this->getRequest();
|
||||
$params = $request->getPost();
|
||||
try {
|
||||
$pl = new Application_Model_Playlist($params['obj_id']);
|
||||
$result = $pl->shuffle();
|
||||
|
||||
if ($result['result'] == 0) {
|
||||
die(json_encode(array("result"=>0, "html"=>$this->createFullResponse($pl, true))));
|
||||
} else {
|
||||
die(json_encode($result));
|
||||
}
|
||||
} catch (PlaylistNotFoundException $e) {
|
||||
$this->playlistNotFound('block', true);
|
||||
} catch (Exception $e) {
|
||||
$this->playlistUnknownError($e);
|
||||
}
|
||||
}
|
||||
|
||||
public function getBlockInfoAction()
|
||||
{
|
||||
|
|
|
@ -387,8 +387,8 @@ SQL;
|
|||
}
|
||||
|
||||
if (isset($obj)) {
|
||||
if (($obj instanceof CcFiles && $obj->visible())
|
||||
|| $obj instanceof CcWebstream ||
|
||||
if (($obj instanceof CcFiles && $obj->visible())
|
||||
|| $obj instanceof CcWebstream ||
|
||||
$obj instanceof CcBlock) {
|
||||
|
||||
$entry = $this->plItem;
|
||||
|
@ -933,6 +933,29 @@ SQL;
|
|||
{
|
||||
CcPlaylistcontentsQuery::create()->findByDbPlaylistId($this->id)->delete();
|
||||
}
|
||||
|
||||
public function shuffle()
|
||||
{
|
||||
$sql = <<<SQL
|
||||
SELECT max(position) from cc_playlistcontents WHERE playlist_id=:p1
|
||||
SQL;
|
||||
$out = Application_Common_Database::prepareAndExecute($sql, array("p1"=>$this->id));
|
||||
$maxPosition = $out[0]['max'];
|
||||
|
||||
$map = range(0, $maxPosition);
|
||||
shuffle($map);
|
||||
|
||||
$currentPos = implode(',', array_keys($map));
|
||||
$sql = "UPDATE cc_playlistcontents SET position = CASE position ";
|
||||
foreach ($map as $current => $after) {
|
||||
$sql .= sprintf("WHEN %d THEN %d ", $current, $after);
|
||||
}
|
||||
$sql .= "END WHERE position IN ($currentPos) and playlist_id=:p1";
|
||||
|
||||
Application_Common_Database::prepareAndExecute($sql, array("p1"=>$this->id));
|
||||
$result['result'] = 0;
|
||||
return $result;
|
||||
}
|
||||
|
||||
} // class Playlist
|
||||
|
||||
|
|
|
@ -16,6 +16,9 @@ if (isset($this->obj)) {
|
|||
</ul>
|
||||
</div>
|
||||
<?php if (isset($this->obj)) : ?>
|
||||
<div class='btn-group pull-right'>
|
||||
<button class="btn btn-inverse" title='Shuffle playlist' type="button" id="playlist_shuffle_button">Shuffle</button>
|
||||
</div>
|
||||
<div class='btn-group pull-right'>
|
||||
<button class="btn btn-inverse" title='Save playlist' type="button" id="save_button">Save</button>
|
||||
</div>
|
||||
|
|
|
@ -637,6 +637,26 @@ var AIRTIME = (function(AIRTIME){
|
|||
$fs.addClass("closed");
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
$pl.on("click", 'button[id="playlist_shuffle_button"]', function(){
|
||||
obj_id = $('input[id="obj_id"]').val();
|
||||
url = "/Playlist/shuffle";
|
||||
enableLoadingIcon();
|
||||
$.post(url, {format: "json", obj_id: obj_id}, function(data){
|
||||
var json = $.parseJSON(data)
|
||||
|
||||
if (json.error !== undefined) {
|
||||
alert(json.error);
|
||||
}
|
||||
AIRTIME.playlist.fnOpenPlaylist(json);
|
||||
if (json.result == "0") {
|
||||
$pl.find('.success').text('Playlist shuffled');
|
||||
$pl.find('.success').show();
|
||||
}
|
||||
disableLoadingIcon();
|
||||
});
|
||||
})
|
||||
|
||||
$pl.on("click", "#webstream_save", function(){
|
||||
//get all fields and POST to server
|
||||
|
|
|
@ -351,7 +351,7 @@ function setupUI() {
|
|||
* It is only active if playlist is not empty
|
||||
*/
|
||||
var plContents = $('#spl_sortable').children();
|
||||
var shuffleButton = $('button[id="shuffle_button"]');
|
||||
var shuffleButton = $('button[id="shuffle_button"], button[id="playlist_shuffle_button"]');
|
||||
|
||||
if (!plContents.hasClass('spl_empty')) {
|
||||
if (shuffleButton.hasClass('ui-state-disabled')) {
|
||||
|
|
|
@ -127,24 +127,6 @@ class RequestProvider(object):
|
|||
|
||||
|
||||
class AirtimeApiClient(object):
|
||||
|
||||
# This is a little hacky fix so that I don't have to pass the config object
|
||||
# everywhere where AirtimeApiClient needs to be initialized
|
||||
default_config = None
|
||||
# the purpose of this custom constructor is to remember which config file
|
||||
# it was called with. So that after the initial call:
|
||||
# AirtimeApiClient.create_right_config('/path/to/config')
|
||||
# All subsequence calls to create_right_config will be with that config
|
||||
# file
|
||||
@staticmethod
|
||||
def create_right_config(log=None,config_path=None):
|
||||
if config_path: AirtimeApiClient.default_config = config_path
|
||||
elif (not AirtimeApiClient.default_config):
|
||||
raise ValueError("Cannot slip config_path attribute when it has \
|
||||
never been passed yet")
|
||||
return AirtimeApiClient( logger=None,
|
||||
config_path=AirtimeApiClient.default_config )
|
||||
|
||||
def __init__(self, logger=None,config_path='/etc/airtime/api_client.cfg'):
|
||||
if logger is None: self.logger = logging
|
||||
else: self.logger = logger
|
||||
|
@ -399,3 +381,4 @@ class AirtimeApiClient(object):
|
|||
def push_stream_stats(self, data):
|
||||
# TODO : users of this method should do their own error handling
|
||||
response = self.services.push_stream_stats(_post_data={'data': json.dumps(data)})
|
||||
return response
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import re
|
||||
from media.saas.launcher import setup_logger, setup_global, MM2
|
||||
from media.saas.airtimeinstance import AirtimeInstance
|
||||
from os.path import isdir, join, abspath, exists
|
||||
from os import listdir
|
||||
|
||||
def list_dirs(d): return (x for x in listdir(d) if isdir(join(d,x)))
|
||||
|
||||
def filter_instance(d): return bool(re.match('.+\d+$',d))
|
||||
|
||||
def get_name(p): return re.match('.+/(\d+)$',p).group(1)
|
||||
|
||||
def filter_instances(l): return (x for x in l if filter_instance(x))
|
||||
|
||||
def autoscan_instances(main_cfg):
|
||||
root = main_cfg['instance_root']
|
||||
instances = []
|
||||
for instance_machine in list_dirs(root):
|
||||
instance_machine = join(root, instance_machine)
|
||||
for instance_root in filter_instances(list_dirs(instance_machine)):
|
||||
full_path = abspath(join(instance_machine,instance_root))
|
||||
ai = AirtimeInstance.root_make(get_name(full_path), full_path)
|
||||
instances.append(ai)
|
||||
return instances
|
||||
|
||||
def verify_exists(p):
|
||||
if not exists(p): raise Exception("%s must exist" % p)
|
||||
|
||||
def main(main_cfg):
|
||||
log_config, log_path = main_cfg['log_config'], main_cfg['log_path']
|
||||
verify_exists(log_config)
|
||||
log = setup_logger(log_config, log_path)
|
||||
setup_global(log)
|
||||
for instance in autoscan_instances(main_cfg):
|
||||
print("Launching instance: %s" % str(instance))
|
||||
MM2(instance).start()
|
||||
print("Launched all instances")
|
||||
|
||||
if __name__ == '__main__':
|
||||
root = '/home/rudi/reps/Airtime/python_apps/media-monitor2'
|
||||
default = {
|
||||
'log_path' : join(root, 'test.log'), # config for log
|
||||
'log_config' : join(root, 'configs/logging.cfg'), # where to log
|
||||
# root dir of all instances
|
||||
'instance_root' : join(root, 'saas_stub')
|
||||
}
|
||||
main(default)
|
|
@ -0,0 +1,32 @@
|
|||
[database]
|
||||
host = localhost
|
||||
dbname = airtime
|
||||
dbuser = airtime
|
||||
dbpass = airtime
|
||||
|
||||
[rabbitmq]
|
||||
host = 127.0.0.1
|
||||
port = 5672
|
||||
user = guest
|
||||
password = guest
|
||||
vhost = /
|
||||
|
||||
[general]
|
||||
api_key = I6EUOJM0D1EIGSMZ9T70
|
||||
web_server_user = www-data
|
||||
airtime_dir = /usr/share/airtime
|
||||
base_url = localhost
|
||||
base_port = 80
|
||||
base_dir = ''
|
||||
|
||||
;How many hours ahead of time should Airtime playout engine (PYPO)
|
||||
;cache scheduled media files.
|
||||
cache_ahead_hours = 1
|
||||
|
||||
[monit]
|
||||
monit_user = guest
|
||||
monit_password = airtime
|
||||
|
||||
[soundcloud]
|
||||
connection_retries = 3
|
||||
time_between_retries = 60
|
|
@ -0,0 +1,126 @@
|
|||
bin_dir = "/usr/lib/airtime/api_clients"
|
||||
|
||||
#############################
|
||||
## Common
|
||||
#############################
|
||||
|
||||
# Value needed to access the API
|
||||
api_key = 'I6EUOJM0D1EIGSMZ9T70'
|
||||
|
||||
# Path to the base of the API
|
||||
api_base = 'api'
|
||||
|
||||
# URL to get the version number of the server API
|
||||
version_url = 'version/api_key/%%api_key%%'
|
||||
|
||||
#URL to register a components IP Address with the central web server
|
||||
register_component = 'register-component/format/json/api_key/%%api_key%%/component/%%component%%'
|
||||
|
||||
# Hostname
|
||||
host = 'localhost'
|
||||
base_port = 80
|
||||
base_dir = ''
|
||||
|
||||
#############################
|
||||
## Config for Media Monitor
|
||||
#############################
|
||||
|
||||
# URL to setup the media monitor
|
||||
media_setup_url = 'media-monitor-setup/format/json/api_key/%%api_key%%'
|
||||
|
||||
# Tell Airtime the file id associated with a show instance.
|
||||
upload_recorded = 'upload-recorded/format/json/api_key/%%api_key%%/fileid/%%fileid%%/showinstanceid/%%showinstanceid%%'
|
||||
|
||||
# URL to tell Airtime to update file's meta data
|
||||
update_media_url = 'reload-metadata/format/json/api_key/%%api_key%%/mode/%%mode%%'
|
||||
|
||||
# URL to tell Airtime we want a listing of all files it knows about
|
||||
list_all_db_files = 'list-all-files/format/json/api_key/%%api_key%%/dir_id/%%dir_id%%/all/%%all%%'
|
||||
|
||||
# URL to tell Airtime we want a listing of all dirs its watching (including the stor dir)
|
||||
list_all_watched_dirs = 'list-all-watched-dirs/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to tell Airtime we want to add watched directory
|
||||
add_watched_dir = 'add-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%'
|
||||
|
||||
# URL to tell Airtime we want to add watched directory
|
||||
remove_watched_dir = 'remove-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%'
|
||||
|
||||
# URL to tell Airtime we want to add watched directory
|
||||
set_storage_dir = 'set-storage-dir/format/json/api_key/%%api_key%%/path/%%path%%'
|
||||
|
||||
# URL to tell Airtime about file system mount change
|
||||
update_fs_mount = 'update-file-system-mount/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to commit multiple updates from media monitor at the same time
|
||||
|
||||
reload_metadata_group = 'reload-metadata-group/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to tell Airtime about file system mount change
|
||||
handle_watched_dir_missing = 'handle-watched-dir-missing/format/json/api_key/%%api_key%%/dir/%%dir%%'
|
||||
|
||||
#############################
|
||||
## Config for Recorder
|
||||
#############################
|
||||
|
||||
# URL to get the schedule of shows set to record
|
||||
show_schedule_url = 'recorded-shows/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to upload the recorded show's file to Airtime
|
||||
upload_file_url = 'upload-file/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to commit multiple updates from media monitor at the same time
|
||||
|
||||
#number of retries to upload file if connection problem
|
||||
upload_retries = 3
|
||||
|
||||
#time to wait between attempts to upload file if connection problem (in seconds)
|
||||
upload_wait = 60
|
||||
|
||||
################################################################################
|
||||
# Uncomment *one of the sets* of values from the API clients below, and comment
|
||||
# out all the others.
|
||||
################################################################################
|
||||
|
||||
#############################
|
||||
## Config for Pypo
|
||||
#############################
|
||||
|
||||
# Schedule export path.
|
||||
# %%from%% - starting date/time in the form YYYY-MM-DD-hh-mm
|
||||
# %%to%% - starting date/time in the form YYYY-MM-DD-hh-mm
|
||||
export_url = 'schedule/api_key/%%api_key%%'
|
||||
|
||||
get_media_url = 'get-media/file/%%file%%/api_key/%%api_key%%'
|
||||
|
||||
# Update whether a schedule group has begun playing.
|
||||
update_item_url = 'notify-schedule-group-play/api_key/%%api_key%%/schedule_id/%%schedule_id%%'
|
||||
|
||||
# Update whether an audio clip is currently playing.
|
||||
update_start_playing_url = 'notify-media-item-start-play/api_key/%%api_key%%/media_id/%%media_id%%/'
|
||||
|
||||
# URL to tell Airtime we want to get stream setting
|
||||
get_stream_setting = 'get-stream-setting/format/json/api_key/%%api_key%%/'
|
||||
|
||||
#URL to update liquidsoap status
|
||||
update_liquidsoap_status = 'update-liquidsoap-status/format/json/api_key/%%api_key%%/msg/%%msg%%/stream_id/%%stream_id%%/boot_time/%%boot_time%%'
|
||||
|
||||
#URL to check live stream auth
|
||||
check_live_stream_auth = 'check-live-stream-auth/format/json/api_key/%%api_key%%/username/%%username%%/password/%%password%%/djtype/%%djtype%%'
|
||||
|
||||
#URL to update source status
|
||||
update_source_status = 'update-source-status/format/json/api_key/%%api_key%%/sourcename/%%sourcename%%/status/%%status%%'
|
||||
|
||||
get_bootstrap_info = 'get-bootstrap-info/format/json/api_key/%%api_key%%'
|
||||
|
||||
get_files_without_replay_gain = 'get-files-without-replay-gain/api_key/%%api_key%%/dir_id/%%dir_id%%'
|
||||
|
||||
update_replay_gain_value = 'update-replay-gain-value/api_key/%%api_key%%'
|
||||
|
||||
notify_webstream_data = 'notify-webstream-data/api_key/%%api_key%%/media_id/%%media_id%%/format/json'
|
||||
|
||||
notify_liquidsoap_started = 'rabbitmq-do-push/api_key/%%api_key%%/format/json'
|
||||
|
||||
get_stream_parameters = 'get-stream-parameters/api_key/%%api_key%%/format/json'
|
||||
|
||||
push_stream_stats = 'push-stream-stats/api_key/%%api_key%%/format/json'
|
|
@ -0,0 +1,32 @@
|
|||
[loggers]
|
||||
keys= root,notifier,metadata
|
||||
|
||||
[handlers]
|
||||
keys=fileOutHandler
|
||||
|
||||
[formatters]
|
||||
keys=simpleFormatter
|
||||
|
||||
[logger_root]
|
||||
level=DEBUG
|
||||
handlers=fileOutHandler
|
||||
|
||||
[logger_notifier]
|
||||
level=DEBUG
|
||||
handlers=fileOutHandler
|
||||
qualname=notifier
|
||||
|
||||
[logger_metadata]
|
||||
level=DEBUG
|
||||
handlers=fileOutHandler
|
||||
qualname=metadata
|
||||
|
||||
[handler_fileOutHandler]
|
||||
class=logging.handlers.RotatingFileHandler
|
||||
level=DEBUG
|
||||
formatter=simpleFormatter
|
||||
args=("/var/log/airtime/media-monitor/media-monitor.log", 'a', 10000000, 5,)
|
||||
|
||||
[formatter_simpleFormatter]
|
||||
format=%(asctime)s %(levelname)s - [%(threadName)s] [%(filename)s : %(funcName)s()] : LINE %(lineno)d - %(message)s
|
||||
datefmt=
|
|
@ -0,0 +1,31 @@
|
|||
api_client = "airtime"
|
||||
|
||||
# where the binary files live
|
||||
bin_dir = '/usr/lib/airtime/media-monitor'
|
||||
|
||||
# where the logging files live
|
||||
log_dir = '/var/log/airtime/media-monitor'
|
||||
|
||||
|
||||
############################################
|
||||
# RabbitMQ settings #
|
||||
############################################
|
||||
rabbitmq_host = 'localhost'
|
||||
rabbitmq_user = 'guest'
|
||||
rabbitmq_password = 'guest'
|
||||
rabbitmq_vhost = '/'
|
||||
|
||||
############################################
|
||||
# Media-Monitor preferences #
|
||||
############################################
|
||||
check_filesystem_events = 5 #how long to queue up events performed on the files themselves.
|
||||
check_airtime_events = 30 #how long to queue metadata input from airtime.
|
||||
|
||||
# MM2 only:
|
||||
touch_interval = 5
|
||||
chunking_number = 450
|
||||
request_max_wait = 3.0
|
||||
rmq_event_wait = 0.1
|
||||
logpath = '/var/log/airtime/media-monitor/media-monitor.log'
|
||||
index_path = '/var/tmp/airtime/media-monitor/last_index'
|
||||
|
|
@ -15,7 +15,7 @@ from media.monitor.exceptions import DirectoryIsNotListed
|
|||
from media.monitor.bootstrap import Bootstrapper
|
||||
from media.monitor.listeners import FileMediator
|
||||
|
||||
from api_clients import api_client as apc
|
||||
from media.saas.thread import apc
|
||||
|
||||
class AirtimeNotifier(Loggable):
|
||||
"""
|
||||
|
@ -98,7 +98,7 @@ class AirtimeMessageReceiver(Loggable):
|
|||
if (not directory_id) and (not directory):
|
||||
raise ValueError("You must provide either directory_id or \
|
||||
directory")
|
||||
sdb = AirtimeDB(apc.AirtimeApiClient.create_right_config())
|
||||
sdb = AirtimeDB(apc())
|
||||
if directory : directory = os.path.normpath(directory)
|
||||
if directory_id == None : directory_id = sdb.to_id(directory)
|
||||
if directory == None : directory = sdb.to_directory(directory_id)
|
||||
|
|
|
@ -2,6 +2,7 @@ import os
|
|||
from pydispatch import dispatcher
|
||||
from media.monitor.events import NewFile, DeleteFile, ModifyFile
|
||||
from media.monitor.log import Loggable
|
||||
from media.saas.thread import getsig
|
||||
import media.monitor.pure as mmp
|
||||
|
||||
class Bootstrapper(Loggable):
|
||||
|
@ -16,7 +17,7 @@ class Bootstrapper(Loggable):
|
|||
watch_signal - the signals should send events for every file on.
|
||||
"""
|
||||
self.db = db
|
||||
self.watch_signal = watch_signal
|
||||
self.watch_signal = getsig(watch_signal)
|
||||
|
||||
def flush_all(self, last_ran):
|
||||
"""
|
||||
|
|
|
@ -12,25 +12,21 @@ class MMConfig(object):
|
|||
self.cfg = ConfigObj(path)
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""
|
||||
We always return a copy of the config item to prevent callers from
|
||||
doing any modifications through the returned objects methods
|
||||
"""
|
||||
""" We always return a copy of the config item to prevent
|
||||
callers from doing any modifications through the returned
|
||||
objects methods """
|
||||
return copy.deepcopy(self.cfg[key])
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
"""
|
||||
We use this method not to allow anybody to mess around with config file
|
||||
any settings made should be done through MMConfig's instance methods
|
||||
"""
|
||||
""" We use this method not to allow anybody to mess around with
|
||||
config file any settings made should be done through MMConfig's
|
||||
instance methods """
|
||||
raise ConfigAccessViolation(key)
|
||||
|
||||
def save(self): self.cfg.write()
|
||||
|
||||
def last_ran(self):
|
||||
"""
|
||||
Returns the last time media monitor was ran by looking at the time when
|
||||
the file at 'index_path' was modified
|
||||
"""
|
||||
""" Returns the last time media monitor was ran by looking at
|
||||
the time when the file at 'index_path' was modified """
|
||||
return mmp.last_modified(self.cfg['index_path'])
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ from media.monitor.pure import LazyProperty
|
|||
from media.monitor.metadata import Metadata
|
||||
from media.monitor.log import Loggable
|
||||
from media.monitor.exceptions import BadSongFile
|
||||
from media.saas.thread import getsig
|
||||
|
||||
class PathChannel(object):
|
||||
"""
|
||||
|
@ -15,7 +16,7 @@ class PathChannel(object):
|
|||
used as a named tuple
|
||||
"""
|
||||
def __init__(self, signal, path):
|
||||
self.signal = signal
|
||||
self.signal = getsig(signal)
|
||||
self.path = path
|
||||
|
||||
# TODO : Move this to it's file. Also possible unsingleton and use it as a
|
||||
|
|
|
@ -23,7 +23,7 @@ class FailedToObtainLocale(Exception):
|
|||
|
||||
class CouldNotCreateIndexFile(Exception):
|
||||
"""exception whenever index file cannot be created"""
|
||||
def __init__(self, path, cause):
|
||||
def __init__(self, path, cause=None):
|
||||
self.path = path
|
||||
self.cause = cause
|
||||
def __str__(self): return "Failed to create touch file '%s'" % self.path
|
||||
|
|
|
@ -3,6 +3,7 @@ from pydispatch import dispatcher
|
|||
import abc
|
||||
|
||||
from media.monitor.log import Loggable
|
||||
from media.saas.thread import getsig
|
||||
import media.monitor.pure as mmp
|
||||
|
||||
# Defines the handle interface
|
||||
|
@ -21,10 +22,10 @@ class ReportHandler(Handles):
|
|||
"""
|
||||
__metaclass__ = abc.ABCMeta
|
||||
def __init__(self, signal, weak=False):
|
||||
self.signal = signal
|
||||
self.report_signal = "badfile"
|
||||
self.signal = getsig(signal)
|
||||
self.report_signal = getsig("badfile")
|
||||
def dummy(sender, event): self.handle(sender,event)
|
||||
dispatcher.connect(dummy, signal=signal, sender=dispatcher.Any,
|
||||
dispatcher.connect(dummy, signal=self.signal, sender=dispatcher.Any,
|
||||
weak=weak)
|
||||
|
||||
def report_problem_file(self, event, exception=None):
|
||||
|
@ -38,7 +39,7 @@ class ProblemFileHandler(Handles, Loggable):
|
|||
"""
|
||||
def __init__(self, channel, **kwargs):
|
||||
self.channel = channel
|
||||
self.signal = self.channel.signal
|
||||
self.signal = getsig(self.channel.signal)
|
||||
self.problem_dir = self.channel.path
|
||||
def dummy(sender, event, exception):
|
||||
self.handle(sender, event, exception)
|
||||
|
|
|
@ -9,7 +9,7 @@ from media.monitor.events import OrganizeFile, NewFile, MoveFile, DeleteFile, \
|
|||
DeleteDir, EventRegistry, MoveDir,\
|
||||
DeleteDirWatch
|
||||
from media.monitor.log import Loggable, get_logger
|
||||
|
||||
from media.saas.thread import getsig
|
||||
# Note: Because of the way classes that inherit from pyinotify.ProcessEvent
|
||||
# interact with constructors. you should only instantiate objects from them
|
||||
# using keyword arguments. For example:
|
||||
|
@ -45,7 +45,7 @@ class BaseListener(object):
|
|||
def __str__(self):
|
||||
return "Listener(%s), Signal(%s)" % \
|
||||
(self.__class__.__name__, self. signal)
|
||||
def my_init(self, signal): self.signal = signal
|
||||
def my_init(self, signal): self.signal = getsig(signal)
|
||||
|
||||
class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable):
|
||||
def process_IN_CLOSE_WRITE(self, event):
|
||||
|
@ -66,14 +66,14 @@ class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable):
|
|||
self.logger.info("Bootstrapping: File in 'organize' directory: \
|
||||
'%s'" % f)
|
||||
if not mmp.file_locked(f):
|
||||
dispatcher.send(signal=self.signal, sender=self,
|
||||
dispatcher.send(signal=getsig(self.signal), sender=self,
|
||||
event=OrganizeFile(f))
|
||||
flushed += 1
|
||||
#self.logger.info("Flushed organized directory with %d files" % flushed)
|
||||
|
||||
@IncludeOnly(mmp.supported_extensions)
|
||||
def process_to_organize(self, event):
|
||||
dispatcher.send(signal=self.signal, sender=self,
|
||||
dispatcher.send(signal=getsig(self.signal), sender=self,
|
||||
event=OrganizeFile(event))
|
||||
|
||||
class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
|
||||
|
@ -101,14 +101,14 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
|
|||
|
||||
def delete_watch_dir(self, event):
|
||||
e = DeleteDirWatch(event)
|
||||
dispatcher.send(signal='watch_move', sender=self, event=e)
|
||||
dispatcher.send(signal=self.signal, sender=self, event=e)
|
||||
dispatcher.send(signal=getsig('watch_move'), sender=self, event=e)
|
||||
dispatcher.send(signal=getsig(self.signal), sender=self, event=e)
|
||||
|
||||
@mediate_ignored
|
||||
@IncludeOnly(mmp.supported_extensions)
|
||||
def process_create(self, event):
|
||||
evt = NewFile(event)
|
||||
dispatcher.send(signal=self.signal, sender=self, event=evt)
|
||||
dispatcher.send(signal=getsig(self.signal), sender=self, event=evt)
|
||||
return evt
|
||||
|
||||
@mediate_ignored
|
||||
|
@ -117,13 +117,13 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
|
|||
evt = None
|
||||
if event.dir : evt = DeleteDir(event)
|
||||
else : evt = DeleteFile(event)
|
||||
dispatcher.send(signal=self.signal, sender=self, event=evt)
|
||||
dispatcher.send(signal=getsig(self.signal), sender=self, event=evt)
|
||||
return evt
|
||||
|
||||
@mediate_ignored
|
||||
def process_delete_dir(self, event):
|
||||
evt = DeleteDir(event)
|
||||
dispatcher.send(signal=self.signal, sender=self, event=evt)
|
||||
dispatcher.send(signal=getsig(self.signal), sender=self, event=evt)
|
||||
return evt
|
||||
|
||||
def flush_events(self, path):
|
||||
|
@ -138,6 +138,6 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
|
|||
added = 0
|
||||
for f in mmp.walk_supported(path, clean_empties=False):
|
||||
added += 1
|
||||
dispatcher.send( signal=self.signal, sender=self, event=NewFile(f) )
|
||||
dispatcher.send( signal=getsig(self.signal), sender=self, event=NewFile(f) )
|
||||
self.logger.info( "Flushed watch directory. added = %d" % added )
|
||||
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import pyinotify
|
||||
import threading
|
||||
import time
|
||||
from pydispatch import dispatcher
|
||||
|
||||
|
@ -9,19 +8,19 @@ from media.monitor.log import Loggable
|
|||
from media.monitor.listeners import StoreWatchListener, OrganizeListener
|
||||
from media.monitor.handler import ProblemFileHandler
|
||||
from media.monitor.organizer import Organizer
|
||||
from media.saas.thread import InstanceInheritingThread, getsig
|
||||
import media.monitor.pure as mmp
|
||||
|
||||
|
||||
class ManagerTimeout(threading.Thread,Loggable):
|
||||
"""
|
||||
The purpose of this class is to flush the organize directory every 3
|
||||
secnods. This used to be just a work around for cc-4235 but recently
|
||||
became a permanent solution because it's "cheap" and reliable
|
||||
"""
|
||||
class ManagerTimeout(InstanceInheritingThread,Loggable):
|
||||
""" The purpose of this class is to flush the organize directory
|
||||
every 3 secnods. This used to be just a work around for cc-4235
|
||||
but recently became a permanent solution because it's "cheap" and
|
||||
reliable """
|
||||
def __init__(self, manager, interval=1.5):
|
||||
# TODO : interval should be read from config and passed here instead
|
||||
# of just using the hard coded value
|
||||
threading.Thread.__init__(self)
|
||||
super(ManagerTimeout, self).__init__()
|
||||
self.manager = manager
|
||||
self.interval = interval
|
||||
def run(self):
|
||||
|
@ -30,19 +29,17 @@ class ManagerTimeout(threading.Thread,Loggable):
|
|||
self.manager.flush_organize()
|
||||
|
||||
class Manager(Loggable):
|
||||
"""
|
||||
An abstraction over media monitors core pyinotify functions. These
|
||||
include adding watched,store, organize directories, etc. Basically
|
||||
composes over WatchManager from pyinotify
|
||||
"""
|
||||
# NOTE : this massive class is a source of many problems of mm and
|
||||
# is in dire need of breaking up and refactoring.
|
||||
""" An abstraction over media monitors core pyinotify functions.
|
||||
These include adding watched,store, organize directories, etc.
|
||||
Basically composes over WatchManager from pyinotify """
|
||||
def __init__(self):
|
||||
self.wm = pyinotify.WatchManager()
|
||||
# These two instance variables are assumed to be constant
|
||||
self.watch_channel = 'watch'
|
||||
self.organize_channel = 'organize'
|
||||
self.watch_channel = getsig('watch')
|
||||
self.organize_channel = getsig('organize')
|
||||
self.watch_listener = StoreWatchListener(signal = self.watch_channel)
|
||||
# TODO : change this to a weak ref
|
||||
# TODO : get rid of this hack once cc-4235 is fixed
|
||||
self.__timeout_thread = ManagerTimeout(self)
|
||||
self.__timeout_thread.daemon = True
|
||||
self.__timeout_thread.start()
|
||||
|
@ -57,11 +54,11 @@ class Manager(Loggable):
|
|||
self.organize_channel),
|
||||
}
|
||||
def dummy(sender, event): self.watch_move( event.path, sender=sender )
|
||||
dispatcher.connect(dummy, signal='watch_move', sender=dispatcher.Any,
|
||||
weak=False)
|
||||
dispatcher.connect(dummy, signal=getsig('watch_move'),
|
||||
sender=dispatcher.Any, weak=False)
|
||||
def subwatch_add(sender, directory):
|
||||
self.__add_watch(directory, self.watch_listener)
|
||||
dispatcher.connect(subwatch_add, signal='add_subwatch',
|
||||
dispatcher.connect(subwatch_add, signal=getsig('add_subwatch'),
|
||||
sender=dispatcher.Any, weak=False)
|
||||
# A private mapping path => watch_descriptor
|
||||
# we use the same dictionary for organize, watch, store wd events.
|
||||
|
@ -76,23 +73,19 @@ class Manager(Loggable):
|
|||
# through dedicated handler objects. Because we must have access to a
|
||||
# manager instance. Hence we must slightly break encapsulation.
|
||||
def watch_move(self, watch_dir, sender=None):
|
||||
"""
|
||||
handle 'watch move' events directly sent from listener
|
||||
"""
|
||||
""" handle 'watch move' events directly sent from listener """
|
||||
self.logger.info("Watch dir '%s' has been renamed (hence removed)" %
|
||||
watch_dir)
|
||||
self.remove_watch_directory(normpath(watch_dir))
|
||||
|
||||
def watch_signal(self):
|
||||
"""
|
||||
Return the signal string our watch_listener is reading events from
|
||||
"""
|
||||
return self.watch_listener.signal
|
||||
""" Return the signal string our watch_listener is reading
|
||||
events from """
|
||||
return getsig(self.watch_listener.signal)
|
||||
|
||||
def __remove_watch(self,path):
|
||||
"""
|
||||
Remove path from being watched (first will check if 'path' is watched)
|
||||
"""
|
||||
""" Remove path from being watched (first will check if 'path'
|
||||
is watched) """
|
||||
# only delete if dir is actually being watched
|
||||
if path in self.__wd_path:
|
||||
wd = self.__wd_path[path]
|
||||
|
@ -100,10 +93,8 @@ class Manager(Loggable):
|
|||
del(self.__wd_path[path])
|
||||
|
||||
def __add_watch(self,path,listener):
|
||||
"""
|
||||
Start watching 'path' using 'listener'. First will check if directory
|
||||
is being watched before adding another watch
|
||||
"""
|
||||
""" Start watching 'path' using 'listener'. First will check if
|
||||
directory is being watched before adding another watch """
|
||||
|
||||
self.logger.info("Attempting to add listener to path '%s'" % path)
|
||||
self.logger.info( 'Listener: %s' % str(listener) )
|
||||
|
@ -114,9 +105,8 @@ class Manager(Loggable):
|
|||
if wd: self.__wd_path[path] = wd.values()[0]
|
||||
|
||||
def __create_organizer(self, target_path, recorded_path):
|
||||
"""
|
||||
creates an organizer at new destination path or modifies the old one
|
||||
"""
|
||||
""" creates an organizer at new destination path or modifies the
|
||||
old one """
|
||||
# TODO : find a proper fix for the following hack
|
||||
# We avoid creating new instances of organize because of the way
|
||||
# it interacts with pydispatch. We must be careful to never have
|
||||
|
@ -134,23 +124,18 @@ class Manager(Loggable):
|
|||
recorded_path=recorded_path)
|
||||
|
||||
def get_problem_files_path(self):
|
||||
"""
|
||||
returns the path where problem files should go
|
||||
"""
|
||||
""" returns the path where problem files should go """
|
||||
return self.organize['problem_files_path']
|
||||
|
||||
def set_problem_files_path(self, new_path):
|
||||
"""
|
||||
Set the path where problem files should go
|
||||
"""
|
||||
""" Set the path where problem files should go """
|
||||
self.organize['problem_files_path'] = new_path
|
||||
self.organize['problem_handler'] = \
|
||||
ProblemFileHandler( PathChannel(signal='badfile',path=new_path) )
|
||||
ProblemFileHandler( PathChannel(signal=getsig('badfile'),
|
||||
path=new_path) )
|
||||
|
||||
def get_recorded_path(self):
|
||||
"""
|
||||
returns the path of the recorded directory
|
||||
"""
|
||||
""" returns the path of the recorded directory """
|
||||
return self.organize['recorded_path']
|
||||
|
||||
def set_recorded_path(self, new_path):
|
||||
|
@ -160,17 +145,14 @@ class Manager(Loggable):
|
|||
self.__add_watch(new_path, self.watch_listener)
|
||||
|
||||
def get_organize_path(self):
|
||||
"""
|
||||
returns the current path that is being watched for organization
|
||||
"""
|
||||
""" returns the current path that is being watched for
|
||||
organization """
|
||||
return self.organize['organize_path']
|
||||
|
||||
def set_organize_path(self, new_path):
|
||||
"""
|
||||
sets the organize path to be new_path. Under the current scheme there
|
||||
is only one organize path but there is no reason why more cannot be
|
||||
supported
|
||||
"""
|
||||
""" sets the organize path to be new_path. Under the current
|
||||
scheme there is only one organize path but there is no reason
|
||||
why more cannot be supported """
|
||||
# if we are already organizing a particular directory we remove the
|
||||
# watch from it first before organizing another directory
|
||||
self.__remove_watch(self.organize['organize_path'])
|
||||
|
@ -188,19 +170,15 @@ class Manager(Loggable):
|
|||
return self.organize['imported_path']
|
||||
|
||||
def set_imported_path(self,new_path):
|
||||
"""
|
||||
set the directory where organized files go to.
|
||||
"""
|
||||
""" set the directory where organized files go to. """
|
||||
self.__remove_watch(self.organize['imported_path'])
|
||||
self.organize['imported_path'] = new_path
|
||||
self.__create_organizer( new_path, self.organize['recorded_path'])
|
||||
self.__add_watch(new_path, self.watch_listener)
|
||||
|
||||
def change_storage_root(self, store):
|
||||
"""
|
||||
hooks up all the directories for you. Problem, recorded, imported,
|
||||
organize.
|
||||
"""
|
||||
""" hooks up all the directories for you. Problem, recorded,
|
||||
imported, organize. """
|
||||
store_paths = mmp.expand_storage(store)
|
||||
# First attempt to make sure that all paths exist before adding any
|
||||
# watches
|
||||
|
@ -217,18 +195,14 @@ class Manager(Loggable):
|
|||
mmp.create_dir(p)
|
||||
|
||||
def has_watch(self, path):
|
||||
"""
|
||||
returns true if the path is being watched or not. Any kind of watch:
|
||||
organize, store, watched.
|
||||
"""
|
||||
""" returns true if the path is being watched or not. Any kind
|
||||
of watch: organize, store, watched. """
|
||||
return path in self.__wd_path
|
||||
|
||||
def add_watch_directory(self, new_dir):
|
||||
"""
|
||||
adds a directory to be "watched". "watched" directories are
|
||||
""" adds a directory to be "watched". "watched" directories are
|
||||
those that are being monitored by media monitor for airtime in
|
||||
this context and not directories pyinotify calls watched
|
||||
"""
|
||||
this context and not directories pyinotify calls watched """
|
||||
if self.has_watch(new_dir):
|
||||
self.logger.info("Cannot add '%s' to watched directories. It's \
|
||||
already being watched" % new_dir)
|
||||
|
@ -237,9 +211,8 @@ class Manager(Loggable):
|
|||
self.__add_watch(new_dir, self.watch_listener)
|
||||
|
||||
def remove_watch_directory(self, watch_dir):
|
||||
"""
|
||||
removes a directory from being "watched". Undoes add_watch_directory
|
||||
"""
|
||||
""" removes a directory from being "watched". Undoes
|
||||
add_watch_directory """
|
||||
if self.has_watch(watch_dir):
|
||||
self.logger.info("Removing watched directory: '%s'", watch_dir)
|
||||
self.__remove_watch(watch_dir)
|
||||
|
@ -250,9 +223,7 @@ class Manager(Loggable):
|
|||
self.logger.info( self.__wd_path )
|
||||
|
||||
def loop(self):
|
||||
"""
|
||||
block until we receive pyinotify events
|
||||
"""
|
||||
""" block until we receive pyinotify events """
|
||||
notifier = pyinotify.Notifier(self.wm)
|
||||
notifier.coalesce_events()
|
||||
notifier.loop()
|
||||
|
|
|
@ -7,6 +7,7 @@ from media.monitor.exceptions import BadSongFile
|
|||
from media.monitor.events import OrganizeFile
|
||||
from pydispatch import dispatcher
|
||||
from os.path import dirname
|
||||
from media.saas.thread import getsig
|
||||
import os.path
|
||||
|
||||
class Organizer(ReportHandler,Loggable):
|
||||
|
@ -36,7 +37,7 @@ class Organizer(ReportHandler,Loggable):
|
|||
self.channel = channel
|
||||
self.target_path = target_path
|
||||
self.recorded_path = recorded_path
|
||||
super(Organizer, self).__init__(signal=self.channel, weak=False)
|
||||
super(Organizer, self).__init__(signal=getsig(self.channel), weak=False)
|
||||
|
||||
def handle(self, sender, event):
|
||||
""" Intercept events where a new file has been added to the
|
||||
|
@ -63,7 +64,7 @@ class Organizer(ReportHandler,Loggable):
|
|||
def new_dir_watch(d):
|
||||
# TODO : rewrite as return lambda : dispatcher.send(...
|
||||
def cb():
|
||||
dispatcher.send(signal="add_subwatch", sender=self,
|
||||
dispatcher.send(signal=getsig("add_subwatch"), sender=self,
|
||||
directory=d)
|
||||
return cb
|
||||
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import threading
|
||||
|
||||
from media.monitor.exceptions import BadSongFile
|
||||
from media.monitor.log import Loggable
|
||||
import api_clients.api_client as ac
|
||||
from media.saas.thread import apc, InstanceInheritingThread
|
||||
|
||||
class ThreadedRequestSync(threading.Thread, Loggable):
|
||||
class ThreadedRequestSync(InstanceInheritingThread, Loggable):
|
||||
def __init__(self, rs):
|
||||
threading.Thread.__init__(self)
|
||||
super(ThreadedRequestSync, self).__init__()
|
||||
self.rs = rs
|
||||
self.daemon = True
|
||||
self.start()
|
||||
|
@ -22,7 +20,7 @@ class RequestSync(Loggable):
|
|||
for some number of times """
|
||||
@classmethod
|
||||
def create_with_api_client(cls, watcher, requests):
|
||||
apiclient = ac.AirtimeApiClient.create_right_config()
|
||||
apiclient = apc()
|
||||
self = cls(watcher, requests, apiclient)
|
||||
return self
|
||||
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
import os
|
||||
from media.monitor.log import Loggable
|
||||
from media.monitor.exceptions import NoDirectoryInAirtime
|
||||
from os.path import normpath
|
||||
from media.saas.thread import user
|
||||
from os.path import normpath, join
|
||||
import media.monitor.pure as mmp
|
||||
|
||||
class AirtimeDB(Loggable):
|
||||
|
@ -11,17 +12,20 @@ class AirtimeDB(Loggable):
|
|||
if reload_now: self.reload_directories()
|
||||
|
||||
def reload_directories(self):
|
||||
"""
|
||||
this is the 'real' constructor, should be called if you ever want the
|
||||
class reinitialized. there's not much point to doing it yourself
|
||||
however, you should just create a new AirtimeDB instance.
|
||||
"""
|
||||
""" this is the 'real' constructor, should be called if you ever
|
||||
want the class reinitialized. there's not much point to doing
|
||||
it yourself however, you should just create a new AirtimeDB
|
||||
instance. """
|
||||
|
||||
saas = user().root_path
|
||||
|
||||
# dirs_setup is a dict with keys:
|
||||
# u'watched_dirs' and u'stor' which point to lists of corresponding
|
||||
# dirs
|
||||
dirs_setup = self.apc.setup_media_monitor()
|
||||
dirs_setup[u'stor'] = normpath( dirs_setup[u'stor'] )
|
||||
dirs_setup[u'watched_dirs'] = map(normpath, dirs_setup[u'watched_dirs'])
|
||||
dirs_setup[u'stor'] = normpath( join(saas, dirs_setup[u'stor'] ) )
|
||||
dirs_setup[u'watched_dirs'] = map(lambda p: normpath(join(saas,p)),
|
||||
dirs_setup[u'watched_dirs'])
|
||||
dirs_with_id = dict([ (k,normpath(v)) for k,v in
|
||||
self.apc.list_all_watched_dirs()['dirs'].iteritems() ])
|
||||
|
||||
|
@ -42,15 +46,11 @@ class AirtimeDB(Loggable):
|
|||
dirs_setup[u'watched_dirs'] ])
|
||||
|
||||
def to_id(self, directory):
|
||||
"""
|
||||
directory path -> id
|
||||
"""
|
||||
""" directory path -> id """
|
||||
return self.dir_to_id[ directory ]
|
||||
|
||||
def to_directory(self, dir_id):
|
||||
"""
|
||||
id -> directory path
|
||||
"""
|
||||
""" id -> directory path """
|
||||
return self.id_to_dir[ dir_id ]
|
||||
|
||||
def storage_path(self) : return self.base_storage
|
||||
|
@ -60,37 +60,31 @@ class AirtimeDB(Loggable):
|
|||
def recorded_path(self) : return self.storage_paths['recorded']
|
||||
|
||||
def list_watched(self):
|
||||
"""
|
||||
returns all watched directories as a list
|
||||
"""
|
||||
""" returns all watched directories as a list """
|
||||
return list(self.watched_directories)
|
||||
|
||||
def list_storable_paths(self):
|
||||
"""
|
||||
returns a list of all the watched directories in the datatabase.
|
||||
(Includes the imported directory and the recorded directory)
|
||||
"""
|
||||
""" returns a list of all the watched directories in the
|
||||
datatabase. (Includes the imported directory and the recorded
|
||||
directory) """
|
||||
l = self.list_watched()
|
||||
l.append(self.import_path())
|
||||
l.append(self.recorded_path())
|
||||
return l
|
||||
|
||||
def dir_id_get_files(self, dir_id, all_files=True):
|
||||
"""
|
||||
Get all files in a directory with id dir_id
|
||||
"""
|
||||
""" Get all files in a directory with id dir_id """
|
||||
base_dir = self.id_to_dir[ dir_id ]
|
||||
return set(( os.path.join(base_dir,p) for p in
|
||||
return set(( join(base_dir,p) for p in
|
||||
self.apc.list_all_db_files( dir_id, all_files ) ))
|
||||
|
||||
def directory_get_files(self, directory, all_files=True):
|
||||
"""
|
||||
returns all the files(recursively) in a directory. a directory is an
|
||||
"actual" directory path instead of its id. This is super hacky because
|
||||
you create one request for the recorded directory and one for the
|
||||
imported directory even though they're the same dir in the database so
|
||||
you get files for both dirs in 1 request...
|
||||
"""
|
||||
""" returns all the files(recursively) in a directory. a
|
||||
directory is an "actual" directory path instead of its id. This
|
||||
is super hacky because you create one request for the recorded
|
||||
directory and one for the imported directory even though they're
|
||||
the same dir in the database so you get files for both dirs in 1
|
||||
request... """
|
||||
normal_dir = os.path.normpath(unicode(directory))
|
||||
if normal_dir not in self.dir_to_id:
|
||||
raise NoDirectoryInAirtime( normal_dir, self.dir_to_id )
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import threading
|
||||
import time
|
||||
import copy
|
||||
|
||||
|
@ -9,15 +8,16 @@ from media.monitor.exceptions import BadSongFile
|
|||
from media.monitor.eventcontractor import EventContractor
|
||||
from media.monitor.events import EventProxy
|
||||
from media.monitor.request import ThreadedRequestSync, RequestSync
|
||||
from media.saas.thread import InstanceInheritingThread, getsig
|
||||
|
||||
class TimeoutWatcher(threading.Thread,Loggable):
|
||||
class TimeoutWatcher(InstanceInheritingThread,Loggable):
|
||||
"""
|
||||
The job of this thread is to keep an eye on WatchSyncer and force a
|
||||
request whenever the requests go over time out
|
||||
"""
|
||||
def __init__(self, watcher, timeout=5):
|
||||
self.logger.info("Created timeout thread...")
|
||||
threading.Thread.__init__(self)
|
||||
super(TimeoutWatcher, self).__init__()
|
||||
self.watcher = watcher
|
||||
self.timeout = timeout
|
||||
|
||||
|
@ -52,7 +52,7 @@ class WatchSyncer(ReportHandler,Loggable):
|
|||
tc = TimeoutWatcher(self, self.timeout)
|
||||
tc.daemon = True
|
||||
tc.start()
|
||||
super(WatchSyncer, self).__init__(signal=signal)
|
||||
super(WatchSyncer, self).__init__(signal=getsig(signal))
|
||||
|
||||
def handle(self, sender, event):
|
||||
"""
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
import os
|
||||
from os.path import join
|
||||
|
||||
from media.monitor.exceptions import NoConfigFile
|
||||
from media.monitor.pure import LazyProperty
|
||||
from media.monitor.config import MMConfig
|
||||
from api_clients.api_client import AirtimeApiClient
|
||||
|
||||
# poor man's phantom types...
|
||||
class SignalString(str): pass
|
||||
|
||||
class AirtimeInstance(object):
|
||||
""" AirtimeInstance is a class that abstracts away every airtime
|
||||
instance by providing all the necessary objects required to interact
|
||||
with the instance. ApiClient, configs, root_directory """
|
||||
|
||||
@classmethod
|
||||
def root_make(cls, name, root):
|
||||
cfg = {
|
||||
'api_client' : join(root, 'etc/airtime/api_client.cfg'),
|
||||
'media_monitor' : join(root, 'etc/airtime/media-monitor.cfg'),
|
||||
}
|
||||
return cls(name, root, cfg)
|
||||
|
||||
def __init__(self,name, root_path, config_paths):
|
||||
""" name is an internal name only """
|
||||
for cfg in ['api_client','media_monitor']:
|
||||
if cfg not in config_paths: raise NoConfigFile(config_paths)
|
||||
elif not os.path.exists(config_paths[cfg]):
|
||||
raise NoConfigFile(config_paths[cfg])
|
||||
self.name = name
|
||||
self.config_paths = config_paths
|
||||
self.root_path = root_path
|
||||
|
||||
def signal(self, sig):
|
||||
if isinstance(sig, SignalString): return sig
|
||||
else: return SignalString("%s_%s" % (self.name, sig))
|
||||
|
||||
def __str__(self):
|
||||
return "%s,%s(%s)" % (self.name, self.root_path, self.config_paths)
|
||||
|
||||
@LazyProperty
|
||||
def api_client(self):
|
||||
return AirtimeApiClient(config_path=self.config_paths['api_client'])
|
||||
|
||||
@LazyProperty
|
||||
def mm_config(self):
|
||||
return MMConfig(self.config_paths['media_monitor'])
|
|
@ -0,0 +1,125 @@
|
|||
import os, sys
|
||||
import logging
|
||||
import logging.config
|
||||
|
||||
import media.monitor.pure as mmp
|
||||
|
||||
from media.monitor.exceptions import FailedToObtainLocale, FailedToSetLocale
|
||||
from media.monitor.log import get_logger, setup_logging
|
||||
from std_err_override import LogWriter
|
||||
from media.saas.thread import InstanceThread, user, apc, getsig
|
||||
from media.monitor.log import Loggable
|
||||
from media.monitor.exceptions import CouldNotCreateIndexFile
|
||||
from media.monitor.toucher import ToucherThread
|
||||
from media.monitor.airtime import AirtimeNotifier, AirtimeMessageReceiver
|
||||
from media.monitor.watchersyncer import WatchSyncer
|
||||
from media.monitor.eventdrainer import EventDrainer
|
||||
from media.monitor.manager import Manager
|
||||
from media.monitor.syncdb import AirtimeDB
|
||||
from media.saas.airtimeinstance import AirtimeInstance
|
||||
|
||||
class MM2(InstanceThread, Loggable):
|
||||
|
||||
def index_create(self, index_create_attempt=False):
|
||||
config = user().mm_config
|
||||
if not index_create_attempt:
|
||||
if not os.path.exists(config['index_path']):
|
||||
self.logger.info("Attempting to create index file:...")
|
||||
try:
|
||||
with open(config['index_path'], 'w') as f: f.write(" ")
|
||||
except Exception as e:
|
||||
self.logger.info("Failed to create index file with exception: %s" \
|
||||
% str(e))
|
||||
else:
|
||||
self.logger.info("Created index file, reloading configuration:")
|
||||
self.index_create(index_create_attempt=True)
|
||||
else:
|
||||
self.logger.info("Already tried to create index. Will not try again ")
|
||||
|
||||
if not os.path.exists(config['index_path']):
|
||||
raise CouldNotCreateIndexFile(config['index_path'])
|
||||
|
||||
def run(self):
|
||||
self.index_create()
|
||||
manager = Manager()
|
||||
apiclient = apc()
|
||||
config = user().mm_config
|
||||
watch_syncer = WatchSyncer(signal=getsig('watch'),
|
||||
chunking_number=config['chunking_number'],
|
||||
timeout=config['request_max_wait'])
|
||||
airtime_receiver = AirtimeMessageReceiver(config,manager)
|
||||
airtime_notifier = AirtimeNotifier(config, airtime_receiver)
|
||||
|
||||
|
||||
adb = AirtimeDB(apiclient)
|
||||
store = {
|
||||
u'stor' : adb.storage_path(),
|
||||
u'watched_dirs' : adb.list_watched(),
|
||||
}
|
||||
|
||||
self.logger.info("initializing mm with directories: %s" % str(store))
|
||||
|
||||
self.logger.info(
|
||||
"Initing with the following airtime response:%s" % str(store))
|
||||
|
||||
airtime_receiver.change_storage({ 'directory':store[u'stor'] })
|
||||
|
||||
for watch_dir in store[u'watched_dirs']:
|
||||
if not os.path.exists(watch_dir):
|
||||
# Create the watch_directory here
|
||||
try: os.makedirs(watch_dir)
|
||||
except Exception:
|
||||
self.logger.error("Could not create watch directory: '%s' \
|
||||
(given from the database)." % watch_dir)
|
||||
if os.path.exists(watch_dir):
|
||||
airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True)
|
||||
else: self.logger.info("Failed to add watch on %s" % str(watch_dir))
|
||||
|
||||
ed = EventDrainer(airtime_notifier.connection,
|
||||
interval=float(config['rmq_event_wait']))
|
||||
|
||||
# Launch the toucher that updates the last time when the script was
|
||||
# ran every n seconds.
|
||||
# TODO : verify that this does not interfere with bootstrapping because the
|
||||
# toucher thread might update the last_ran variable too fast
|
||||
tt = ToucherThread(path=config['index_path'],
|
||||
interval=int(config['touch_interval']))
|
||||
|
||||
apiclient.register_component('media-monitor')
|
||||
|
||||
manager.loop()
|
||||
|
||||
def launch_instance(name, root, global_cfg, apc_cfg):
|
||||
cfg = {
|
||||
'api_client' : apc_cfg,
|
||||
'media_monitor' : global_cfg,
|
||||
}
|
||||
ai = AirtimeInstance(name, root, cfg)
|
||||
MM2(ai).start()
|
||||
|
||||
def setup_global(log):
|
||||
""" setup unicode and other stuff """
|
||||
log.info("Attempting to set the locale...")
|
||||
try: mmp.configure_locale(mmp.get_system_locale())
|
||||
except FailedToSetLocale as e:
|
||||
log.info("Failed to set the locale...")
|
||||
sys.exit(1)
|
||||
except FailedToObtainLocale as e:
|
||||
log.info("Failed to obtain the locale form the default path: \
|
||||
'/etc/default/locale'")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
log.info("Failed to set the locale for unknown reason. \
|
||||
Logging exception.")
|
||||
log.info(str(e))
|
||||
|
||||
def setup_logger(log_config, logpath):
|
||||
logging.config.fileConfig(log_config)
|
||||
#need to wait for Python 2.7 for this..
|
||||
#logging.captureWarnings(True)
|
||||
logger = logging.getLogger()
|
||||
LogWriter.override_std_err(logger)
|
||||
logfile = unicode(logpath)
|
||||
setup_logging(logfile)
|
||||
log = get_logger()
|
||||
return log
|
|
@ -0,0 +1,27 @@
|
|||
import threading
|
||||
|
||||
class UserlessThread(Exception):
|
||||
def __str__():
|
||||
return "Current thread: %s is not an instance of InstanceThread \
|
||||
of InstanceInheritingThread" % str(threading.current_thread())
|
||||
|
||||
class HasUser(object):
|
||||
def user(self): return self._user
|
||||
|
||||
class InstanceThread(threading.Thread, HasUser):
|
||||
def __init__(self,user, *args, **kwargs):
|
||||
super(InstanceThread, self).__init__(*args, **kwargs)
|
||||
self._user = user
|
||||
|
||||
class InstanceInheritingThread(threading.Thread, HasUser):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._user = threading.current_thread().user()
|
||||
super(InstanceInheritingThread, self).__init__(*args, **kwargs)
|
||||
|
||||
def user():
|
||||
try: return threading.current_thread().user()
|
||||
except AttributeError: raise UserlessThread()
|
||||
|
||||
def apc(): return user().api_client
|
||||
|
||||
def getsig(s): return user().signal(s)
|
|
@ -1,141 +1,15 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
import logging.config
|
||||
from media.saas.launcher import setup_global, launch_instance, setup_logger
|
||||
from media.monitor.config import MMConfig
|
||||
|
||||
from media.monitor.manager import Manager
|
||||
from media.monitor.bootstrap import Bootstrapper
|
||||
from media.monitor.log import get_logger, setup_logging
|
||||
from media.monitor.config import MMConfig
|
||||
from media.monitor.toucher import ToucherThread
|
||||
from media.monitor.syncdb import AirtimeDB
|
||||
from media.monitor.exceptions import FailedToObtainLocale, \
|
||||
FailedToSetLocale, \
|
||||
NoConfigFile
|
||||
from media.monitor.airtime import AirtimeNotifier, \
|
||||
AirtimeMessageReceiver
|
||||
from media.monitor.watchersyncer import WatchSyncer
|
||||
from media.monitor.eventdrainer import EventDrainer
|
||||
from media.update.replaygainupdater import ReplayGainUpdater
|
||||
from std_err_override import LogWriter
|
||||
|
||||
import media.monitor.pure as mmp
|
||||
from api_clients import api_client as apc
|
||||
|
||||
|
||||
|
||||
def main(global_config, api_client_config, log_config,
|
||||
index_create_attempt=False):
|
||||
for cfg in [global_config, api_client_config]:
|
||||
if not os.path.exists(cfg): raise NoConfigFile(cfg)
|
||||
# MMConfig is a proxy around ConfigObj instances. it does not allow
|
||||
# itself users of MMConfig instances to modify any config options
|
||||
# directly through the dictionary. Users of this object muse use the
|
||||
# correct methods designated for modification
|
||||
try: config = MMConfig(global_config)
|
||||
except NoConfigFile as e:
|
||||
print("Cannot run mediamonitor2 without configuration file.")
|
||||
print("Current config path: '%s'" % global_config)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print("Unknown error reading configuration file: '%s'" % global_config)
|
||||
print(str(e))
|
||||
|
||||
|
||||
logging.config.fileConfig(log_config)
|
||||
|
||||
#need to wait for Python 2.7 for this..
|
||||
#logging.captureWarnings(True)
|
||||
|
||||
logger = logging.getLogger()
|
||||
LogWriter.override_std_err(logger)
|
||||
logfile = unicode( config['logpath'] )
|
||||
setup_logging(logfile)
|
||||
log = get_logger()
|
||||
|
||||
if not index_create_attempt:
|
||||
if not os.path.exists(config['index_path']):
|
||||
log.info("Attempting to create index file:...")
|
||||
try:
|
||||
with open(config['index_path'], 'w') as f: f.write(" ")
|
||||
except Exception as e:
|
||||
log.info("Failed to create index file with exception: %s" \
|
||||
% str(e))
|
||||
else:
|
||||
log.info("Created index file, reloading configuration:")
|
||||
main( global_config, api_client_config, log_config,
|
||||
index_create_attempt=True )
|
||||
else:
|
||||
log.info("Already tried to create index. Will not try again ")
|
||||
|
||||
if not os.path.exists(config['index_path']):
|
||||
log.info("Index file does not exist. Terminating")
|
||||
|
||||
log.info("Attempting to set the locale...")
|
||||
|
||||
try:
|
||||
mmp.configure_locale(mmp.get_system_locale())
|
||||
except FailedToSetLocale as e:
|
||||
log.info("Failed to set the locale...")
|
||||
sys.exit(1)
|
||||
except FailedToObtainLocale as e:
|
||||
log.info("Failed to obtain the locale form the default path: \
|
||||
'/etc/default/locale'")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
log.info("Failed to set the locale for unknown reason. \
|
||||
Logging exception.")
|
||||
log.info(str(e))
|
||||
|
||||
watch_syncer = WatchSyncer(signal='watch',
|
||||
chunking_number=config['chunking_number'],
|
||||
timeout=config['request_max_wait'])
|
||||
|
||||
apiclient = apc.AirtimeApiClient.create_right_config(log=log,
|
||||
config_path=api_client_config)
|
||||
|
||||
ReplayGainUpdater.start_reply_gain(apiclient)
|
||||
|
||||
sdb = AirtimeDB(apiclient)
|
||||
|
||||
manager = Manager()
|
||||
|
||||
airtime_receiver = AirtimeMessageReceiver(config,manager)
|
||||
airtime_notifier = AirtimeNotifier(config, airtime_receiver)
|
||||
|
||||
store = apiclient.setup_media_monitor()
|
||||
|
||||
log.info("Initing with the following airtime response:%s" % str(store))
|
||||
|
||||
airtime_receiver.change_storage({ 'directory':store[u'stor'] })
|
||||
|
||||
for watch_dir in store[u'watched_dirs']:
|
||||
if not os.path.exists(watch_dir):
|
||||
# Create the watch_directory here
|
||||
try: os.makedirs(watch_dir)
|
||||
except Exception as e:
|
||||
log.error("Could not create watch directory: '%s' \
|
||||
(given from the database)." % watch_dir)
|
||||
if os.path.exists(watch_dir):
|
||||
airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True)
|
||||
else: log.info("Failed to add watch on %s" % str(watch_dir))
|
||||
|
||||
bs = Bootstrapper( db=sdb, watch_signal='watch' )
|
||||
|
||||
ed = EventDrainer(airtime_notifier.connection,
|
||||
interval=float(config['rmq_event_wait']))
|
||||
|
||||
# Launch the toucher that updates the last time when the script was
|
||||
# ran every n seconds.
|
||||
# TODO : verify that this does not interfere with bootstrapping because the
|
||||
# toucher thread might update the last_ran variable too fast
|
||||
tt = ToucherThread(path=config['index_path'],
|
||||
interval=int(config['touch_interval']))
|
||||
|
||||
apiclient.register_component('media-monitor')
|
||||
|
||||
return manager.loop()
|
||||
def main(global_config, api_client_config, log_config):
|
||||
""" function to run hosted install """
|
||||
mm_config = MMConfig(global_config)
|
||||
log = setup_logger( log_config, mm_config['logpath'] )
|
||||
setup_global(log)
|
||||
launch_instance('hosted_install', '/', global_config, api_client_config)
|
||||
|
||||
__doc__ = """
|
||||
Usage:
|
||||
|
@ -148,9 +22,6 @@ Options:
|
|||
--log=<path> log config at <path>
|
||||
"""
|
||||
|
||||
def main_loop():
|
||||
while True: pass
|
||||
|
||||
if __name__ == '__main__':
|
||||
from docopt import docopt
|
||||
args = docopt(__doc__,version="mm1.99")
|
||||
|
@ -160,5 +31,4 @@ if __name__ == '__main__':
|
|||
sys.exit(0)
|
||||
print("Running mm1.99")
|
||||
main(args['--config'],args['--apiclient'],args['--log'])
|
||||
#gevent.joinall([ gevent.spawn(main_loop) ])
|
||||
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
import unittest
|
||||
from copy import deepcopy
|
||||
from media.saas.airtimeinstance import AirtimeInstance, NoConfigFile
|
||||
|
||||
class TestAirtimeInstance(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.cfg = {
|
||||
'api_client' : 'tests/test_instance.py',
|
||||
'media_monitor' : 'tests/test_instance.py',
|
||||
'logging' : 'tests/test_instance.py',
|
||||
}
|
||||
|
||||
def test_init_good(self):
|
||||
AirtimeInstance("/root", self.cfg)
|
||||
self.assertTrue(True)
|
||||
|
||||
def test_init_bad(self):
|
||||
cfg = deepcopy(self.cfg)
|
||||
cfg['api_client'] = 'bs'
|
||||
with self.assertRaises(NoConfigFile):
|
||||
AirtimeInstance("/root", cfg)
|
|
@ -0,0 +1,64 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
import time
|
||||
from media.saas.thread import InstanceThread, InstanceInheritingThread
|
||||
|
||||
# ugly but necessary for 2.7
|
||||
signal = False
|
||||
signal2 = False
|
||||
|
||||
class TestInstanceThread(unittest.TestCase):
|
||||
def test_user_inject(self):
|
||||
global signal
|
||||
signal = False
|
||||
u = "rudi"
|
||||
class T(InstanceThread):
|
||||
def run(me):
|
||||
global signal
|
||||
super(T, me).run()
|
||||
signal = True
|
||||
self.assertEquals(u, me.user())
|
||||
t = T(u, name="test_user_inject")
|
||||
t.daemon = True
|
||||
t.start()
|
||||
time.sleep(0.2)
|
||||
self.assertTrue(signal)
|
||||
|
||||
def test_inheriting_thread(utest):
|
||||
global signal2
|
||||
u = "testing..."
|
||||
|
||||
class TT(InstanceInheritingThread):
|
||||
def run(self):
|
||||
global signal2
|
||||
utest.assertEquals(self.user(), u)
|
||||
signal2 = True
|
||||
|
||||
class T(InstanceThread):
|
||||
def run(self):
|
||||
super(T, self).run()
|
||||
child_thread = TT(name="child thread")
|
||||
child_thread.daemon = True
|
||||
child_thread.start()
|
||||
|
||||
parent_thread = T(u, name="Parent instance thread")
|
||||
parent_thread.daemon = True
|
||||
parent_thread.start()
|
||||
|
||||
time.sleep(0.2)
|
||||
utest.assertTrue(signal2)
|
||||
|
||||
def test_different_user(utest):
|
||||
u1, u2 = "ru", "di"
|
||||
class T(InstanceThread):
|
||||
def run(self):
|
||||
super(T, self).run()
|
||||
|
||||
for u in [u1, u2]:
|
||||
t = T(u)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
utest.assertEquals(t.user(), u)
|
||||
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
|
@ -3,12 +3,11 @@ from threading import Thread
|
|||
import traceback
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
|
||||
from media.update import replaygain
|
||||
from media.monitor.log import Loggable
|
||||
|
||||
|
||||
class ReplayGainUpdater(Thread, Loggable):
|
||||
class ReplayGainUpdater(Thread):
|
||||
"""
|
||||
The purpose of the class is to query the server for a list of files which
|
||||
do not have a ReplayGain value calculated. This class will iterate over the
|
||||
|
@ -30,6 +29,7 @@ class ReplayGainUpdater(Thread, Loggable):
|
|||
def __init__(self,apc):
|
||||
Thread.__init__(self)
|
||||
self.api_client = apc
|
||||
self.logger = logging.getLogger()
|
||||
|
||||
def main(self):
|
||||
raw_response = self.api_client.list_all_watched_dirs()
|
|
@ -24,6 +24,8 @@ from recorder import Recorder
|
|||
from listenerstat import ListenerStat
|
||||
from pypomessagehandler import PypoMessageHandler
|
||||
|
||||
from media.update.replaygainupdater import ReplayGainUpdater
|
||||
|
||||
from configobj import ConfigObj
|
||||
|
||||
# custom imports
|
||||
|
@ -174,6 +176,9 @@ if __name__ == '__main__':
|
|||
sys.exit()
|
||||
|
||||
api_client = api_client.AirtimeApiClient()
|
||||
|
||||
ReplayGainUpdater.start_reply_gain(api_client)
|
||||
|
||||
api_client.register_component("pypo")
|
||||
|
||||
pypoFetch_q = Queue()
|
||||
|
|
Loading…
Reference in New Issue