fix test failures

This commit is contained in:
Kyle Robbertze 2020-01-21 09:13:42 +02:00
parent 5d67172dd0
commit 82042e8c69
10 changed files with 125 additions and 129 deletions

View File

@ -10,8 +10,6 @@ php:
- 7.0 - 7.0
# folks who prefer running on 5.x should be using 5.6 in most cases, 5.4 is no in the matrix since noone should use it # folks who prefer running on 5.x should be using 5.6 in most cases, 5.4 is no in the matrix since noone should use it
- 5.6 - 5.6
# this is in for centos support, it's still the default on CentOS 7.3 and there were some lang changes after 5.4
- 5.4
services: services:
- postgresql - postgresql
- rabbitmq - rabbitmq
@ -50,6 +48,12 @@ addons:
packages: packages:
- silan - silan
- libgirepository1.0-dev - libgirepository1.0-dev
- gir1.2-gstreamer-1.0
- gstreamer1.0-plugins-base
- gstreamer1.0-plugins-good
- gstreamer1.0-plugins-bad
- gstreamer1.0-plugins-ugly
- libcairo2-dev
- liquidsoap - liquidsoap
- liquidsoap-plugin-mad - liquidsoap-plugin-mad
- liquidsoap-plugin-taglib - liquidsoap-plugin-taglib

View File

@ -1,9 +1,9 @@
""" Analyzes and imports an audio file into the Airtime library. """ Analyzes and imports an audio file into the Airtime library.
""" """
import logging import logging
import threading import threading
import multiprocessing import multiprocessing
import queue from queue import Queue
import configparser import configparser
from .metadata_analyzer import MetadataAnalyzer from .metadata_analyzer import MetadataAnalyzer
from .filemover_analyzer import FileMoverAnalyzer from .filemover_analyzer import FileMoverAnalyzer
@ -12,8 +12,8 @@ from .replaygain_analyzer import ReplayGainAnalyzer
from .playability_analyzer import * from .playability_analyzer import *
class AnalyzerPipeline: class AnalyzerPipeline:
""" Analyzes and imports an audio file into the Airtime library. """ Analyzes and imports an audio file into the Airtime library.
This currently performs metadata extraction (eg. gets the ID3 tags from an MP3), This currently performs metadata extraction (eg. gets the ID3 tags from an MP3),
then moves the file to the Airtime music library (stor/imported), and returns then moves the file to the Airtime music library (stor/imported), and returns
the results back to the parent process. This class is used in an isolated process the results back to the parent process. This class is used in an isolated process
@ -26,27 +26,27 @@ class AnalyzerPipeline:
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, storage_backend, file_prefix): def run_analysis(queue, audio_file_path, import_directory, original_filename, storage_backend, file_prefix):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
queue: A multiprocessing.queues.Queue which will be used to pass the queue: A multiprocessing.queues.Queue which will be used to pass the
extracted metadata back to the parent process. extracted metadata back to the parent process.
audio_file_path: Path on disk to the audio file to analyze. audio_file_path: Path on disk to the audio file to analyze.
import_directory: Path to the final Airtime "import" directory where import_directory: Path to the final Airtime "import" directory where
we will move the file. we will move the file.
original_filename: The original filename of the file, which we'll try to original_filename: The original filename of the file, which we'll try to
preserve. The file at audio_file_path typically has a preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want temporary randomly generated name, which is why we want
to know what the original name was. to know what the original name was.
storage_backend: String indicating the storage backend (amazon_s3 or file) storage_backend: String indicating the storage backend (amazon_s3 or file)
file_prefix: file_prefix:
""" """
# It is super critical to initialize a separate log file here so that we # It is super critical to initialize a separate log file here so that we
# don't inherit logging/locks from the parent process. Supposedly # don't inherit logging/locks from the parent process. Supposedly
# this can lead to Bad Things (deadlocks): http://bugs.python.org/issue6721 # this can lead to Bad Things (deadlocks): http://bugs.python.org/issue6721
AnalyzerPipeline.python_logger_deadlock_workaround() AnalyzerPipeline.python_logger_deadlock_workaround()
try: try:
if not isinstance(queue, queue.Queue): if not isinstance(queue, Queue):
raise TypeError("queue must be a Queue.Queue()") raise TypeError("queue must be a Queue.Queue()")
if not isinstance(audio_file_path, str): if not isinstance(audio_file_path, str):
raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__ + " instead.") raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__ + " instead.")
@ -72,7 +72,7 @@ class AnalyzerPipeline:
metadata["import_status"] = 0 # Successfully imported metadata["import_status"] = 0 # Successfully imported
# Note that the queue we're putting the results into is our interprocess communication # Note that the queue we're putting the results into is our interprocess communication
# back to the main process. # back to the main process.
# Pass all the file metadata back to the main analyzer process, which then passes # Pass all the file metadata back to the main analyzer process, which then passes

View File

@ -64,7 +64,7 @@ class CuePointAnalyzer(Analyzer):
except OSError as e: # silan was not found except OSError as e: # silan was not found
logging.warn("Failed to run: %s - %s. %s" % (command[0], e.strerror, "Do you have silan installed?")) logging.warn("Failed to run: %s - %s. %s" % (command[0], e.strerror, "Do you have silan installed?"))
except subprocess.CalledProcessError as e: # silan returned an error code except subprocess.CalledProcessError as e: # silan returned an error code
logging.warn("%s %s %s", e.cmd, e.message, e.returncode) logging.warn("%s %s %s", e.cmd, e.output, e.returncode)
except Exception as e: except Exception as e:
logging.warn(e) logging.warn(e)

