cc-2419 media monitor import on start

This commit is contained in:
martin 2011-06-30 17:23:31 -04:00
parent 851d5c12b2
commit 0f67db9ed7
4 changed files with 103 additions and 55 deletions

View File

@ -40,7 +40,7 @@ try:
logger.info("Initializing event processor")
pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config)
notifier = AirtimeNotifier(pe.wm, pe, read_freq=0.1, timeout=0.1, airtime_config=config)
notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=0, airtime_config=config)
notifier.coalesce_events()
logger.info("Setting up monitor")
@ -52,6 +52,7 @@ try:
storage_directory = response["stor"].encode('utf-8')
logger.info("Storage Directory is: %s", storage_directory)
config.storage_directory = storage_directory
config.imported_directory = storage_directory + '/imported'
bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe)
bootstrap.scan()

View File

@ -1,4 +1,5 @@
import os
import time
from subprocess import Popen, PIPE
@ -9,6 +10,7 @@ class AirtimeMediaMonitorBootstrap():
self.logger = logger
self.multi_queue = multi_queue
self.pe = pe
self.airtime_tmp = '/var/tmp/airtime'
"""
on bootup we want to scan all directories and look for files that
@ -21,41 +23,56 @@ class AirtimeMediaMonitorBootstrap():
for dir in directories:
self.check_for_diff(dir)
def check_for_diff(self, dir):
airtime_tmp = '/var/tmp/airtime'
def check_for_diff(self, dir):
#set to hold new and/or modified files. We use a set to make it ok if files are added
#twice. This is become some of the tests for new files return result sets that are not
#mutually exclusive from each other.
modified_files = set()
#find files that have been modified since the last time
#media-monitor process was running.
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -30" % dir
stdout = self.execCommandAndReturnStdOut(command)
self.logger.info("Files modified since last checkin: \n%s\n", stdout)
new_files = stdout.split('\n')
for file_path in new_files:
modified_files.add(file_path)
added_files = set()
removed_files = set()
if os.path.exists(airtime_tmp + '/.airtime_media_index') and False:
if os.path.exists(self.airtime_tmp + '/.airtime_media_index'):
#find files that have been modified since the last time
#media-monitor process was running.
time_diff_sec = time.time() - os.path.getmtime(self.airtime_tmp + '/.airtime_media_index')
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -%d" % (dir, time_diff_sec/60+1)
self.logger.debug(command)
stdout = self.execCommandAndReturnStdOut(command)
self.logger.info("Files modified since last checkin: \n%s\n", stdout)
new_files = stdout.split('\n')
for file_path in new_files:
added_files.add(file_path)
#a previous index exists, we can do a diff between this
#file and the current state to see whether anything has
#changed.
self.logger.info("Previous index file found.")
#find deleted files
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, airtime_tmp)
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, self.airtime_tmp)
self.execCommand(command)
command = "diff %s/.airtime_media_index.tmp %s/.airtime_media_index" % (airtime_tmp, airtime_tmp)
command = "diff -u %s/.airtime_media_index %s/.airtime_media_index.tmp" % (self.airtime_tmp, self.airtime_tmp)
stdout = self.execCommandAndReturnStdOut(command)
self.logger.info("Deleted files since last checkin:\n%s\n", stdout)
#TODO: notify about deleted files and files moved here
#remove first 3 lines from the diff output.
stdoutSplit = (stdout.split('\n'))[3:]
self.logger.info("Changed files since last checkin:\n%s\n", "\n".join(stdoutSplit))
for line in stdoutSplit:
if len(line.strip(' ')) > 0:
if line[0] == '+':
added_files.add(line[1:])
elif line[0] == '-':
removed_files.add(line[1:])
self.pe.write_index_file()
else:
#a previous index does not exist. Most likely means that
#media monitor has never seen this directory before. Let's
@ -63,27 +80,20 @@ class AirtimeMediaMonitorBootstrap():
self.logger.info("Previous index file does not exist. Creating a new one")
#create a new index file.
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % dir
stdout = self.execCommandAndReturnStdOut(command)
self.logger.info("New files found: \n%s\n", stdout)
self.write_file(airtime_tmp + '/.airtime_media_index', stdout)
stdout = self.pe.write_index_file()
new_files = stdout.split('\n')
for file_path in new_files:
modified_files.add(file_path)
added_files.add(file_path)
self.logger.debug("set size: %d", len(modified_files))
for file_path in modified_files:
for file_path in added_files:
if os.path.exists(file_path):
self.pe.handle_created_file(False, os.path.basename(file_path), file_path)
def write_file(self, file, string):
f = open(file, 'w')
f.write(string)
f.close()
for file_path in removed_files:
self.pe.handle_removed_file(file_path)
def execCommand(self, command):
p = Popen(command, shell=True)
sts = os.waitpid(p.pid, 0)[1]
@ -96,9 +106,4 @@ class AirtimeMediaMonitorBootstrap():
if p.returncode != 0:
self.logger.warn("command \n%s\n return with a non-zero return value", command)
return stdout
if __name__ == '__main__':
mmb = AirtimeMediaMonitorBootstrap()
mmb.scan()

View File

@ -3,8 +3,6 @@ import time
import os
import logging
from multiprocessing import Process, Lock, Queue as mpQueue
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
@ -26,6 +24,7 @@ class AirtimeNotifier(Notifier):
self.md_manager = AirtimeMetadata()
self.import_processes = {}
self.watched_folders = []
while not self.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
@ -112,7 +111,6 @@ class AirtimeNotifier(Notifier):
# -data
# -is_recorded_show
def update_airtime(self, d):
filepath = d['filepath']
mode = d['mode']

View File

@ -4,6 +4,8 @@ import grp
import pwd
import logging
from subprocess import Popen, PIPE
import pyinotify
from pyinotify import WatchManager, Notifier, ProcessEvent
@ -36,6 +38,11 @@ class AirtimeProcessEvent(ProcessEvent):
self.mask = pyinotify.ALL_EVENTS
self.wm = WatchManager()
self.md_manager = AirtimeMetadata()
#Set to "True" everytime we get a file event so
#that we can track of when we need to rewrite the
#index file
self.dirty = False
#define which directories the pyinotify WatchManager should watch.
def watch_directory(self, directory):
@ -177,7 +184,6 @@ class AirtimeProcessEvent(ProcessEvent):
#yyyy-mm-dd-hh-MM-ss
y = orig_md['MDATA_KEY_YEAR'].split("-")
filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "recorded".encode('utf-8'), y[0], y[1], orig_md['MDATA_KEY_YEAR'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
is_recorded_show = True
elif(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')):
filepath = '%s/%s/%s/%s/%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
else:
@ -191,7 +197,7 @@ class AirtimeProcessEvent(ProcessEvent):
except Exception, e:
self.logger.error('Exception: %s', e)
return filepath, is_recorded_show
return filepath
#event.dir: True if the event was raised against a directory.
#event.name
@ -229,10 +235,15 @@ class AirtimeProcessEvent(ProcessEvent):
file_md = self.md_manager.get_md_from_file(pathname)
if file_md is not None:
filepath, is_recorded_show = self.create_file_path(pathname, file_md)
self.move_file(pathname, filepath)
self.renamed_files[pathname] = filepath
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show})
is_recorded_show = 'MDATA_KEY_CREATOR' in file_md and \
file_md['MDATA_KEY_CREATOR'] == "AIRTIMERECORDERSOURCEFABRIC".encode('utf-8')
if not self.is_parent_directory(pathname, self.config.imported_directory):
filepath = self.create_file_path(pathname, file_md)
self.move_file(pathname, filepath)
self.renamed_files[pathname] = filepath
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show})
else:
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'data': file_md, 'is_recorded_show': is_recorded_show})
def process_IN_MODIFY(self, event):
@ -290,18 +301,51 @@ class AirtimeProcessEvent(ProcessEvent):
def process_IN_DELETE(self, event):
self.logger.info("%s: %s", event.maskname, event.pathname)
if not event.dir:
self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_DELETE})
self.handle_removed_file(event.pathname)
def handle_removed_file(self, pathname):
self.logger.info("Deleting %s", pathname)
self.file_events.append({'filepath': pathname, 'mode': self.config.MODE_DELETE})
def process_default(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname)
pass
def execCommandAndReturnStdOut(self, command):
p = Popen(command, shell=True, stdout=PIPE)
stdout = p.communicate()[0]
if p.returncode != 0:
self.logger.warn("command \n%s\n return with a non-zero return value", command)
return stdout
def write_index_file(self):
#create a new index file.
self.logger.debug("writing new index file")
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % '/srv/airtime/stor'
stdout = self.execCommandAndReturnStdOut(command)
self.write_file('/var/tmp/airtime' + '/.airtime_media_index', stdout)
return stdout
def write_file(self, file, string):
f = open(file, 'w')
f.write(string)
f.close()
def notifier_loop_callback(self, notifier):
if len(self.file_events) > 0:
for event in self.file_events:
self.multi_queue.put(event)
for event in self.file_events:
self.multi_queue.put(event)
self.file_events = []
self.dirty = True
self.file_events = []
elif self.multi_queue.empty():
#no file_events and queue is empty. This is a good time
#to write an index file.
if self.dirty:
self.write_index_file()
self.dirty = False
#check for any events recieved from Airtime.
try: