➖ (Amazon S3) drop broken S3 support
This commit is contained in:
parent
203ceb0cb1
commit
f7ceeedb04
|
@ -1,131 +0,0 @@
|
||||||
<?php
|
|
||||||
|
|
||||||
use Aws\S3\S3Client;
|
|
||||||
|
|
||||||
class Amazon_S3StorageBackend extends StorageBackend
|
|
||||||
{
|
|
||||||
|
|
||||||
private $s3Client;
|
|
||||||
private $proxyHost;
|
|
||||||
|
|
||||||
public function __construct($securityCredentials)
|
|
||||||
{
|
|
||||||
$this->setBucket($securityCredentials['bucket']);
|
|
||||||
$this->setAccessKey($securityCredentials['api_key']);
|
|
||||||
$this->setSecretKey($securityCredentials['api_key_secret']);
|
|
||||||
|
|
||||||
$s3Options = array(
|
|
||||||
'key' => $securityCredentials['api_key'],
|
|
||||||
'secret' => $securityCredentials['api_key_secret'],
|
|
||||||
'region' => $securityCredentials['region']
|
|
||||||
);
|
|
||||||
if (array_key_exists("proxy_host", $securityCredentials)) {
|
|
||||||
$s3Options = array_merge($s3Options, array(
|
|
||||||
//'base_url' => "http://" . $securityCredentials['proxy_host'],
|
|
||||||
'base_url' => "http://s3.amazonaws.com",
|
|
||||||
'scheme' => "http",
|
|
||||||
//'force_path_style' => true,
|
|
||||||
'signature' => 'v4'
|
|
||||||
));
|
|
||||||
$this->proxyHost = $securityCredentials['proxy_host'];
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->s3Client = S3Client::factory($s3Options);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getAbsoluteFilePath($resourceId)
|
|
||||||
{
|
|
||||||
return $this->s3Client->getObjectUrl($this->getBucket(), $resourceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns a signed download URL from Amazon S3, expiring in 60 minutes */
|
|
||||||
public function getDownloadURLs($resourceId, $contentDispositionFilename)
|
|
||||||
{
|
|
||||||
$urls = array();
|
|
||||||
|
|
||||||
$s3args = array('ResponseContentDisposition' => 'attachment; filename="' . urlencode($contentDispositionFilename) . '"');
|
|
||||||
$signedS3Url = $this->s3Client->getObjectUrl($this->getBucket(), $resourceId, '+60 minutes', $s3args);
|
|
||||||
|
|
||||||
//If we're using the proxy cache, we need to modify the request URL after it has
|
|
||||||
//been generated by the above. (The request signature must be for the amazonaws.com,
|
|
||||||
//not our proxy, since the proxy translates the host back to amazonaws.com)
|
|
||||||
if ($this->proxyHost) {
|
|
||||||
$p = parse_url($signedS3Url);
|
|
||||||
$p["host"] = $this->getBucket() . "." . $this->proxyHost;
|
|
||||||
$p["scheme"] = "http";
|
|
||||||
//If the path contains the bucket name (which is the case with HTTPS requests to Amazon),
|
|
||||||
//we need to strip that part out, since we're forcing everything to HTTP. The Amazon S3
|
|
||||||
//URL convention for HTTP is to prepend the bucket name to the hostname instead of having
|
|
||||||
//it in the path.
|
|
||||||
//eg. http://bucket.s3.amazonaws.com/ instead of https://s3.amazonaws.com/bucket/
|
|
||||||
if (strpos($p["path"], $this->getBucket()) == 1) {
|
|
||||||
$p["path"] = substr($p["path"], 1 + strlen($this->getBucket()));
|
|
||||||
}
|
|
||||||
$proxyUrl = $p["scheme"] . "://" . $p["host"] . $p["path"] . "?" . $p["query"];
|
|
||||||
//Add this proxy cache URL to the list of download URLs.s
|
|
||||||
array_push($urls, $proxyUrl);
|
|
||||||
}
|
|
||||||
|
|
||||||
//Add the direct S3 URL to the list (as a fallback)
|
|
||||||
array_push($urls, $signedS3Url);
|
|
||||||
|
|
||||||
//http_build_url() would be nice to use but it requires pecl_http :-(
|
|
||||||
|
|
||||||
//Logging::info($url);
|
|
||||||
|
|
||||||
return $urls;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function deletePhysicalFile($resourceId)
|
|
||||||
{
|
|
||||||
$bucket = $this->getBucket();
|
|
||||||
|
|
||||||
if ($this->s3Client->doesObjectExist($bucket, $resourceId)) {
|
|
||||||
|
|
||||||
$result = $this->s3Client->deleteObject(array(
|
|
||||||
'Bucket' => $bucket,
|
|
||||||
'Key' => $resourceId,
|
|
||||||
));
|
|
||||||
} else {
|
|
||||||
throw new Exception("ERROR: Could not locate file to delete.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This should only be called for station termination.
|
|
||||||
// We are only deleting the file objects from Amazon S3.
|
|
||||||
// Records in the database will remain in case we have to restore the files.
|
|
||||||
public function deleteAllCloudFileObjects()
|
|
||||||
{
|
|
||||||
$bucket = $this->getBucket();
|
|
||||||
$prefix = $this->getFilePrefix();
|
|
||||||
|
|
||||||
//Add a trailing slash in for safety
|
|
||||||
//(so that deleting /13/413 doesn't delete /13/41313 !)
|
|
||||||
$prefix = $prefix . "/";
|
|
||||||
|
|
||||||
//Do a bunch of safety checks to ensure we don't delete more than we intended.
|
|
||||||
//An valid prefix is like "12/4312" for instance 4312.
|
|
||||||
$slashPos = strpos($prefix, "/");
|
|
||||||
if (($slashPos === FALSE) || //Slash must exist
|
|
||||||
($slashPos != 2) || //Slash must be the third character
|
|
||||||
(strlen($prefix) <= $slashPos) || //String must have something after the first slash
|
|
||||||
(substr_count($prefix, "/") != 2)) //String must have two slashes
|
|
||||||
{
|
|
||||||
throw new Exception("Invalid file prefix in " . __FUNCTION__);
|
|
||||||
}
|
|
||||||
$this->s3Client->deleteMatchingObjects($bucket, $prefix);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function getFilePrefix()
|
|
||||||
{
|
|
||||||
$filePrefix = '';
|
|
||||||
// only prefix files on S3 when billing is active since saas customers share a s3 bucket
|
|
||||||
// I'm not sure why the choice was made to put everything into one bucket
|
|
||||||
// We might refactor this to use a bucket per customer if we revisit S3
|
|
||||||
if (LIBRETIME_ENABLE_BILLING === true) {
|
|
||||||
$hostingId = Billing::getClientInstanceId();
|
|
||||||
$filePrefix = substr($hostingId, -2)."/".$hostingId;
|
|
||||||
}
|
|
||||||
return $filePrefix;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -18,12 +18,10 @@ class ProxyStorageBackend extends StorageBackend
|
||||||
{
|
{
|
||||||
$CC_CONFIG = Config::getConfig();
|
$CC_CONFIG = Config::getConfig();
|
||||||
|
|
||||||
//The storage backend in the airtime.conf directly corresponds to
|
// The storage backend in the airtime.conf directly corresponds to
|
||||||
//the name of the class that implements it (eg. Amazon_S3), so we
|
// the name of the class that implements it, so we can create the
|
||||||
//can easily create the right backend object dynamically:
|
// right backend object dynamically:
|
||||||
if ($storageBackend == "amazon_S3") {
|
if ($storageBackend == "file") {
|
||||||
$this->storageBackend = new Amazon_S3StorageBackend($CC_CONFIG["amazon_S3"]);
|
|
||||||
} else if ($storageBackend == "file") {
|
|
||||||
$this->storageBackend = new FileStorageBackend();
|
$this->storageBackend = new FileStorageBackend();
|
||||||
} else {
|
} else {
|
||||||
$this->storageBackend = new $storageBackend($CC_CONFIG[$storageBackend]);
|
$this->storageBackend = new $storageBackend($CC_CONFIG[$storageBackend]);
|
||||||
|
|
|
@ -49,13 +49,6 @@ class Config {
|
||||||
$CC_CONFIG['staticBaseDir'] = '/';
|
$CC_CONFIG['staticBaseDir'] = '/';
|
||||||
}
|
}
|
||||||
|
|
||||||
$CC_CONFIG['amazon_S3'] = array(
|
|
||||||
'provider' => $values['amazon_S3']['provider'],
|
|
||||||
'bucket' => $values['amazon_S3']['bucket'],
|
|
||||||
'api_key' => $values['amazon_S3']['api_key'],
|
|
||||||
'api_key_secret' => $values['amazon_S3']['api_key_secret']
|
|
||||||
);
|
|
||||||
|
|
||||||
// Tells us where file uploads will be uploaded to.
|
// Tells us where file uploads will be uploaded to.
|
||||||
// It will either be set to a cloud storage backend or local file storage.
|
// It will either be set to a cloud storage backend or local file storage.
|
||||||
$CC_CONFIG["current_backend"] = $values["current_backend"]["storage_backend"];
|
$CC_CONFIG["current_backend"] = $values["current_backend"]["storage_backend"];
|
||||||
|
|
|
@ -127,11 +127,6 @@ vhost = /airtime
|
||||||
[current_backend]
|
[current_backend]
|
||||||
storage_backend=file
|
storage_backend=file
|
||||||
|
|
||||||
[amazon_S3]
|
|
||||||
provider=amazon_S3
|
|
||||||
bucket=0
|
|
||||||
api_key=0
|
|
||||||
api_key_secret=0
|
|
||||||
|
|
||||||
# ----------------------------------------------------------------------
|
# ----------------------------------------------------------------------
|
||||||
# M O N I T
|
# M O N I T
|
||||||
|
|
|
@ -26,12 +26,6 @@ station_id = teststation
|
||||||
[current_backend]
|
[current_backend]
|
||||||
storage_backend=file
|
storage_backend=file
|
||||||
|
|
||||||
[amazon_S3]
|
|
||||||
provider=amazon_S3
|
|
||||||
bucket=0
|
|
||||||
api_key=0
|
|
||||||
api_key_secret=0
|
|
||||||
|
|
||||||
[monit]
|
[monit]
|
||||||
monit_user = guest
|
monit_user = guest
|
||||||
monit_password = airtime
|
monit_password = airtime
|
||||||
|
|
|
@ -23,7 +23,7 @@ class AirtimeAnalyzerServer:
|
||||||
# Variables
|
# Variables
|
||||||
_log_level = logging.INFO
|
_log_level = logging.INFO
|
||||||
|
|
||||||
def __init__(self, rmq_config_path, cloud_storage_config_path, http_retry_queue_path, debug=False):
|
def __init__(self, rmq_config_path, http_retry_queue_path, debug=False):
|
||||||
|
|
||||||
# Dump a stacktrace with 'kill -SIGUSR2 <PID>'
|
# Dump a stacktrace with 'kill -SIGUSR2 <PID>'
|
||||||
signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace())
|
signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace())
|
||||||
|
@ -34,15 +34,12 @@ class AirtimeAnalyzerServer:
|
||||||
# Read our rmq config file
|
# Read our rmq config file
|
||||||
rmq_config = config_file.read_config_file(rmq_config_path)
|
rmq_config = config_file.read_config_file(rmq_config_path)
|
||||||
|
|
||||||
# Read the cloud storage config file
|
|
||||||
cloud_storage_config = config_file.read_config_file(cloud_storage_config_path)
|
|
||||||
|
|
||||||
# Start up the StatusReporter process
|
# Start up the StatusReporter process
|
||||||
StatusReporter.start_thread(http_retry_queue_path)
|
StatusReporter.start_thread(http_retry_queue_path)
|
||||||
|
|
||||||
# Start listening for RabbitMQ messages telling us about newly
|
# Start listening for RabbitMQ messages telling us about newly
|
||||||
# uploaded files. This blocks until we recieve a shutdown signal.
|
# uploaded files. This blocks until we recieve a shutdown signal.
|
||||||
self._msg_listener = MessageListener(rmq_config, cloud_storage_config)
|
self._msg_listener = MessageListener(rmq_config)
|
||||||
|
|
||||||
StatusReporter.stop_thread()
|
StatusReporter.stop_thread()
|
||||||
|
|
||||||
|
@ -61,9 +58,6 @@ class AirtimeAnalyzerServer:
|
||||||
pika_logger = logging.getLogger('pika')
|
pika_logger = logging.getLogger('pika')
|
||||||
pika_logger.setLevel(logging.CRITICAL)
|
pika_logger.setLevel(logging.CRITICAL)
|
||||||
|
|
||||||
boto_logger = logging.getLogger('auth')
|
|
||||||
boto_logger.setLevel(logging.CRITICAL)
|
|
||||||
|
|
||||||
# Set up logging
|
# Set up logging
|
||||||
logFormatter = logging.Formatter("%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s")
|
logFormatter = logging.Formatter("%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s")
|
||||||
rootLogger = logging.getLogger()
|
rootLogger = logging.getLogger()
|
||||||
|
@ -89,5 +83,3 @@ class AirtimeAnalyzerServer:
|
||||||
if line:
|
if line:
|
||||||
code.append(" %s" % (line.strip()))
|
code.append(" %s" % (line.strip()))
|
||||||
logging.info('\n'.join(code))
|
logging.info('\n'.join(code))
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,6 @@ import Queue
|
||||||
import ConfigParser
|
import ConfigParser
|
||||||
from metadata_analyzer import MetadataAnalyzer
|
from metadata_analyzer import MetadataAnalyzer
|
||||||
from filemover_analyzer import FileMoverAnalyzer
|
from filemover_analyzer import FileMoverAnalyzer
|
||||||
from cloud_storage_uploader import CloudStorageUploader
|
|
||||||
from cuepoint_analyzer import CuePointAnalyzer
|
from cuepoint_analyzer import CuePointAnalyzer
|
||||||
from replaygain_analyzer import ReplayGainAnalyzer
|
from replaygain_analyzer import ReplayGainAnalyzer
|
||||||
from playability_analyzer import *
|
from playability_analyzer import *
|
||||||
|
@ -25,7 +24,7 @@ class AnalyzerPipeline:
|
||||||
IMPORT_STATUS_FAILED = 2
|
IMPORT_STATUS_FAILED = 2
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def run_analysis(queue, audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config):
|
def run_analysis(queue, audio_file_path, import_directory, original_filename, storage_backend, file_prefix):
|
||||||
"""Analyze and import an audio file, and put all extracted metadata into queue.
|
"""Analyze and import an audio file, and put all extracted metadata into queue.
|
||||||
|
|
||||||
Keyword arguments:
|
Keyword arguments:
|
||||||
|
@ -40,7 +39,6 @@ class AnalyzerPipeline:
|
||||||
to know what the original name was.
|
to know what the original name was.
|
||||||
storage_backend: String indicating the storage backend (amazon_s3 or file)
|
storage_backend: String indicating the storage backend (amazon_s3 or file)
|
||||||
file_prefix:
|
file_prefix:
|
||||||
cloud_storage_config: ConfigParser object containing the cloud storage configuration settings
|
|
||||||
"""
|
"""
|
||||||
# It is super critical to initialize a separate log file here so that we
|
# It is super critical to initialize a separate log file here so that we
|
||||||
# don't inherit logging/locks from the parent process. Supposedly
|
# don't inherit logging/locks from the parent process. Supposedly
|
||||||
|
@ -58,8 +56,6 @@ class AnalyzerPipeline:
|
||||||
raise TypeError("original_filename must be unicode. Was of type " + type(original_filename).__name__ + " instead.")
|
raise TypeError("original_filename must be unicode. Was of type " + type(original_filename).__name__ + " instead.")
|
||||||
if not isinstance(file_prefix, unicode):
|
if not isinstance(file_prefix, unicode):
|
||||||
raise TypeError("file_prefix must be unicode. Was of type " + type(file_prefix).__name__ + " instead.")
|
raise TypeError("file_prefix must be unicode. Was of type " + type(file_prefix).__name__ + " instead.")
|
||||||
if not isinstance(cloud_storage_config, ConfigParser.SafeConfigParser):
|
|
||||||
raise TypeError("cloud_storage_config must be a SafeConfigParser. Was of type " + type(cloud_storage_config).__name__ + " instead.")
|
|
||||||
|
|
||||||
|
|
||||||
# Analyze the audio file we were told to analyze:
|
# Analyze the audio file we were told to analyze:
|
||||||
|
@ -72,11 +68,7 @@ class AnalyzerPipeline:
|
||||||
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
|
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
|
||||||
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
|
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
|
||||||
|
|
||||||
if storage_backend.lower() == u"amazon_s3":
|
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
|
||||||
csu = CloudStorageUploader(cloud_storage_config)
|
|
||||||
metadata = csu.upload_obj(audio_file_path, metadata)
|
|
||||||
else:
|
|
||||||
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
|
|
||||||
|
|
||||||
metadata["import_status"] = 0 # Successfully imported
|
metadata["import_status"] = 0 # Successfully imported
|
||||||
|
|
||||||
|
|
|
@ -1,122 +0,0 @@
|
||||||
import os
|
|
||||||
import logging
|
|
||||||
import uuid
|
|
||||||
import socket
|
|
||||||
from boto.s3.connection import S3Connection
|
|
||||||
from boto.s3.key import Key
|
|
||||||
|
|
||||||
# Fix for getaddrinfo deadlock. See these issues for details:
|
|
||||||
# https://github.com/gevent/gevent/issues/349
|
|
||||||
# https://github.com/docker/docker-registry/issues/400
|
|
||||||
u'fix getaddrinfo deadlock'.encode('idna')
|
|
||||||
|
|
||||||
CLOUD_CONFIG_PATH = os.path.join(os.getenv('LIBRETIME_CONF_DIR', '/etc/airtime'), 'airtime.conf')
|
|
||||||
STORAGE_BACKEND_FILE = "file"
|
|
||||||
SOCKET_TIMEOUT = 240
|
|
||||||
|
|
||||||
class CloudStorageUploader:
|
|
||||||
""" A class that uses Python-Boto SDK to upload objects into Amazon S3.
|
|
||||||
|
|
||||||
It is important to note that every file, coming from different Airtime Pro
|
|
||||||
stations, will get uploaded into the same bucket on the same Amazon S3
|
|
||||||
account.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
_host: Host name for the specific region assigned to the bucket.
|
|
||||||
_bucket: Name of container on Amazon S3 where files will get uploaded into.
|
|
||||||
_api_key: Access key to objects on Amazon S3.
|
|
||||||
_api_key_secret: Secret access key to objects on Amazon S3.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, config):
|
|
||||||
|
|
||||||
try:
|
|
||||||
cloud_storage_config_section = config.get("current_backend", "storage_backend")
|
|
||||||
self._storage_backend = cloud_storage_config_section
|
|
||||||
except Exception as e:
|
|
||||||
print e
|
|
||||||
print "Defaulting to file storage"
|
|
||||||
self._storage_backend = STORAGE_BACKEND_FILE
|
|
||||||
|
|
||||||
if self._storage_backend == STORAGE_BACKEND_FILE:
|
|
||||||
self._host = ""
|
|
||||||
self._bucket = ""
|
|
||||||
self._api_key = ""
|
|
||||||
self._api_key_secret = ""
|
|
||||||
else:
|
|
||||||
self._host = config.get(cloud_storage_config_section, 'host')
|
|
||||||
self._bucket = config.get(cloud_storage_config_section, 'bucket')
|
|
||||||
self._api_key = config.get(cloud_storage_config_section, 'api_key')
|
|
||||||
self._api_key_secret = config.get(cloud_storage_config_section, 'api_key_secret')
|
|
||||||
|
|
||||||
|
|
||||||
def enabled(self):
|
|
||||||
if self._storage_backend == "file":
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def upload_obj(self, audio_file_path, metadata):
|
|
||||||
"""Uploads a file into Amazon S3 object storage.
|
|
||||||
|
|
||||||
Before a file is uploaded onto Amazon S3 we generate a unique object
|
|
||||||
name consisting of the filename and a unqiue string using the uuid4
|
|
||||||
module.
|
|
||||||
|
|
||||||
Keyword arguments:
|
|
||||||
audio_file_path: Path on disk to the audio file that is about to be
|
|
||||||
uploaded to Amazon S3 object storage.
|
|
||||||
metadata: ID3 tags and other metadata extracted from the audio file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The metadata dictionary it received with two new keys:
|
|
||||||
filename: The file's filename.
|
|
||||||
resource_id: The unique object name used to identify the objects
|
|
||||||
on Amazon S3
|
|
||||||
"""
|
|
||||||
|
|
||||||
file_base_name = os.path.basename(audio_file_path)
|
|
||||||
file_name, extension = os.path.splitext(file_base_name)
|
|
||||||
|
|
||||||
# With Amazon S3 you cannot create a signed url if there are spaces
|
|
||||||
# in the object name. URL encoding the object name doesn't solve the
|
|
||||||
# problem. As a solution we will replace spaces with dashes.
|
|
||||||
file_name = file_name.replace(" ", "-")
|
|
||||||
|
|
||||||
unique_id = str(uuid.uuid4())
|
|
||||||
|
|
||||||
# We add another prefix to the resource name with the last two characters
|
|
||||||
# of the unique id so files are not all placed under the root folder. We
|
|
||||||
# do this in case we need to restore a customer's file/s; File restoration
|
|
||||||
# is done via the S3 Browser client. The client will hang if there are too
|
|
||||||
# many files under the same folder.
|
|
||||||
unique_id_prefix = unique_id[-2:]
|
|
||||||
|
|
||||||
resource_id = "%s/%s/%s_%s%s" % (metadata['file_prefix'], unique_id_prefix, file_name, unique_id, extension)
|
|
||||||
|
|
||||||
# Boto uses the "global default timeout" by default, which is infinite! To prevent network problems from
|
|
||||||
# turning into deadlocks, we explicitly set the global default timeout period here:
|
|
||||||
socket.setdefaulttimeout(SOCKET_TIMEOUT)
|
|
||||||
|
|
||||||
conn = S3Connection(self._api_key, self._api_key_secret, host=self._host)
|
|
||||||
bucket = conn.get_bucket(self._bucket)
|
|
||||||
|
|
||||||
key = Key(bucket)
|
|
||||||
key.key = resource_id
|
|
||||||
key.set_metadata('filename', file_base_name)
|
|
||||||
key.set_contents_from_filename(audio_file_path)
|
|
||||||
|
|
||||||
# Remove file from organize directory
|
|
||||||
try:
|
|
||||||
os.remove(audio_file_path)
|
|
||||||
except OSError:
|
|
||||||
logging.info("Could not remove %s from organize directory" % audio_file_path)
|
|
||||||
|
|
||||||
# Pass original filename to Airtime so we can store it in the db
|
|
||||||
metadata["filename"] = file_base_name
|
|
||||||
|
|
||||||
metadata["resource_id"] = resource_id
|
|
||||||
metadata["storage_backend"] = self._storage_backend
|
|
||||||
return metadata
|
|
||||||
|
|
|
@ -1,121 +0,0 @@
|
||||||
import os
|
|
||||||
import logging
|
|
||||||
import uuid
|
|
||||||
import ConfigParser
|
|
||||||
from libcloud.storage.providers import get_driver
|
|
||||||
from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError
|
|
||||||
|
|
||||||
|
|
||||||
CLOUD_CONFIG_PATH = os.path.join(os.getenv('LIBRETIME_CONF_DIR', '/etc/airtime'), 'airtime.conf')
|
|
||||||
STORAGE_BACKEND_FILE = "file"
|
|
||||||
|
|
||||||
class CloudStorageUploader:
|
|
||||||
""" A class that uses Apache Libcloud's Storage API to upload objects into
|
|
||||||
a cloud storage backend. For this implementation all files will be uploaded
|
|
||||||
into a bucket on Amazon S3.
|
|
||||||
|
|
||||||
It is important to note that every file, coming from different Airtime Pro
|
|
||||||
stations, will get uploaded into the same bucket on the same Amazon S3
|
|
||||||
account.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
_provider: Storage backend. For exmaple, Amazon S3, Google Storage.
|
|
||||||
_bucket: Name of container on provider where files will get uploaded into.
|
|
||||||
_api_key: Access key to objects on the provider's storage backend.
|
|
||||||
_api_key_secret: Secret access key to objects on the provider's storage backend.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
|
|
||||||
config = ConfigParser.SafeConfigParser()
|
|
||||||
try:
|
|
||||||
config.readfp(open(CLOUD_CONFIG_PATH))
|
|
||||||
cloud_storage_config_section = config.get("current_backend", "storage_backend")
|
|
||||||
self._storage_backend = cloud_storage_config_section
|
|
||||||
except IOError as e:
|
|
||||||
print "Failed to open config file at " + CLOUD_CONFIG_PATH + ": " + e.strerror
|
|
||||||
print "Defaulting to file storage"
|
|
||||||
self._storage_backend = STORAGE_BACKEND_FILE
|
|
||||||
except Exception as e:
|
|
||||||
print e
|
|
||||||
print "Defaulting to file storage"
|
|
||||||
self._storage_backend = STORAGE_BACKEND_FILE
|
|
||||||
|
|
||||||
if self._storage_backend == STORAGE_BACKEND_FILE:
|
|
||||||
self._provider = ""
|
|
||||||
self._bucket = ""
|
|
||||||
self._api_key = ""
|
|
||||||
self._api_key_secret = ""
|
|
||||||
else:
|
|
||||||
self._provider = config.get(cloud_storage_config_section, 'provider')
|
|
||||||
self._bucket = config.get(cloud_storage_config_section, 'bucket')
|
|
||||||
self._api_key = config.get(cloud_storage_config_section, 'api_key')
|
|
||||||
self._api_key_secret = config.get(cloud_storage_config_section, 'api_key_secret')
|
|
||||||
|
|
||||||
def enabled(self):
|
|
||||||
if self._storage_backend == "file":
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def upload_obj(self, audio_file_path, metadata):
|
|
||||||
"""Uploads a file into Amazon S3 object storage.
|
|
||||||
|
|
||||||
Before a file is uploaded onto Amazon S3 we generate a unique object
|
|
||||||
name consisting of the filename and a unqiue string using the uuid4
|
|
||||||
module.
|
|
||||||
|
|
||||||
Keyword arguments:
|
|
||||||
audio_file_path: Path on disk to the audio file that is about to be
|
|
||||||
uploaded to Amazon S3 object storage.
|
|
||||||
metadata: ID3 tags and other metadata extracted from the audio file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The metadata dictionary it received with three new keys:
|
|
||||||
filesize: The file's filesize in bytes.
|
|
||||||
filename: The file's filename.
|
|
||||||
resource_id: The unique object name used to identify the objects
|
|
||||||
on Amazon S3
|
|
||||||
"""
|
|
||||||
|
|
||||||
file_base_name = os.path.basename(audio_file_path)
|
|
||||||
file_name, extension = os.path.splitext(file_base_name)
|
|
||||||
|
|
||||||
# With Amazon S3 you cannot create a signed url if there are spaces
|
|
||||||
# in the object name. URL encoding the object name doesn't solve the
|
|
||||||
# problem. As a solution we will replace spaces with dashes.
|
|
||||||
file_name = file_name.replace(" ", "-")
|
|
||||||
object_name = "%s_%s%s" % (file_name, str(uuid.uuid4()), extension)
|
|
||||||
|
|
||||||
provider_driver_class = get_driver(getattr(Provider, self._provider))
|
|
||||||
driver = provider_driver_class(self._api_key, self._api_key_secret)
|
|
||||||
|
|
||||||
try:
|
|
||||||
container = driver.get_container(self._bucket)
|
|
||||||
except ContainerDoesNotExistError:
|
|
||||||
container = driver.create_container(self._bucket)
|
|
||||||
|
|
||||||
extra = {'meta_data': {'filename': file_base_name}}
|
|
||||||
|
|
||||||
obj = driver.upload_object(file_path=audio_file_path,
|
|
||||||
container=container,
|
|
||||||
object_name=object_name,
|
|
||||||
verify_hash=False,
|
|
||||||
extra=extra)
|
|
||||||
|
|
||||||
metadata["filesize"] = os.path.getsize(audio_file_path)
|
|
||||||
|
|
||||||
# Remove file from organize directory
|
|
||||||
try:
|
|
||||||
os.remove(audio_file_path)
|
|
||||||
except OSError:
|
|
||||||
logging.info("Could not remove %s from organize directory" % audio_file_path)
|
|
||||||
|
|
||||||
# Pass original filename to Airtime so we can store it in the db
|
|
||||||
metadata["filename"] = file_base_name
|
|
||||||
|
|
||||||
metadata["resource_id"] = object_name
|
|
||||||
metadata["storage_backend"] = self._storage_backend
|
|
||||||
return metadata
|
|
||||||
|
|
|
@ -9,7 +9,6 @@ import multiprocessing
|
||||||
import Queue
|
import Queue
|
||||||
from analyzer_pipeline import AnalyzerPipeline
|
from analyzer_pipeline import AnalyzerPipeline
|
||||||
from status_reporter import StatusReporter
|
from status_reporter import StatusReporter
|
||||||
from cloud_storage_uploader import CloudStorageUploader
|
|
||||||
|
|
||||||
EXCHANGE = "airtime-uploads"
|
EXCHANGE = "airtime-uploads"
|
||||||
EXCHANGE_TYPE = "topic"
|
EXCHANGE_TYPE = "topic"
|
||||||
|
@ -56,13 +55,12 @@ QUEUE = "airtime-uploads"
|
||||||
"""
|
"""
|
||||||
class MessageListener:
|
class MessageListener:
|
||||||
|
|
||||||
def __init__(self, rmq_config, cloud_storage_config):
|
def __init__(self, rmq_config):
|
||||||
''' Start listening for file upload notification messages
|
''' Start listening for file upload notification messages
|
||||||
from RabbitMQ
|
from RabbitMQ
|
||||||
|
|
||||||
Keyword arguments:
|
Keyword arguments:
|
||||||
rmq_config: A ConfigParser object containing the [rabbitmq] configuration.
|
rmq_config: A ConfigParser object containing the [rabbitmq] configuration.
|
||||||
cloud_storage_config: A ConfigParser object containing the cloud storage configuration.
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
self._shutdown = False
|
self._shutdown = False
|
||||||
|
@ -76,8 +74,6 @@ class MessageListener:
|
||||||
self._password = rmq_config.get(RMQ_CONFIG_SECTION, 'password')
|
self._password = rmq_config.get(RMQ_CONFIG_SECTION, 'password')
|
||||||
self._vhost = rmq_config.get(RMQ_CONFIG_SECTION, 'vhost')
|
self._vhost = rmq_config.get(RMQ_CONFIG_SECTION, 'vhost')
|
||||||
|
|
||||||
self.cloud_storage_config = cloud_storage_config
|
|
||||||
|
|
||||||
# Set up a signal handler so we can shutdown gracefully
|
# Set up a signal handler so we can shutdown gracefully
|
||||||
# For some reason, this signal handler must be set up here. I'd rather
|
# For some reason, this signal handler must be set up here. I'd rather
|
||||||
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
|
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
|
||||||
|
@ -172,7 +168,7 @@ class MessageListener:
|
||||||
file_prefix = msg_dict["file_prefix"]
|
file_prefix = msg_dict["file_prefix"]
|
||||||
storage_backend = msg_dict["storage_backend"]
|
storage_backend = msg_dict["storage_backend"]
|
||||||
|
|
||||||
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend, file_prefix, self.cloud_storage_config)
|
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend, file_prefix)
|
||||||
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
|
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
|
||||||
|
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
|
@ -211,12 +207,12 @@ class MessageListener:
|
||||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config):
|
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend, file_prefix):
|
||||||
''' Spawn a child process to analyze and import a new audio file. '''
|
''' Spawn a child process to analyze and import a new audio file. '''
|
||||||
'''
|
'''
|
||||||
q = multiprocessing.Queue()
|
q = multiprocessing.Queue()
|
||||||
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
|
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
|
||||||
args=(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config))
|
args=(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix))
|
||||||
p.start()
|
p.start()
|
||||||
p.join()
|
p.join()
|
||||||
if p.exitcode == 0:
|
if p.exitcode == 0:
|
||||||
|
@ -230,7 +226,7 @@ class MessageListener:
|
||||||
|
|
||||||
q = Queue.Queue()
|
q = Queue.Queue()
|
||||||
try:
|
try:
|
||||||
AnalyzerPipeline.run_analysis(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config)
|
AnalyzerPipeline.run_analysis(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix)
|
||||||
metadata = q.get()
|
metadata = q.get()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error("Analyzer pipeline exception: %s" % str(e))
|
logging.error("Analyzer pipeline exception: %s" % str(e))
|
||||||
|
|
|
@ -10,7 +10,6 @@ import airtime_analyzer.airtime_analyzer as aa
|
||||||
VERSION = "1.0"
|
VERSION = "1.0"
|
||||||
LIBRETIME_CONF_DIR = os.getenv('LIBRETIME_CONF_DIR', '/etc/airtime')
|
LIBRETIME_CONF_DIR = os.getenv('LIBRETIME_CONF_DIR', '/etc/airtime')
|
||||||
DEFAULT_RMQ_CONFIG_PATH = os.path.join(LIBRETIME_CONF_DIR, 'airtime.conf')
|
DEFAULT_RMQ_CONFIG_PATH = os.path.join(LIBRETIME_CONF_DIR, 'airtime.conf')
|
||||||
DEFAULT_CLOUD_STORAGE_CONFIG_PATH = os.path.join(LIBRETIME_CONF_DIR, 'airtime.conf')
|
|
||||||
DEFAULT_HTTP_RETRY_PATH = '/tmp/airtime_analyzer_http_retries'
|
DEFAULT_HTTP_RETRY_PATH = '/tmp/airtime_analyzer_http_retries'
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
|
@ -20,7 +19,6 @@ def run():
|
||||||
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
|
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
|
||||||
parser.add_argument("--debug", help="log full debugging output", action="store_true")
|
parser.add_argument("--debug", help="log full debugging output", action="store_true")
|
||||||
parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_RMQ_CONFIG_PATH)
|
parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_RMQ_CONFIG_PATH)
|
||||||
parser.add_argument("--cloud-storage-config-file", help="specify a configuration file with cloud storage settings (default is %s)" % DEFAULT_CLOUD_STORAGE_CONFIG_PATH)
|
|
||||||
parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH)
|
parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
@ -28,25 +26,20 @@ def run():
|
||||||
|
|
||||||
#Default config file path
|
#Default config file path
|
||||||
rmq_config_path = DEFAULT_RMQ_CONFIG_PATH
|
rmq_config_path = DEFAULT_RMQ_CONFIG_PATH
|
||||||
cloud_storage_config_path = DEFAULT_CLOUD_STORAGE_CONFIG_PATH
|
|
||||||
http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH
|
http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH
|
||||||
if args.rmq_config_file:
|
if args.rmq_config_file:
|
||||||
rmq_config_path = args.rmq_config_file
|
rmq_config_path = args.rmq_config_file
|
||||||
if args.cloud_storage_config_file:
|
|
||||||
cloud_storage_config_path = args.cloud_storage_config_file
|
|
||||||
if args.http_retry_queue_file:
|
if args.http_retry_queue_file:
|
||||||
http_retry_queue_path = args.http_retry_queue_file
|
http_retry_queue_path = args.http_retry_queue_file
|
||||||
|
|
||||||
if args.daemon:
|
if args.daemon:
|
||||||
with daemon.DaemonContext():
|
with daemon.DaemonContext():
|
||||||
aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path,
|
aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path,
|
||||||
cloud_storage_config_path = cloud_storage_config_path,
|
|
||||||
http_retry_queue_path=http_retry_queue_path,
|
http_retry_queue_path=http_retry_queue_path,
|
||||||
debug=args.debug)
|
debug=args.debug)
|
||||||
else:
|
else:
|
||||||
# Run without daemonizing
|
# Run without daemonizing
|
||||||
aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path,
|
aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path,
|
||||||
cloud_storage_config_path = cloud_storage_config_path,
|
|
||||||
http_retry_queue_path=http_retry_queue_path,
|
http_retry_queue_path=http_retry_queue_path,
|
||||||
debug=args.debug)
|
debug=args.debug)
|
||||||
|
|
||||||
|
@ -71,5 +64,3 @@ def check_if_media_monitor_is_running():
|
||||||
continue
|
continue
|
||||||
|
|
||||||
run()
|
run()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -37,9 +37,7 @@ setup(name='airtime_analyzer',
|
||||||
'mock',
|
'mock',
|
||||||
'python-daemon==1.6',
|
'python-daemon==1.6',
|
||||||
'requests>=2.7.0',
|
'requests>=2.7.0',
|
||||||
'apache-libcloud',
|
|
||||||
'rgain',
|
'rgain',
|
||||||
'boto',
|
|
||||||
# These next 3 are required for requests to support SSL with SNI. Learned this the hard way...
|
# These next 3 are required for requests to support SSL with SNI. Learned this the hard way...
|
||||||
# What sucks is that GCC is required to pip install these.
|
# What sucks is that GCC is required to pip install these.
|
||||||
#'ndg-httpsclient',
|
#'ndg-httpsclient',
|
||||||
|
|
|
@ -22,15 +22,10 @@ def teardown():
|
||||||
def test_basic():
|
def test_basic():
|
||||||
filename = os.path.basename(DEFAULT_AUDIO_FILE)
|
filename = os.path.basename(DEFAULT_AUDIO_FILE)
|
||||||
q = Queue.Queue()
|
q = Queue.Queue()
|
||||||
#cloud_storage_config_path = os.path.join(os.getenv('LIBRETIME_CONF_DIR', '/etc/airtime'), '/production/cloud_storage.conf')
|
|
||||||
#cloud_storage_config = config_file.read_config_file(cloud_storage_config_path)
|
|
||||||
cloud_storage_config = SafeConfigParser()
|
|
||||||
cloud_storage_config.add_section("current_backend")
|
|
||||||
cloud_storage_config.set("current_backend", "storage_backend", "file")
|
|
||||||
file_prefix = u''
|
file_prefix = u''
|
||||||
storage_backend = "file"
|
storage_backend = "file"
|
||||||
#This actually imports the file into the "./Test Artist" directory.
|
#This actually imports the file into the "./Test Artist" directory.
|
||||||
AnalyzerPipeline.run_analysis(q, DEFAULT_AUDIO_FILE, u'.', filename, storage_backend, file_prefix, cloud_storage_config)
|
AnalyzerPipeline.run_analysis(q, DEFAULT_AUDIO_FILE, u'.', filename, storage_backend, file_prefix)
|
||||||
metadata = q.get()
|
metadata = q.get()
|
||||||
assert metadata['track_title'] == u'Test Title'
|
assert metadata['track_title'] == u'Test Title'
|
||||||
assert metadata['artist_name'] == u'Test Artist'
|
assert metadata['artist_name'] == u'Test Artist'
|
||||||
|
|
|
@ -1,18 +0,0 @@
|
||||||
from nose.tools import *
|
|
||||||
from ConfigParser import SafeConfigParser
|
|
||||||
from airtime_analyzer.cloud_storage_uploader import CloudStorageUploader
|
|
||||||
from airtime_analyzer.airtime_analyzer import AirtimeAnalyzerServer
|
|
||||||
from airtime_analyzer import config_file
|
|
||||||
|
|
||||||
def setup():
|
|
||||||
pass
|
|
||||||
|
|
||||||
def teardown():
|
|
||||||
pass
|
|
||||||
|
|
||||||
def test_analyze():
|
|
||||||
|
|
||||||
cloud_storage_config = SafeConfigParser()
|
|
||||||
cloud_storage_config.add_section("current_backend")
|
|
||||||
cloud_storage_config.set("current_backend", "storage_backend", "file")
|
|
||||||
cl = CloudStorageUploader(cloud_storage_config)
|
|
Loading…
Reference in New Issue