View File

@ -4,24 +4,24 @@ import time
import shutil import shutil
import os, errno import os, errno
import time import time
import uuid import uuid
from .analyzer import Analyzer from .analyzer import Analyzer
class FileMoverAnalyzer(Analyzer): class FileMoverAnalyzer(Analyzer):
"""This analyzer copies a file over from a temporary directory (stor/organize) """This analyzer copies a file over from a temporary directory (stor/organize)
into the Airtime library (stor/imported). into the Airtime library (stor/imported).
""" """
@staticmethod @staticmethod
def analyze(audio_file_path, metadata): def analyze(audio_file_path, metadata):
"""Dummy method because we need more info than analyze gets passed to it""" """Dummy method because we need more info than analyze gets passed to it"""
raise Exception("Use FileMoverAnalyzer.move() instead.") raise Exception("Use FileMoverAnalyzer.move() instead.")
@staticmethod @staticmethod
def move(audio_file_path, import_directory, original_filename, metadata): def move(audio_file_path, import_directory, original_filename, metadata):
"""Move the file at audio_file_path over into the import_directory/import, """Move the file at audio_file_path over into the import_directory/import,
renaming it to original_filename. renaming it to original_filename.
Keyword arguments: Keyword arguments:
audio_file_path: Path to the file to be imported. audio_file_path: Path to the file to be imported.
import_directory: Path to the "import" directory inside the Airtime stor directory. import_directory: Path to the "import" directory inside the Airtime stor directory.
@ -30,18 +30,20 @@ class FileMoverAnalyzer(Analyzer):
metadata: A dictionary where the "full_path" of where the file is moved to will be added. metadata: A dictionary where the "full_path" of where the file is moved to will be added.
""" """
if not isinstance(audio_file_path, str): if not isinstance(audio_file_path, str):
raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__) raise TypeError("audio_file_path must be string. Was of type " + type(audio_file_path).__name__)
if not isinstance(import_directory, str): if not isinstance(import_directory, str):
raise TypeError("import_directory must be unicode. Was of type " + type(import_directory).__name__) raise TypeError("import_directory must be string. Was of type " + type(import_directory).__name__)
if not isinstance(original_filename, str): if not isinstance(original_filename, str):
raise TypeError("original_filename must be unicode. Was of type " + type(original_filename).__name__) raise TypeError("original_filename must be string. Was of type " + type(original_filename).__name__)
if not isinstance(metadata, dict): if not isinstance(metadata, dict):
raise TypeError("metadata must be a dict. Was of type " + type(metadata).__name__) raise TypeError("metadata must be a dict. Was of type " + type(metadata).__name__)
if not os.path.exists(audio_file_path):
raise FileNotFoundError("audio file not found: {}".format(audio_file_path))
#Import the file over to it's final location. #Import the file over to it's final location.
# TODO: Also, handle the case where the move fails and write some code # TODO: Also, handle the case where the move fails and write some code
# to possibly move the file to problem_files. # to possibly move the file to problem_files.
max_dir_len = 48 max_dir_len = 48
max_file_len = 48 max_file_len = 48
final_file_path = import_directory final_file_path = import_directory
@ -58,11 +60,11 @@ class FileMoverAnalyzer(Analyzer):
#If a file with the same name already exists in the "import" directory, then #If a file with the same name already exists in the "import" directory, then
#we add a unique string to the end of this one. We never overwrite a file on import #we add a unique string to the end of this one. We never overwrite a file on import
#because if we did that, it would mean Airtime's database would have #because if we did that, it would mean Airtime's database would have
#the wrong information for the file we just overwrote (eg. the song length would be wrong!) #the wrong information for the file we just overwrote (eg. the song length would be wrong!)
#If the final file path is the same as the file we've been told to import (which #If the final file path is the same as the file we've been told to import (which
#you often do when you're debugging), then don't move the file at all. #you often do when you're debugging), then don't move the file at all.
if os.path.exists(final_file_path): if os.path.exists(final_file_path):
if os.path.samefile(audio_file_path, final_file_path): if os.path.samefile(audio_file_path, final_file_path):
metadata["full_path"] = final_file_path metadata["full_path"] = final_file_path
@ -77,14 +79,14 @@ class FileMoverAnalyzer(Analyzer):
#Ensure the full path to the file exists #Ensure the full path to the file exists
mkdir_p(os.path.dirname(final_file_path)) mkdir_p(os.path.dirname(final_file_path))
#Move the file into its final destination directory #Move the file into its final destination directory
logging.debug("Moving %s to %s" % (audio_file_path, final_file_path)) logging.debug("Moving %s to %s" % (audio_file_path, final_file_path))
shutil.move(audio_file_path, final_file_path) shutil.move(audio_file_path, final_file_path)
metadata["full_path"] = final_file_path metadata["full_path"] = final_file_path
return metadata return metadata
def mkdir_p(path): def mkdir_p(path):
""" Make all directories in a tree (like mkdir -p)""" """ Make all directories in a tree (like mkdir -p)"""
if path == "": if path == "":

View File

