diff --git a/analyzer/libretime_analyzer/config.py b/analyzer/libretime_analyzer/config.py index d3d833c17..92db4b247 100644 --- a/analyzer/libretime_analyzer/config.py +++ b/analyzer/libretime_analyzer/config.py @@ -1,5 +1,6 @@ -from libretime_shared.config import BaseConfig, RabbitMQConfig +from libretime_shared.config import BaseConfig, GeneralConfig, RabbitMQConfig class Config(BaseConfig): + general: GeneralConfig rabbitmq: RabbitMQConfig = RabbitMQConfig() diff --git a/analyzer/libretime_analyzer/main.py b/analyzer/libretime_analyzer/main.py index e33de9f0c..aeb3b23bb 100644 --- a/analyzer/libretime_analyzer/main.py +++ b/analyzer/libretime_analyzer/main.py @@ -41,6 +41,6 @@ def cli( # Start listening for RabbitMQ messages telling us about newly # uploaded files. This blocks until we receive a shutdown signal. - MessageListener(config.rabbitmq) + MessageListener(config) StatusReporter.stop_thread() diff --git a/analyzer/libretime_analyzer/message_listener.py b/analyzer/libretime_analyzer/message_listener.py index 421c64eab..b95547b84 100644 --- a/analyzer/libretime_analyzer/message_listener.py +++ b/analyzer/libretime_analyzer/message_listener.py @@ -4,9 +4,9 @@ import time from queue import Queue import pika -from libretime_shared.config import RabbitMQConfig from loguru import logger +from .config import Config from .pipeline import Pipeline, PipelineStatus from .status_reporter import StatusReporter @@ -17,7 +17,7 @@ QUEUE = "airtime-uploads" class MessageListener: - def __init__(self, config: RabbitMQConfig): + def __init__(self, config: Config): """ Start listening for file upload event messages from RabbitMQ. """ @@ -54,12 +54,12 @@ class MessageListener: """Connect to the RabbitMQ server and start listening for messages.""" self._connection = pika.BlockingConnection( pika.ConnectionParameters( - host=self.config.host, - port=self.config.port, - virtual_host=self.config.vhost, + host=self.config.rabbitmq.host, + port=self.config.rabbitmq.port, + virtual_host=self.config.rabbitmq.vhost, credentials=pika.credentials.PlainCredentials( - self.config.user, - self.config.password, + self.config.rabbitmq.user, + self.config.rabbitmq.password, ), ) ) @@ -104,9 +104,7 @@ class MessageListener: # final_file_path = "" import_directory = "" original_filename = "" - callback_url = "" - api_key = "" - file_prefix = "" + file_id = "" try: try: @@ -114,26 +112,23 @@ class MessageListener: except (UnicodeDecodeError, AttributeError): pass msg_dict = json.loads(body) - api_key = msg_dict["api_key"] - callback_url = msg_dict["callback_url"] + file_id = msg_dict["file_id"] audio_file_path = msg_dict["tmp_file_path"] - import_directory = msg_dict["import_directory"] original_filename = msg_dict["original_filename"] - file_prefix = msg_dict["file_prefix"] - storage_backend = msg_dict["storage_backend"] + import_directory = msg_dict["import_directory"] - audio_metadata = MessageListener.spawn_analyzer_process( + metadata = MessageListener.spawn_analyzer_process( audio_file_path, import_directory, original_filename, - storage_backend, - file_prefix, - ) - StatusReporter.report_success_to_callback_url( - callback_url, api_key, audio_metadata ) + callback_url = f"{self.config.general.public_url}/rest/media/{file_id}" + callback_api_key = self.config.general.api_key + + StatusReporter.report_success(callback_url, callback_api_key, metadata) + except KeyError: logger.exception("A mandatory field was missing from the message.") channel.basic_nack( @@ -150,10 +145,10 @@ class MessageListener: requeue=False, ) - if callback_url: - StatusReporter.report_failure_to_callback_url( + if file_id: + StatusReporter.report_failure( callback_url, - api_key, + callback_api_key, import_status=2, reason="An error occurred while importing this file", ) @@ -166,8 +161,6 @@ class MessageListener: audio_file_path, import_directory, original_filename, - storage_backend, - file_prefix, ): metadata = {} @@ -178,8 +171,6 @@ class MessageListener: audio_file_path, import_directory, original_filename, - storage_backend, - file_prefix, ) metadata = queue.get() except Exception as exception: diff --git a/analyzer/libretime_analyzer/pipeline/pipeline.py b/analyzer/libretime_analyzer/pipeline/pipeline.py index 438ab99ff..7e6e85d52 100644 --- a/analyzer/libretime_analyzer/pipeline/pipeline.py +++ b/analyzer/libretime_analyzer/pipeline/pipeline.py @@ -38,8 +38,6 @@ class Pipeline: audio_file_path, import_directory, original_filename, - storage_backend, - file_prefix, ): """Analyze and import an audio file, and put all extracted metadata into queue. @@ -53,8 +51,6 @@ class Pipeline: preserve. The file at audio_file_path typically has a temporary randomly generated name, which is why we want to know what the original name was. - storage_backend: String indicating the storage backend (amazon_s3 or file) - file_prefix: """ try: if not isinstance(queue, Queue): @@ -77,25 +73,20 @@ class Pipeline: + type(original_filename).__name__ + " instead." ) - if not isinstance(file_prefix, str): - raise TypeError( - "file_prefix must be unicode. Was of type " - + type(file_prefix).__name__ - + " instead." - ) # Analyze the audio file we were told to analyze: # First, we extract the ID3 tags and other metadata: metadata = {} - metadata["file_prefix"] = file_prefix - metadata = analyze_metadata(audio_file_path, metadata) metadata = analyze_cuepoint(audio_file_path, metadata) metadata = analyze_replaygain(audio_file_path, metadata) metadata = analyze_playability(audio_file_path, metadata) metadata = organise_file( - audio_file_path, import_directory, original_filename, metadata + audio_file_path, + import_directory, + original_filename, + metadata, ) metadata["import_status"] = PipelineStatus.succeed diff --git a/analyzer/libretime_analyzer/status_reporter.py b/analyzer/libretime_analyzer/status_reporter.py index e1fc281e3..f8638671d 100644 --- a/analyzer/libretime_analyzer/status_reporter.py +++ b/analyzer/libretime_analyzer/status_reporter.py @@ -12,11 +12,11 @@ from loguru import logger class PicklableHttpRequest: - def __init__(self, method, url, data, api_key): + def __init__(self, method, url, api_key, data): self.method = method self.url = url - self.data = data self.api_key = api_key + self.data = data def create_request(self): return requests.Request( @@ -194,30 +194,30 @@ class StatusReporter: StatusReporter._ipc_queue.put(request) @classmethod - def report_success_to_callback_url( + def report_success( self, - callback_url, - api_key, - audio_metadata, + callback_url: str, + callback_api_key: str, + metadata: dict, ): """Report the extracted metadata and status of the successfully imported file to the callback URL (which should be the Airtime File Upload API) """ - put_payload = json.dumps(audio_metadata) + put_payload = json.dumps(metadata) StatusReporter._send_http_request( PicklableHttpRequest( method="PUT", url=callback_url, + api_key=callback_api_key, data=put_payload, - api_key=api_key, ) ) @classmethod - def report_failure_to_callback_url( + def report_failure( self, callback_url, - api_key, + callback_api_key, import_status, reason, ): @@ -228,7 +228,7 @@ class StatusReporter: ) logger.debug("Reporting import failure to Airtime REST API...") - audio_metadata = dict() + audio_metadata = {} audio_metadata["import_status"] = import_status audio_metadata["comment"] = reason # hack attack put_payload = json.dumps(audio_metadata) @@ -238,7 +238,7 @@ class StatusReporter: PicklableHttpRequest( method="PUT", url=callback_url, + api_key=callback_api_key, data=put_payload, - api_key=api_key, ) ) diff --git a/analyzer/tests/pipeline/pipeline_test.py b/analyzer/tests/pipeline/pipeline_test.py index ee33d4130..6c5fecbbd 100644 --- a/analyzer/tests/pipeline/pipeline_test.py +++ b/analyzer/tests/pipeline/pipeline_test.py @@ -16,8 +16,6 @@ def test_run_analysis(src_dir: Path, dest_dir: Path): str(src_dir / AUDIO_FILENAME), str(dest_dir), AUDIO_FILENAME, - "file", - "", ) metadata = queue.get() @@ -30,17 +28,3 @@ def test_run_analysis(src_dir: Path, dest_dir: Path): assert metadata["length_seconds"] == pytest.approx(15.0, abs=0.1) assert metadata["length"] == str(timedelta(seconds=metadata["length_seconds"])) assert (dest_dir / AUDIO_IMPORT_DEST).exists() - - -@pytest.mark.parametrize( - "params,exception", - [ - ((Queue(), "", "", ""), TypeError), - ((Queue(), "", "", ""), TypeError), - ((Queue(), "", "", ""), TypeError), - ((Queue(), "", "", ""), TypeError), - ], -) -def test_run_analysis_wrong_params(params, exception): - with pytest.raises(exception): - Pipeline.run_analysis(*params) diff --git a/legacy/application/models/RabbitMq.php b/legacy/application/models/RabbitMq.php index ce17c01a5..881fb2f14 100644 --- a/legacy/application/models/RabbitMq.php +++ b/legacy/application/models/RabbitMq.php @@ -92,10 +92,7 @@ class Application_Model_RabbitMq $tmpFilePath, $importedStorageDirectory, $originalFilename, - $callbackUrl, - $apiKey, - $storageBackend, - $filePrefix + $fileId ) { $config = Config::getConfig(); @@ -111,19 +108,11 @@ class Application_Model_RabbitMq $exchangeType = 'topic'; $queue = 'airtime-uploads'; $autoDeleteExchange = false; + + $data['file_id'] = $fileId; $data['tmp_file_path'] = $tmpFilePath; - $data['storage_backend'] = $storageBackend; $data['import_directory'] = $importedStorageDirectory; $data['original_filename'] = $originalFilename; - $data['callback_url'] = $callbackUrl; - $data['api_key'] = $apiKey; - - // We add a prefix to the resource name so files are not all placed - // under the root folder. We do this in case we need to restore a - // customer's file/s; File restoration is done via the S3 Browser - // client. The client will hang if there are too many files under the - // same folder. - $data['file_prefix'] = $filePrefix; $jsonData = json_encode($data); // self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads'); diff --git a/legacy/application/models/airtime/CcFiles.php b/legacy/application/models/airtime/CcFiles.php index a66ca1622..f59f29fbf 100644 --- a/legacy/application/models/airtime/CcFiles.php +++ b/legacy/application/models/airtime/CcFiles.php @@ -168,10 +168,8 @@ class CcFiles extends BaseCcFiles $file->setDbHidden(true); $file->save(); - $callbackUrl = Config::getPublicUrl() . 'rest/media/' . $file->getPrimaryKey(); - Application_Service_MediaService::importFileToLibrary( - $callbackUrl, + $file->getPrimaryKey(), $filePath, $originalFilename, self::getOwnerId(), diff --git a/legacy/application/services/MediaService.php b/legacy/application/services/MediaService.php index 3dc58b121..52604ab81 100644 --- a/legacy/application/services/MediaService.php +++ b/legacy/application/services/MediaService.php @@ -13,7 +13,7 @@ class Application_Service_MediaService /** Move (or copy) a file to the stor/organize directory and send it off to the * analyzer to be processed. * - * @param $callbackUrl + * @param $fileId * @param $filePath string Path to the local file to import to the library * @param $originalFilename string The original filename, if you want it to be preserved after import * @param $ownerId string The ID of the user that will own the file inside Airtime @@ -23,11 +23,8 @@ class Application_Service_MediaService * * @return Ambigous */ - public static function importFileToLibrary($callbackUrl, $filePath, $originalFilename, $ownerId, $copyFile) + public static function importFileToLibrary($fileId, $filePath, $originalFilename, $ownerId, $copyFile) { - $CC_CONFIG = Config::getConfig(); - $apiKey = $CC_CONFIG['apiKey'][0]; - $importedStorageDirectory = Config::getStoragePath() . '/imported/' . $ownerId; // Copy the temporary file over to the "organize" folder so that it's off our webserver @@ -40,10 +37,7 @@ class Application_Service_MediaService $newTempFilePath, $importedStorageDirectory, basename($originalFilename), - $callbackUrl, - $apiKey, - '', - '', + $fileId, ); return $newTempFilePath;