Source code for pyloggr.main.shipper2fs
# encoding: utf-8
"""
Ships events from RabbitMQ to the filesystem
"""
from __future__ import absolute_import, division, print_function
__author__ = 'stef'
import logging
from datetime import timedelta
from os.path import dirname, exists, join
from io import open
import os
from tornado.concurrent import Future
from tornado.gen import coroutine, Return, TimeoutError
from tornado.ioloop import IOLoop, PeriodicCallback
from sortedcontainers import SortedSet
import lockfile
from future.utils import lmap, viewvalues
from pyloggr.rabbitmq.consumer import Consumer, RabbitMQConnectionError, RabbitMQMessage
from pyloggr.utils import sleep
from pyloggr.config import Config
from pyloggr.event import Event, ParsingError, InvalidSignature
logger = logging.getLogger(__name__)
security_logger = logging.getLogger('security')
[docs]class FilesystemShipper(object):
"""
The FilesystemShipper takes events from a RabbitMQ queue and writes them on filesystem
Parameters
==========
rabbitmq_config: pyloggr.rabbitmq.Configuration
RabbitMQ configuration
export_fs_config: pyloggr.config.Shipper2FSConfig
Log export configuration
"""
def __init__(self, rabbitmq_config, export_fs_config):
"""
:type rabbitmq_config: pyloggr.rabbitmq.Configuration
:type export_fs_config: pyloggr.config.Shipper2FSConfig
"""
self.consumer = Consumer(rabbitmq_config)
self.export_fs_config = export_fs_config
self.shutting_down = None
self.event_queue = None
self.closed_conn_event = None
self.files_queues = {}
@coroutine
[docs] def launch(self):
"""
launch()
Start shipper2fs
"""
# connect to RabbitMQ
try:
self.closed_conn_event = yield self.consumer.start()
except (RabbitMQConnectionError, TimeoutError):
logger.error("Can't connect to RabbitMQ")
yield sleep(60)
if not self.shutting_down:
IOLoop.instance().add_callback(self.launch)
return
# consume events and put them in event_queue
self.event_queue = self.consumer.start_consuming()
IOLoop.instance().add_callback(self._consume)
# wait until we lose rabbitmq connection
yield self.closed_conn_event.wait()
# we lost connection to RabbitMQ (by accident, or because stop() was called)
yield self.stop()
logger.info("Waiting {} seconds before trying to reconnect".format(Config.SLEEP_TIME))
yield sleep(Config.SLEEP_TIME)
if not self.shutting_down:
# try to reconnect
IOLoop.instance().add_callback(self.launch)
@coroutine
[docs] def stop(self):
"""
stop()
Stops the shipper
"""
logger.info("Stopping shipper2fs")
if self.consumer:
yield self.consumer.stop()
self.consumer = None
# stop the "export to FS" queues
lmap(lambda queue: queue.stop(), viewvalues(self.files_queues))
@coroutine
[docs] def shutdown(self):
"""
shutdown()
Shutdowns (stops definitely) the shipper.
"""
logger.info("Shutting down shipper2fs")
self.shutting_down = True
yield self.stop()
@coroutine
def _consume(self):
event_queue = self.event_queue
if event_queue is None:
return
# loop until we lose rabbitmq connection
while (not self.closed_conn_event.is_set()) or (event_queue.qsize() != 0):
try:
message = yield event_queue.get_wait(deadline=timedelta(seconds=1))
except TimeoutError:
pass
else:
IOLoop.instance().add_callback(self.export, message)
@coroutine
[docs] def export(self, message):
"""
export(message)
Export event to filesystem
:param message: RabbitMQ message
:type message: RabbitMQMessage
"""
try:
event = Event.parse_bytes_to_event(message.body, hmac=True)
except ParsingError:
logger.info("shipper2fs: dropping one unparsable message")
message.ack()
except InvalidSignature:
logger.info("shipper2fs: dropping one message with invalid signature")
message.ack()
security_logger.critical("shipper2fs: dropping one message with invalid signature")
security_logger.info(message.body)
else:
# put event in export queue and wait until it has been exported
res = yield self._append(event)
if res:
message.ack()
else:
message.nack()
@coroutine
[docs] def _append(self, event):
"""
:type event: pyloggr.event.Event
"""
# replace fields: $SOURCE, $DATE, $SEVERITY, $FACILITY, $APP_NAME
filename = self.export_fs_config.filename.replace(
'$SOURCE', event.source
).replace(
'$SEVERITY', event.severity
).replace(
'$FACILITY', event.facility
).replace(
'$APP_NAME', event.app_name
).replace(
'$DATE', str(event.timereported.date())
).lstrip('/')
filename = join(self.export_fs_config.directory, filename)
if filename not in self.files_queues:
self.files_queues[filename] = FSQueue(
filename, self.export_fs_config.seconds_between_flush, self.export_fs_config.frmt
)
queue = self.files_queues[filename]
res = yield queue.append(event)
raise Return(res)
[docs]class FSQueue(object):
"""
Store events that have to be exported to a given filename
"""
def __init__(self, filename, period, frmt):
self.filename = filename
self.frmt = frmt
self._queue = SortedSet()
self.futures = {}
self._periodic = PeriodicCallback(self.flush, period * 1000)
self._periodic.start()
self._flushing = False
@coroutine
[docs] def append(self, event):
"""
append(event)
Add an event to the queue. The coroutine resolves when the event has been exported.
:param event: event
:type event: pyloggr.event.Event
"""
if event in self._queue:
raise Return(True)
self._queue.add(event)
self.futures[event.uuid] = Future()
res = yield self.futures[event.uuid]
del self.futures[event.uuid]
raise Return(res)
[docs] def flush(self):
"""
flush()
Actually flush the events to the file
"""
if self._flushing:
return
self._flushing = True
if len(self._queue) == 0:
return
s = "\n".join(event.dump(frmt=self.frmt) for event in self._queue) + '\n'
dname = dirname(self.filename)
if not exists(dname):
os.makedirs(dname)
logger.debug("Flushing '{}'".format(self.filename))
try:
with lockfile.LockFile(self.filename):
with open(self.filename, 'ab') as fh:
fh.write(s)
except (OSError, lockfile.Error):
logger.exception("shipper2fs: flushing failed")
lmap(lambda event: self.futures[event.uuid].set_result(False), self._queue)
else:
lmap(lambda event: self.futures[event.uuid].set_result(True), self._queue)
finally:
self._queue = SortedSet()
self._flushing = False
[docs] def stop(self):
"""
stop()
Stop the queue
"""
self._periodic.stop()
# notify that the events inside this queue were not exported
if self.futures:
lmap(lambda future: future.set_result(False), viewvalues(self.futures))