cc-2419: media monitor import on startup
-new method of querying db on startup.
This commit is contained in:
parent
df64b70280
commit
429d8b234c
|
@ -14,6 +14,7 @@ class ApiController extends Zend_Controller_Action
|
||||||
->addActionContext('media-monitor-setup', 'json')
|
->addActionContext('media-monitor-setup', 'json')
|
||||||
->addActionContext('media-item-status', 'json')
|
->addActionContext('media-item-status', 'json')
|
||||||
->addActionContext('reload-metadata', 'json')
|
->addActionContext('reload-metadata', 'json')
|
||||||
|
->addActionContext('list-all-files', 'json')
|
||||||
->initContext();
|
->initContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -431,9 +432,27 @@ class ApiController extends Zend_Controller_Action
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($mode == "create") {
|
if ($mode == "create") {
|
||||||
|
$filepath = $md['MDATA_KEY_FILEPATH'];
|
||||||
|
$file = StoredFile::RecallByFilepath($filepath);
|
||||||
|
|
||||||
|
if (is_null($file)) {
|
||||||
|
$file = StoredFile::Insert($md);
|
||||||
|
} else {
|
||||||
|
$this->view->error = "File already exists in Airtime.";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//Martin Konecny July 14th, 2011: The following commented out code is the way
|
||||||
|
//we used to check for duplicates (by md5). Why are we checking by md5 and
|
||||||
|
//not by filepath? I had to change this behaviour to check by filepath for the
|
||||||
|
//following reason:
|
||||||
|
//File A is renamed, which creates a delete and create event. Because the create
|
||||||
|
//event can be executed in parallel before the delete event completes, the create
|
||||||
|
//event will fail because the md5 is still in the database.
|
||||||
|
/*
|
||||||
$md5 = $md['MDATA_KEY_MD5'];
|
$md5 = $md['MDATA_KEY_MD5'];
|
||||||
$file = StoredFile::RecallByMd5($md5);
|
$file = StoredFile::RecallByMd5($md5);
|
||||||
|
|
||||||
if (is_null($file)) {
|
if (is_null($file)) {
|
||||||
$file = StoredFile::Insert($md);
|
$file = StoredFile::Insert($md);
|
||||||
}
|
}
|
||||||
|
@ -441,6 +460,7 @@ class ApiController extends Zend_Controller_Action
|
||||||
$this->view->error = "File already exists in Airtime.";
|
$this->view->error = "File already exists in Airtime.";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
else if ($mode == "modify") {
|
else if ($mode == "modify") {
|
||||||
$filepath = $md['MDATA_KEY_FILEPATH'];
|
$filepath = $md['MDATA_KEY_FILEPATH'];
|
||||||
|
@ -488,5 +508,20 @@ class ApiController extends Zend_Controller_Action
|
||||||
|
|
||||||
$this->view->id = $file->getId();
|
$this->view->id = $file->getId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function listAllFilesAction() {
|
||||||
|
global $CC_CONFIG;
|
||||||
|
|
||||||
|
$request = $this->getRequest();
|
||||||
|
$api_key = $request->getParam('api_key');
|
||||||
|
if (!in_array($api_key, $CC_CONFIG["apiKey"]))
|
||||||
|
{
|
||||||
|
header('HTTP/1.0 401 Unauthorized');
|
||||||
|
print 'You are not allowed to access this resource.';
|
||||||
|
exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->view->files = StoredFile::listAllFiles();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -819,5 +819,23 @@ class StoredFile {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static function listAllFiles(){
|
||||||
|
global $CC_CONFIG, $CC_DBC;
|
||||||
|
|
||||||
|
$sql = "SELECT m.directory || '/' || f.filepath as fp"
|
||||||
|
." FROM CC_MUSIC_DIRS m"
|
||||||
|
." LEFT JOIN CC_FILES f"
|
||||||
|
." ON m.id = f.directory"
|
||||||
|
." WHERE m.id = f.directory";
|
||||||
|
$rows = $CC_DBC->getAll($sql);
|
||||||
|
|
||||||
|
$results = array();
|
||||||
|
foreach ($rows as $row){
|
||||||
|
$results[] = $row["fp"];
|
||||||
|
}
|
||||||
|
|
||||||
|
return $results;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -103,6 +103,9 @@ class ApiClientInterface:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def update_media_metadata(self, md):
|
def update_media_metadata(self, md):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def list_all_db_files(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Put here whatever tests you want to run to make sure your API is working
|
# Put here whatever tests you want to run to make sure your API is working
|
||||||
|
@ -402,6 +405,22 @@ class AirTimeApiClient(ApiClientInterface):
|
||||||
logger.error("Exception: %s", e)
|
logger.error("Exception: %s", e)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
def list_all_db_files(self):
|
||||||
|
logger = logging.getLogger()
|
||||||
|
try:
|
||||||
|
url = "http://%s:%s/%s/%s" % (self.config["base_url"], str(self.config["base_port"]), self.config["api_base"], self.config["list_all_db_files"])
|
||||||
|
|
||||||
|
url = url.replace("%%api_key%%", self.config["api_key"])
|
||||||
|
|
||||||
|
req = urllib2.Request(url)
|
||||||
|
response = urllib2.urlopen(req).read()
|
||||||
|
response = json.loads(response)
|
||||||
|
except Exception, e:
|
||||||
|
response = None
|
||||||
|
logger.error("Exception: %s", e)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -54,7 +54,7 @@ try:
|
||||||
config.storage_directory = storage_directory
|
config.storage_directory = storage_directory
|
||||||
config.imported_directory = storage_directory + '/imported'
|
config.imported_directory = storage_directory + '/imported'
|
||||||
|
|
||||||
bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe)
|
bootstrap = AirtimeMediaMonitorBootstrap(logger, multi_queue, pe, config)
|
||||||
bootstrap.scan()
|
bootstrap.scan()
|
||||||
|
|
||||||
#create 5 worker processes
|
#create 5 worker processes
|
||||||
|
|
|
@ -2,15 +2,19 @@ import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from subprocess import Popen, PIPE
|
from subprocess import Popen, PIPE
|
||||||
|
from api_clients import api_client
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AirtimeMediaMonitorBootstrap():
|
class AirtimeMediaMonitorBootstrap():
|
||||||
|
|
||||||
def __init__(self, logger, multi_queue, pe):
|
def __init__(self, logger, multi_queue, pe, config):
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
self.multi_queue = multi_queue
|
self.multi_queue = multi_queue
|
||||||
self.pe = pe
|
self.pe = pe
|
||||||
self.airtime_tmp = '/var/tmp/airtime'
|
self.airtime_tmp = '/var/tmp/airtime'
|
||||||
|
self.config = config
|
||||||
|
self.api_client = api_client.api_client_factory(self.config.cfg)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
on bootup we want to scan all directories and look for files that
|
on bootup we want to scan all directories and look for files that
|
||||||
|
@ -23,76 +27,78 @@ class AirtimeMediaMonitorBootstrap():
|
||||||
for dir in directories:
|
for dir in directories:
|
||||||
self.check_for_diff(dir)
|
self.check_for_diff(dir)
|
||||||
|
|
||||||
|
def list_db_files(self):
|
||||||
|
return self.api_client.list_all_db_files()
|
||||||
|
|
||||||
def check_for_diff(self, dir):
|
def check_for_diff(self, dir):
|
||||||
#set to hold new and/or modified files. We use a set to make it ok if files are added
|
#set to hold new and/or modified files. We use a set to make it ok if files are added
|
||||||
#twice. This is become some of the tests for new files return result sets that are not
|
#twice. This is become some of the tests for new files return result sets that are not
|
||||||
#mutually exclusive from each other.
|
#mutually exclusive from each other.
|
||||||
added_files = set()
|
new_and_modified_files = set()
|
||||||
removed_files = set()
|
removed_files = set()
|
||||||
|
|
||||||
if os.path.exists(self.airtime_tmp + '/.airtime_media_index'):
|
|
||||||
|
db_known_files_set = set()
|
||||||
|
files = self.list_db_files()
|
||||||
|
for file in files['files']:
|
||||||
|
db_known_files_set.add(file)
|
||||||
|
|
||||||
|
|
||||||
|
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % dir
|
||||||
|
stdout = self.execCommandAndReturnStdOut(command)
|
||||||
|
new_files = stdout.split('\n')
|
||||||
|
all_files_set = set()
|
||||||
|
for file_path in new_files:
|
||||||
|
if len(file_path.strip(" \n")) > 0:
|
||||||
|
all_files_set.add(file_path)
|
||||||
|
|
||||||
|
|
||||||
|
if os.path.exists("/var/tmp/airtime/media_monitor_boot"):
|
||||||
#find files that have been modified since the last time
|
#find files that have been modified since the last time
|
||||||
#media-monitor process was running.
|
#media-monitor process started.
|
||||||
time_diff_sec = time.time() - os.path.getmtime(self.airtime_tmp + '/.airtime_media_index')
|
time_diff_sec = time.time() - os.path.getmtime("/var/tmp/airtime/media_monitor_boot")
|
||||||
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -%d" % (dir, time_diff_sec/60+1)
|
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable -mmin -%d" % (dir, time_diff_sec/60+1)
|
||||||
self.logger.debug(command)
|
|
||||||
stdout = self.execCommandAndReturnStdOut(command)
|
|
||||||
self.logger.info("Files modified since last checkin: \n%s\n", stdout)
|
|
||||||
|
|
||||||
new_files = stdout.split('\n')
|
|
||||||
|
|
||||||
for file_path in new_files:
|
|
||||||
added_files.add(file_path)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#a previous index exists, we can do a diff between this
|
|
||||||
#file and the current state to see whether anything has
|
|
||||||
#changed.
|
|
||||||
self.logger.info("Previous index file found.")
|
|
||||||
|
|
||||||
#find deleted files
|
|
||||||
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable > %s/.airtime_media_index.tmp" % (dir, self.airtime_tmp)
|
|
||||||
self.execCommand(command)
|
|
||||||
|
|
||||||
command = "diff -u %s/.airtime_media_index %s/.airtime_media_index.tmp" % (self.airtime_tmp, self.airtime_tmp)
|
|
||||||
stdout = self.execCommandAndReturnStdOut(command)
|
|
||||||
|
|
||||||
#remove first 3 lines from the diff output.
|
|
||||||
stdoutSplit = (stdout.split('\n'))[3:]
|
|
||||||
|
|
||||||
self.logger.info("Changed files since last checkin:\n%s\n", "\n".join(stdoutSplit))
|
|
||||||
|
|
||||||
for line in stdoutSplit:
|
|
||||||
if len(line.strip(' ')) > 1:
|
|
||||||
if line[0] == '+':
|
|
||||||
added_files.add(line[1:])
|
|
||||||
elif line[0] == '-':
|
|
||||||
removed_files.add(line[1:])
|
|
||||||
|
|
||||||
self.pe.write_index_file()
|
|
||||||
else:
|
else:
|
||||||
#a previous index does not exist. Most likely means that
|
command = "find %s -type f -iname '*.ogg' -o -iname '*.mp3' -readable" % dir
|
||||||
#media monitor has never seen this directory before. Let's
|
|
||||||
#notify airtime server about each of these files
|
|
||||||
self.logger.info("Previous index file does not exist. Creating a new one")
|
|
||||||
|
|
||||||
#create a new index file.
|
stdout = self.execCommandAndReturnStdOut(command)
|
||||||
stdout = self.pe.write_index_file()
|
|
||||||
|
new_files = stdout.split('\n')
|
||||||
|
|
||||||
|
for file_path in new_files:
|
||||||
|
if len(file_path.strip(" \n")) > 0:
|
||||||
|
new_and_modified_files.add(file_path)
|
||||||
|
|
||||||
new_files = stdout.split('\n')
|
#new_and_modified_files gives us a set of files that were either copied or modified
|
||||||
|
#since the last time media-monitor was running. These files were collected based on
|
||||||
for file_path in new_files:
|
#their modified timestamp. But this is not all that has changed in the directory. Files
|
||||||
added_files.add(file_path)
|
#could have been removed, or files could have been moved into this directory (moving does
|
||||||
|
#not affect last modified timestamp). Lets get a list of files that are on the file-system
|
||||||
|
#that the db has no record of, and vice-versa.
|
||||||
|
|
||||||
|
deleted_files_set = db_known_files_set - all_files_set
|
||||||
|
new_files_set = all_files_set - db_known_files_set
|
||||||
|
modified_files_set = new_and_modified_files - new_files_set
|
||||||
|
|
||||||
|
self.logger.info("Deleted files: \n%s\n\n"%deleted_files_set)
|
||||||
|
self.logger.info("New files: \n%s\n\n"%new_files_set)
|
||||||
|
self.logger.info("Modified files: \n%s\n\n"%modified_files_set)
|
||||||
|
|
||||||
for file_path in removed_files:
|
#"touch" file timestamp
|
||||||
|
open("/var/tmp/airtime/media_monitor_boot","w")
|
||||||
|
#return
|
||||||
|
|
||||||
|
|
||||||
|
for file_path in deleted_files_set:
|
||||||
self.pe.handle_removed_file(file_path)
|
self.pe.handle_removed_file(file_path)
|
||||||
|
|
||||||
for file_path in added_files:
|
for file_path in new_files_set:
|
||||||
if os.path.exists(file_path):
|
if os.path.exists(file_path):
|
||||||
self.pe.handle_created_file(False, os.path.basename(file_path), file_path)
|
self.pe.handle_created_file(False, os.path.basename(file_path), file_path)
|
||||||
|
|
||||||
|
for file_path in modified_files_set:
|
||||||
|
if os.path.exists(file_path):
|
||||||
|
self.pe.handle_modified_file(False, os.path.basename(file_path), file_path)
|
||||||
|
|
||||||
def execCommand(self, command):
|
def execCommand(self, command):
|
||||||
p = Popen(command, shell=True)
|
p = Popen(command, shell=True)
|
||||||
|
|
|
@ -242,12 +242,15 @@ class AirtimeProcessEvent(ProcessEvent):
|
||||||
|
|
||||||
|
|
||||||
def process_IN_MODIFY(self, event):
|
def process_IN_MODIFY(self, event):
|
||||||
if not event.dir:
|
self.handle_modified_file(event.dir, event.pathname, event.name)
|
||||||
self.logger.info("%s: %s", event.maskname, event.pathname)
|
|
||||||
if event.pathname in self.renamed_files:
|
def handle_modified_file(self, dir, pathname, name):
|
||||||
|
if not dir:
|
||||||
|
self.logger.info("Modified: %s", pathname)
|
||||||
|
if pathname in self.renamed_files:
|
||||||
pass
|
pass
|
||||||
elif self.is_audio_file(event.name):
|
elif self.is_audio_file(name):
|
||||||
self.file_events.append({'filepath': event.pathname, 'mode': self.config.MODE_MODIFY})
|
self.file_events.append({'filepath': pathname, 'mode': self.config.MODE_MODIFY})
|
||||||
|
|
||||||
def process_IN_MOVED_FROM(self, event):
|
def process_IN_MOVED_FROM(self, event):
|
||||||
self.logger.info("%s: %s", event.maskname, event.pathname)
|
self.logger.info("%s: %s", event.maskname, event.pathname)
|
||||||
|
|
|
@ -28,6 +28,9 @@ upload_recorded = 'upload-recorded/format/json/api_key/%%api_key%%/fileid/%%file
|
||||||
# URL to tell Airtime to update file's meta data
|
# URL to tell Airtime to update file's meta data
|
||||||
update_media_url = 'reload-metadata/format/json/api_key/%%api_key%%/mode/%%mode%%'
|
update_media_url = 'reload-metadata/format/json/api_key/%%api_key%%/mode/%%mode%%'
|
||||||
|
|
||||||
|
# URL to tell Airtime we want a listing of all files it knows about
|
||||||
|
list_all_db_files = 'list-all-files/format/json/api_key/%%api_key%%'
|
||||||
|
|
||||||
############################################
|
############################################
|
||||||
# RabbitMQ settings #
|
# RabbitMQ settings #
|
||||||
############################################
|
############################################
|
||||||
|
|
Loading…
Reference in New Issue