cc-2419: media monitor import on start
-files importing working...
This commit is contained in:
parent
778b645a69
commit
851d5c12b2
3 changed files with 72 additions and 39 deletions
|
@ -37,33 +37,31 @@ try:
|
||||||
config = AirtimeMediaConfig(logger)
|
config = AirtimeMediaConfig(logger)
|
||||||
|
|
||||||
multi_queue = mpQueue()
|
multi_queue = mpQueue()
|
||||||
|
|
||||||
|
|
||||||
bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue)
|
|
||||||
bootstrap.scan()
|
|
||||||
|
|
||||||
logger.info("Initializing event processor")
|
logger.info("Initializing event processor")
|
||||||
pe = AirtimeProcessEvent(multi_queue, airtime_config=config)
|
pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config)
|
||||||
|
|
||||||
notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config)
|
notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config)
|
||||||
notifier.coalesce_events()
|
notifier.coalesce_events()
|
||||||
|
|
||||||
|
logger.info("Setting up monitor")
|
||||||
|
response = None
|
||||||
|
while response is None:
|
||||||
|
response = notifier.api_client.setup_media_monitor()
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
storage_directory = response["stor"].encode('utf-8')
|
||||||
|
logger.info("Storage Directory is: %s", storage_directory)
|
||||||
|
config.storage_directory = storage_directory
|
||||||
|
|
||||||
|
bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe)
|
||||||
|
bootstrap.scan()
|
||||||
|
|
||||||
#create 5 worker processes
|
#create 5 worker processes
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
p = Process(target=notifier.process_file_events, args=(multi_queue,))
|
p = Process(target=notifier.process_file_events, args=(multi_queue,))
|
||||||
processes.append(p)
|
processes.append(p)
|
||||||
p.start()
|
p.start()
|
||||||
|
|
||||||
logger.info("Setting up monitor")
|
|
||||||
response = None
|
|
||||||
while response is None:
|
|
||||||
response = notifier.api_client.setup_media_monitor()
|
|
||||||
time.sleep(5)
|
|
||||||
|
|
||||||
storage_directory = response["stor"].encode('utf-8')
|
|
||||||
logger.info("Storage Directory is: %s", storage_directory)
|
|
||||||
config.storage_directory = storage_directory
|
|
||||||
|
|
||||||
wdd = pe.watch_directory(storage_directory)
|
wdd = pe.watch_directory(storage_directory)
|
||||||
logger.info("Added watch to %s", storage_directory)
|
logger.info("Added watch to %s", storage_directory)
|
||||||
logger.info("wdd result %s", wdd[storage_directory])
|
logger.info("wdd result %s", wdd[storage_directory])
|
||||||
|
|
|
@ -5,9 +5,10 @@ from subprocess import Popen, PIPE
|
||||||
|
|
||||||
class AirtimeMediaMonitorBootstrap():
|
class AirtimeMediaMonitorBootstrap():
|
||||||
|
|
||||||
def __init__(self, logger, multi_queue):
|
def __init__(self, logger, multi_queue, pe):
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
self.multi_queue = multi_queue
|
self.multi_queue = multi_queue
|
||||||
|
self.pe = pe
|
||||||
|
|
||||||
"""
|
"""
|
||||||
on bootup we want to scan all directories and look for files that
|
on bootup we want to scan all directories and look for files that
|
||||||
|
@ -22,16 +23,24 @@ class AirtimeMediaMonitorBootstrap():
|
||||||
|
|
||||||
def check_for_diff(self, dir):
|
def check_for_diff(self, dir):
|
||||||
airtime_tmp = '/var/tmp/airtime'
|
airtime_tmp = '/var/tmp/airtime'
|
||||||
|
|
||||||
|
#set to hold new and/or modified files. We use a set to make it ok if files are added
|
||||||
|
#twice. This is become some of the tests for new files return result sets that are not
|
||||||
|
#mutually exclusive from each other.
|
||||||
|
modified_files = set()
|
||||||
|
|
||||||
#find files that have been modified since the last time
|
#find files that have been modified since the last time
|
||||||
#media-monitor process was running.
|
#media-monitor process was running.
|
||||||
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -30" % dir
|
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -30" % dir
|
||||||
stdout = self.execCommandAndReturnStdOut(command)
|
stdout = self.execCommandAndReturnStdOut(command)
|
||||||
self.logger.info("Files modified since last checkin: \n%s\n", stdout)
|
self.logger.info("Files modified since last checkin: \n%s\n", stdout)
|
||||||
|
|
||||||
#TODO: notify about modified and newly created files (not including copied files)
|
new_files = stdout.split('\n')
|
||||||
|
|
||||||
|
for file_path in new_files:
|
||||||
|
modified_files.add(file_path)
|
||||||
|
|
||||||
if os.path.exists(airtime_tmp + '/.airtime_media_index'):
|
if os.path.exists(airtime_tmp + '/.airtime_media_index') and False:
|
||||||
#a previous index exists, we can do a diff between this
|
#a previous index exists, we can do a diff between this
|
||||||
#file and the current state to see whether anything has
|
#file and the current state to see whether anything has
|
||||||
#changed.
|
#changed.
|
||||||
|
@ -54,11 +63,26 @@ class AirtimeMediaMonitorBootstrap():
|
||||||
self.logger.info("Previous index file does not exist. Creating a new one")
|
self.logger.info("Previous index file does not exist. Creating a new one")
|
||||||
|
|
||||||
#create a new index file.
|
#create a new index file.
|
||||||
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index" % (dir, airtime_tmp)
|
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % dir
|
||||||
self.execCommand(command)
|
stdout = self.execCommandAndReturnStdOut(command)
|
||||||
|
self.logger.info("New files found: \n%s\n", stdout)
|
||||||
|
self.write_file(airtime_tmp + '/.airtime_media_index', stdout)
|
||||||
|
|
||||||
|
new_files = stdout.split('\n')
|
||||||
|
|
||||||
|
for file_path in new_files:
|
||||||
|
modified_files.add(file_path)
|
||||||
|
|
||||||
|
self.logger.debug("set size: %d", len(modified_files))
|
||||||
|
|
||||||
#TODO: notify about all files in this directory.
|
for file_path in modified_files:
|
||||||
self.multi_queue.put(event)
|
if os.path.exists(file_path):
|
||||||
|
self.pe.handle_created_file(False, os.path.basename(file_path), file_path)
|
||||||
|
|
||||||
|
def write_file(self, file, string):
|
||||||
|
f = open(file, 'w')
|
||||||
|
f.write(string)
|
||||||
|
f.close()
|
||||||
|
|
||||||
def execCommand(self, command):
|
def execCommand(self, command):
|
||||||
p = Popen(command, shell=True)
|
p = Popen(command, shell=True)
|
||||||
|
|
|
@ -138,6 +138,7 @@ class AirtimeProcessEvent(ProcessEvent):
|
||||||
|
|
||||||
return filepath
|
return filepath
|
||||||
|
|
||||||
|
#create path in /srv/airtime/stor/imported/[song-metadata]
|
||||||
def create_file_path(self, imported_filepath, orig_md):
|
def create_file_path(self, imported_filepath, orig_md):
|
||||||
|
|
||||||
storage_directory = self.config.storage_directory
|
storage_directory = self.config.storage_directory
|
||||||
|
@ -192,29 +193,39 @@ class AirtimeProcessEvent(ProcessEvent):
|
||||||
|
|
||||||
return filepath, is_recorded_show
|
return filepath, is_recorded_show
|
||||||
|
|
||||||
|
#event.dir: True if the event was raised against a directory.
|
||||||
|
#event.name
|
||||||
|
#event.pathname: pathname (str): Concatenation of 'path' and 'name'.
|
||||||
def process_IN_CREATE(self, event):
|
def process_IN_CREATE(self, event):
|
||||||
|
|
||||||
#self.logger.info("%s: %s", event.maskname, event.pathname)
|
self.logger.debug("PROCESS_IN_CREATE")
|
||||||
storage_directory = self.config.storage_directory
|
self.handle_created_file(event.dir, event.name, event.pathname)
|
||||||
|
|
||||||
if not event.dir:
|
|
||||||
|
def handle_created_file(self, dir, name, pathname):
|
||||||
|
|
||||||
|
self.logger.debug("dir: %s, name: %s, pathname: %s ", dir, name, pathname)
|
||||||
|
storage_directory = self.config.storage_directory
|
||||||
|
if not dir:
|
||||||
#file created is a tmp file which will be modified and then moved back to the original filename.
|
#file created is a tmp file which will be modified and then moved back to the original filename.
|
||||||
if self.is_temp_file(event.name) :
|
if self.is_temp_file(name) :
|
||||||
self.temp_files[event.pathname] = None
|
self.temp_files[pathname] = None
|
||||||
#This is a newly imported file.
|
#This is a newly imported file.
|
||||||
elif self.is_audio_file(event.pathname):
|
elif self.is_audio_file(pathname):
|
||||||
if self.is_parent_directory(event.pathname, storage_directory):
|
if self.is_parent_directory(pathname, storage_directory):
|
||||||
self.set_needed_file_permissions(event.pathname, event.dir)
|
self.set_needed_file_permissions(pathname, dir)
|
||||||
|
|
||||||
self.process_new_file(event.pathname)
|
self.process_new_file(pathname)
|
||||||
else:
|
else:
|
||||||
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False})
|
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'is_recorded_show': False})
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if self.is_parent_directory(event.pathname, storage_directory):
|
if self.is_parent_directory(pathname, storage_directory):
|
||||||
self.set_needed_file_permissions(event.pathname, event.dir)
|
self.set_needed_file_permissions(pathname, dir)
|
||||||
|
|
||||||
def process_new_file(pathname):
|
|
||||||
|
def process_new_file(self, pathname):
|
||||||
|
self.logger.info("Processing new file: %s", pathname)
|
||||||
file_md = self.md_manager.get_md_from_file(pathname)
|
file_md = self.md_manager.get_md_from_file(pathname)
|
||||||
|
|
||||||
if file_md is not None:
|
if file_md is not None:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue