Source code for pyloggr.main.filter_machine

# encoding: utf-8

"""
The Filter Machine process can be used to apply series of filters to events
"""
__author__ = 'stef'

import logging
from datetime import timedelta

from tornado.gen import coroutine, TimeoutError
from tornado.ioloop import IOLoop
# noinspection PyCompatibility
from concurrent.futures import ThreadPoolExecutor

from ..rabbitmq import RabbitMQConnectionError
from ..rabbitmq.publisher import Publisher
from ..rabbitmq.consumer import Consumer
from pyloggr.filters import DropException, Filters
from pyloggr.event import Event, ParsingError, InvalidSignature
from pyloggr.config import Config
from pyloggr.utils import sleep

logger = logging.getLogger(__name__)


[docs]class FilterMachine(object): """ Implements an Event parser than retrieves events from RabbitMQ, apply filters, and pushes back events to RabbitMQ. """ def __init__(self, consumer_config, publisher_config, filters_filename): """ :type consumer_config: pyloggr.rabbitmq.Configuration :type publisher_config: pyloggr.rabbitmq.Configuration """ self.consumer_config = consumer_config self.publisher_config = publisher_config self.consumer = None self.publisher = None self.shutting_down = None self.executor = ThreadPoolExecutor(max_workers=self.consumer_config.qos + 5) self.filters = None self.filters_filename = filters_filename @coroutine
[docs] def launch(self): """ Starts the parser Note ==== Coroutine """ self.filters = Filters(Config.CONFIG_DIR, self.filters_filename) self.filters.open() self.publisher = Publisher(self.publisher_config) try: closed_publisher_event = yield self.publisher.start() except RabbitMQConnectionError: logger.warning("Filter machine: Can't connect to publisher") logger.info("We will try to reconnect to RabbitMQ in {} seconds".format(Config.SLEEP_TIME)) yield self.stop() yield sleep(60) if not self.shutting_down: IOLoop.instance().add_callback(self.launch) return # here we use a callback, so that we can directly wait for the next closed_publisher_event IOLoop.instance().add_callback(self._start_consumer) yield closed_publisher_event.wait() yield self.stop() yield sleep(60) if not self.shutting_down: IOLoop.instance().add_callback(self.launch)
@coroutine def _start_consumer(self): self.consumer = Consumer(self.consumer_config) try: closed_consumer_event = yield self.consumer.start() except (RabbitMQConnectionError, TimeoutError): logger.warning("Can't connect to consumer") logger.info("We will try to reconnect to RabbitMQ in {} seconds".format(Config.SLEEP_TIME)) # self.stop() stops the publisher too. so closed_publisher_event.wait() inside launch will return yield self.stop() return else: IOLoop.instance().add_callback(self._consume) yield closed_consumer_event.wait() yield self.stop() @coroutine def _consume(self): # this coroutine doesn't return, as long the rabbitmq connection lives message_queue = self.consumer.start_consuming() while self.consumer.consuming and not self.shutting_down: try: message = yield message_queue.get_wait(deadline=timedelta(seconds=1)) except TimeoutError: pass else: future = self.executor.submit(self._apply_filters, message) IOLoop.instance().add_future(future, self._publish) @coroutine def _publish(self, future): message, ev = future.result() if ev is None: # dropped event message.ack() return if self.publisher: # here we take into account the optional overrides of the router engine event_type = ev.override_event_type if ev.override_event_type else '' if ev.override_exchanges: # publish to many exchanges futures = [ self.publisher.publish_event( ev, routing_key='pyloggr.machine', event_type=event_type, exchange=exchange ) for exchange in ev.override_exchanges ] results = yield futures results = [res for res, _ in results] if any(results): # if at least one publish succeeds, we ack the message message.ack() if not all(results): logger.warning("Publication of event '{}' failed for at least one exchange".format(ev.uuid)) else: logger.warning("Publication of event '{}' failed for all exchanges".format(ev.uuid)) message.nack() else: # publish only to the default exchange, from machine configuration res, _ = yield self.publisher.publish_event(ev, routing_key='pyloggr.machine', event_type=event_type) if res: message.ack() else: logger.warning("Publication of event '{}' failed".format(ev.uuid)) message.nack() else: message.nack() @coroutine
[docs] def stop(self): """ Stops the parser """ futures = [] if self.consumer: futures.append(self.consumer.stop()) if self.publisher: futures.append(self.publisher.stop()) yield futures self.consumer = None self.publisher = None
@coroutine
[docs] def shutdown(self): """ Shutdowns (stops definitely) the parser """ self.shutting_down = True yield self.stop() self.filters.close()
[docs] def _apply_filters(self, message): """ Apply filters to the event inside the RabbitMQ message. Note ==== This method is executed in a separated thread. :param message: event to apply filters to, as a RabbitMQ message :type message: pyloggr.consumer.RabbitMQMessage :return: tuple(message, parsed event). parsed event is None when event couldn't be parsed. :rtype: tuple(pyloggr.consumer.RabbitMQMessage, pyloggr.event.Event) """ try: ev = Event.parse_bytes_to_event(message.body, hmac=True) except ParsingError: # should not happen, as pyloggr's syslog server just sent the event logger.error("Dropping one unparsable event") logger.error(message) return message, None except InvalidSignature: # should not happen, the event is not supposed to have a HMAC yet logger.critical("Dropping one tampered event") logger.critical(message) return message, None try: self.filters.apply(ev) except DropException: logger.debug("DROP filter") return message, None return message, ev