CC-1799 : Live Studio Playout from media library (pytagsfs)

-code cleanup, refactoring, and files moved from a watched-dir
 to a non-watched dir are now properly handled
This commit is contained in:
martin 2011-07-05 15:48:50 -04:00
parent 8593b340de
commit 2be05a8004
5 changed files with 128 additions and 53 deletions

View File

@ -6,11 +6,16 @@ import sys
import os
import signal
from api_clients import api_client
from multiprocessing import Process, Queue as mpQueue
from pyinotify import WatchManager
from airtimefilemonitor.airtimenotifier import AirtimeNotifier
from airtimefilemonitor.airtimeprocessevent import AirtimeProcessEvent
from airtimefilemonitor.mediaconfig import AirtimeMediaConfig
from airtimefilemonitor.workerprocess import MediaMonitorWorkerProcess
from airtimefilemonitor.airtimemediamonitorbootstrap import AirtimeMediaMonitorBootstrap
def handleSigTERM(signum, frame):
@ -35,18 +40,12 @@ processes = []
try:
config = AirtimeMediaConfig(logger)
multi_queue = mpQueue()
logger.info("Initializing event processor")
pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config)
notifier = AirtimeNotifier(pe.wm, pe, read_freq=1, timeout=0, airtime_config=config)
notifier.coalesce_events()
api_client = api_client.api_client_factory(config.cfg)
logger.info("Setting up monitor")
response = None
while response is None:
response = notifier.api_client.setup_media_monitor()
response = api_client.setup_media_monitor()
time.sleep(5)
storage_directory = response["stor"].encode('utf-8')
@ -54,12 +53,25 @@ try:
config.storage_directory = storage_directory
config.imported_directory = storage_directory + '/imported'
multi_queue = mpQueue()
logger.info("Initializing event processor")
except Exception, e:
logger.error('Exception: %s', e)
try:
wm = WatchManager()
pe = AirtimeProcessEvent(queue=multi_queue, airtime_config=config, wm=wm)
notifier = AirtimeNotifier(wm, pe, read_freq=1, timeout=0, airtime_config=config, api_client=api_client)
notifier.coalesce_events()
bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe, config)
bootstrap.scan()
#create 5 worker processes
wp = MediaMonitorWorkerProcess()
for i in range(5):
p = Process(target=notifier.process_file_events, args=(multi_queue,))
p = Process(target=wp.process_file_events, args=(multi_queue, notifier))
processes.append(p)
p.start()

View File

@ -19,7 +19,7 @@ class AirtimeMediaMonitorBootstrap():
"""
on bootup we want to scan all directories and look for files that
weren't there or files that changed before media-monitor process
went offline. We can do this by doing a hash of the directory metadata.
went offline.
"""
def scan(self):
directories = self.get_list_of_watched_dirs();
@ -39,7 +39,7 @@ class AirtimeMediaMonitorBootstrap():
def check_for_diff(self, dir_id, dir):
#set to hold new and/or modified files. We use a set to make it ok if files are added
#twice. This is become some of the tests for new files return result sets that are not
#twice. This is because some of the tests for new files return result sets that are not
#mutually exclusive from each other.
new_and_modified_files = set()
removed_files = set()

View File

@ -8,19 +8,19 @@ from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
import pyinotify
from pyinotify import WatchManager, Notifier, ProcessEvent
from pyinotify import Notifier
from api_clients import api_client
#from api_clients import api_client
from airtimemetadata import AirtimeMetadata
class AirtimeNotifier(Notifier):
def __init__(self, watch_manager, default_proc_fun=None, read_freq=0, threshold=0, timeout=None, airtime_config=None):
def __init__(self, watch_manager, default_proc_fun=None, read_freq=0, threshold=0, timeout=None, airtime_config=None, api_client=None):
Notifier.__init__(self, watch_manager, default_proc_fun, read_freq, threshold, timeout)
self.logger = logging.getLogger()
self.config = airtime_config
self.api_client = api_client.api_client_factory(self.config.cfg)
self.api_client = api_client
self.md_manager = AirtimeMetadata()
self.import_processes = {}
self.watched_folders = []
@ -155,15 +155,6 @@ class AirtimeNotifier(Notifier):
self.api_client.update_media_metadata(md, mode)
#this function is run in its own process, and continuously
#checks the queue for any new file events.
def process_file_events(self, queue):
while True:
event = queue.get()
self.logger.info("received event %s", event)
self.update_airtime(event)
def walk_newly_watched_directory(self, directory):
mm = self.proc_fun()

View File

