Source code for pyloggr.rabbitmq.notifications_consumer
# encoding: utf-8
__author__ = 'stef'
import logging
import ujson
from datetime import timedelta
from tornado.gen import coroutine, TimeoutError
from .consumer import Consumer
from ..utils.observable import Observable
logger = logging.getLogger(__name__)
[docs]class NotificationsConsumer(Consumer, Observable):
"""
Consumes notification that were posted in RabbitMQ
Parameters
==========
rabbitmq_config: pyloggr.config.RabbitMQBaseConfig
RabbitMQ connection parameters (to consume notifications from RabbitMQ)
binding_key: str
Binding key to filter notifications
"""
def __init__(self, rabbitmq_config):
"""
:type rabbitmq_config: pyloggr.config.RabbitMQBaseConfig
"""
Consumer.__init__(self, rabbitmq_config)
Observable.__init__(self)
@coroutine
[docs] def start_consuming(self):
"""
Start consuming notifications from RabbitMQ and notify observers.
"""
message_queue = Consumer.start_consuming(self)
while self.consuming:
try:
message = yield message_queue.get_wait(deadline=timedelta(seconds=1))
except TimeoutError:
pass
else:
try:
self.notify_observers(ujson.loads(message.body))
except Exception:
logger.exception("Swallowed exception")