Merge branch 'cc-5709-airtime-analyzer-cloud-storage' into cc-5709-airtime-analyzer-cloud-storage-saas
Conflicts: airtime_mvc/application/Bootstrap.php airtime_mvc/application/models/Schedule.php airtime_mvc/application/modules/rest/controllers/MediaController.php airtime_mvc/build/sql/schema.sql python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py
This commit is contained in:
commit
cf9eabbc74
42 changed files with 5355 additions and 1681 deletions
|
@ -18,7 +18,7 @@ class AnalyzerPipeline:
|
|||
"""
|
||||
|
||||
@staticmethod
|
||||
def run_analysis(queue, audio_file_path, import_directory, original_filename):
|
||||
def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain):
|
||||
"""Analyze and import an audio file, and put all extracted metadata into queue.
|
||||
|
||||
Keyword arguments:
|
||||
|
@ -31,6 +31,7 @@ class AnalyzerPipeline:
|
|||
preserve. The file at audio_file_path typically has a
|
||||
temporary randomly generated name, which is why we want
|
||||
to know what the original name was.
|
||||
station_domain: The Airtime Pro account's domain name. i.e. bananas
|
||||
"""
|
||||
# It is super critical to initialize a separate log file here so that we
|
||||
# don't inherit logging/locks from the parent process. Supposedly
|
||||
|
@ -52,6 +53,7 @@ class AnalyzerPipeline:
|
|||
# First, we extract the ID3 tags and other metadata:
|
||||
metadata = dict()
|
||||
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
|
||||
metadata["station_domain"] = station_domain
|
||||
#metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
|
||||
csu = CloudStorageUploader()
|
||||
metadata = csu.upload_obj(audio_file_path, metadata)
|
||||
|
|
|
@ -6,7 +6,7 @@ from libcloud.storage.providers import get_driver
|
|||
from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError
|
||||
|
||||
|
||||
CONFIG_PATH = '/etc/airtime-saas/amazon.conf'
|
||||
CONFIG_PATH = '/etc/airtime-saas/cloud_storage.conf'
|
||||
|
||||
class CloudStorageUploader:
|
||||
""" A class that uses Apache Libcloud's Storage API to upload objects into
|
||||
|
@ -27,7 +27,8 @@ class CloudStorageUploader:
|
|||
def __init__(self):
|
||||
config = aa.AirtimeAnalyzerServer.read_config_file(CONFIG_PATH)
|
||||
|
||||
CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage"
|
||||
CLOUD_STORAGE_CONFIG_SECTION = config.get("current_backend", "storage_backend")
|
||||
self._storage_backend = CLOUD_STORAGE_CONFIG_SECTION
|
||||
self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider')
|
||||
self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket')
|
||||
self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key')
|
||||
|
@ -70,7 +71,8 @@ class CloudStorageUploader:
|
|||
except ContainerDoesNotExistError:
|
||||
container = driver.create_container(self._bucket)
|
||||
|
||||
extra = {'meta_data': {'filename': file_base_name}}
|
||||
extra = {'meta_data': {'filename': file_base_name,
|
||||
'station_domain': metadata["station_domain"]}}
|
||||
|
||||
obj = driver.upload_object(file_path=audio_file_path,
|
||||
container=container,
|
||||
|
@ -90,5 +92,6 @@ class CloudStorageUploader:
|
|||
metadata["filename"] = file_base_name
|
||||
|
||||
metadata["resource_id"] = object_name
|
||||
metadata["storage_backend"] = self._storage_backend
|
||||
return metadata
|
||||
|
||||
|
|
|
@ -161,12 +161,13 @@ class MessageListener:
|
|||
msg_dict = json.loads(body)
|
||||
api_key = msg_dict["api_key"]
|
||||
callback_url = msg_dict["callback_url"]
|
||||
station_domain = msg_dict["station_domain"]
|
||||
|
||||
audio_file_path = msg_dict["tmp_file_path"]
|
||||
import_directory = msg_dict["import_directory"]
|
||||
original_filename = msg_dict["original_filename"]
|
||||
|
||||
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename)
|
||||
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain)
|
||||
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
|
||||
|
||||
except KeyError as e:
|
||||
|
@ -205,11 +206,11 @@ class MessageListener:
|
|||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
|
||||
@staticmethod
|
||||
def spawn_analyzer_process(audio_file_path, import_directory, original_filename):
|
||||
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain):
|
||||
''' Spawn a child process to analyze and import a new audio file. '''
|
||||
q = multiprocessing.Queue()
|
||||
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
|
||||
args=(q, audio_file_path, import_directory, original_filename))
|
||||
args=(q, audio_file_path, import_directory, original_filename, station_domain))
|
||||
p.start()
|
||||
p.join()
|
||||
if p.exitcode == 0:
|
||||
|
|
|
@ -2,17 +2,19 @@
|
|||
|
||||
from threading import Thread
|
||||
from Queue import Empty
|
||||
from cloud_storage_downloader import CloudStorageDownloader
|
||||
|
||||
import logging
|
||||
import shutil
|
||||
import os
|
||||
import sys
|
||||
import stat
|
||||
|
||||
import requests
|
||||
import ConfigParser
|
||||
|
||||
from std_err_override import LogWriter
|
||||
|
||||
CONFIG_PATH = '/etc/airtime/airtime.conf'
|
||||
|
||||
# configure logging
|
||||
logging.config.fileConfig("logging.cfg")
|
||||
logger = logging.getLogger()
|
||||
|
@ -37,12 +39,8 @@ class PypoFile(Thread):
|
|||
"""
|
||||
src = media_item['uri']
|
||||
dst = media_item['dst']
|
||||
|
||||
try:
|
||||
src_size = os.path.getsize(src)
|
||||
except Exception, e:
|
||||
self.logger.error("Could not get size of source file: %s", src)
|
||||
return
|
||||
|
||||
src_size = media_item['filesize']
|
||||
|
||||
dst_exists = True
|
||||
try:
|
||||
|
@ -68,7 +66,23 @@ class PypoFile(Thread):
|
|||
"""
|
||||
copy will overwrite dst if it already exists
|
||||
"""
|
||||
shutil.copy(src, dst)
|
||||
#shutil.copy(src, dst)
|
||||
config = self.read_config_file(CONFIG_PATH)
|
||||
CONFIG_SECTION = "general"
|
||||
username = config.get(CONFIG_SECTION, 'api_key')
|
||||
url = media_item['download_url']
|
||||
|
||||
with open(dst, "wb") as handle:
|
||||
response = requests.get(url, auth=requests.auth.HTTPBasicAuth(username, ''), stream=True)
|
||||
|
||||
if not response.ok:
|
||||
raise Exception("%s - Error occurred downloading file" % response.status_code)
|
||||
|
||||
for chunk in response.iter_content(1024):
|
||||
if not chunk:
|
||||
break
|
||||
|
||||
handle.write(chunk)
|
||||
|
||||
#make file world readable
|
||||
os.chmod(dst, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
|
||||
|
@ -108,6 +122,19 @@ class PypoFile(Thread):
|
|||
|
||||
return media_item
|
||||
|
||||
def read_config_file(self, config_path):
|
||||
"""Parse the application's config file located at config_path."""
|
||||
config = ConfigParser.SafeConfigParser()
|
||||
try:
|
||||
config.readfp(open(config_path))
|
||||
except IOError as e:
|
||||
logging.debug("Failed to open config file at %s: %s" % (config_path, e.strerror))
|
||||
sys.exit()
|
||||
except Exception:
|
||||
logging.debug(e.strerror)
|
||||
sys.exit()
|
||||
|
||||
return config
|
||||
|
||||
def main(self):
|
||||
while True:
|
||||
|
@ -133,15 +160,7 @@ class PypoFile(Thread):
|
|||
|
||||
media_item = self.get_highest_priority_media_item(self.media)
|
||||
if media_item is not None:
|
||||
"""
|
||||
If an object_name exists the file is stored on Amazon S3
|
||||
"""
|
||||
if 'amazonS3_resource_id' in media_item:
|
||||
csd = CloudStorageDownloader()
|
||||
csd.download_obj(media_item['dst'], media_item['amazonS3_resource_id'])
|
||||
media_item['file_ready'] = True
|
||||
else:
|
||||
self.copy_file(media_item)
|
||||
self.copy_file(media_item)
|
||||
except Exception, e:
|
||||
import traceback
|
||||
top = traceback.format_exc()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue