Source code for pyloggr.main.shipper2pgsql

# encoding: utf-8
"""
Ship events to a PostgreSQL database
"""

__author__ = 'stef'

import logging

from tornado.gen import coroutine
from tornado.ioloop import PeriodicCallback, IOLoop
# noinspection PyCompatibility
from concurrent.futures import ThreadPoolExecutor
from psycopg2.pool import ThreadedConnectionPool, PoolError
# noinspection PyCompatibility
import psycopg2
from sortedcontainers import SortedSet
from tornado.gen import Return, TimeoutError

from pyloggr.utils.constants import SQL_INSERT_QUERY, SQL_COLUMNS_STR, D_COLUMNS
from pyloggr.rabbitmq.consumer import Consumer, RabbitMQConnectionError
from pyloggr.event import Event, ParsingError, InvalidSignature
from pyloggr.utils import sleep
from pyloggr.config import Config

logger = logging.getLogger(__name__)


[docs]class PostgresqlShipper(object): """ PostgresqlShipper retrieves events from RabbitMQ, and inserts them in PostgreSQL """ def __init__(self, rabbitmq_config, pgsql_config): """ :type rabbitmq_config: pyloggr.rabbitmq.Configuration :type pgsql_config: pyloggr.config.Shipper2PGSQL """ self.pgsql_config = pgsql_config self.syslog_ev_queue = None self.periodic_check_queue_size = None self.dsn = 'dbname={} user={} password={} host={} port={} connect_timeout={}'.format( self.pgsql_config.dbname, self.pgsql_config.user, self.pgsql_config.password, self.pgsql_config.host, self.pgsql_config.port, self.pgsql_config.connect_timeout ) self.db_pool = None self.shutting_down = None self._times = 0 self.consumer = Consumer(rabbitmq_config) @coroutine
[docs] def launch(self): """ Starts the shipper - Opens a connection to RabbitMQ - Opens a pool to PostgreSQL - Consumes messages from RabbitMQ - Parses messages as regular syslog events - Periodically ships the events to PostgreSQL """ self.periodic_check_queue_size = None # connect to RabbitMQ try: closed_conn_event = yield self.consumer.start() except (RabbitMQConnectionError, TimeoutError): logger.error("Can't connect to RabbitMQ") yield sleep(60) if not self.shutting_down: IOLoop.instance().add_callback(self.launch) return # connect to PGSQL while self.db_pool is None: try: yield self._get_db_pool() except psycopg2.Error: logger.error("shipper: can't connect to PGSQL") yield sleep(Config.SLEEP_TIME) self.syslog_ev_queue = self.consumer.start_consuming() self.periodic_check_queue_size = PeriodicCallback(self._check_queue_size, callback_time=2000) self.periodic_check_queue_size.start() yield closed_conn_event.wait() # we lost connection to RabbitMQ (by accident, or because stop() was called) yield self.stop() logger.info("Waiting {} seconds before trying to reconnect".format(Config.SLEEP_TIME)) yield sleep(Config.SLEEP_TIME) if not self.shutting_down: IOLoop.instance().add_callback(self.launch)
@coroutine def _get_db_pool(self): if not self.db_pool: # we try to connect to PGSQL in a thread, because connection timeouts can block executor = ThreadPoolExecutor(max_workers=1) try: self.db_pool = yield executor.submit( ThreadedConnectionPool, 1, self.pgsql_config.max_pool, self.dsn, async=False ) finally: executor.shutdown() raise Return(self.db_pool) @coroutine
[docs] def stop(self): """ Stops the shipper """ logger.info("Stopping the shipper2pgsql") if self.periodic_check_queue_size: self.periodic_check_queue_size.stop() self.periodic_check_queue_size = None if self.consumer: yield self.consumer.stop() self.consumer = None if self.db_pool: if not self.db_pool.closed: self.db_pool.closeall() self.db_pool = None
@coroutine
[docs] def shutdown(self): """ Shutdowns (stops definitely) the shipper. """ logger.info("Shutting down shipper2pgsql") self.shutting_down = True yield self.stop()
@coroutine def _check_queue_size(self): self._times += 2 # todo: configurable 500 and 60 if self._times >= 60 or self.syslog_ev_queue.qsize() >= 500: self._times = 0 IOLoop.instance().add_callback(self._flush_messages) @coroutine def _flush_messages(self): size = self.syslog_ev_queue.qsize() if size == 0: logger.debug("No event to flush") return logger.info("Flushing events to PGSQL") if self.db_pool is None: logger.warning("We don't have a pool to PGSQL. Giving up flush. Stopping the consumer.") yield self.stop() return if self.db_pool.closed: logger.warning("PGSQL pool is closed. Giving up flush. Stopping the consumer.") yield self.stop() return # get_all_nowait pops all the events in syslog_ev_queue msgs = self.syslog_ev_queue.get_all() if not msgs: return logger.info("{} events to forward to PGSQL".format(len(msgs))) def _flush_backthread(rabbitmq_messages, tablename): # parse the bytes messages into real events # we use a SortedSet to get rid of duplicates events = SortedSet() for rabbit_message in rabbitmq_messages: try: ev = Event.parse_bytes_to_event(rabbit_message.body, hmac=True) except ParsingError: # should not happen, messages are coming from pyloggr logger.info("shipper: dropping one unparsable message") except InvalidSignature: security_logger = logging.getLogger('security') logger.error("Dropping one tampered event, see security logs") security_logger.critical("Dropping one tampered event") security_logger.critical(rabbit_message.body) else: events.add(ev) try: conn = self.db_pool.getconn() except PoolError: logging.exception("flush_backthread: can't get a PGSQL connection from the pool") raise try: conn.autocommit = False with conn.cursor() as cur: # build the SQL insert query values = ','.join([evt.dump_sql(cur) for evt in events]) # query = "INSERT INTO {} {} VALUES ".format(tablename, SQL_COLUMNS) + values query = SQL_INSERT_QUERY.format( SQL_COLUMNS_STR, values, tablename, SQL_COLUMNS_STR, D_COLUMNS, tablename ) cur.execute(query) conn.commit() except psycopg2.Error: logger.exception("flush_backthread: flushing to PGSQL failed") raise finally: if conn: self.db_pool.putconn(conn) executor = ThreadPoolExecutor(max_workers=1) # noinspection PyBroadException try: yield executor.submit( _flush_backthread, rabbitmq_messages=msgs, tablename=self.pgsql_config.tablename ) except (psycopg2.Error, PoolError): logger.exception("Flushing to PGSQL failed (probably PGSQL connection problem)") map(lambda message: message.nack(), msgs) yield self.stop() except: logger.error("Admin should review this") logger.exception("shipper: unexpected Exception while flushing events to PGSQL") map(lambda message: message.nack(), msgs) yield self.stop() else: map(lambda message: message.ack(), msgs) finally: executor.shutdown()