Source code for pyloggr.main.syslog_server

# encoding: utf-8

"""
This module provides stuff to implement a the main syslog/RELP server with Tornado
"""
from __future__ import absolute_import, division, print_function
__author__ = 'stef'

import logging
import threading
from collections import namedtuple
from datetime import timedelta
from itertools import ifilter

from tornado.gen import coroutine, Return, TimeoutError
from tornado.ioloop import IOLoop
from tornado.concurrent import Future
# noinspection PyCompatibility
from concurrent.futures import Future as RealFuture
# noinspection PyCompatibility
from concurrent.futures import ThreadPoolExecutor
import ujson

from pyloggr.syslog.server import BaseSyslogServer, SyslogParameters, BaseSyslogClientConnection
from pyloggr.event import Event, ParsingError, InvalidSignature
from pyloggr.rabbitmq.publisher import Publisher, RabbitMQConnectionError
from pyloggr.config import Config
from pyloggr.utils import sleep
from pyloggr.utils.parsing_classes import always_true_singleton
from pyloggr.utils.lmdb_wrapper import LmdbWrapper
from pyloggr.utils.simple_queue import ThreadSafeQueue
from pyloggr.cache import Cache


[docs]class ListOfClients(object): """ Stores the current Syslog clients, sends notifications to observers, publishes the list of clients in Redis """ server_id = None @classmethod
[docs] def set_server_id(cls, server_id): """ :param server_id: process number :type server_id: int """ cls.server_id = server_id
def __init__(self): super(ListOfClients, self).__init__() self._clients = dict()
[docs] def add(self, client_id, client): """ :type client_id: str :type client: SyslogClientConnection """ self._clients[client_id] = client Cache.syslog_list[self.server_id].clients = self if publications: d = {'action': 'add_client'} d.update(client.props) publications.notify_observers(d, 'pyloggr.syslog.clients')
[docs] def remove(self, client_id): """ :type client_id: str """ if client_id in self._clients: d = {'action': 'remove_client'} d.update(self._clients[client_id].props) del self._clients[client_id] Cache.syslog_list[self.server_id].clients = self if publications: publications.notify_observers(d, 'pyloggr.syslog.clients')
[docs] def __getitem__(self, client_id): """ :type client_id: str """ return self._clients[client_id]
def __iter__(self): return iter(self._clients) # noinspection PyDocstring def values(self): return self._clients.values() # noinspection PyDocstring def keys(self): return self._clients.keys()
list_of_clients = ListOfClients() Notification = namedtuple('Notification', ['dictionnary', 'routing_key']) SyslogMessage = namedtuple( 'SyslogMessage', ['protocol', 'server_port', 'client_host', 'bytes_event', 'client_id', 'relp_id', 'total_messages'] )
[docs]class Publicator(object): """ `Publicator` manages the RabbitMQ connection and actually makes the publish calls. Publicator runs in its own thread, and has its own IOLoop. Parameters ========== syslog_servers_conf: Syslog configuration (used to initialize packers) rabbitmq_config: RabbitMQ connection parameters """ def __init__(self, syslog_servers_conf, rabbitmq_config): self.rabbitmq_config = rabbitmq_config self.publisher = None self.syslog_servers_conf = syslog_servers_conf self._publications_queue = ThreadSafeQueue() self._publication_thread = threading.Thread(target=self.init_thread) self._shutting_down = threading.Event() self._rabbitmq_status = threading.Event() self.rabbit_is_lost_future = None self.packer_groups = {} self.logger = None
[docs] def start(self): """ Start `publications` own thread """ self._publication_thread.start()
[docs] def rabbitmq_status(self): """ Return True if we have an established connection to RabbitMQ """ return self._rabbitmq_status.is_set()
[docs] def init_thread(self): """ `Publicator` thread: start a new IOLoop, make it current, call `_do_start` as a callback """ # make a specific IOLoop in this thread for RabbitMQ publishing # noinspection PyAttributeOutsideInit self.publication_ioloop = IOLoop() self.publication_ioloop.make_current() self.publication_ioloop.add_callback(self._do_start) self.publication_ioloop.start() # when the publication_ioloop will be stopped by 'shutdown', previous start() will return, and the # thread will terminate
@coroutine def _do_start(self): self.logger = logging.getLogger(__name__) lost_rabbit_connection = yield self._connect_to_rabbit() if lost_rabbit_connection is None: self.publication_ioloop.stop() return IOLoop.current().add_callback(self._wait_for_messages) yield lost_rabbit_connection.wait() # if we get here, it means we lost rabbitmq logging.getLogger(__name__).debug("syslog_server.Publication: lost rabbit!") self._rabbitmq_status.clear() self.rabbit_is_lost_future.set_result(True)
[docs] def shutdown(self): """ Ask Publicator to shutdown. Can be called by any thread. """ if not self._shutting_down.is_set(): self._shutting_down.set()
[docs] def notify_observers(self, d, routing_key=''): """ Send a notification via RabbitMQ :param d: dictionnary to send as a notification :param routing_key: RabbitMQ routing key :type d: dict :type routing_key: str """ if self._rabbitmq_status.is_set() and not self._shutting_down.is_set(): self._publications_queue.put(Notification(d, routing_key)) return True else: return False
def publish_syslog_messages(self, protocol, server_port, client_host, bytes_events, client_id, relp_id): if self._rabbitmq_status.is_set() and not self._shutting_down.is_set(): total = len(bytes_events) [ self._publications_queue.put( SyslogMessage(protocol, server_port, client_host, bytes_event, client_id, relp_id, total) ) for bytes_event in bytes_events ] return True else: return False # noinspection PyDocstring
[docs] def publish_syslog_message(self, protocol, server_port, client_host, bytes_event, client_id, relp_id=None): """ Ask `publications` to publish a syslog event to RabbitMQ. Can be called by any thread Parameters ========== protocol: str 'tcp' or 'relp' server_port: int which syslog server port was used to transmit the event client_host: str client hostname that sent the event bytes_event: bytes the event as bytes client_id: str SyslogClientConnection client_id relp_id: int event RELP id """ if self._rabbitmq_status.is_set() and not self._shutting_down.is_set(): self._publications_queue.put( SyslogMessage(protocol, server_port, client_host, bytes_event, client_id, relp_id, 1) ) return True else: return False
@coroutine def _connect_to_rabbit(self): rabbit_close_ev = None self.publisher = Publisher(self.rabbitmq_config) while True: try: rabbit_close_ev = yield self.publisher.start() except (RabbitMQConnectionError, TimeoutError): logging.getLogger(__name__).error("local thread: can't connect to RabbitMQ") # _shutting_down could be set when we are sleeping # if we don't listen to it, the thread will not be stopped... and pyloggr will stay alive yield sleep(Config.SLEEP_TIME, threading_event=self._shutting_down) if self._shutting_down.is_set(): return else: break self.rabbit_is_lost_future = RealFuture() self._rabbitmq_status.set() raise Return(rabbit_close_ev) def _initialize_packers(self): # chain packers: packers A publishes the event in packer A+1, which publishes the event in A+2... for server in self.syslog_servers_conf: new_packer_groups = [] for packer_group in server.packer_groups: current_publisher = self.publisher for packer_partial in reversed(packer_group.packers): current_publisher = packer_partial(current_publisher) new_packer_groups.append((packer_group.condition, current_publisher)) for port in server.ports: # str because port can be a unix socket name self.packer_groups[str(port)] = new_packer_groups
[docs] def _do_publish_syslog(self, message): """ :type message: SyslogMessage """ try: event = Event.parse_bytes_to_event(message.bytes_event, hmac=True) except ParsingError as ex: if ex.json: self.logger.warning("JSON decoding failed. We log the event, drop it and continue") self.logger.warning(message.bytes_event) return else: # kick out the misleading client if message.client_id in list_of_clients: IOLoop.instance().add_callback(list_of_clients[message.client_id].disconnect) return event.relp_id = message.relp_id event['syslog_server_port'] = message.server_port event['syslog_client_host'] = message.client_host event['syslog_protocol'] = message.protocol # pick the right publisher/packer early_fails = False if self.publisher is None: early_fails = True else: # pick the first packer which condition gives True # if no packer is suited, just use the traditional publisher port = str(message.server_port) if port in self.packer_groups: packers = self.packer_groups[str(message.server_port)] + [(always_true_singleton, self.publisher)] chosen_publisher = ifilter( lambda (condition, publisher): condition.eval(event), packers ).next()[1] else: chosen_publisher = self.publisher if self._rabbitmq_status.is_set(): future = chosen_publisher.publish_event( event, routing_key='pyloggr.syslog.{}'.format(message.server_port) ) future.protocol = message.protocol future.client_id = message.client_id future.total_messages = message.total_messages # after the event has been published (or not), we need to inform the sender IOLoop.current().add_future(future, Publicator._after_published) else: early_fails = True if early_fails: # don't even try to publish to rabbit, but notify the syslog client future = Future() future.protocol = message.protocol future.client_id = message.client_id future.total_messages = message.total_messages future.set_result((False, event)) Publicator._after_published(future)
[docs] def _do_publish_notification(self, message): """ :type message: Notification """ json_message = ujson.dumps(message.dictionnary) if (self.publisher is not None) and self._rabbitmq_status.is_set(): IOLoop.current().add_callback( self.publisher.publish, exchange=Config.NOTIFICATIONS.exchange, body=json_message, routing_key=message.routing_key, persistent=False, event_type=Config.NOTIFICATIONS.event_type, application_id=Config.NOTIFICATIONS.application_id ) else: self.logger.debug( "Some notification was not sent cause connection with RabbitMQ was not available" )
@coroutine def _wait_for_messages(self): self.count_lz4_messages = {} self._initialize_packers() while not self._shutting_down.is_set(): try: message = yield self._publications_queue.get_wait(deadline=timedelta(seconds=1)) except TimeoutError: pass else: if isinstance(message, Notification): self._do_publish_notification(message) elif isinstance(message, SyslogMessage): self._do_publish_syslog(message) else: self.logger.warning("Ignoring strange message type '{}'".format(type(message))) yield self._do_shutdown() @coroutine def _do_shutdown(self): # notify the packers that they should not accept new stuff [packer.shutdown() for packer_groups in self.packer_groups.values() for _, packer in packer_groups] # shutdown the rabbitmq publisher if self.publisher: yield self.publisher.stop() # yield returns when the rabbit connection is actually closed self.publisher = None # wait a bit so that 'publications' can finish its stuff # noinspection PyUnresolvedReferences,PyProtectedMember if self.publication_ioloop._callbacks or self.publication_ioloop._timeouts: yield sleep(1) self.publication_ioloop.stop() @classmethod def _after_published(cls, f): """ :type f: tornado.concurrent.Future """ status, event = f.result() if event is None or status is None: return # give control back to main thread if hasattr(f, 'protocol'): protocol = f.protocol.lower() if protocol in ("tcp", "udp"): # if TCP protocol, no need to answer the client # but we must save the event when the RabbitMQ publication has failed IOLoop.instance().add_callback(after_published_tcp, status, event=event) elif protocol == "relp": if hasattr(f, 'client_id'): if f.client_id in list_of_clients: client = list_of_clients[f.client_id] # if RELP protocol, we have to send a response to the RELP client IOLoop.instance().add_callback(client.after_published_relp, status, event=event) else: # client is already gone... logging.getLogger(__name__).info( "Couldn't send RELP response to already gone client '%s'", f.client_id ) elif protocol == "lz4": if hasattr(f, 'client_id'): if f.client_id in list_of_clients: if hasattr(f, 'total_messages'): client = list_of_clients[f.client_id] IOLoop.instance().add_callback(client.after_published_lz4, status, event, f.total_messages) else: logging.getLogger(__name__).info( "Couldn't send LZ4 response to already gone client '%s'", f.client_id )
@coroutine
[docs]def after_published_tcp(status, event=None, bytes_event=None): """ Called after an event received by TCP has been tried to be published in RabbitMQ :param status: True if the event was successfully sent :param event: the Event object :param bytes_event: the event as bytes """ if event is None and bytes_event is None: return if not status: logger = logging.getLogger(__name__) logger.warning("RabbitMQ could not store an event :( We shall try tu put in rescue queue") if event is None: try: event = Event.parse_bytes_to_event(bytes_event, hmac=True) except (ParsingError, InvalidSignature): logger.warning("... but event was unparsable: we drop it") return # LMDB can block, use a thread with ThreadPoolExecutor(1) as exe: yield exe.submit(_store_lmdb(event))
def _store_lmdb(event): logger = logging.getLogger(__name__) lmdb = LmdbWrapper.get_instance(Config.RESCUE_QUEUE_DIRNAME) if lmdb.queue('pyloggr.rescue').push(event): logger.info("Published in RabbitMQ failed, but we pushed the event in the rescue queue") else: logger.error("Failed to save event in redis. Event has been lost") publications = None
[docs]class SyslogClientConnection(BaseSyslogClientConnection): """ Encapsulates a connection with a syslog client """ def __init__(self, stream, address, syslog_parameters): super(SyslogClientConnection, self).__init__(stream, address, syslog_parameters) self.count_lz4_messages = {} @property def props(self): """ Return a few properties for this client :rtype: dict """ return { 'host': self.client_host, 'client_port': self.client_port, 'server_port': self.server_port, 'id': self.client_id, 'server_id': ListOfClients.server_id } @coroutine
[docs] def after_published_relp(self, status, event=None, relp_id=None): """ Called after an event received by RELP has been published in RabbitMQ :param status: True if the event was successfully sent :param event: the Event object :param relp_id: event RELP id """ if event is None and relp_id is None: return relp_event_id = event.relp_id if relp_id is None else relp_id if relp_event_id is None: return if status: yield self.stream.write('{} rsp 6 200 OK\n'.format(relp_event_id)) else: logging.getLogger(__name__).info("RabbitMQ publisher said NACK, sending 500 to RELP client. Event ID: {}".format( relp_event_id )) yield self.stream.write('{} rsp 6 500 KO\n'.format(relp_event_id))
@coroutine def after_published_lz4(self, status, event, total_messages): relp_id = event.relp_id already_received_count = self.count_lz4_messages.get(relp_id) # noinspection PySimplifyBooleanCheck if already_received_count is False: # we already answered this group of messages return elif not status: # an event of the group was rejected. we have to answer the client and refuse all the group. self.count_lz4_messages[relp_id] = False yield self.stream.write('{} rsp 6 500 KO\n'.format(relp_id)) elif already_received_count is None: self.count_lz4_messages[relp_id] = 1 else: self.count_lz4_messages[relp_id] += 1 if self.count_lz4_messages[relp_id] == total_messages: # all events from this group have been published, confirm it to the client yield self.stream.write('{} rsp 6 200 OK\n'.format(relp_id)) del self.count_lz4_messages[relp_id] def _set_socket(self): super(SyslogClientConnection, self)._set_socket() list_of_clients.add(self.client_id, self)
[docs] def disconnect(self): """ Disconnects the client """ if not self.disconnecting.is_set(): super(SyslogClientConnection, self).disconnect() list_of_clients.remove(self.client_id)
[docs] def on_stream_closed(self): """ on_stream_closed() Called when a client has been disconnected """ list_of_clients.remove(self.client_id) super(SyslogClientConnection, self).on_stream_closed()
[docs] def _process_event(self, bytes_event, protocol, relp_event_id=None): """ _process_relp_event(bytes_event, relp_event_id) Process a TCP syslog or RELP event. :param bytes_event: the event as `bytes` :param protocol: relp or tcp :param relp_event_id: event RELP ID, given by the RELP client """ if bytes_event: self.nb_messages_received += 1 accepted = publications.publish_syslog_message( protocol, self.server_port, self.client_host, bytes_event, self.client_id, relp_event_id ) if not accepted: protocol = protocol.lower() # message was refused by 'publications' because rabbitmq is not available if protocol == 'tcp': IOLoop.instance().add_callback(after_published_tcp, False, bytes_event=bytes_event) elif protocol == 'relp': IOLoop.instance().add_callback(self.after_published_relp, False, relp_id=relp_event_id)
def _process_group_events(self, bytes_events, relp_event_id): nb_events = len(bytes_events) self.nb_messages_received += nb_events accepted = publications.publish_syslog_messages( 'lz4', self.server_port, self.client_host, bytes_events, self.client_id, relp_event_id ) if not accepted: # only send one response to client for all the LZ4 transmitted messages IOLoop.instance().add_callback(self.after_published_relp, False, relp_id=relp_event_id)
[docs]class MainSyslogServer(BaseSyslogServer): """ Tornado syslog server `SyslogServer` listens for syslog messages (RELP, RELP/TLS, TCP, TCP/TLS, Unix socket) and sends messages to RabbitMQ. """ def __init__(self, rabbitmq_config, syslog_parameters, server_id): """ :type syslog_parameters: SyslogParameters :type rabbitmq_config: pyloggr.config.RabbitMQBaseConfig :type server_id: int """ super(MainSyslogServer, self).__init__(syslog_parameters) self.rabbitmq_config = rabbitmq_config self.server_id = server_id ListOfClients.set_server_id(server_id) @coroutine
[docs] def launch(self): """ launch() Starts the server - First we try to connect to RabbitMQ - If successfull, we start to listen for syslog clients Note ==== Tornado coroutine """ LmdbWrapper(Config.RESCUE_QUEUE_DIRNAME, size=52428800).open( sync=True, metasync=True, max_dbs=2 ) global publications publications = Publicator(self.syslog_parameters.conf.values(), self.rabbitmq_config) publications.start() # wait that publications object has successfully connected to rabbitmq while not publications.rabbitmq_status(): yield sleep(1) yield super(MainSyslogServer, self).launch() yield publications.rabbit_is_lost_future logging.getLogger(__name__).debug("Syslog server: lost Rabbit!") yield self.stop_all() yield sleep(Config.SLEEP_TIME) if not self.shutting_down: IOLoop.instance().add_callback(self.launch)
@coroutine
[docs] def _start_syslog(self): """ _start_syslog() Start to listen for syslog clients Note ==== Tornado coroutine """ if not self.listening: yield super(MainSyslogServer, self)._start_syslog() Cache.syslog_list[self.server_id].status = True Cache.syslog_list[self.server_id].ports = self.syslog_parameters.port_to_protocol.keys() if publications: publications.notify_observers( { 'action': 'add_server', 'ports': self.syslog_parameters.port_to_protocol.keys(), 'server_id': self.server_id }, 'pyloggr.syslog.servers' )
@coroutine
[docs] def _stop_syslog(self): """ _stop_syslog() Stop listening for syslog connections Note ==== Tornado coroutine """ if self.listening: yield super(MainSyslogServer, self)._stop_syslog() Cache.syslog_list[self.server_id].status = False if publications: publications.notify_observers( {'action': 'remove_server', 'server_id': self.server_id}, 'pyloggr.syslog.servers' ) # buy a bit of time so that notifications actually reach rabbit yield sleep(2)
@coroutine
[docs] def stop_all(self): """ stop_all() Stops completely the server. Stop listening for syslog clients. Close connection to RabbitMQ. Note ==== Tornado coroutine """ yield super(MainSyslogServer, self).stop_all() del Cache.syslog_list[self.server_id] if publications: publications.shutdown()
[docs] def handle_data(self, data, sockname, peername): """ Handle UDP syslog :param data: data sent :param sockname: the server socket info :param peername: the client socket info """ data = data.strip('\r\n ') if not data: return if not peername: client = "unix socket" server_port = sockname else: client = peername[0] server_port = sockname[0] accepted = publications.publish_syslog_message( 'udp', server_port, client, data, None, None ) if not accepted: IOLoop.instance().add_callback(after_published_tcp, False, bytes_event=data)
@coroutine
[docs] def handle_stream(self, stream, address): """ handle_stream(stream, address) Called by tornado when we have a new client. :param stream: IOStream for the new connection :type stream: `IOStream` :param address: tuple (client IP, client source port) :type address: tuple Note ==== Tornado coroutine """ connection = SyslogClientConnection( stream=stream, address=address, syslog_parameters=self.syslog_parameters ) self.list_of_clients.append(connection) yield connection.on_connect()
@coroutine
[docs] def shutdown(self): """ Authoritarian shutdown """ yield super(MainSyslogServer, self).shutdown() LmdbWrapper.get_instance(Config.RESCUE_QUEUE_DIRNAME).close()