CC-5709: Airtime Analyzer

* Better SIGTERM handling for airtime_analyzer
* Nuke the .identifier files saved by Airtime
This commit is contained in:
Albert Santoni 2014-04-08 17:28:50 -04:00
parent b90356b74c
commit e4af3a5a0e
3 changed files with 32 additions and 10 deletions

View File

@ -982,6 +982,7 @@ SQL;
} else { } else {
$uid = $user->getId(); $uid = $user->getId();
} }
/*
$id_file = "$audio_stor.identifier"; $id_file = "$audio_stor.identifier";
if (file_put_contents($id_file, $uid) === false) { if (file_put_contents($id_file, $uid) === false) {
Logging::info("Could not write file to identify user: '$uid'"); Logging::info("Could not write file to identify user: '$uid'");
@ -991,7 +992,8 @@ SQL;
} else { } else {
Logging::info("Successfully written identification file for Logging::info("Successfully written identification file for
uploaded '$audio_stor'"); uploaded '$audio_stor'");
} }*/
//if the uploaded file is not UTF-8 encoded, let's encode it. Assuming source //if the uploaded file is not UTF-8 encoded, let's encode it. Assuming source
//encoding is ISO-8859-1 //encoding is ISO-8859-1
$audio_stor = mb_detect_encoding($audio_stor, "UTF-8") == "UTF-8" ? $audio_stor : utf8_encode($audio_stor); $audio_stor = mb_detect_encoding($audio_stor, "UTF-8") == "UTF-8" ? $audio_stor : utf8_encode($audio_stor);
@ -1004,7 +1006,7 @@ SQL;
//the file wasn't uploaded and they should check if there . //the file wasn't uploaded and they should check if there .
//is enough disk space . //is enough disk space .
unlink($audio_file); //remove the file after failed rename unlink($audio_file); //remove the file after failed rename
unlink($id_file); // Also remove the identifier file //unlink($id_file); // Also remove the identifier file
throw new Exception("The file was not uploaded, this error can occur if the computer " throw new Exception("The file was not uploaded, this error can occur if the computer "
."hard drive does not have enough disk space or the stor " ."hard drive does not have enough disk space or the stor "

View File

@ -4,6 +4,7 @@ import ConfigParser
import logging import logging
import logging.handlers import logging.handlers
import sys import sys
from functools import partial
from metadata_analyzer import MetadataAnalyzer from metadata_analyzer import MetadataAnalyzer
from replaygain_analyzer import ReplayGainAnalyzer from replaygain_analyzer import ReplayGainAnalyzer
from message_listener import MessageListener from message_listener import MessageListener
@ -26,7 +27,7 @@ class AirtimeAnalyzerServer:
# Read our config file # Read our config file
rabbitmq_config = self.read_config_file(config_path) rabbitmq_config = self.read_config_file(config_path)
# Start listening for RabbitMQ messages telling us about newly # Start listening for RabbitMQ messages telling us about newly
# uploaded files. # uploaded files.
self._msg_listener = MessageListener(rabbitmq_config) self._msg_listener = MessageListener(rabbitmq_config)
@ -74,4 +75,4 @@ class AirtimeAnalyzerServer:
exit(-1) exit(-1)
return config return config

View File

@ -2,6 +2,8 @@ import sys
import pika import pika
import json import json
import time import time
import select
import signal
import logging import logging
import multiprocessing import multiprocessing
from analyzer_pipeline import AnalyzerPipeline from analyzer_pipeline import AnalyzerPipeline
@ -59,6 +61,8 @@ class MessageListener:
Keyword arguments: Keyword arguments:
config: A ConfigParser object containing the [rabbitmq] configuration. config: A ConfigParser object containing the [rabbitmq] configuration.
''' '''
self._shutdown = False
# Read the RabbitMQ connection settings from the config file # Read the RabbitMQ connection settings from the config file
# The exceptions throw here by default give good error messages. # The exceptions throw here by default give good error messages.
@ -68,21 +72,31 @@ class MessageListener:
self._username = config.get(RMQ_CONFIG_SECTION, 'user') self._username = config.get(RMQ_CONFIG_SECTION, 'user')
self._password = config.get(RMQ_CONFIG_SECTION, 'password') self._password = config.get(RMQ_CONFIG_SECTION, 'password')
self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost')
# Set up a signal handler so we can shutdown gracefully
# For some reason, this signal handler must be set up here. I'd rather
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
# with pika's SIGTERM handler interfering with it, I think...)
signal.signal(signal.SIGTERM, self.graceful_shutdown)
while True: while not self._shutdown:
try: try:
self.connect_to_messaging_server() self.connect_to_messaging_server()
self.wait_for_messages() self.wait_for_messages()
except KeyboardInterrupt: except (KeyboardInterrupt, SystemExit):
self.disconnect_from_messaging_server() break # Break out of the while loop and exit the application
break except select.error:
pass
except pika.exceptions.AMQPError as e: except pika.exceptions.AMQPError as e:
if self._shutdown:
break
logging.error("Connection to message queue failed. ") logging.error("Connection to message queue failed. ")
logging.error(e) logging.error(e)
logging.info("Retrying in 5 seconds...") logging.info("Retrying in 5 seconds...")
time.sleep(5) time.sleep(5)
self._connection.close() self.disconnect_from_messaging_server()
logging.info("Exiting cleanly.")
def connect_to_messaging_server(self): def connect_to_messaging_server(self):
@ -107,7 +121,12 @@ class MessageListener:
def disconnect_from_messaging_server(self): def disconnect_from_messaging_server(self):
'''Stop consuming RabbitMQ messages and disconnect''' '''Stop consuming RabbitMQ messages and disconnect'''
self._channel.stop_consuming() self._channel.stop_consuming()
self._connection.close()
def graceful_shutdown(self, signum, frame):
'''Disconnect and break out of the message listening loop'''
self._shutdown = True
self.disconnect_from_message_listener()
@staticmethod @staticmethod
def msg_received_callback(channel, method_frame, header_frame, body): def msg_received_callback(channel, method_frame, header_frame, body):