# encoding: utf-8
"""
This module provides stuff to implement a UDP/TCP/unix socket syslog/RELP server with Tornado
"""
__author__ = 'stef'
import logging
import ssl
import socket
import errno
from itertools import chain
import uuid
import threading
import os
from io import BytesIO
import certifi
import lz4
# noinspection PyCompatibility,PyPackageRequirements
from future.builtins import str as real_unicode
from future.builtins import bytes as real_bytes
from tornado.iostream import SSLIOStream, StreamClosedError
from tornado.tcpserver import TCPServer
from tornado.gen import coroutine, Return
from tornado.netutil import errno_from_exception, bind_sockets
from pyloggr.utils.constants import CIPHERS
from pyloggr.utils import read_next_token_in_stream
from pyloggr.syslog.udpserver import bind_udp_unix_socket, UDPServer, bind_udp_sockets
from pyloggr.config import SyslogServerConfig
def parse_relp_stream(stream):
next_token = read_next_token_in_stream(stream)
messages = []
relp_id = None
try:
while True:
try:
relp_id = next_token.next()
except StopIteration:
break
relp_id = int(relp_id)
_ = next_token.next()
length = int(next_token.next())
data = stream.read(length)
messages.append(data.strip(' \r\n'))
except (ValueError, StopIteration):
raise ValueError("Invalid RELP stream")
return relp_id, messages
def parse_tcp_stream(stream):
next_token = read_next_token_in_stream(stream)
messages = []
try:
while True:
try:
length = next_token.next()
except StopIteration:
break
length = int(length)
data = stream.read(length)
messages.append(data.strip(' \r\n'))
except (ValueError, StopIteration):
raise ValueError("Invalid TCP stream")
return messages
[docs]def wrap_ssl_sock(sock, ssl_options):
"""
Wrap a socket into a SSL socket
:param sock: socket to wrap
:param ssl_options: SSL options
:type ssl_options: pyloggr.config.SSLConfig
"""
if hasattr(ssl, 'SSLContext'):
# python 2.7.9
context = ssl.SSLContext(protocol=ssl_options.ssl_version)
context.load_cert_chain(ssl_options.certfile, ssl_options.keyfile)
context.verify_mode = ssl_options.cert_reqs
if ssl_options.cert_reqs != ssl.CERT_NONE:
if ssl_options.ca_certs:
context.load_verify_locations(ssl_options.ca_certs)
else:
security_logger = logging.getLogger('security')
security_logger.info("Using certifi store to verify clients certs")
context.load_verify_locations(certifi.where())
# context.load_default_certs(ssl.Purpose.CLIENT_AUTH)
context.options |= getattr(ssl, 'OP_NO_SSLv2', 0)
context.options |= getattr(ssl, 'OP_NO_SSLv3', 0)
context.options |= getattr(ssl, 'OP_NO_COMPRESSION', 0)
context.options |= getattr(ssl, 'OP_CIPHER_SERVER_PREFERENCE', 0)
context.options |= getattr(ssl, 'OP_SINGLE_DH_USE', 0)
context.options |= getattr(ssl, 'OP_SINGLE_ECDH_USE', 0)
if ssl.HAS_ECDH:
context.set_ecdh_curve('prime256v1')
context.set_ciphers(CIPHERS)
return context.wrap_socket(
sock=sock,
server_side=True,
do_handshake_on_connect=False,
suppress_ragged_eofs=True
)
else:
# no SSLContext, we simply call wrap_socket
ca_certs = None
if ssl_options.cert_reqs != ssl.CERT_NONE:
if ssl_options.ca_certs:
ca_certs = ssl_options.ca_certs
else:
security_logger = logging.getLogger('security')
security_logger.info("Using certifi store to verify clients certs")
ca_certs = certifi.where()
return ssl.wrap_socket(
sock=sock,
keyfile=ssl_options.keyfile,
certfile=ssl_options.certfile,
server_side=True,
cert_reqs=ssl_options.cert_reqs,
ssl_version=ssl_options.ssl_version,
ca_certs=ca_certs,
do_handshake_on_connect=False,
suppress_ragged_eofs=True,
ciphers=CIPHERS
)
[docs]class BaseSyslogServer(TCPServer, UDPServer):
"""
Basic Syslog/RELP server
"""
def __init__(self, syslog_parameters):
"""
:type syslog_parameters: SyslogParameters
"""
TCPServer.__init__(self)
self.syslog_parameters = syslog_parameters
self.listening = False
self.shutting_down = False
self.list_of_clients = []
@coroutine
[docs] def launch(self):
"""
Starts the server
- First we try to connect to RabbitMQ
- If successfull, we start to listen for syslog clients
Note
====
Tornado coroutine
"""
self.listening = False
yield self._start_syslog()
@coroutine
[docs] def _start_syslog(self):
"""
_start_syslog()
Start to listen for syslog clients
"""
if not self.listening:
self.listening = True
self.add_sockets(self.syslog_parameters.list_of_tcp_sockets)
self.add_udp_sockets(self.syslog_parameters.list_of_udp_sockets)
self.add_udp_sockets(self.syslog_parameters.list_of_unix_sockets)
[docs] def handle_data(self, data, sockname, peername):
"""
Inherit to handle UDP data
"""
raise NotImplementedError
@coroutine
[docs] def handle_stream(self, stream, address):
"""
handle_stream(stream, address)
Called by tornado when we have a new client.
:param stream: IOStream for the new connection
:type stream: `IOStream`
:param address: tuple (client IP, client source port)
:type address: tuple
Note
====
Tornado coroutine
"""
connection = BaseSyslogClientConnection(
stream=stream,
address=address,
syslog_parameters=self.syslog_parameters
)
self.list_of_clients.append(connection)
yield connection.on_connect()
self.list_of_clients.remove(connection)
def _stop_listen_sockets(self):
[self.io_loop.remove_handler(fd) for fd in self._sockets]
@coroutine
[docs] def _stop_syslog(self):
"""
_stop_syslog()
Stop listening for syslog connections
Note
====
Tornado coroutine
"""
if self.listening:
logger = logging.getLogger(__name__)
self.listening = False
logger.info("Closing RELP clients connections")
[client.disconnect() for client in self.list_of_clients]
logger.info("Stopping the RELP server")
# instead of calling self.stop(): we don't want to close the sockets
self._stop_listen_sockets()
@coroutine
[docs] def stop_all(self):
"""
stop_all()
Stops completely the server.
Note
====
Tornado coroutine
"""
yield self._stop_syslog()
@coroutine
[docs] def shutdown(self):
"""
Authoritarian shutdown
"""
self.shutting_down = True
yield self.stop_all()
self.stop()
[docs] def _handle_connection(self, connection, address):
"""
Inherits _handle_connection from parent TCPServer to manage SSL connections. Called by
Tornado when a client connects.
"""
port = connection.getsockname()[1]
if port not in self.syslog_parameters.ssl_ports:
return super(BaseSyslogServer, self)._handle_connection(connection, address)
try:
connection = wrap_ssl_sock(
sock=connection,
ssl_options=self.syslog_parameters.port_to_ssl_config[port]
)
except ssl.SSLError as err:
if err.args[0] == ssl.SSL_ERROR_EOF:
return connection.close()
else:
raise
except socket.error as err:
if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
return connection.close()
else:
raise
except IOError:
logger = logging.getLogger(__name__)
logger.error("IOError happened when client connected with TLS. Check the SSL configuration")
return connection.close()
try:
stream = SSLIOStream(
connection,
io_loop=self.io_loop,
max_buffer_size=self.max_buffer_size,
read_chunk_size=self.read_chunk_size
)
self.handle_stream(stream, address)
except Exception:
logger = logging.getLogger(__name__)
logger.error("Error in connection callback", exc_info=True)
raise
[docs]class SyslogParameters(object):
"""
Encapsulates the syslog configuration
"""
def __init__(self, conf):
# todo: simplify the mess
self.conf = conf
self.list_of_tcp_sockets = None
self.list_of_unix_sockets = None
self.list_of_udp_sockets = None
self.port_to_protocol = {}
self.unix_socket_names = []
self.port_to_ssl_config = {}
self.port_to_compress = {}
for server in conf.values():
assert(isinstance(server, SyslogServerConfig))
if server.stype == 'unix':
self.unix_socket_names.extend(server.socket_names)
for socket_name in server.socket_names:
self.port_to_protocol[socket_name] = 'socket'
else:
for port in server.ports:
# we don't need to track UDP ports
if server.stype in ('relp', 'tcp'):
self.port_to_protocol[port] = server.stype
if server.stype == 'tcp':
self.port_to_compress[port] = server.compress
self.ssl_ports = list(
chain.from_iterable([server.ports for server in conf.values() if server.ssl is not None])
)
for port in self.ssl_ports:
self.port_to_ssl_config[port] = [server.ssl for server in conf.values() if port in server.ports][0]
[docs] def delete_unix_sockets(self):
"""
Try to delete unix sockets files. Ignore any error and log them as warnings.
"""
for path in self.unix_socket_names:
try:
os.remove(path)
except OSError:
logger = logging.getLogger(__name__)
logger.warning("Can't delete unix socket '%s'", path)
[docs] def bind_all_sockets(self):
"""
Bind the sockets to the current server
:return: list of bound sockets
:rtype: list
"""
self.list_of_tcp_sockets = list()
self.list_of_udp_sockets = list()
for server in self.conf.values():
address = '127.0.0.1' if server.localhost_only else ''
[
self.list_of_tcp_sockets.extend(bind_sockets(port, address))
for port in server.ports
if server.stype in ('tcp', 'relp') and server.ports
]
[
self.list_of_udp_sockets.extend(bind_udp_sockets(port, address))
for port in server.ports
if server.stype == 'udp' and server.ports
]
old_umask = os.umask(0o000)
try:
self.list_of_unix_sockets = [
bind_udp_unix_socket(sock, mode=0o777)
for sock in self.unix_socket_names
]
finally:
os.umask(old_umask)
logger = logging.getLogger(__name__)
logger.info("Pyloggr syslog will listen on: {}".format(
','.join(str(x) for x in self.port_to_protocol.keys()))
)
logger.info("SSL ports: {}".format(','.join(str(x) for x in self.ssl_ports)))
[docs]class BaseSyslogClientConnection(object):
"""
Encapsulates a connection with a syslog client
"""
def __init__(self, stream, address, syslog_parameters):
"""
:type syslog_parameters: SyslogParameters
"""
self.syslog_parameters = syslog_parameters
self.stream = stream
self.address = address
if address:
# ipv4 or ipv6
self.stream.socket.setsockopt(socket.IPPROTO_TCP, socket.SO_KEEPALIVE, 1)
self.stream.set_nodelay(True)
self.stream.set_close_callback(self.on_stream_closed)
self.client_id = str(uuid.uuid4())
self.client_host = ''
self.client_port = None
self.flowinfo = None
self.scopeid = None
self.server_port = None
self.nb_messages_received = 0
self.nb_messages_transmitted = 0
self.packer_groups = []
self.dispatch_dict = {
'tcp': self.dispatch_tcp_client,
'relp': self.dispatch_relp_client,
'socket': self.dispatch_tcp_client
}
self.disconnecting = threading.Event()
@coroutine
[docs] def _read_next_tokens(self, nb_tokens=1):
"""
_read_next_token()
Reads the stream until we get a space delimiter
Note
====
Tornado coroutine
"""
# todo: perf optimisation ?
stream = self.stream
tokens = []
while len(tokens) < nb_tokens:
token = ''
while len(token) == 0:
token = yield stream.read_until_regex(r'\s')
token = token.strip(' \r\n')
tokens.append(token)
raise Return(tokens)
[docs] def on_stream_closed(self):
"""
on_stream_closed()
Called when a client has been disconnected
"""
self.disconnecting.set()
logger = logging.getLogger(__name__)
logger.info("Syslog client has been disconnected {}:{}".format(
self.client_host, self.client_port
))
[docs] def disconnect(self):
"""
Disconnects the client
"""
if not self.disconnecting.is_set():
self.disconnecting.set()
if not self.stream.closed():
self.stream.close()
@coroutine
[docs] def dispatch_relp_client(self):
"""
dispatch_relp_client()
Implements RELP protocol
Note
====
Tornado coroutine
From http://www.rsyslog.com/doc/relp.html::
Request:
RELP-FRAME = RELPID SP COMMAND SP DATALEN [SP DATA] TRAILER
DATA = [SP 1*OCTET] ; command-defined data, if DATALEN is 0, no data is present
COMMAND = 1*32ALPHA
TRAILER = LF
Response:
RSP-HEADER = TXNR SP RSP-CODE [SP HUMANMSG] LF [CMDDATA]
RSP-CODE = 200 / 500 ; 200 is ok, all the rest currently erros
HUAMANMSG = *OCTET ; a human-readble message without LF in it
CMDDATA = *OCTET ; semantics depend on original command
"""
logger = logging.getLogger(__name__)
security_logger = logging.getLogger('security')
read_next_tokens = self._read_next_tokens
try:
while not self.disconnecting.is_set():
relp_event_id = (yield read_next_tokens(1))[0]
try:
relp_event_id = int(relp_event_id)
except ValueError:
# bad client, let's disconnect
log_msg = "Relp ID ({}) was not an integer. We disconnect the RELP client {}:{}".format(
relp_event_id, self.client_host, self.client_port
)
logger.warning(log_msg)
security_logger.warning(log_msg)
self.disconnect()
break
command, length = yield read_next_tokens(2)
try:
length = int(length)
except ValueError:
# bad client, let's disconnect
log_msg = "Relp length ({}) was not an integer. We disconnect the RELP client {}:{}".format(
length, self.client_host, self.client_port
)
logger.warning(log_msg)
security_logger.warning(log_msg)
self.disconnect()
break
data = b''
if length > 0:
data = yield self.stream.read_bytes(length)
self._process_relp_command(relp_event_id, command, data)
except StreamClosedError:
logger.info("Stream was closed {}:{}".format(self.client_host, self.client_port))
self.disconnect()
except ssl.SSLError:
logger.warning("Something bad happened in the TLS conversation")
security_logger.exception("Something bad happened in the TLS conversation")
self.disconnect()
@coroutine
[docs] def dispatch_tcp_client(self):
"""
dispatch_tcp_client()
Implements Syslog/TCP protocol
Note
====
Tornado coroutine
From RFC 6587::
It can be assumed that octet-counting framing is used if a syslog
frame starts with a digit.
TCP-DATA = *SYSLOG-FRAME
SYSLOG-FRAME = MSG-LEN SP SYSLOG-MSG
MSG-LEN = NONZERO-DIGIT *DIGIT
NONZERO-DIGIT = %d49-57
SYSLOG-MSG is defined in the syslog protocol [RFC5424] and may also be considered to be the payload in [RFC3164]
MSG-LEN is the octet count of the SYSLOG-MSG in the SYSLOG-FRAME.
A transport receiver can assume that non-transparent-framing is used
if a syslog frame starts with the ASCII character "<" (%d60).
TCP-DATA = *SYSLOG-FRAME
SYSLOG-FRAME = SYSLOG-MSG TRAILER
TRAILER = LF / APP-DEFINED
APP-DEFINED = 1*2OCTET
SYSLOG-MSG is defined in the syslog protocol [RFC5424] and may also be considered to be the payload in [RFC3164]
"""
read_next_tokens = self._read_next_tokens
stream = self.stream
logger = logging.getLogger(__name__)
security_logger = logging.getLogger('security')
try:
while not self.disconnecting.is_set():
first_token = (yield read_next_tokens(1))[0]
if first_token[0] == b'<':
# non-transparent framing
rest_of_line = yield stream.read_until(b'\n')
syslog_msg = first_token + b' ' + rest_of_line
else:
# octet framing
try:
msg_len = int(first_token)
except ValueError:
log_msg = u"Syntax error from TCP client '{}:{}'. We disconnect it.".format(
self.client_host, self.client_port
)
logger.warning(log_msg)
security_logger.warning(log_msg)
self.disconnect()
break
syslog_msg = yield stream.read_bytes(msg_len)
if self.syslog_parameters.port_to_compress[self.server_port]:
try:
buf = BytesIO(lz4.decompress(syslog_msg))
except ValueError:
logger.error("Syslog server: tcp server: can't decompress data")
self.disconnect()
break
finally:
del syslog_msg
try:
messages = parse_tcp_stream(buf)
except ValueError:
logger.error("Syslog server: tcp server: can't parse decompressed data")
self.disconnect()
break
finally:
buf.close()
del buf
[self._process_event(message, 'tcp') for message in messages]
else:
self._process_event(syslog_msg.strip(' \r\n'), 'tcp')
except StreamClosedError:
logger.info(u"TCP stream was closed {}:{}".format(self.client_host, self.client_port))
self.disconnect()
except ssl.SSLError:
logger.warning("Something bad happened in the TLS conversation")
security_logger.exception("Something bad happened in the TLS conversation")
self.disconnect()
[docs] def _process_relp_command(self, relp_event_id, command, data):
"""
_process_relp_command(relp_event_id, command, data)
RELP client has sent a command. Find the type and make the right answer.
:param relp_event_id: RELP ID, sent by client
:param command: the RELP command
:param data: data transmitted after command (can be empty)
"""
logger = logging.getLogger(__name__)
security_logger = logging.getLogger('security')
if command == 'open':
data = data.strip(b' \r\n')
answer = '{} rsp {} 200 OK\n{}\n'.format(relp_event_id, len(data) + 7, data)
self.stream.write(answer)
elif command == 'close':
self.stream.write('{} rsp 0\n0 serverclose 0\n'.format(relp_event_id))
self.disconnect()
elif command == 'syslog':
data = data.strip(b' \r\n')
self._process_event(data, 'relp', relp_event_id)
elif command == 'lz4':
try:
buf = BytesIO(lz4.decompress(data))
except ValueError:
logger.error("Dropping compressed stream: invalid compressed data")
self.disconnect()
return
finally:
del data
try:
parsed_relp_id, messages = parse_relp_stream(buf)
except ValueError:
# malformed stream
logger.error("Dropping compressed stream: invalid RELP data")
self.disconnect()
return
finally:
buf.close()
del buf
self._process_group_events(messages, relp_event_id)
else:
log_msg = "Unknown command '{}' from {}:{}".format(command, self.client_host, self.client_port)
security_logger.warning(log_msg)
logger.warning(log_msg)
self.stream.write('{} rsp 6 200 OK\n'.format(relp_event_id))
def _process_event(self, bytes_event, protocol, relp_event_id=None):
raise NotImplementedError
def _process_group_events(self, bytes_events, relp_event_id):
raise NotImplementedError
def _set_socket(self):
logger = logging.getLogger(__name__)
try:
server_sockname = self.stream.socket.getsockname()
if isinstance(server_sockname, real_bytes) or isinstance(server_sockname, real_unicode):
self.server_port = server_sockname # socket unix
elif len(server_sockname) == 2:
self.server_port = server_sockname[1] # ipv4
(self.client_host, self.client_port) = self.stream.socket.getpeername()
elif len(server_sockname) == 4:
self.server_port = server_sockname[1] # ipv6
(self.client_host, self_client_port, self.flowinfo, self.scopeid) = self.stream.socket.getpeername()
else:
raise ValueError
except StreamClosedError:
logger.info("The client went away before it could be dispatched")
self.disconnect()
raise
except ValueError:
logger.warning("Unknown socket type")
self.disconnect()
raise
@coroutine
[docs] def on_connect(self):
"""
on_connect()
Called when a client connects to SyslogServer.
We find the protocol by looking at the connecting port. Then run the appropriate dispatch method.
Note
====
Tornado coroutine
"""
logger = logging.getLogger(__name__)
try:
self._set_socket()
except (StreamClosedError, ValueError):
return
t = self.syslog_parameters.port_to_protocol.get(self.server_port, None)
if t is None:
logger.warning("Don't know what to do with port '{}'".format(self.server_port))
self.disconnect()
return
dispatch_function = self.dispatch_dict.get(t, None)
if dispatch_function is None:
logger.warning("on_connect: no dispatch function for '%s'", t)
self.disconnect()
return
logger.info('New client is connected {}:{} to {}'.format(
self.client_host, self.client_port, self.server_port
))
# noinspection PyCallingNonCallable
yield dispatch_function()