@ -13,15 +13,17 @@ class MetadataAnalyzer(Analyzer):
@staticmethod @staticmethod
def analyze(filename, metadata): def analyze(filename, metadata):
''' Extract audio metadata from tags embedded in the file (eg. ID3 tags) ''' Extract audio metadata from tags embedded in the file (eg. ID3 tags)
Keyword arguments: Keyword arguments:
filename: The path to the audio file to extract metadata from. filename: The path to the audio file to extract metadata from.
metadata: A dictionary that the extracted metadata will be added to. metadata: A dictionary that the extracted metadata will be added to.
''' '''
if not isinstance(filename, str): if not isinstance(filename, str):
raise TypeError("filename must be unicode. Was of type " + type(filename).__name__) raise TypeError("filename must be string. Was of type " + type(filename).__name__)
if not isinstance(metadata, dict): if not isinstance(metadata, dict):
raise TypeError("metadata must be a dict. Was of type " + type(metadata).__name__) raise TypeError("metadata must be a dict. Was of type " + type(metadata).__name__)
if not os.path.exists(filename):
raise FileNotFoundError("audio file not found: {}".format(filename))
#Airtime <= 2.5.x nonsense: #Airtime <= 2.5.x nonsense:
metadata["ftype"] = "audioclip" metadata["ftype"] = "audioclip"
@ -40,7 +42,7 @@ class MetadataAnalyzer(Analyzer):
m.update(data) m.update(data)
metadata["md5"] = m.hexdigest() metadata["md5"] = m.hexdigest()
# Mutagen doesn't handle WAVE files so we use a different package # Mutagen doesn't handle WAVE files so we use a different package
ms = magic.open(magic.MIME_TYPE) ms = magic.open(magic.MIME_TYPE)
ms.load() ms.load()
with open(filename, 'rb') as fh: with open(filename, 'rb') as fh:
@ -57,15 +59,15 @@ class MetadataAnalyzer(Analyzer):
if audio_file == None: # Don't use "if not" here. It is wrong due to mutagen's design. if audio_file == None: # Don't use "if not" here. It is wrong due to mutagen's design.
return metadata return metadata
# Note that audio_file can equal {} if the file is valid but there's no metadata tags. # Note that audio_file can equal {} if the file is valid but there's no metadata tags.
# We can still try to grab the info variables below. # We can still try to grab the info variables below.
#Grab other file information that isn't encoded in a tag, but instead usually #Grab other file information that isn't encoded in a tag, but instead usually
#in the file header. Mutagen breaks that out into a separate "info" object: #in the file header. Mutagen breaks that out into a separate "info" object:
info = audio_file.info info = audio_file.info
if hasattr(info, "sample_rate"): # Mutagen is annoying and inconsistent if hasattr(info, "sample_rate"): # Mutagen is annoying and inconsistent
metadata["sample_rate"] = info.sample_rate metadata["sample_rate"] = info.sample_rate
if hasattr(info, "length"): if hasattr(info, "length"):
metadata["length_seconds"] = info.length metadata["length_seconds"] = info.length
#Converting the length in seconds (float) to a formatted time string #Converting the length in seconds (float) to a formatted time string
track_length = datetime.timedelta(seconds=info.length) track_length = datetime.timedelta(seconds=info.length)
metadata["length"] = str(track_length) #time.strftime("%H:%M:%S.%f", track_length) metadata["length"] = str(track_length) #time.strftime("%H:%M:%S.%f", track_length)
@ -77,12 +79,12 @@ class MetadataAnalyzer(Analyzer):
if hasattr(info, "bitrate"): if hasattr(info, "bitrate"):
metadata["bit_rate"] = info.bitrate metadata["bit_rate"] = info.bitrate
# Use the mutagen to get the MIME type, if it has one. This is more reliable and # Use the mutagen to get the MIME type, if it has one. This is more reliable and
# consistent for certain types of MP3s or MPEG files than the MIMEs returned by magic. # consistent for certain types of MP3s or MPEG files than the MIMEs returned by magic.
if audio_file.mime: if audio_file.mime:
metadata["mime"] = audio_file.mime[0] metadata["mime"] = audio_file.mime[0]
#Try to get the number of channels if mutagen can... #Try to get the number of channels if mutagen can...
try: try:
#Special handling for getting the # of channels from MP3s. It's in the "mode" field #Special handling for getting the # of channels from MP3s. It's in the "mode" field
@ -97,13 +99,13 @@ class MetadataAnalyzer(Analyzer):
except (AttributeError, KeyError): except (AttributeError, KeyError):
#If mutagen can't figure out the number of channels, we'll just leave it out... #If mutagen can't figure out the number of channels, we'll just leave it out...
pass pass
#Try to extract the number of tracks on the album if we can (the "track total") #Try to extract the number of tracks on the album if we can (the "track total")
try: try:
track_number = audio_file["tracknumber"] track_number = audio_file["tracknumber"]
if isinstance(track_number, list): # Sometimes tracknumber is a list, ugh if isinstance(track_number, list): # Sometimes tracknumber is a list, ugh
track_number = track_number[0] track_number = track_number[0]
track_number_tokens = track_number track_number_tokens = track_number
if '/' in track_number: if '/' in track_number:
track_number_tokens = track_number.split('/') track_number_tokens = track_number.split('/')
track_number = track_number_tokens[0] track_number = track_number_tokens[0]
@ -118,7 +120,7 @@ class MetadataAnalyzer(Analyzer):
pass pass
#We normalize the mutagen tags slightly here, so in case mutagen changes, #We normalize the mutagen tags slightly here, so in case mutagen changes,
#we find the #we find the
mutagen_to_airtime_mapping = { mutagen_to_airtime_mapping = {
'title': 'track_title', 'title': 'track_title',
'artist': 'artist_name', 'artist': 'artist_name',
@ -153,13 +155,13 @@ class MetadataAnalyzer(Analyzer):
# Some tags are returned as lists because there could be multiple values. # Some tags are returned as lists because there could be multiple values.
# This is unusual so we're going to always just take the first item in the list. # This is unusual so we're going to always just take the first item in the list.
if isinstance(metadata[airtime_tag], list): if isinstance(metadata[airtime_tag], list):
if metadata[airtime_tag]: if metadata[airtime_tag]:
metadata[airtime_tag] = metadata[airtime_tag][0] metadata[airtime_tag] = metadata[airtime_tag][0]
else: # Handle empty lists else: # Handle empty lists
metadata[airtime_tag] = "" metadata[airtime_tag] = ""
except KeyError: except KeyError:
continue continue
return metadata return metadata
@ -174,7 +176,7 @@ class MetadataAnalyzer(Analyzer):
track_length = datetime.timedelta(seconds=length_seconds) track_length = datetime.timedelta(seconds=length_seconds)
metadata["length"] = str(track_length) #time.strftime("%H:%M:%S.%f", track_length) metadata["length"] = str(track_length) #time.strftime("%H:%M:%S.%f", track_length)
metadata["length_seconds"] = length_seconds metadata["length_seconds"] = length_seconds
metadata["cueout"] = metadata["length"] metadata["cueout"] = metadata["length"]
except wave.Error as ex: except wave.Error as ex:
logging.error("Invalid WAVE file: {}".format(str(ex))) logging.error("Invalid WAVE file: {}".format(str(ex)))
raise raise

View File

@ -1,12 +1,13 @@
import subprocess import subprocess
import logging import logging
from .analyzer import Analyzer from .analyzer import Analyzer
import re
class ReplayGainAnalyzer(Analyzer): class ReplayGainAnalyzer(Analyzer):
''' This class extracts the ReplayGain using a tool from the python-rgain package. ''' ''' This class extracts the ReplayGain using a tool from the python-rgain package. '''
REPLAYGAIN_EXECUTABLE = 'replaygain' # From the python-rgain package REPLAYGAIN_EXECUTABLE = 'replaygain' # From the rgain3 python package
@staticmethod @staticmethod
def analyze(filename, metadata): def analyze(filename, metadata):
@ -19,17 +20,16 @@ class ReplayGainAnalyzer(Analyzer):
''' '''
command = [ReplayGainAnalyzer.REPLAYGAIN_EXECUTABLE, '-d', filename] command = [ReplayGainAnalyzer.REPLAYGAIN_EXECUTABLE, '-d', filename]
try: try:
results = subprocess.check_output(command, stderr=subprocess.STDOUT, close_fds=True) results = subprocess.check_output(command, stderr=subprocess.STDOUT,
filename_token = "%s: " % filename close_fds=True, text=True)
rg_pos = results.find(filename_token, results.find("Calculating Replay Gain information")) + len(filename_token) gain_match = r'Calculating Replay Gain information \.\.\.(?:\n|.)*?:([\d.-]*) dB'
db_pos = results.find(" dB", rg_pos) replaygain = re.search(gain_match, results).group(1)
replaygain = results[rg_pos:db_pos]
metadata['replay_gain'] = float(replaygain) metadata['replay_gain'] = float(replaygain)
except OSError as e: # replaygain was not found except OSError as e: # replaygain was not found
logging.warn("Failed to run: %s - %s. %s" % (command[0], e.strerror, "Do you have python-rgain installed?")) logging.warn("Failed to run: %s - %s. %s" % (command[0], e.strerror, "Do you have python-rgain installed?"))
except subprocess.CalledProcessError as e: # replaygain returned an error code except subprocess.CalledProcessError as e: # replaygain returned an error code
logging.warn("%s %s %s", e.cmd, e.message, e.returncode) logging.warn("%s %s %s", e.cmd, e.output, e.returncode)
except Exception as e: except Exception as e:
logging.warn(e) logging.warn(e)

View File

@ -1,9 +1,8 @@
from nose.tools import * from nose.tools import *
from ConfigParser import SafeConfigParser
import os import os
import shutil import shutil
import multiprocessing import multiprocessing
import Queue from queue import Queue
import datetime import datetime
from airtime_analyzer.analyzer_pipeline import AnalyzerPipeline from airtime_analyzer.analyzer_pipeline import AnalyzerPipeline
from airtime_analyzer import config_file from airtime_analyzer import config_file
@ -21,7 +20,7 @@ def teardown():
def test_basic(): def test_basic():
filename = os.path.basename(DEFAULT_AUDIO_FILE) filename = os.path.basename(DEFAULT_AUDIO_FILE)
q = Queue.Queue() q = Queue()
file_prefix = u'' file_prefix = u''
storage_backend = "file" storage_backend = "file"
#This actually imports the file into the "./Test Artist" directory. #This actually imports the file into the "./Test Artist" directory.
@ -39,17 +38,17 @@ def test_basic():
@raises(TypeError) @raises(TypeError)
def test_wrong_type_queue_param(): def test_wrong_type_queue_param():
AnalyzerPipeline.run_analysis(Queue.Queue(), u'', u'', u'') AnalyzerPipeline.run_analysis(Queue(), u'', u'', u'')
@raises(TypeError) @raises(TypeError)
def test_wrong_type_string_param2(): def test_wrong_type_string_param2():
AnalyzerPipeline.run_analysis(Queue.Queue(), '', u'', u'') AnalyzerPipeline.run_analysis(Queue(), '', u'', u'')
@raises(TypeError) @raises(TypeError)
def test_wrong_type_string_param3(): def test_wrong_type_string_param3():
AnalyzerPipeline.run_analysis(Queue.Queue(), u'', '', u'') AnalyzerPipeline.run_analysis(Queue(), u'', '', u'')
@raises(TypeError) @raises(TypeError)
def test_wrong_type_string_param4(): def test_wrong_type_string_param4():
AnalyzerPipeline.run_analysis(Queue.Queue(), u'', u'', '') AnalyzerPipeline.run_analysis(Queue(), u'', u'', '')

View File

@ -2,7 +2,6 @@ from nose.tools import *
import os import os
import shutil import shutil
import multiprocessing import multiprocessing
import Queue
import time import time
import mock import mock
from pprint import pprint from pprint import pprint
@ -23,30 +22,34 @@ def test_dont_use_analyze():
@raises(TypeError) @raises(TypeError)
def test_move_wrong_string_param1(): def test_move_wrong_string_param1():
FileMoverAnalyzer.move('', u'', u'', dict()) FileMoverAnalyzer.move(42, '', '', dict())
@raises(TypeError) @raises(TypeError)
def test_move_wrong_string_param2(): def test_move_wrong_string_param2():
FileMoverAnalyzer.move(u'', '', u'', dict()) FileMoverAnalyzer.move(u'', 23, u'', dict())
@raises(TypeError) @raises(TypeError)
def test_move_wrong_string_param3(): def test_move_wrong_string_param3():
FileMoverAnalyzer.move(u'', u'', '', dict()) FileMoverAnalyzer.move('', '', 5, dict())
@raises(TypeError) @raises(TypeError)
def test_move_wrong_dict_param(): def test_move_wrong_dict_param():
FileMoverAnalyzer.move(u'', u'', u'', 12345) FileMoverAnalyzer.move('', '', '', 12345)
@raises(FileNotFoundError)
def test_move_wrong_string_param3():
FileMoverAnalyzer.move('', '', '', dict())
def test_basic(): def test_basic():
filename = os.path.basename(DEFAULT_AUDIO_FILE) filename = os.path.basename(DEFAULT_AUDIO_FILE)
FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, dict()) FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, dict())
#Move the file back #Move the file back
shutil.move("./" + filename, DEFAULT_AUDIO_FILE) shutil.move("./" + filename, DEFAULT_AUDIO_FILE)
assert os.path.exists(DEFAULT_AUDIO_FILE) assert os.path.exists(DEFAULT_AUDIO_FILE)
def test_basic_samefile(): def test_basic_samefile():
filename = os.path.basename(DEFAULT_AUDIO_FILE) filename = os.path.basename(DEFAULT_AUDIO_FILE)
FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'tests/test_data', filename, dict()) FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'tests/test_data', filename, dict())
assert os.path.exists(DEFAULT_AUDIO_FILE) assert os.path.exists(DEFAULT_AUDIO_FILE)
def test_duplicate_file(): def test_duplicate_file():
@ -55,9 +58,9 @@ def test_duplicate_file():
FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, dict()) FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, dict())
#Copy it back to the original location #Copy it back to the original location
shutil.copy("./" + filename, DEFAULT_AUDIO_FILE) shutil.copy("./" + filename, DEFAULT_AUDIO_FILE)
#Import it again. It shouldn't overwrite the old file and instead create a new #Import it again. It shouldn't overwrite the old file and instead create a new
metadata = dict() metadata = dict()
metadata = FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, metadata) metadata = FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, metadata)
#Cleanup: move the file (eg. 44100Hz-16bit-mono.mp3) back #Cleanup: move the file (eg. 44100Hz-16bit-mono.mp3) back
shutil.move("./" + filename, DEFAULT_AUDIO_FILE) shutil.move("./" + filename, DEFAULT_AUDIO_FILE)
#Remove the renamed duplicate, eg. 44100Hz-16bit-mono_03-26-2014-11-58.mp3 #Remove the renamed duplicate, eg. 44100Hz-16bit-mono_03-26-2014-11-58.mp3
@ -71,7 +74,7 @@ def test_duplicate_file():
it's imported within 1 second of the second file (ie. if the timestamp is the same). it's imported within 1 second of the second file (ie. if the timestamp is the same).
''' '''
def test_double_duplicate_files(): def test_double_duplicate_files():
# Here we use mock to patch out the time.localtime() function so that it # Here we use mock to patch out the time.localtime() function so that it
# always returns the same value. This allows us to consistently simulate this test cases # always returns the same value. This allows us to consistently simulate this test cases
# where the last two of the three files are imported at the same time as the timestamp. # where the last two of the three files are imported at the same time as the timestamp.
with mock.patch('airtime_analyzer.filemover_analyzer.time') as mock_time: with mock.patch('airtime_analyzer.filemover_analyzer.time') as mock_time:
@ -83,17 +86,17 @@ def test_double_duplicate_files():
FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, dict()) FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, dict())
#Copy it back to the original location #Copy it back to the original location
shutil.copy("./" + filename, DEFAULT_AUDIO_FILE) shutil.copy("./" + filename, DEFAULT_AUDIO_FILE)
#Import it again. It shouldn't overwrite the old file and instead create a new #Import it again. It shouldn't overwrite the old file and instead create a new
first_dup_metadata = dict() first_dup_metadata = dict()
first_dup_metadata = FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, first_dup_metadata = FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename,
first_dup_metadata) first_dup_metadata)
#Copy it back again! #Copy it back again!
shutil.copy("./" + filename, DEFAULT_AUDIO_FILE) shutil.copy("./" + filename, DEFAULT_AUDIO_FILE)
#Reimport for the third time, which should have the same timestamp as the second one #Reimport for the third time, which should have the same timestamp as the second one
#thanks to us mocking out time.localtime() #thanks to us mocking out time.localtime()
second_dup_metadata = dict() second_dup_metadata = dict()
second_dup_metadata = FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename, second_dup_metadata = FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, u'.', filename,
second_dup_metadata) second_dup_metadata)
#Cleanup: move the file (eg. 44100Hz-16bit-mono.mp3) back #Cleanup: move the file (eg. 44100Hz-16bit-mono.mp3) back
shutil.move("./" + filename, DEFAULT_AUDIO_FILE) shutil.move("./" + filename, DEFAULT_AUDIO_FILE)
#Remove the renamed duplicate, eg. 44100Hz-16bit-mono_03-26-2014-11-58.mp3 #Remove the renamed duplicate, eg. 44100Hz-16bit-mono_03-26-2014-11-58.mp3
@ -105,7 +108,7 @@ def test_double_duplicate_files():
def test_bad_permissions_destination_dir(): def test_bad_permissions_destination_dir():
filename = os.path.basename(DEFAULT_AUDIO_FILE) filename = os.path.basename(DEFAULT_AUDIO_FILE)
dest_dir = u'/sys/foobar' # /sys is using sysfs on Linux, which is unwritable dest_dir = u'/sys/foobar' # /sys is using sysfs on Linux, which is unwritable
FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, dest_dir, filename, dict()) FileMoverAnalyzer.move(DEFAULT_AUDIO_FILE, dest_dir, filename, dict())
#Move the file back #Move the file back
shutil.move(os.path.join(dest_dir, filename), DEFAULT_AUDIO_FILE) shutil.move(os.path.join(dest_dir, filename), DEFAULT_AUDIO_FILE)
assert os.path.exists(DEFAULT_AUDIO_FILE) assert os.path.exists(DEFAULT_AUDIO_FILE)

