CC-3321: Multiple messages are sent to Pypo via RabbitMq at one time, causing multiple telnet connections to Liquidsoap.

-Fixed
This commit is contained in:
Martin Konecny 2012-02-11 23:53:43 -05:00
parent eadf68cb61
commit 885572b36d
1 changed files with 24 additions and 44 deletions

View File

@ -10,8 +10,7 @@ import string
import json import json
import telnetlib import telnetlib
import math import math
import socket from threading import Thread
from threading import Thread, Lock
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
@ -21,6 +20,7 @@ import filecmp
from kombu.connection import BrokerConnection from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer from kombu.messaging import Exchange, Queue, Consumer, Producer
from kombu.exceptions import MessageStateError from kombu.exceptions import MessageStateError
from kombu.simple import SimpleQueue
from api_clients import api_client from api_clients import api_client
@ -44,7 +44,6 @@ except Exception, e:
class PypoFetch(Thread): class PypoFetch(Thread):
def __init__(self, q): def __init__(self, q):
Thread.__init__(self) Thread.__init__(self)
self.lock = Lock()
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.set_export_source('scheduler') self.set_export_source('scheduler')
self.queue = q self.queue = q
@ -58,11 +57,9 @@ class PypoFetch(Thread):
try: try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
self.connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], config["rabbitmq_vhost"]) connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], config["rabbitmq_vhost"])
channel = self.connection.channel() channel = connection.channel()
consumer = Consumer(channel, schedule_queue) self.simple_queue = SimpleQueue(channel, schedule_queue)
consumer.register_callback(self.handle_message)
consumer.consume()
except Exception, e: except Exception, e:
logger.error(e) logger.error(e)
return False return False
@ -73,18 +70,12 @@ class PypoFetch(Thread):
Handle a message from RabbitMQ, put it into our yucky global var. Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this. Hopefully there is a better way to do this.
""" """
def handle_message(self, body, message): def handle_message(self, message):
try: try:
#Acquire Lock because multiple rabbitmq messages can be sent simultaneously
#and therefore we can have multiple threads inside this function. This causes
#multiple telnet connections to Liquidsoap which causes problems (refused connections).
self.lock.acquire()
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
logger.info("Received event from RabbitMQ: " + message.body) logger.info("Received event from RabbitMQ: %s" % message)
m = json.loads(message.body) m = json.loads(message)
command = m['event_type'] command = m['event_type']
logger.info("Handling command: " + command) logger.info("Handling command: " + command)
@ -105,13 +96,6 @@ class PypoFetch(Thread):
self.stop_current_show() self.stop_current_show()
except Exception, e: except Exception, e:
logger.error("Exception in handling RabbitMQ message: %s", e) logger.error("Exception in handling RabbitMQ message: %s", e)
finally:
self.lock.release()
try:
# ACK the message to take it off the queue
message.ack()
except MessageStateError, m:
logger.error("Message ACK error: %s", m)
def stop_current_show(self): def stop_current_show(self):
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
@ -515,29 +499,25 @@ class PypoFetch(Thread):
loops = 1 loops = 1
while True: while True:
logger.info("Loop #%s", loops) logger.info("Loop #%s", loops)
try: try:
# Wait for messages from RabbitMQ. Timeout if we try:
# dont get any after POLL_INTERVAL. message = self.simple_queue.get(block=True)
self.connection.drain_events(timeout=POLL_INTERVAL) self.handle_message(message.payload)
except socket.timeout, se: # ACK the message to take it off the queue
# We didnt get a message for a while, so poll the server message.ack()
# to get an updated schedule. except MessageStateError, m:
logger.error("Message ACK error: %s", m)
except Exception, e:
"""
There is a problem with the RabbitMq messenger service. Let's
log the error and get the schedule via HTTP polling
"""
logger.error("Exception, %s", e)
status, self.schedule_data = self.api_client.get_schedule() status, self.schedule_data = self.api_client.get_schedule()
if status == 1: if status == 1:
self.process_schedule(self.schedule_data, "scheduler", False) self.process_schedule(self.schedule_data, "scheduler", False)
except Exception, e:
"""
This Generic exception is thrown whenever the RabbitMQ
Service is stopped. In this case let's check every few
seconds to see if it has come back up
"""
logger.info("Exception, %s", e)
return
#return based on the exception
#if status == 1:
# self.process_schedule(self.schedule_data, "scheduler", False)
loops += 1 loops += 1
""" """