# encoding: utf-8
"""
Local syslog agent
"""
from __future__ import absolute_import, division, print_function
__author__ = 'stef'
from threading import Thread
import threading
from os.path import exists
import socket
import arrow
import logging
import time
from itertools import cycle
# noinspection PyCompatibility
from queue import Queue
from copy import copy
from datetime import timedelta
# noinspection PyCompatibility
from queue import Empty as QueueEmpty
from tornado.gen import coroutine, TimeoutError
from tornado.ioloop import IOLoop
from pyloggr.event import Event, ParsingError, InvalidSignature
from pyloggr.utils.lmdb_wrapper import LmdbWrapper
from pyloggr.utils import sleep
from pyloggr.utils.simple_queue import ThreadSafeQueue
from pyloggr.syslog.relp_client import ServerClose
from pyloggr.syslog import client_factory
from pyloggr.syslog.server import BaseSyslogServer, SyslogParameters, BaseSyslogClientConnection
from pyloggr.config import SyslogServerConfig, SyslogAgentConfig, SyslogAgentDestination
[docs]class SyslogAgent(BaseSyslogServer):
"""
Syslog agent
`SyslogServer` listens for syslog messages (RELP, RELP/TLS, TCP, TCP/TLS, Unix socket) and
sends messages to a remote TCP/Syslog or RELP server.
"""
# todo: retrieve kernel log messages on linux
def __init__(self, syslog_agent_config):
"""
:type syslog_agent_config: SyslogAgentConfig
"""
syslog_conf = {}
if syslog_agent_config.tcp_ports:
syslog_conf['tcp_agent'] = SyslogServerConfig(
name="tcp_agent",
ports=syslog_agent_config.tcp_ports,
stype="tcp",
localhost_only=syslog_agent_config.localhost_only
)
if syslog_agent_config.udp_ports:
syslog_conf['udp_agent'] = SyslogServerConfig(
name="udp_agent",
ports=syslog_agent_config.udp_ports,
stype="udp",
localhost_only=syslog_agent_config.localhost_only
)
if syslog_agent_config.relp_ports:
syslog_conf['relp_agent'] = SyslogServerConfig(
name="relp_agent",
ports=syslog_agent_config.relp_ports,
stype="relp",
localhost_only=syslog_agent_config.localhost_only
)
if syslog_agent_config.socket_names:
syslog_conf['sockets'] = SyslogServerConfig(
name="sockets",
socket_names=syslog_agent_config.socket_names,
stype="unix"
)
syslog_parameters = SyslogParameters(syslog_conf)
self.syslog_agent_config = syslog_agent_config
super(SyslogAgent, self).__init__(syslog_parameters)
self._put_messages_in_lmdb_thread = None
self._retrieve_messages_from_lmdb_thread = None
self._publication_thread = None
self.publication_queue = None
self.published_messages_queue = None
self.failed_messages_queue = None
self.received_messages_queue = None
self.syslog_server_is_available = None
@coroutine
[docs] def launch(self):
"""
launch()
Starts the agent
Note
====
Tornado coroutine
"""
LmdbWrapper(self.syslog_agent_config.lmdb_db_name, size=52428800).open(
sync=False, metasync=False, max_dbs=3
)
self.received_messages_queue = Queue()
self.publication_queue = ThreadSafeQueue()
self.published_messages_queue = Queue()
self.failed_messages_queue = Queue()
self.syslog_server_is_available = threading.Event()
self._start_publication_thread()
self._start_thread_retrieve_messages_from_lmdb()
self._start_thread_store_messages_in_lmdb()
yield self._start_syslog()
def _start_publication_thread(self):
if self._publication_thread is None:
self._publication_thread = Publications(
syslog_agent_config=self.syslog_agent_config,
publication_queue=self.publication_queue,
published_messages_queue=self.published_messages_queue,
failed_messages_queue=self.failed_messages_queue,
syslog_server_is_available=self.syslog_server_is_available
)
logger = logging.getLogger(__name__)
logger.debug("Starting 'Publication' thread")
self._publication_thread.start()
def _start_thread_retrieve_messages_from_lmdb(self):
if self._retrieve_messages_from_lmdb_thread is None:
self._retrieve_messages_from_lmdb_thread = RetrieveMessagesFromLMDB(
lmdb_db_name=self.syslog_agent_config.lmdb_db_name,
publication_queue=self.publication_queue,
published_messages_queue=self.published_messages_queue,
failed_messages_queue=self.failed_messages_queue,
syslog_server_is_available=self.syslog_server_is_available,
pause=self.syslog_agent_config.pause
)
logger = logging.getLogger(__name__)
logger.debug("Starting 'Retrieve messages from LMDB' thread")
self._retrieve_messages_from_lmdb_thread.start()
def _start_thread_store_messages_in_lmdb(self):
if self._put_messages_in_lmdb_thread is None:
self._put_messages_in_lmdb_thread = StoreMessagesInLMDB(
received_messages_queue=self.received_messages_queue,
lmdb_db_name=self.syslog_agent_config.lmdb_db_name
)
logger = logging.getLogger(__name__)
logger.debug("Starting 'Store messages in LMDB' thread")
self._put_messages_in_lmdb_thread.start()
@coroutine
[docs] def _start_syslog(self):
"""
_start_syslog()
Start to listen for syslog clients
Note
====
Tornado coroutine
"""
if not self.listening:
yield super(SyslogAgent, self)._start_syslog()
@coroutine
[docs] def _stop_syslog(self):
"""
_stop_syslog()
Stop listening for syslog connections
Note
====
Tornado coroutine
"""
if self.listening:
yield super(SyslogAgent, self)._stop_syslog()
# close the sockets
self.stop()
self.syslog_parameters.delete_unix_sockets()
def _stop_publication_thread(self):
logger = logging.getLogger(__name__)
if self._publication_thread is not None:
logger.debug("Asking thread 'Publication' to stop")
self._publication_thread.stopping.set()
self._publication_thread.publication_ioloop.stop()
self._publication_thread.join()
self._publication_thread = None
def _stop_thread_put_messages_in_lmdb(self):
logger = logging.getLogger(__name__)
if self._put_messages_in_lmdb_thread is not None:
logger.debug("Asking thread 'Store messages in LMDB' to stop")
self._put_messages_in_lmdb_thread.stopping.set()
# wait until it is actually stopped
self._put_messages_in_lmdb_thread.join()
self._put_messages_in_lmdb_thread = None
def _stop_thread_retrieve_messages_from_lmdb(self):
logger = logging.getLogger(__name__)
if self._retrieve_messages_from_lmdb_thread is not None:
logger.debug("Asking thread 'Retrieve messages from LMDB' to stop")
self._retrieve_messages_from_lmdb_thread.stopping.set()
# wait until it is actually stopped
self._retrieve_messages_from_lmdb_thread.join()
self._retrieve_messages_from_lmdb_thread = None
@coroutine
[docs] def stop_all(self):
"""
stop_all()
Stops completely the server. Stop listening for syslog clients. Close connection to remote server.
Note
====
Tornado coroutine
"""
# stop the syslog server
yield self._stop_syslog()
# stop the "put things in LMDB thread"
self._stop_thread_put_messages_in_lmdb()
# stop the "retrieve from LMDB thread"
self._stop_thread_retrieve_messages_from_lmdb()
# stop the publication thread
self._stop_publication_thread()
@coroutine
[docs] def shutdown(self):
"""
Authoritarian shutdown
"""
# call stop_all
yield super(SyslogAgent, self).shutdown()
# cleanly close LMDB
if exists(self.syslog_agent_config.lmdb_db_name):
LmdbWrapper.get_instance(self.syslog_agent_config.lmdb_db_name).close()
# noinspection PyDocstring
[docs] def handle_data(self, data, sockname, peername):
"""
Handle UDP connections
"""
data = data.strip('\r\n ')
if data:
self.received_messages_queue.put_nowait(data)
# noinspection PyDocstring
@coroutine
[docs] def handle_stream(self, stream, address):
"""
Handle TCP and RELP clients
"""
connection = SyslogAgentClient(
stream=stream,
address=address,
syslog_parameters=self.syslog_parameters,
received_messages=self.received_messages_queue
)
yield connection.on_connect()
[docs]class Publications(Thread):
"""
The Publications thread handles the connection to the remote syslog server to publish the messages
"""
def __init__(self, syslog_agent_config, publication_queue, published_messages_queue, failed_messages_queue,
syslog_server_is_available):
"""
:type syslog_agent_config: SyslogAgentConfig
"""
super(Publications, self).__init__(name="Send messages to remote syslog")
self.syslog_agent_config = syslog_agent_config
self.publication_queue = publication_queue
self.published_messages_queue = published_messages_queue
self.failed_messages_queue = failed_messages_queue
self.stopping = threading.Event()
self.syslog_server_is_available = syslog_server_is_available
self.publication_ioloop = None
self.next_destination_idx = cycle(range(len(syslog_agent_config.destinations)))
# noinspection PyDocstring
def run(self):
# start a second IOLoop for publications to remote syslog
self.publication_ioloop = IOLoop()
self.publication_ioloop.make_current()
self.publication_ioloop.add_callback(self._do_start)
self.publication_ioloop.start()
# will not return until the ioloop is stopped
logging.getLogger(__name__).debug("End of 'Publications' thread")
@coroutine
def _do_start(self):
# try to connect to the remote syslog
logger = logging.getLogger(__name__)
idx = next(self.next_destination_idx)
self.destination = self.syslog_agent_config.destinations[idx]
assert(isinstance(self.destination, SyslogAgentDestination))
last_destination = idx == (len(self.syslog_agent_config.destinations) - 1)
self.syslog_or_relp_client = client_factory(
protocol=self.destination.protocol,
servr=self.destination.host,
port=self.destination.port,
use_ssl=self.destination.tls,
verify_cert=self.destination.verify_server_cert,
hostname=self.destination.tls_hostname,
server_deadline=self.syslog_agent_config.server_deadline
)
if self.stopping.is_set():
return
try:
logger.info("Connecting to syslog destination %s", idx)
self.closed_connection_event = yield self.syslog_or_relp_client.start()
except (socket.error, TimeoutError):
logger.error("Syslog agent: can't connect to remote syslog server")
if last_destination:
yield sleep(60, threading_event=self.stopping)
if not self.stopping.is_set():
IOLoop.current().add_callback(self._do_start)
return
except ServerClose:
logger.critical("Syslog agent: remote syslog unexpectedly closed the connection")
if last_destination:
yield sleep(60, threading_event=self.stopping)
if not self.stopping.is_set():
IOLoop.current().add_callback(self._do_start)
return
else:
self.syslog_server_is_available.set()
logger.info("Syslog agent: connected to destination '%s'", idx)
IOLoop.current().add_callback(self._wait_for_messages)
# the next wait will return if self.stopping is set thanks to the end of _wait_for_messages
yield self.closed_connection_event.wait()
self.syslog_server_is_available.clear()
if self.stopping.is_set():
# shutdown
IOLoop.current().stop()
# the run_method will then return, terminating the publication thread
else:
# lost connection to the remote syslog server
if last_destination:
# we wait 1 minute before trying to reconnect
yield sleep(60, threading_event=self.stopping)
if not self.stopping.is_set():
IOLoop.current().add_callback(self._do_start)
def _consume_relp_client_response(self, f):
logger = logging.getLogger(__name__)
status, event = f.result()
if status:
logger.debug("Syslog agent: Pushing event '%s' to published queue", event.uuid)
self.published_messages_queue.put_nowait(event.lmdb_idx())
else:
logger.debug("Syslog agent: Pushing event '%s' to failed queue", event.uuid)
self.failed_messages_queue.put_nowait(event.lmdb_idx())
@coroutine
def _wait_for_messages(self):
logger = logging.getLogger(__name__)
# wait for LMDB messages. we stop the loop if we are asked to stop, or if we lost the connection to
# the remote syslog server
compress = self.destination.compress
while (not self.stopping.is_set()) and (not self.closed_connection_event.is_set()):
try:
idx, obj = yield self.publication_queue.get_wait(deadline=timedelta(seconds=1))
except TimeoutError:
continue
try:
ev = Event.load(obj)
except ParsingError:
logger.info("agent _wait_for_messages: parsing error, an event has been dropped")
continue
try:
ev.verify_hmac()
except InvalidSignature:
logger.info("agent _wait_for_messages: invalid HMAC, an event has been dropped")
continue
logger.debug("Sending one event to remote syslog: {}".format(ev.uuid))
future = self.syslog_or_relp_client.publish_event(
event=ev,
frmt=self.destination.frmt,
compress=compress
)
IOLoop.current().add_future(future, self._consume_relp_client_response)
if self.stopping.is_set():
# we were asked to stop (shutdown) ==> close connection to remote server
yield self.syslog_or_relp_client.stop()
[docs]class StoreMessagesInLMDB(Thread):
"""
The `StoreMessagesInLMDB` thread gets messages from the TCP, UDP and unix sockets, via a queue, and pushes
them to LMDB
"""
def __init__(self, received_messages_queue, lmdb_db_name):
super(StoreMessagesInLMDB, self).__init__(name="Store messages in LMDB")
self.received_messages_queue = received_messages_queue
self.lmdb_db_name = lmdb_db_name
self.stopping = threading.Event()
# noinspection PyDocstring
def run(self):
lmdb = LmdbWrapper.get_instance(self.lmdb_db_name)
lmdb_queue = lmdb.queue("received_messages")
logger = logging.getLogger(__name__)
# we loop until we have been told to stop, and we have no more messages to handle
while (not self.stopping.is_set()) or (not self.received_messages_queue.empty()):
try:
data = self.received_messages_queue.get(block=True, timeout=1)
except QueueEmpty:
continue
try:
event = Event.parse_bytes_to_event(data, hmac=True)
except ParsingError:
logger.exception("Syslog agent: can't parse received message")
continue
except InvalidSignature:
logger.exception("Syslog agent: message had an invalid signature")
continue
# actually store message in LMDB
logger.debug("Pushing event '%s' to LMDB", event.uid)
lmdb_queue.push(event)
logger = logging.getLogger(__name__)
logger.debug("End of 'Store messages in LMDB' thread")
[docs]class RetrieveMessagesFromLMDB(Thread):
"""
The `RetrieveMessagesFromLMDB` thread gets messages from LMDB and pushes them to the Publications thread, via a
queue
"""
def __init__(self, lmdb_db_name, publication_queue, published_messages_queue, failed_messages_queue,
syslog_server_is_available, pause):
super(RetrieveMessagesFromLMDB, self).__init__(name="Retrieve messages from LMDB")
self.lmdb_db_name = lmdb_db_name
self.stopping = threading.Event()
self.pending_idx = set()
self.published_messages_queue = published_messages_queue
self.failed_messages_queue = failed_messages_queue
self.publication_queue = publication_queue
self.syslog_server_is_available = syslog_server_is_available
self.pause = pause * 60
# noinspection PyDocstring
def run(self):
lmdb = LmdbWrapper.get_instance(self.lmdb_db_name)
logger = logging.getLogger(__name__)
with lmdb.queue("received_messages") as lmdb_queue:
with lmdb.queue("failed_messages") as lmdb_failed_queue:
while not self.stopping.is_set():
# push back messages from the failed queue
for idx, obj in lmdb_failed_queue.generator():
age = arrow.get(obj['time'])
if (arrow.utcnow() - age).total_seconds() > self.pause:
lmdb_queue.push(obj['obj'], idx=idx)
lmdb_failed_queue.delete(idx)
# only add messages to the publication queue if we actually have a working connection
# to the remote syslog server; this way we prevent publication_queue to grow indefinitely
if self.syslog_server_is_available.is_set():
# wait 2 seconds maximum for new messages to publish
if lmdb_queue.wait_not_empty(tick=1, timeout=4, exclude=self.pending_idx):
for idx, obj in lmdb_queue.generator(exclude=copy(self.pending_idx)):
self.pending_idx.add(idx)
# try to publish the message to the remote syslog server
logger.debug("One message in LMDB: {}".format(idx))
self.publication_queue.put((idx, obj))
else:
logger.debug("Syslog agent: no more events in LMDB")
else:
logger.debug("Syslog agent: syslog server not available, not pushing events")
time.sleep(2)
# get "published" notifications from the publication thread
while True:
try:
idx = self.published_messages_queue.get(block=False)
except QueueEmpty:
break
logger.debug("Got confirmation for '{}'".format(idx))
# now we can safely delete the published event from LMDB
lmdb_queue.delete(idx=idx)
self.pending_idx.remove(idx)
# get "failed" notifications from the publication thread
while True:
try:
idx = self.failed_messages_queue.get(block=False)
except QueueEmpty:
break
# we push the failed message in the waiting list
obj = lmdb_queue.pop(idx)
if obj:
lmdb_failed_queue.push(
{
'time': str(arrow.utcnow()),
'obj': obj
},
idx=idx
)
self.pending_idx.remove(idx)
logger.debug("End of 'Retrieve messages from LMDB' thread")
[docs]class SyslogAgentClient(BaseSyslogClientConnection):
"""
Handles TCP connections
"""
def __init__(self, stream, address, syslog_parameters, received_messages):
super(SyslogAgentClient, self).__init__(stream, address, syslog_parameters)
self.received_messages = received_messages
[docs] def _process_event(self, bytes_event, protocol, relp_event_id=None):
"""
Handle TCP and RELP connections
"""
logger = logging.getLogger(__name__)
data = bytes_event.strip('\r\n ')
if data:
logger.debug("Syslog agent: got one event via %s", protocol)
self.received_messages.put_nowait(data)
def _process_group_events(self, bytes_events, relp_event_id):
pass