sintonia/python_apps/airtime_analyzer/tests/analyzer_pipeline_tests.py
Lucas Bickel e28ad471f9 Rewrite config from /etc/airtime-saas to plain /etc/airtime
This is the results of sed -i -e 's|/etc/airtime-saas/|/etc/airtime/|' `grep -irl 'airtime-saas' airtime_mvc/ python_apps/` :P

It might need more testing, the airtime-saas part never really made sense, zf1 has environments for that, ie you would create a saas env based on production for instance.

I beleive legacy upstream was using this to share configuration between customers (ie. analyser runs only once and writes to a shared S3 bucket). I assume they mount the airtime-saas folder onto individual customers instances with a global config. Like I said, I don't feel that this makes sense since all it does is make hacking at the configs in airtime-saas a bit easier. A serious SaaS operation should be using something like puppet or ansible to achieve this.
2017-03-03 15:57:41 +01:00

60 lines
2.2 KiB
Python

from nose.tools import *
from ConfigParser import SafeConfigParser
import os
import shutil
import multiprocessing
import Queue
import datetime
from airtime_analyzer.analyzer_pipeline import AnalyzerPipeline
from airtime_analyzer import config_file
DEFAULT_AUDIO_FILE = u'tests/test_data/44100Hz-16bit-mono.mp3'
DEFAULT_IMPORT_DEST = u'Test Artist/Test Album/44100Hz-16bit-mono.mp3'
def setup():
pass
def teardown():
#Move the file back
shutil.move(DEFAULT_IMPORT_DEST, DEFAULT_AUDIO_FILE)
assert os.path.exists(DEFAULT_AUDIO_FILE)
def test_basic():
filename = os.path.basename(DEFAULT_AUDIO_FILE)
q = Queue.Queue()
#cloud_storage_config_path = os.path.join(os.getenv('LIBRETIME_CONF_DIR', '/etc/airtime'), '/production/cloud_storage.conf')
#cloud_storage_config = config_file.read_config_file(cloud_storage_config_path)
cloud_storage_config = SafeConfigParser()
cloud_storage_config.add_section("current_backend")
cloud_storage_config.set("current_backend", "storage_backend", "file")
file_prefix = u''
storage_backend = "file"
#This actually imports the file into the "./Test Artist" directory.
AnalyzerPipeline.run_analysis(q, DEFAULT_AUDIO_FILE, u'.', filename, storage_backend, file_prefix, cloud_storage_config)
metadata = q.get()
assert metadata['track_title'] == u'Test Title'
assert metadata['artist_name'] == u'Test Artist'
assert metadata['album_title'] == u'Test Album'
assert metadata['year'] == u'1999'
assert metadata['genre'] == u'Test Genre'
assert metadata['mime'] == 'audio/mp3' # Not unicode because MIMEs aren't.
assert abs(metadata['length_seconds'] - 3.9) < 0.1
assert metadata["length"] == str(datetime.timedelta(seconds=metadata["length_seconds"]))
assert os.path.exists(DEFAULT_IMPORT_DEST)
@raises(TypeError)
def test_wrong_type_queue_param():
AnalyzerPipeline.run_analysis(Queue.Queue(), u'', u'', u'')
@raises(TypeError)
def test_wrong_type_string_param2():
AnalyzerPipeline.run_analysis(Queue.Queue(), '', u'', u'')
@raises(TypeError)
def test_wrong_type_string_param3():
AnalyzerPipeline.run_analysis(Queue.Queue(), u'', '', u'')
@raises(TypeError)
def test_wrong_type_string_param4():
AnalyzerPipeline.run_analysis(Queue.Queue(), u'', u'', '')