# encoding: utf-8
"""
RELP syslog client
"""
from __future__ import absolute_import, division, print_function
__author__ = 'stef'
import logging
from tornado.gen import coroutine, Return
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from toro import Event as ToroEvent
from future.utils import raise_from
from pyloggr.utils.constants import RELP_OPEN_COMMAND, RELP_CLOSE_COMMAND
from pyloggr.utils import sleep
from .base import GenericClient
security_logger = logging.getLogger("security")
class RELPException(Exception):
pass
class ServerClose(RELPException):
pass
class ServerBoo(RELPException):
pass
[docs]class RELPClient(GenericClient):
"""
Utility class to send messages or whole files to a RELP server, using an asynchrone TCP client
Parameters
==========
server: str
RELP server hostname or IP
port: int
RELP server port
use_ssl: bool
Should the client connect with SSL
"""
def __init__(self, server, port, use_ssl=False, verify_cert=True, hostname=None, ca_certs=None,
client_key=None, client_cert=None, server_deadline=120):
super(RELPClient, self).__init__(server, port, use_ssl, verify_cert, hostname, ca_certs, client_key,
client_cert)
self.current_relp_id = 1
self.server_deadline = server_deadline
self.acks = None
self.unexpected_close = None
@coroutine
[docs] def start(self):
"""
start()
Connect to the RELP server and send 'open' command
:raises `socket.error`: if TCP connection fails
Note
====
Tornado coroutine
"""
yield super(RELPClient, self).start()
self.current_relp_id = 1
self._say_hello()
self.current_relp_id += 1
self.acks = {}
self.unexpected_close = ToroEvent()
# receive the responses in background
IOLoop.current().add_callback(self._read_streaming_responses)
raise Return(self.closed_connection_event)
def _say_hello(self):
logger = logging.getLogger(__name__)
try:
yield self.stream.write(str(self.current_relp_id) + " " + RELP_OPEN_COMMAND)
response_id, code, data = yield self._read_one_response()
if code != 200:
logger.error("RELP server sent a BOO after the 'open' command")
raise ServerBoo(data)
except (ServerClose, StreamClosedError, ServerBoo) as ex:
logger.error("RELP opening connection failed")
raise_from(ServerClose("RELP opening connection failed"), ex)
@coroutine
def _read_streaming_responses(self):
logger = logging.getLogger(__name__)
while True:
try:
response_id, code, data = yield self._read_one_response()
except ServerClose:
logger.error("_read_streaming_responses: server announced unexpected close")
self.stream.close()
self.unexpected_close.set()
return
except StreamClosedError:
logger.error("_read_streaming_responses: stream closed error")
self.unexpected_close.set()
return
except ServerBoo:
logger.error("_read_streaming_responses: did not understand response")
self.stream.close()
self.unexpected_close.set()
return
if code == 200:
logger.debug("RELP client: remote RELP server ACKed one message")
self.acks[response_id] = True if code == 200 else False
@coroutine
def _read_one_response(self):
response_id = yield self._read_next_token()
if response_id == "0":
raise ServerClose("RELP server announced a serverclose")
try:
response_id = int(response_id)
yield self._read_next_token() # rsp
length = yield self._read_next_token()
length = int(length)
if length > 0:
data = yield self.stream.read_bytes(length)
data = data.strip('\r\n ').split(None, 1)
code = int(data[0])
cmddata = ''
if len(data) > 0:
cmddata = data[1].strip('\r\n ')
else:
code = None
cmddata = ''
except (ValueError, TypeError) as ex:
raise_from(ServerBoo("did not understand relp server response"), ex)
return
raise Return((response_id, code, cmddata))
@coroutine
[docs] def stop(self):
"""
stop()
Disconnect from the RELP server
"""
if not self.stream.closed():
try:
yield self.stream.write(str(self.current_relp_id) + " " + RELP_CLOSE_COMMAND)
except StreamClosedError:
pass
else:
# yield self._read_one_response()
self.stream.close()
yield super(RELPClient, self).stop()
@coroutine
[docs] def send_events(self, events, frmt="RFC5424", compress=False):
"""
send_events(events, frmt="RFC5424")
Send multiple events to the RELP server
:param events: events to send (iterable of :py:class:`Event`)
:param frmt: event dumping format
:param compress: if True, send the events as one LZ4-compressed line
:type events: iterable of Event
:type frmt: str
:type compress: bool
"""
if self.closed_connection_event.is_set():
raise Return((False, len(events) * [False]))
n_start = self.current_relp_id
logger = logging.getLogger(__name__)
relp_ids = set()
nb_total_events = 0
if compress:
# same relp ID for every line
current_relp_id_str = str(self.current_relp_id)
relp_ids.add(self.current_relp_id)
# increment current_relp_if *before* any yield !
self.current_relp_id += 1
nb_total_events = 1
bytes_events = (event.dump(frmt=frmt) + "\n" for event in events)
lines = (str(len(bytes_event)) + ' ' + bytes_event for bytes_event in bytes_events)
relp_lines = (current_relp_id_str + " syslog " + line + "\n" for line in lines)
uncompressed = "".join(relp_lines)
# compress the buf with LZ4
compressed = yield self.compress_thread.submit(self._compress, uncompressed)
ratio = int(100 - (100 * len(compressed) // len(uncompressed)))
logger.debug("RELP client: LZ4 compression ratio: %s", ratio)
relp_line = current_relp_id_str + " lz4 " + str(len(compressed)) + ' ' + compressed + "\n"
try:
yield self.stream.write(relp_line)
except StreamClosedError:
logger.info("Relp client sending events: Stream closed error ?!")
self.unexpected_close.set()
else:
# send the events separately, without compression
relp_lines = []
# there is no "yield" in the for loop, so the self.current_relp_id can be incremented without
# any race condition (at the cost of consumed RAM)
for event in events:
bytes_event = event.dump(frmt=frmt)
line = str(len(bytes_event)) + ' ' + bytes_event
relp_lines.append(str(self.current_relp_id) + " " + "syslog " + line + "\n")
relp_ids.add(self.current_relp_id)
self.current_relp_id += 1
nb_total_events += 1
try:
yield self.stream.write("".join(relp_lines))
except StreamClosedError:
logger.info("Relp client sending events: Stream closed error ?!")
self.unexpected_close.set()
# wait that until we have all answers
elapsed = 0
while (not relp_ids.issubset(set(self.acks.keys()))) and (not self.unexpected_close.is_set()):
yield sleep(1, wake_event=self.unexpected_close)
elapsed += 1
if elapsed >= self.server_deadline:
logger.warning("RELP client: the server did not sent all answers before deadline. Giving up.")
self.stream.close()
self.unexpected_close.set()
if compress:
if n_start in self.acks:
status = [self.acks[n_start] and not self.unexpected_close.is_set()]
acks = nb_total_events * status
raise Return((status, acks))
else:
raise Return((False, nb_total_events * [False]))
else:
# return all the ACKs that we've got
acks = [self.acks.get(relp_id, False) for relp_id in sorted(relp_ids)]
raise Return((not self.unexpected_close.is_set(), acks))