@ -3,11 +3,12 @@ import socket
import grp
import pwd
import logging
import time
from subprocess import Popen, PIPE
import pyinotify
from pyinotify import WatchManager, Notifier, ProcessEvent
from pyinotify import ProcessEvent
# For RabbitMQ
from kombu.connection import BrokerConnection
@ -18,7 +19,7 @@ from airtimefilemonitor.mediaconfig import AirtimeMediaConfig
class AirtimeProcessEvent(ProcessEvent):
def my_init(self, queue, airtime_config=None):
def my_init(self, queue, airtime_config=None, wm=None):
"""
Method automatically called from ProcessEvent.__init__(). Additional
keyworded arguments passed to ProcessEvent.__init__() are then
@ -30,13 +31,16 @@ class AirtimeProcessEvent(ProcessEvent):
self.supported_file_formats = ['mp3', 'ogg']
self.temp_files = {}
self.renamed_files = {}
"""
self.moved_files = {}
self.gui_replaced = {}
self.renamed_files = {}
"""
self.cookies_IN_MOVED_FROM = {}
self.file_events = []
self.multi_queue = queue
self.mask = pyinotify.ALL_EVENTS
self.wm = WatchManager()
self.wm = wm
self.md_manager = AirtimeMetadata()
#define which directories the pyinotify WatchManager should watch.
@ -95,9 +99,12 @@ class AirtimeProcessEvent(ProcessEvent):
try:
omask = os.umask(0)
if ((not os.path.exists(directory)) or ((os.path.exists(directory) and not os.path.isdir(directory)))):
if not os.path.exists(directory):
os.makedirs(directory, 02777)
self.watch_directory(directory)
#self.watch_directory(directory)
elif not os.path.isdir(directory):
#path exists but it is a file not a directory!
self.logger.error("path %s exists, but it is not a directory!!!")
finally:
os.umask(omask)
@ -111,7 +118,8 @@ class AirtimeProcessEvent(ProcessEvent):
finally:
os.umask(omask)
#checks if path exists already in stor. If the path exists and the md5s are the same just moves file to same path anyway to avoid duplicates in the system.
#checks if path exists already in stor. If the path exists and the md5s are the
#same just overwrite.
def create_unique_filename(self, filepath, old_filepath):
try:
@ -148,7 +156,7 @@ class AirtimeProcessEvent(ProcessEvent):
return filepath
#create path in /srv/airtime/stor/imported/[song-metadata]
def create_file_path(self, imported_filepath, orig_md):
def create_file_path(self, original_path, orig_md):
storage_directory = self.config.storage_directory
@ -156,7 +164,7 @@ class AirtimeProcessEvent(ProcessEvent):
try:
#will be in the format .ext
file_ext = os.path.splitext(imported_filepath)[1]
file_ext = os.path.splitext(original_path)[1]
file_ext = file_ext.encode('utf-8')
path_md = ['MDATA_KEY_TITLE', 'MDATA_KEY_CREATOR', 'MDATA_KEY_SOURCE', 'MDATA_KEY_TRACKNUMBER', 'MDATA_KEY_BITRATE']
@ -189,10 +197,10 @@ class AirtimeProcessEvent(ProcessEvent):
elif(md['MDATA_KEY_TRACKNUMBER'] == u'unknown'.encode('utf-8')):
filepath = '%s/%s/%s/%s/%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
else:
filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TRACKNUMBER'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
filepath = '%s/%s/%s/%s/%s-%s-%s%s' % (storage_directory, "imported".encode('utf-8'), md['MDATA_KEY_CREATOR'], md['MDATA_KEY_SOURCE'], md['MDATA_KEY_TRACKNUMBER'], md['MDATA_KEY_TITLE'], md['MDATA_KEY_BITRATE'], file_ext)
self.logger.info('Created filepath: %s', filepath)
filepath = self.create_unique_filename(filepath, imported_filepath)
filepath = self.create_unique_filename(filepath, original_path)
self.logger.info('Unique filepath: %s', filepath)
self.ensure_dir(filepath)
@ -202,30 +210,32 @@ class AirtimeProcessEvent(ProcessEvent):
return filepath
#event.dir: True if the event was raised against a directory.
#event.name
#event.name: filename
#event.pathname: pathname (str): Concatenation of 'path' and 'name'.
def process_IN_CREATE(self, event):
self.logger.debug("PROCESS_IN_CREATE: %s", event)
self.handle_created_file(event.dir, event.name, event.pathname)
def handle_created_file(self, dir, name, pathname):
self.logger.debug("PROCESS_IN_CREATE")
self.logger.debug("dir: %s, name: %s, pathname: %s ", dir, name, pathname)
storage_directory = self.config.storage_directory
if not dir:
#file created is a tmp file which will be modified and then moved back to the original filename.
#event is because of a created file
if self.is_temp_file(name) :
#file created is a tmp file which will be modified and then moved back to the original filename.
self.temp_files[pathname] = None
#This is a newly imported file.
elif self.is_audio_file(pathname):
if self.is_parent_directory(pathname, storage_directory):
#file was created in /srv/airtime/stor. Need to process and copy
#to /srv/airtime/stor/imported
self.set_needed_file_permissions(pathname, dir)
self.process_new_file(pathname)
else:
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'is_recorded_show': False})
else:
#event is because of a created directory
if self.is_parent_directory(pathname, storage_directory):
self.set_needed_file_permissions(pathname, dir)
@ -237,16 +247,20 @@ class AirtimeProcessEvent(ProcessEvent):
if file_md is not None:
is_recorded_show = 'MDATA_KEY_CREATOR' in file_md and \
file_md['MDATA_KEY_CREATOR'] == "AIRTIMERECORDERSOURCEFABRIC".encode('utf-8')
if not self.is_parent_directory(pathname, self.config.imported_directory):
filepath = self.create_file_path(pathname, file_md)
self.move_file(pathname, filepath)
self.renamed_files[pathname] = filepath
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show})
else:
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'data': file_md, 'is_recorded_show': is_recorded_show})
#if not self.is_parent_directory(pathname, self.config.imported_directory):
#file has not been "imported" yet. Need to move this file to /srv/airtime/stor/imported
filepath = self.create_file_path(pathname, file_md)
self.move_file(pathname, filepath)
self.renamed_files[pathname] = filepath
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': is_recorded_show})
#else:
# self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': pathname, 'data': file_md, 'is_recorded_show': is_recorded_show})
else:
self.logger.warn("File %s, has invalid metadata", pathname)
def process_IN_MODIFY(self, event):
self.logger.info("process_IN_MODIFY: %s", event)
self.handle_modified_file(event.dir, event.pathname, event.name)
def handle_modified_file(self, dir, pathname, name):
@ -257,9 +271,16 @@ class AirtimeProcessEvent(ProcessEvent):
elif self.is_audio_file(name):
self.file_events.append({'filepath': pathname, 'mode': self.config.MODE_MODIFY})
#if a file is moved somewhere, this callback is run. With details about
#where the file is being moved from. The corresponding process_IN_MOVED_TO
#callback is only called if the destination of the file is also in a watched
#directory.
def process_IN_MOVED_FROM(self, event):
self.logger.info("%s: %s", event.maskname, event.pathname)
self.logger.info("process_IN_MOVED_FROM: %s", event)
if not event.dir:
self.cookies_IN_MOVED_FROM[event.cookie] = (event, time.time())
"""
if "goutputstream" in event.pathname:
self.gui_replaced[event.cookie] = None
elif event.pathname in self.temp_files:
@ -269,12 +290,34 @@ class AirtimeProcessEvent(ProcessEvent):
pass
else:
self.moved_files[event.cookie] = event.pathname
"""
def process_IN_MOVED_TO(self, event):
self.logger.info("%s: %s", event.maskname, event.pathname)
self.logger.info("process_IN_MOVED_TO: %s", event)
#if stuff dropped in stor via a UI move must change file permissions.
self.set_needed_file_permissions(event.pathname, event.dir)
if not event.dir:
if event.cookie in self.cookies_IN_MOVED_FROM:
#files original location was also in a watched directory, in this case
#we won't try to create a new file name, and move the file to the appropriate
#location. We'll just assume the user knows what he is doing.
del self.cookies_IN_MOVED_FROM[event.cookie]
self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MOVED})
else:
storage_directory = self.config.storage_directory
if self.is_parent_directory(event.pathname, storage_directory):
#show dragged from unwatched directory into the storage directory
file_md = self.md_manager.get_md_from_file(event.pathname)
if file_md is not None:
filepath = self.create_file_path(event.pathname, file_md)
self.move_file(event.pathname, filepath)
#self.renamed_files[event.pathname] = filepath
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': False})
else:
#show dragged from unwatched folder into a watched folder.
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False})
"""
if event.cookie in self.temp_files:
del self.temp_files[event.cookie]
self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY})
@ -299,9 +342,10 @@ class AirtimeProcessEvent(ProcessEvent):
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': filepath, 'data': file_md, 'is_recorded_show': False})
else:
self.file_events.append({'mode': self.config.MODE_CREATE, 'filepath': event.pathname, 'is_recorded_show': False})
"""
def process_IN_DELETE(self, event):
self.logger.info("%s: %s", event.maskname, event.pathname)
self.logger.info("process_IN_DELETE: %s", event)
if not event.dir:
self.handle_removed_file(event.pathname)
@ -311,7 +355,7 @@ class AirtimeProcessEvent(ProcessEvent):
def process_default(self, event):
#self.logger.info("%s: %s", event.maskname, event.pathname)
#self.logger.info("PROCESS_DEFAULT: %s", event)
pass
def execCommandAndReturnStdOut(self, command):
@ -332,6 +376,22 @@ class AirtimeProcessEvent(ProcessEvent):
self.multi_queue.put(event)
self.file_events = []
for k, pair in self.cookies_IN_MOVED_FROM.items():
event = pair[0]
timestamp = pair[1]
timestamp_now = time.time()
if timestamp_now - timestamp > 5:
#in_moved_from event didn't have a corresponding
#in_moved_to event in the last 5 seconds.
#This means the file was moved to outside of the
#watched directories. Let's handle this by deleting
#it from the Airtime directory.
del self.cookies_IN_MOVED_FROM[k]
self.handle_removed_file(event.pathname)
#check for any events recieved from Airtime.
try:

View File

@ -0,0 +1,12 @@
class MediaMonitorWorkerProcess:
#this function is run in its own process, and continuously
#checks the queue for any new file events.
def process_file_events(self, queue, notifier):
while True:
event = queue.get()
notifier.logger.info("received event %s", event)
notifier.update_airtime(event)