View File

@ -4,7 +4,7 @@ import datetime
import mutagen import mutagen
import mock import mock
from nose.tools import * from nose.tools import *
from airtime_analyzer.metadata_analyzer import MetadataAnalyzer from airtime_analyzer.metadata_analyzer import MetadataAnalyzer
def setup(): def setup():
pass pass
@ -13,73 +13,73 @@ def teardown():
pass pass
def check_default_metadata(metadata): def check_default_metadata(metadata):
assert metadata['track_title'] == u'Test Title' assert metadata['track_title'] == 'Test Title'
assert metadata['artist_name'] == u'Test Artist' assert metadata['artist_name'] == 'Test Artist'
assert metadata['album_title'] == u'Test Album' assert metadata['album_title'] == 'Test Album'
assert metadata['year'] == u'1999' assert metadata['year'] == '1999'
assert metadata['genre'] == u'Test Genre' assert metadata['genre'] == 'Test Genre'
assert metadata['track_number'] == u'1' assert metadata['track_number'] == '1'
assert metadata["length"] == str(datetime.timedelta(seconds=metadata["length_seconds"])) assert metadata["length"] == str(datetime.timedelta(seconds=metadata["length_seconds"]))
def test_mp3_mono(): def test_mp3_mono():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-mono.mp3', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-mono.mp3', dict())
check_default_metadata(metadata) check_default_metadata(metadata)
assert metadata['channels'] == 1 assert metadata['channels'] == 1
assert metadata['bit_rate'] == 63998 assert metadata['bit_rate'] == 63998
assert abs(metadata['length_seconds'] - 3.9) < 0.1 assert abs(metadata['length_seconds'] - 3.9) < 0.1
assert metadata['mime'] == 'audio/mp3' # Not unicode because MIMEs aren't. assert metadata['mime'] == 'audio/mp3' # Not unicode because MIMEs aren't.
assert metadata['track_total'] == u'10' # MP3s can have a track_total assert metadata['track_total'] == '10' # MP3s can have a track_total
#Mutagen doesn't extract comments from mp3s it seems #Mutagen doesn't extract comments from mp3s it seems
def test_mp3_jointstereo(): def test_mp3_jointstereo():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-jointstereo.mp3', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-jointstereo.mp3', dict())
check_default_metadata(metadata) check_default_metadata(metadata)
assert metadata['channels'] == 2 assert metadata['channels'] == 2
assert metadata['bit_rate'] == 127998 assert metadata['bit_rate'] == 127998
assert abs(metadata['length_seconds'] - 3.9) < 0.1 assert abs(metadata['length_seconds'] - 3.9) < 0.1
assert metadata['mime'] == 'audio/mp3' assert metadata['mime'] == 'audio/mp3'
assert metadata['track_total'] == u'10' # MP3s can have a track_total assert metadata['track_total'] == '10' # MP3s can have a track_total
def test_mp3_simplestereo(): def test_mp3_simplestereo():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-simplestereo.mp3', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-simplestereo.mp3', dict())
check_default_metadata(metadata) check_default_metadata(metadata)
assert metadata['channels'] == 2 assert metadata['channels'] == 2
assert metadata['bit_rate'] == 127998 assert metadata['bit_rate'] == 127998
assert abs(metadata['length_seconds'] - 3.9) < 0.1 assert abs(metadata['length_seconds'] - 3.9) < 0.1
assert metadata['mime'] == 'audio/mp3' assert metadata['mime'] == 'audio/mp3'
assert metadata['track_total'] == u'10' # MP3s can have a track_total assert metadata['track_total'] == '10' # MP3s can have a track_total
def test_mp3_dualmono(): def test_mp3_dualmono():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-dualmono.mp3', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-dualmono.mp3', dict())
check_default_metadata(metadata) check_default_metadata(metadata)
assert metadata['channels'] == 2 assert metadata['channels'] == 2
assert metadata['bit_rate'] == 127998 assert metadata['bit_rate'] == 127998
assert abs(metadata['length_seconds'] - 3.9) < 0.1 assert abs(metadata['length_seconds'] - 3.9) < 0.1
assert metadata['mime'] == 'audio/mp3' assert metadata['mime'] == 'audio/mp3'
assert metadata['track_total'] == u'10' # MP3s can have a track_total assert metadata['track_total'] == '10' # MP3s can have a track_total
def test_ogg_mono(): def test_ogg_mono():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-mono.ogg', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-mono.ogg', dict())
check_default_metadata(metadata) check_default_metadata(metadata)
assert metadata['channels'] == 1 assert metadata['channels'] == 1
assert metadata['bit_rate'] == 80000 assert metadata['bit_rate'] == 80000
assert abs(metadata['length_seconds'] - 3.8) < 0.1 assert abs(metadata['length_seconds'] - 3.8) < 0.1
assert metadata['mime'] == 'audio/vorbis' assert metadata['mime'] == 'audio/vorbis'
assert metadata['comment'] == u'Test Comment' assert metadata['comment'] == 'Test Comment'
def test_ogg_stereo(): def test_ogg_stereo():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-stereo.ogg', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-stereo.ogg', dict())
check_default_metadata(metadata) check_default_metadata(metadata)
assert metadata['channels'] == 2 assert metadata['channels'] == 2
assert metadata['bit_rate'] == 112000 assert metadata['bit_rate'] == 112000
assert abs(metadata['length_seconds'] - 3.8) < 0.1 assert abs(metadata['length_seconds'] - 3.8) < 0.1
assert metadata['mime'] == 'audio/vorbis' assert metadata['mime'] == 'audio/vorbis'
assert metadata['comment'] == u'Test Comment' assert metadata['comment'] == 'Test Comment'
''' faac and avconv can't seem to create a proper mono AAC file... ugh ''' faac and avconv can't seem to create a proper mono AAC file... ugh
def test_aac_mono(): def test_aac_mono():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-mono.m4a') metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-mono.m4a')
print("Mono AAC metadata:") print("Mono AAC metadata:")
print(metadata) print(metadata)
check_default_metadata(metadata) check_default_metadata(metadata)
@ -87,41 +87,41 @@ def test_aac_mono():
assert metadata['bit_rate'] == 80000 assert metadata['bit_rate'] == 80000
assert abs(metadata['length_seconds'] - 3.8) < 0.1 assert abs(metadata['length_seconds'] - 3.8) < 0.1
assert metadata['mime'] == 'audio/mp4' assert metadata['mime'] == 'audio/mp4'
assert metadata['comment'] == u'Test Comment' assert metadata['comment'] == 'Test Comment'
''' '''
def test_aac_stereo(): def test_aac_stereo():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-stereo.m4a', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-stereo.m4a', dict())
check_default_metadata(metadata) check_default_metadata(metadata)
assert metadata['channels'] == 2 assert metadata['channels'] == 2
assert metadata['bit_rate'] == 102619 assert metadata['bit_rate'] == 102619
assert abs(metadata['length_seconds'] - 3.8) < 0.1 assert abs(metadata['length_seconds'] - 3.8) < 0.1
assert metadata['mime'] == 'audio/mp4' assert metadata['mime'] == 'audio/mp4'
assert metadata['comment'] == u'Test Comment' assert metadata['comment'] == 'Test Comment'
def test_mp3_utf8(): def test_mp3_utf8():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-stereo-utf8.mp3', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-stereo-utf8.mp3', dict())
# Using a bunch of different UTF-8 codepages here. Test data is from: # Using a bunch of different UTF-8 codepages here. Test data is from:
# http://winrus.com/utf8-jap.htm # http://winrus.com/utf8-jap.htm
assert metadata['track_title'] == u'アイウエオカキクケコサシスセソタチツテ' assert metadata['track_title'] == 'アイウエオカキクケコサシスセソタチツテ'
assert metadata['artist_name'] == u'てすと' assert metadata['artist_name'] == 'てすと'
assert metadata['album_title'] == u'Ä ä Ü ü ß' assert metadata['album_title'] == 'Ä ä Ü ü ß'
assert metadata['year'] == u'1999' assert metadata['year'] == '1999'
assert metadata['genre'] == u'Я Б Г Д Ж Й' assert metadata['genre'] == 'Я Б Г Д Ж Й'
assert metadata['track_number'] == u'1' assert metadata['track_number'] == '1'
assert metadata['channels'] == 2 assert metadata['channels'] == 2
assert metadata['bit_rate'] < 130000 assert metadata['bit_rate'] < 130000
assert metadata['bit_rate'] > 127000 assert metadata['bit_rate'] > 127000
assert abs(metadata['length_seconds'] - 3.9) < 0.1 assert abs(metadata['length_seconds'] - 3.9) < 0.1
assert metadata['mime'] == 'audio/mp3' assert metadata['mime'] == 'audio/mp3'
assert metadata['track_total'] == u'10' # MP3s can have a track_total assert metadata['track_total'] == '10' # MP3s can have a track_total
def test_invalid_wma(): def test_invalid_wma():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-stereo-invalid.wma', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-stereo-invalid.wma', dict())
assert metadata['mime'] == 'audio/x-ms-wma' assert metadata['mime'] == 'audio/x-ms-wma'
def test_wav_stereo(): def test_wav_stereo():
metadata = MetadataAnalyzer.analyze(u'tests/test_data/44100Hz-16bit-stereo.wav', dict()) metadata = MetadataAnalyzer.analyze('tests/test_data/44100Hz-16bit-stereo.wav', dict())
assert metadata['mime'] == 'audio/x-wav' assert metadata['mime'] == 'audio/x-wav'
assert abs(metadata['length_seconds'] - 3.9) < 0.1 assert abs(metadata['length_seconds'] - 3.9) < 0.1
assert metadata['channels'] == 2 assert metadata['channels'] == 2
@ -129,7 +129,7 @@ def test_wav_stereo():
# Make sure the parameter checking works # Make sure the parameter checking works
@raises(TypeError) @raises(FileNotFoundError)
def test_move_wrong_string_param1(): def test_move_wrong_string_param1():
not_unicode = 'asdfasdf' not_unicode = 'asdfasdf'
MetadataAnalyzer.analyze(not_unicode, dict()) MetadataAnalyzer.analyze(not_unicode, dict())
@ -137,12 +137,12 @@ def test_move_wrong_string_param1():
@raises(TypeError) @raises(TypeError)
def test_move_wrong_metadata_dict(): def test_move_wrong_metadata_dict():
not_a_dict = list() not_a_dict = list()
MetadataAnalyzer.analyze(u'asdfasdf', not_a_dict) MetadataAnalyzer.analyze('asdfasdf', not_a_dict)
# Test an mp3 file where the number of channels is invalid or missing: # Test an mp3 file where the number of channels is invalid or missing:
def test_mp3_bad_channels(): def test_mp3_bad_channels():
filename = u'tests/test_data/44100Hz-16bit-mono.mp3' filename = 'tests/test_data/44100Hz-16bit-mono.mp3'
''' '''
It'd be a pain in the ass to construct a real MP3 with an invalid number It'd be a pain in the ass to construct a real MP3 with an invalid number
of channels by hand because that value is stored in every MP3 frame in the file of channels by hand because that value is stored in every MP3 frame in the file
''' '''
@ -158,8 +158,8 @@ def test_mp3_bad_channels():
assert metadata['bit_rate'] == 63998 assert metadata['bit_rate'] == 63998
assert abs(metadata['length_seconds'] - 3.9) < 0.1 assert abs(metadata['length_seconds'] - 3.9) < 0.1
assert metadata['mime'] == 'audio/mp3' # Not unicode because MIMEs aren't. assert metadata['mime'] == 'audio/mp3' # Not unicode because MIMEs aren't.
assert metadata['track_total'] == u'10' # MP3s can have a track_total assert metadata['track_total'] == '10' # MP3s can have a track_total
#Mutagen doesn't extract comments from mp3s it seems #Mutagen doesn't extract comments from mp3s it seems
def test_unparsable_file(): def test_unparsable_file():
MetadataAnalyzer.analyze(u'README.rst', dict()) MetadataAnalyzer.analyze('README.rst', dict())

View File

@ -2,20 +2,6 @@ from __future__ import print_function
from nose.tools import * from nose.tools import *
from airtime_analyzer.replaygain_analyzer import ReplayGainAnalyzer from airtime_analyzer.replaygain_analyzer import ReplayGainAnalyzer
'''
The tests in here were all tagged with the 'rgain' tag so the can be exluded from being run
with nosetest -a '!rgain'. This was needed due to the fact that it is not readily possible
to install replaygain on a containerized travis instance.
We can either give running replaygain test on travis another shot after ubuntu getsan updated
gi instrospection allowing us to install gi and gobject into the virtualenv, or we can switch
to a full machine and stop using 'sudo: false' on travis.
Deactivating these tests is a bad fix for now and I plan on looking into it again after
most everything else is up and running. For those interesed the tests seem to work locally
albeit my results not being up to the given tolerance of 0.30 (which I'm assuming is my rig's
problem and would work on travis if replaygain was available).
'''
def check_default_metadata(metadata): def check_default_metadata(metadata):
''' Check that the values extract by Silan/CuePointAnalyzer on our test audio files match what we expect. ''' Check that the values extract by Silan/CuePointAnalyzer on our test audio files match what we expect.