cc-4105: major refactorings. shit is barely running
This commit is contained in:
parent
add322d515
commit
be00cc6990
|
@ -41,6 +41,19 @@ def convert_dict_value_to_utf8(md):
|
|||
|
||||
class AirtimeApiClient():
|
||||
|
||||
# This is a little hacky fix so that I don't have to pass the config object
|
||||
# everywhere where AirtimeApiClient needs to be initialized
|
||||
default_config = None
|
||||
# the purpose of this custom constructor is to remember which config file
|
||||
# it was called with. So that after the initial call:
|
||||
# AirtimeApiClient.create_right_config('/path/to/config')
|
||||
# All subsequence calls to create_right_config will be with that config
|
||||
# file
|
||||
@staticmethod
|
||||
def create_right_config(log=None,config_path=None):
|
||||
if config_path: default_config = config_path
|
||||
return AirtimeApiClient( logger=None, config_path=default_config )
|
||||
|
||||
def __init__(self, logger=None,config_path='/etc/airtime/api_client.cfg'):
|
||||
if logger is None:
|
||||
self.logger = logging
|
||||
|
|
|
@ -3,11 +3,17 @@ from kombu.messaging import Exchange, Queue, Consumer
|
|||
from kombu.connection import BrokerConnection
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import copy
|
||||
|
||||
from media.monitor.exceptions import BadSongFile
|
||||
from media.monitor.metadata import Metadata
|
||||
from media.monitor.log import Loggable
|
||||
from media.monitor.syncdb import SyncDB
|
||||
from media.monitor.exceptions import DirectoryIsNotListed
|
||||
from media.monitor.bootstrap import Bootstrapper
|
||||
|
||||
from api_clients import api_client as apc
|
||||
|
||||
# Do not confuse with media monitor 1's AirtimeNotifier class that more related
|
||||
# to pyinotify's Notifier class. AirtimeNotifier just notifies when events come
|
||||
|
@ -54,7 +60,7 @@ class AirtimeNotifier(Loggable):
|
|||
|
||||
|
||||
class AirtimeMessageReceiver(Loggable):
|
||||
def __init__(self, cfg):
|
||||
def __init__(self, cfg, manager):
|
||||
self.dispatch_table = {
|
||||
'md_update' : self.md_update,
|
||||
'new_watch' : self.new_watch,
|
||||
|
@ -64,6 +70,7 @@ class AirtimeMessageReceiver(Loggable):
|
|||
'file_delete' : self.file_delete,
|
||||
}
|
||||
self.cfg = cfg
|
||||
self.manager = manager
|
||||
def message(self, msg):
|
||||
"""
|
||||
This method is called by an AirtimeNotifier instance that consumes the Rabbit MQ events
|
||||
|
@ -84,13 +91,22 @@ class AirtimeMessageReceiver(Loggable):
|
|||
def _execute_message(self,evt,message):
|
||||
self.dispatch_table[evt](message)
|
||||
|
||||
|
||||
def __request_now_bootstrap(self, directory_id=None, directory=None):
|
||||
sdb = SyncDB(apc.AirtimeApiClient.create_right_config())
|
||||
if directory_id == None: directory_id = sdb.directories[directory]
|
||||
if directory_id in sdb.id_lookup:
|
||||
d = sdb.id_lookup[directory_id]
|
||||
bs = Bootstrapper(sdb, self.manager.watch_signal())
|
||||
bs.flush_watch( directory=d, last_ran=time.time() )
|
||||
else:
|
||||
raise DirectoryIsNotListed(directory_id)
|
||||
|
||||
def supported_messages(self):
|
||||
return self.dispatch_table.keys()
|
||||
|
||||
# TODO : Handler methods - Should either fire the events directly with
|
||||
# pydispatcher or do the necessary changes on the filesystem that will fire
|
||||
# the events
|
||||
def md_update(self, msg):
|
||||
self.logger.info("Updating metadata for: '%s'" % msg['MDATA_KEY_FILEPATH'])
|
||||
md_path = msg['MDATA_KEY_FILEPATH']
|
||||
try:
|
||||
Metadata.write_unsafe(path=md_path, md=msg)
|
||||
|
@ -101,21 +117,65 @@ class AirtimeMessageReceiver(Loggable):
|
|||
self.logger.info("Unknown error when writing metadata to: '%s'" % md_path)
|
||||
|
||||
def new_watch(self, msg):
|
||||
# TODO: "walk" the newly watched directory
|
||||
self.manager.add_watch_directory(msg['directory'])
|
||||
self.logger.info("Creating watch for directory: '%s'" % msg['directory'])
|
||||
if not os.path.exists(msg['directory']):
|
||||
try: os.makedirs(msg['directory'])
|
||||
except Exception as e:
|
||||
self.logger.info("Failed to create watched dir '%s'" % msg['directory'])
|
||||
self.logger.info(str(e))
|
||||
# Is this clever or stupid?
|
||||
else: self.new_watch(msg)
|
||||
else:
|
||||
# TODO : Refactor this; breaks encapsulation.
|
||||
self.manager.watch_listener.flush_events(msg['directory'])
|
||||
self.manager.add_watch_directory(msg['directory'])
|
||||
|
||||
def remove_watch(self, msg):
|
||||
self.logger.info("Removing watch from directory: '%s'" % msg['directory'])
|
||||
self.manager.remove_watch_directory(msg['directory'])
|
||||
|
||||
def rescan_watch(self, msg):
|
||||
# TODO : basically a bootstrap on the directory
|
||||
pass
|
||||
self.logger.info("Trying to rescan watched directory: '%s'" % msg['directory'])
|
||||
try:
|
||||
self.__request_now_bootstrap(msg['id'])
|
||||
except DirectoryIsNotListed as e:
|
||||
self.logger.info("Bad rescan request")
|
||||
self.logger.info( str(e) )
|
||||
except Exception as e:
|
||||
self.logger.info("Bad rescan request. Unknown error.")
|
||||
self.logger.info( str(e) )
|
||||
else:
|
||||
self.logger.info("Successfully re-scanned: '%s'" % msg['directory'])
|
||||
|
||||
def change_storage(self, msg):
|
||||
new_storage_directory = msg['directory']
|
||||
new_storage_directory_id = str(msg['dir_id'])
|
||||
new_import = os.path.join(new_storage_directory, 'imported')
|
||||
new_organize = os.path.join(new_storage_directory, 'organize')
|
||||
for d in [new_import, new_organize]:
|
||||
if os.path.exists(d):
|
||||
self.logger.info("Changing storage to existing dir: '%s'" % d)
|
||||
else:
|
||||
try: os.makedirs(d)
|
||||
except Exception as e:
|
||||
self.logger.info("Could not create dir for storage '%s'" % d)
|
||||
self.logger.info(str(e))
|
||||
|
||||
if all([ os.path.exists(d) for d in [new_import, new_organize] ]):
|
||||
self.manager.set_store_path(new_import)
|
||||
try:
|
||||
self.__request_now_bootstrap( directory=new_import )
|
||||
except Exception as e:
|
||||
self.logger.info("Did not bootstrap off directory '%s'. Probably not in airtime db" % new_import)
|
||||
self.logger.info(str(e))
|
||||
# set_organize_path should automatically flush new_organize
|
||||
self.manager.set_organize_path(new_organize)
|
||||
else:
|
||||
self.logger.info("Change storage procedure failed, could not create directories")
|
||||
|
||||
def file_delete(self, msg):
|
||||
# deletes should be requested only from imported folder but we don't
|
||||
# verify that.
|
||||
# clippy confirmation: "are you really sure?"
|
||||
self.logger.info("Attempting to delete(maybe) '%s'" % msg['filepath'])
|
||||
if msg['delete']:
|
||||
self.logger.info("Clippy confirmation was received, actually deleting file...")
|
||||
if os.path.exists(msg['filepath']):
|
||||
|
@ -126,7 +186,8 @@ class AirtimeMessageReceiver(Loggable):
|
|||
self.logger.info("Failed to delete '%s'" % msg['filepath'])
|
||||
self.logger.info("Error: " % str(e))
|
||||
else:
|
||||
self.logger.info("Attempting to delete file '%s' that does not exist. Full request coming:" % msg['filepath'])
|
||||
self.logger.info("Attempting to delete file '%s' that does not exist. Full request coming:"
|
||||
% msg['filepath'])
|
||||
self.logger.info(msg)
|
||||
else:
|
||||
self.logger.info("No clippy confirmation, ignoring event. Out of curiousity we will print some details.")
|
||||
|
|
|
@ -9,55 +9,48 @@ class Bootstrapper(Loggable):
|
|||
Bootstrapper reads all the info in the filesystem flushes organize
|
||||
events and watch events
|
||||
"""
|
||||
def __init__(self,db,last_ran,org_channels,watch_channels):
|
||||
def __init__(self,db,watch_signal):
|
||||
"""
|
||||
db - SyncDB object; small layer over api client
|
||||
last_ran - last time the program was ran.
|
||||
watch_signal - the signals should send events for every file on.
|
||||
"""
|
||||
self.db = db
|
||||
self.org_channels = org_channels
|
||||
self.watch_channels = watch_channels
|
||||
self.last_ran = last_ran
|
||||
self.watch_signal = watch_signal
|
||||
|
||||
def flush_watch(self):
|
||||
def flush_all(self, last_ran):
|
||||
"""
|
||||
Syncs the file system into the database. Walks over deleted/new/modified files since
|
||||
the last run in mediamonitor and sends requests to make the database consistent with
|
||||
file system
|
||||
bootstrap every single watched directory. only useful at startup
|
||||
"""
|
||||
# Songs is a dictionary where every key is the watched the directory
|
||||
# and the value is a set with all the files in that directory.
|
||||
songs = {}
|
||||
for d in self.db.list_directories():
|
||||
self.flush_watch(d, last_ran)
|
||||
|
||||
def flush_watch(self, directory, last_ran):
|
||||
"""
|
||||
flush a single watch/imported directory. useful when wanting to to rescan,
|
||||
or add a watched/imported directory
|
||||
"""
|
||||
songs = set([])
|
||||
modded = deleted = 0
|
||||
signal_by_path = dict( (pc.signal, pc.path) for pc in self.watch_channels )
|
||||
for pc in self.watch_channels:
|
||||
songs[ pc.path ] = set()
|
||||
for f in mmp.walk_supported(pc.path, clean_empties=False):
|
||||
songs[ pc.path ].add(f)
|
||||
# We decide whether to update a file's metadata by checking
|
||||
# its system modification date. If it's above the value
|
||||
# self.last_ran which is passed to us that means media monitor
|
||||
# wasn't aware when this changes occured in the filesystem
|
||||
# hence it will send the correct events to sync the database
|
||||
# with the filesystem
|
||||
if os.path.getmtime(f) > self.last_ran:
|
||||
modded += 1
|
||||
dispatcher.send(signal=pc.signal, sender=self, event=DeleteFile(f))
|
||||
dispatcher.send(signal=pc.signal, sender=self, event=NewFile(f))
|
||||
# Want all files in the database that are not in the filesystem
|
||||
for watch_dir in self.db.list_directories():
|
||||
db_songs = self.db.directory_get_files(watch_dir)
|
||||
# Get all the files that are in the database but in the file
|
||||
# system. These are the files marked for deletions
|
||||
for to_delete in db_songs.difference(songs[watch_dir]):
|
||||
# need the correct watch channel signal to call delete
|
||||
if watch_dir in signal_by_path:
|
||||
dispatcher.send(signal=signal_by_path[watch_dir], sender=self, event=DeleteFile(f))
|
||||
# TODO : get rid of this, we should never delete files from
|
||||
# the FS even if they are deleted in airtime. Instead we
|
||||
# should put this file on a global ignore list until it's
|
||||
# re-added or something
|
||||
# os.remove(to_delete)
|
||||
deleted += 1
|
||||
else:
|
||||
self.logger.error("Could not find the signal corresponding to path: '%s'" % watch_dir)
|
||||
for f in mmp.walk_supported(directory, clean_empties=False):
|
||||
songs.add(f)
|
||||
# We decide whether to update a file's metadata by checking
|
||||
# its system modification date. If it's above the value
|
||||
# self.last_ran which is passed to us that means media monitor
|
||||
# wasn't aware when this changes occured in the filesystem
|
||||
# hence it will send the correct events to sync the database
|
||||
# with the filesystem
|
||||
if os.path.getmtime(f) > last_ran:
|
||||
modded += 1
|
||||
dispatcher.send(signal=self.watch_signal, sender=self, event=DeleteFile(f))
|
||||
dispatcher.send(signal=self.watch_signal, sender=self, event=NewFile(f))
|
||||
db_songs = self.db.directory_get_files(directory)
|
||||
# Get all the files that are in the database but in the file
|
||||
# system. These are the files marked for deletions
|
||||
for to_delete in db_songs.difference(songs):
|
||||
dispatcher.send(signal=self.watch_signal, sender=self, event=DeleteFile(f))
|
||||
deleted += 1
|
||||
self.logger.info( "Flushed watch directories. (modified, deleted) = (%d, %d)"
|
||||
% (modded, deleted) )
|
||||
% (modded, deleted) )
|
||||
|
||||
|
||||
|
|
|
@ -28,3 +28,7 @@ class CouldNotCreateIndexFile(Exception):
|
|||
self.cause = cause
|
||||
def __str__(self): return "Failed to create touch file '%s'" % self.path
|
||||
|
||||
class DirectoryIsNotListed(Exception):
|
||||
def __init__(self,dir_id):
|
||||
self.dir_id = dir_id
|
||||
def __str__(self): return "%d was not listed as a directory in the database" % self.dir_id
|
||||
|
|
|
@ -1,149 +0,0 @@
|
|||
import os
|
||||
import time
|
||||
import media.monitor.pure as mmp
|
||||
import media.monitor.log
|
||||
from subprocess import Popen, PIPE
|
||||
import api_clients.api_client as ac
|
||||
from media.monitor.syncdb import SyncDB
|
||||
|
||||
logger = media.monitor.log.get_logger()
|
||||
|
||||
def find_command(directory, extra_arguments=""):
|
||||
""" Builds a find command that respects supported_file_formats list
|
||||
Note: Use single quotes to quote arguments """
|
||||
ext_globs = [ "-iname '*.%s'" % ext for ext in mmp.supported_extensions ]
|
||||
find_glob = ' -o '.join(ext_globs)
|
||||
return "find '%s' %s %s" % (directory, find_glob, extra_arguments)
|
||||
|
||||
def exec_command(command):
|
||||
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE)
|
||||
stdout, stderr = p.communicate()
|
||||
if p.returncode != 0:
|
||||
logger.warn("command \n%s\n return with a non-zero return value", command)
|
||||
logger.error(stderr)
|
||||
try:
|
||||
#File name charset encoding is UTF-8.
|
||||
stdout = stdout.decode("UTF-8")
|
||||
except Exception:
|
||||
stdout = None
|
||||
logger.error("Could not decode %s using UTF-8" % stdout)
|
||||
return stdout
|
||||
|
||||
def scan_dir_for_new_files(dir):
|
||||
command = find_command(directory=dir, extra_arguments="-type f -readable")
|
||||
logger.debug(command)
|
||||
stdout = exec_command(command)
|
||||
if stdout is None: return []
|
||||
else: return stdout.splitlines()
|
||||
|
||||
def clean_dirty_file_paths(dirty_files):
|
||||
""" clean dirty file paths by removing blanks and removing trailing/leading whitespace"""
|
||||
return filter(lambda e: len(e) > 0, [ f.strip(" \n") for f in dirty_files ])
|
||||
|
||||
def handle_created_file(dir, pathname, name):
|
||||
if not dir:
|
||||
self.logger.debug("PROCESS_IN_CLOSE_WRITE: %s, name: %s, pathname: %s ", dir, name, pathname)
|
||||
|
||||
if self.mmc.is_temp_file(name) :
|
||||
#file created is a tmp file which will be modified and then moved back to the original filename.
|
||||
#Easy Tag creates this when changing metadata of ogg files.
|
||||
self.temp_files[pathname] = None
|
||||
#file is being overwritten/replaced in GUI.
|
||||
elif "goutputstream" in pathname:
|
||||
self.temp_files[pathname] = None
|
||||
elif self.mmc.is_audio_file(name):
|
||||
if self.mmc.is_parent_directory(pathname, self.config.organize_directory):
|
||||
|
||||
#file was created in /srv/airtime/stor/organize. Need to process and move
|
||||
#to /srv/airtime/stor/imported
|
||||
file_md = self.md_manager.get_md_from_file(pathname)
|
||||
playable = self.mmc.test_file_playability(pathname)
|
||||
|
||||
if file_md and playable:
|
||||
self.mmc.organize_new_file(pathname, file_md)
|
||||
else:
|
||||
#move to problem_files
|
||||
self.mmc.move_to_problem_dir(pathname)
|
||||
else:
|
||||
# only append to self.file_events if the file isn't going to be altered by organize_new_file(). If file is going
|
||||
# to be altered by organize_new_file(), then process_IN_MOVED_TO event will handle appending it to self.file_events
|
||||
is_recorded = self.mmc.is_parent_directory(pathname, self.config.recorded_directory)
|
||||
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'is_recorded_show': is_recorded})
|
||||
def handle_removed_file(dir, pathname):
|
||||
logger.info("Deleting %s", pathname)
|
||||
if not dir:
|
||||
if mmc.is_audio_file(pathname):
|
||||
if pathname in self.ignore_event:
|
||||
logger.info("pathname in ignore event")
|
||||
ignore_event.remove(pathname)
|
||||
elif not self.mmc.is_parent_directory(pathname, self.config.organize_directory):
|
||||
logger.info("deleting a file not in organize")
|
||||
#we don't care if a file was deleted from the organize directory.
|
||||
file_events.append({'filepath': pathname, 'mode': self.config.MODE_DELETE})
|
||||
|
||||
"""
|
||||
This function takes in a path name provided by the database (and its corresponding row id)
|
||||
and reads the list of files in the local file system. Its purpose is to discover which files
|
||||
exist on the file system but not in the database and vice versa, as well as which files have
|
||||
been modified since the database was last updated. In each case, this method will call an
|
||||
appropiate method to ensure that the database actually represents the filesystem.
|
||||
dir_id -- row id of the directory in the cc_watched_dirs database table
|
||||
dir -- pathname of the directory
|
||||
"""
|
||||
def sync_database_to_filesystem(dir_id, dir,syncdb, last_ran=0):
|
||||
# TODO: is this line even necessary?
|
||||
dir = os.path.normpath(dir)+"/"
|
||||
"""
|
||||
set to hold new and/or modified files. We use a set to make it ok if files are added
|
||||
twice. This is because some of the tests for new files return result sets that are not
|
||||
mutually exclusive from each other.
|
||||
"""
|
||||
removed_files = set() # Not used in the original code either
|
||||
db_known_files_set = set()
|
||||
files = syncdb.id_get_files(dir_id)
|
||||
for f in files:
|
||||
db_known_files_set.add(f)
|
||||
all_files = clean_dirty_file_paths( scan_dir_for_new_files(dir) )
|
||||
all_files_set = set()
|
||||
for file_path in all_files:
|
||||
all_files_set.add(file_path[len(dir):])
|
||||
if last_ran > 0:
|
||||
"""find files that have been modified since the last time media-monitor process started."""
|
||||
time_diff_sec = time.time() - last_ran
|
||||
command = find_command(directory=dir, extra_arguments=("-type f -readable -mmin -%d" % (time_diff_sec/60+1)))
|
||||
else:
|
||||
command = find_command(directory=dir, extra_arguments="-type f -readable")
|
||||
logger.debug(command)
|
||||
stdout = exec_command(command)
|
||||
if stdout is None: new_files = []
|
||||
else: new_files = stdout.splitlines()
|
||||
new_and_modified_files = set()
|
||||
for file_path in new_files:
|
||||
new_and_modified_files.add(file_path[len(dir):])
|
||||
"""
|
||||
new_and_modified_files gives us a set of files that were either copied or modified
|
||||
since the last time media-monitor was running. These files were collected based on
|
||||
their modified timestamp. But this is not all that has changed in the directory. Files
|
||||
could have been removed, or files could have been moved into this directory (moving does
|
||||
not affect last modified timestamp). Lets get a list of files that are on the file-system
|
||||
that the db has no record of, and vice-versa.
|
||||
"""
|
||||
deleted_files_set = db_known_files_set - all_files_set
|
||||
new_files_set = all_files_set - db_known_files_set
|
||||
modified_files_set = new_and_modified_files - new_files_set
|
||||
logger.info(u"Deleted files: \n%s\n\n", deleted_files_set)
|
||||
logger.info(u"New files: \n%s\n\n", new_files_set)
|
||||
logger.info(u"Modified files: \n%s\n\n", modified_files_set)
|
||||
for file_path in deleted_files_set:
|
||||
logger.debug("deleted file")
|
||||
full_file_path = os.path.join(dir, file_path)
|
||||
logger.debug(full_file_path)
|
||||
self.pe.handle_removed_file(False, full_file_path)
|
||||
for file_set, debug_message, handle_attribute in [(new_files_set, "new file", "handle_created_file"),
|
||||
(modified_files_set, "modified file", "handle_modified_file")]:
|
||||
for file_path in file_set:
|
||||
logger.debug(debug_message)
|
||||
full_file_path = os.path.join(dir, file_path)
|
||||
logger.debug(full_file_path)
|
||||
if os.path.exists(full_file_path):
|
||||
getattr(self.pe,handle_attribute)(False,full_file_path, os.path.basename(full_file_path))
|
|
@ -84,6 +84,6 @@ class StoreWatchListener(BaseListener, Loggable, pyinotify.ProcessEvent):
|
|||
added = 0
|
||||
for f in mmp.walk_supported(path, clean_empties=False):
|
||||
added += 1
|
||||
dispatcher.send(signal=self.signal, sender=self, event=NewFile(f))
|
||||
dispatcher.send( signal=self.signal, sender=self, event=NewFile(f) )
|
||||
self.logger.info( "Flushed watch directory. added = %d" % added )
|
||||
|
||||
|
|
|
@ -36,6 +36,10 @@ class Manager(Loggable):
|
|||
# removed...
|
||||
self.watched_directories = set([])
|
||||
|
||||
|
||||
def watch_signal(self):
|
||||
return self.watch_listener.self.signal
|
||||
|
||||
def __remove_watch(self,path):
|
||||
if path in self.__wd_path: # only delete if dir is actually being watched
|
||||
wd = self.__wd_path[path]
|
||||
|
@ -94,7 +98,7 @@ class Manager(Loggable):
|
|||
# new files are added to the database. Note that we are not responsible
|
||||
# for removing songs in the old store directory from the database
|
||||
# we assume that this is already done for us.
|
||||
self.watch_listener.flush_events(new_path)
|
||||
# self.watch_listener.flush_events(new_path)
|
||||
self.__add_watch(new_path, self.watch_listener)
|
||||
|
||||
store_path = property(get_store_path, set_store_path)
|
||||
|
@ -130,6 +134,10 @@ class Manager(Loggable):
|
|||
self.logger.info("'%s' is not being watched, hence cannot be removed"
|
||||
% watch_dir)
|
||||
|
||||
def pyinotify(self):
|
||||
return pyinotify.Notifier(self.wm)
|
||||
|
||||
|
||||
def loop(self):
|
||||
"""
|
||||
block until we receive pyinotify events
|
||||
|
|
|
@ -287,6 +287,11 @@ def last_modified(path):
|
|||
return os.path.getmtime(path)
|
||||
else: 0
|
||||
|
||||
def import_organize(store):
|
||||
"""returns a tuple of organize and imported directory from an airtime store directory"""
|
||||
store = os.path.normpath(store)
|
||||
return os.path.join(store,'organize'), os.path.join(store,'imported')
|
||||
|
||||
if __name__ == '__main__':
|
||||
import doctest
|
||||
doctest.testmod()
|
||||
|
|
|
@ -19,7 +19,7 @@ class RequestSync(threading.Thread,Loggable):
|
|||
|
||||
@LazyProperty
|
||||
def apiclient(self):
|
||||
return ac.AirtimeApiClient()
|
||||
return ac.AirtimeApiClient.create_right_config()
|
||||
|
||||
def run(self):
|
||||
# TODO : implement proper request sending
|
||||
|
@ -28,7 +28,7 @@ class RequestSync(threading.Thread,Loggable):
|
|||
# Not forget to attach the 'is_record' to any requests that are related
|
||||
# to recorded shows
|
||||
# A simplistic request would like:
|
||||
# self.apiclient.send_media_monitor_requests(requests)
|
||||
self.apiclient.send_media_monitor_requests(self.requests)
|
||||
self.watcher.flag_done()
|
||||
|
||||
class TimeoutWatcher(threading.Thread,Loggable):
|
||||
|
@ -56,8 +56,9 @@ class TimeoutWatcher(threading.Thread,Loggable):
|
|||
self.watcher.flush_events()
|
||||
|
||||
class WatchSyncer(ReportHandler,Loggable):
|
||||
def __init__(self, channel, chunking_number = 50, timeout=15):
|
||||
self.channel = channel
|
||||
def __init__(self, signal, chunking_number = 50, timeout=15):
|
||||
self.path = '' # TODO : get rid of this attribute everywhere
|
||||
#self.signal = signal
|
||||
self.timeout = timeout
|
||||
self.chunking_number = chunking_number
|
||||
self.__queue = []
|
||||
|
@ -70,12 +71,10 @@ class WatchSyncer(ReportHandler,Loggable):
|
|||
tc = TimeoutWatcher(self, timeout)
|
||||
tc.daemon = True
|
||||
tc.start()
|
||||
super(WatchSyncer, self).__init__(signal=channel.signal)
|
||||
super(WatchSyncer, self).__init__(signal=signal)
|
||||
|
||||
@property
|
||||
def target_path(self): return self.channel.path
|
||||
@property
|
||||
def signal(self): return self.channel.signal
|
||||
def target_path(self): return self.path
|
||||
|
||||
def handle(self, sender, event):
|
||||
"""We implement this abstract method from ReportHandler"""
|
||||
|
|
|
@ -1,13 +1,8 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import pyinotify
|
||||
import sys
|
||||
import os
|
||||
|
||||
from media.monitor.listeners import OrganizeListener, StoreWatchListener
|
||||
from media.monitor.organizer import Organizer
|
||||
from media.monitor.events import PathChannel
|
||||
from media.monitor.watchersyncer import WatchSyncer
|
||||
from media.monitor.handler import ProblemFileHandler
|
||||
from media.monitor.manager import Manager
|
||||
from media.monitor.bootstrap import Bootstrapper
|
||||
from media.monitor.log import get_logger
|
||||
from media.monitor.config import MMConfig
|
||||
|
@ -15,6 +10,7 @@ from media.monitor.toucher import ToucherThread
|
|||
from media.monitor.syncdb import SyncDB
|
||||
from media.monitor.exceptions import FailedToObtainLocale, FailedToSetLocale, NoConfigFile
|
||||
from media.monitor.airtime import AirtimeNotifier, AirtimeMessageReceiver
|
||||
from media.monitor.watchersyncer import WatchSyncer
|
||||
import media.monitor.pure as mmp
|
||||
|
||||
from api_clients import api_client as apc
|
||||
|
@ -34,7 +30,7 @@ from api_clients import api_client as apc
|
|||
# Rewrite to use manager.Manager
|
||||
|
||||
log = get_logger()
|
||||
global_config = u'/path/to/global/config'
|
||||
global_config = u'/home/rudi/Airtime/python_apps/media-monitor2/tests/live_client.cfg'
|
||||
# MMConfig is a proxy around ConfigObj instances. it does not allow itself
|
||||
# users of MMConfig instances to modify any config options directly through the
|
||||
# dictionary. Users of this object muse use the correct methods designated for
|
||||
|
@ -62,80 +58,44 @@ except Exception as e:
|
|||
log.info("Failed to set the locale for unknown reason. Logging exception.")
|
||||
log.info(str(e))
|
||||
|
||||
#channels = {
|
||||
#note that org channel still has a 'watch' path because that is the path
|
||||
#it supposed to be moving the organized files to. it doesn't matter where
|
||||
#are all the "to organize" files are coming from
|
||||
#'org' : PathChannel('org', '/home/rudi/throwaway/fucking_around/organize'),
|
||||
#'watch' : [],
|
||||
#'badfile' : PathChannel('badfile', '/home/rudi/throwaway/fucking_around/problem_dir'),
|
||||
#}
|
||||
watch_syncer = WatchSyncer(signal='watch')
|
||||
|
||||
channels = {}
|
||||
org = config['org']
|
||||
channels['org'] = PathChannel(org['signal'], org['path'])
|
||||
channels['watch'] = []
|
||||
problem = config['problem']
|
||||
channels['badfile'] = PathChannel(problem['signal'], problem['path'])
|
||||
apiclient = apc.AirtimeApiClient.create_right_config(log=log,config_path=global_config)
|
||||
|
||||
apiclient = apc.AirtimeApiClient(log)
|
||||
# We initialize sdb before anything because we must know what our watched
|
||||
# directories are.
|
||||
|
||||
# TODO : Need to do setup_media_monitor call somewhere around here to get
|
||||
# import/organize dirs
|
||||
sdb = SyncDB(apiclient)
|
||||
|
||||
manager = Manager()
|
||||
|
||||
store = apiclient.setup_media_monitor()
|
||||
store = store[u'stor']
|
||||
|
||||
organize_dir, import_dir = mmp.import_organize(store)
|
||||
# Order matters here:
|
||||
manager.set_store_path(import_dir)
|
||||
manager.set_organize_path(organize_dir)
|
||||
|
||||
for watch_dir in sdb.list_directories():
|
||||
if not os.path.exists(watch_dir):
|
||||
# Create the watch_directory here
|
||||
try: os.makedirs(watch_dir)
|
||||
except Exception as e:
|
||||
log.error("Could not create watch directory: '%s' (given from the database)." % watch_dir)
|
||||
# We must do another existence check for the watched directory because we
|
||||
# the creation of it could have failed above
|
||||
if os.path.exists(watch_dir):
|
||||
channels['watch'].append(PathChannel('watch', watch_dir))
|
||||
manager.add_watch_directory(watch_dir)
|
||||
|
||||
# The stor directory is the first directory in the watched directories list
|
||||
org = Organizer(channel=channels['org'],target_path=channels['watch'][0].path)
|
||||
# TODO : this is wrong
|
||||
watches = [ WatchSyncer(channel=pc) for pc in channels['watch'] ]
|
||||
problem_files = ProblemFileHandler(channel=channels['badfile'])
|
||||
last_ran=config.last_ran()
|
||||
bs = Bootstrapper( db=sdb, watch_signal='watch' )
|
||||
|
||||
# A slight incosistency here, channels['watch'] is already a list while the
|
||||
# other items are single elements. For consistency we should make all the
|
||||
# values in channels lists later on
|
||||
# we try to not share the config object as much as possible and in this case we
|
||||
# prefer to only pass the necessary last_ran parameter instead of the whole
|
||||
# object. although this might change in the future if Bootstrapper becomes more
|
||||
# complicated
|
||||
bs = Bootstrapper(db=sdb, last_ran=config.last_ran(), org_channels=[channels['org']], watch_channels=channels['watch'])
|
||||
bs.flush_all( config.last_ran() )
|
||||
|
||||
bs.flush_organize()
|
||||
bs.flush_watch()
|
||||
|
||||
wm = pyinotify.WatchManager()
|
||||
|
||||
# Listeners don't care about which directory they're related to. All they care
|
||||
# about is which signal they should respond to
|
||||
o1 = OrganizeListener(signal=channels['org'].signal)
|
||||
# We are assuming that the signals are the same for each watched directory here
|
||||
o2 = StoreWatchListener(signal=channels['watch'][0].signal)
|
||||
|
||||
notifier = pyinotify.Notifier(wm)
|
||||
wdd1 = wm.add_watch(channels['org'].path, pyinotify.ALL_EVENTS, rec=True, auto_add=True, proc_fun=o1)
|
||||
for pc in channels['watch']:
|
||||
wdd2 = wm.add_watch(pc.path, pyinotify.ALL_EVENTS, rec=True, auto_add=True, proc_fun=o2)
|
||||
|
||||
# After finishing the bootstrapping + the listeners we should initialize the
|
||||
# kombu message consumer to respond to messages from airtime. we prefer to
|
||||
# leave this component of the program for last because without the *Listener
|
||||
# objects we cannot properly respond to all events from airtime anyway.
|
||||
|
||||
airtime_receiver = AirtimeMessageReceiver(config)
|
||||
airtime_receiver = AirtimeMessageReceiver(config,manager)
|
||||
airtime_notifier = AirtimeNotifier(config, airtime_receiver)
|
||||
|
||||
# Launch the toucher that updates the last time when the script was ran every
|
||||
# n seconds.
|
||||
tt = ToucherThread(path=config['index_path'], interval=config['touch_interval'])
|
||||
|
||||
notifier.loop()
|
||||
tt = ToucherThread(path=config['index_path'], interval=int(config['touch_interval']))
|
||||
|
||||
pyi = manager.pyinotify()
|
||||
pyi.loop()
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
#!/bin/bash
|
||||
|
||||
for f in /home/rudi/Airtime/python_apps/media-monitor2/tests/*.py
|
||||
do
|
||||
python $f
|
||||
done
|
|
@ -4,9 +4,12 @@ import os
|
|||
import sys
|
||||
from api_clients import api_client as apc
|
||||
|
||||
|
||||
import prepare_tests
|
||||
|
||||
class TestApiClient(unittest.TestCase):
|
||||
def setUp(self):
|
||||
test_path = '/home/rudi/Airtime/python_apps/media-monitor2/tests/api_client.cfg'
|
||||
test_path = prepare_tests.real_config
|
||||
if not os.path.exists(test_path):
|
||||
print("path for config does not exist: '%s' % test_path")
|
||||
# TODO : is there a cleaner way to exit the unit testing?
|
||||
|
|
|
@ -6,13 +6,15 @@ from media.monitor.airtime import AirtimeNotifier, AirtimeMessageReceiver
|
|||
from mock import patch, Mock
|
||||
from media.monitor.config import MMConfig
|
||||
|
||||
from media.monitor.manager import Manager
|
||||
|
||||
def filter_ev(d): return { i : j for i,j in d.iteritems() if i != 'event_type' }
|
||||
|
||||
class TestReceiver(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# TODO : properly mock this later
|
||||
cfg = {}
|
||||
self.amr = AirtimeMessageReceiver(cfg)
|
||||
self.amr = AirtimeMessageReceiver(cfg, Manager())
|
||||
|
||||
def test_supported_messages(self):
|
||||
self.assertTrue( len(self.amr.supported_messages()) > 0 )
|
||||
|
|
|
@ -5,10 +5,12 @@ from media.monitor.syncdb import SyncDB
|
|||
from media.monitor.log import get_logger
|
||||
from media.monitor.pure import partition
|
||||
import api_clients.api_client as ac
|
||||
import prepare_tests
|
||||
|
||||
class TestSyncDB(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.ac = ac.AirtimeApiClient(logger=get_logger())
|
||||
self.ac = ac.AirtimeApiClient(logger=get_logger(),
|
||||
config_path=prepare_tests.real_config)
|
||||
|
||||
def test_syncdb_init(self):
|
||||
sdb = SyncDB(self.ac)
|
||||
|
@ -25,7 +27,7 @@ class TestSyncDB(unittest.TestCase):
|
|||
for wdir in sdb.list_directories():
|
||||
files = sdb.directory_get_files(wdir)
|
||||
self.assertTrue( len(files) >= 0 )
|
||||
self.assertTrue( isinstance(files, list) )
|
||||
self.assertTrue( isinstance(files, set) )
|
||||
exist, deleted = partition(os.path.exists, files)
|
||||
print("(exist, deleted) = (%d, %d)" % ( len(exist), len(deleted) ) )
|
||||
|
||||
|
|
Loading…
Reference in New Issue