# encoding: utf-8
"""
Pyloggr Web interface
"""
from __future__ import absolute_import, division, print_function
__author__ = 'stef'
# todo: useful metrics
import logging
from pkg_resources import resource_filename
from tornado.web import RequestHandler, Application, url
from tornado.websocket import WebSocketHandler
from tornado.httpserver import HTTPServer
from tornado.gen import coroutine, TimeoutError
from tornado.ioloop import PeriodicCallback, IOLoop
from jinja2 import Environment, PackageLoader
import momoko
# noinspection PyCompatibility
import psycopg2
from pyloggr.rabbitmq import management
from pyloggr.rabbitmq import Configuration as RMQConfig
from pyloggr.rabbitmq.notifications_consumer import NotificationsConsumer
from pyloggr.rabbitmq import RabbitMQConnectionError
from pyloggr.utils.observable import Observable, Observer
from pyloggr.config import Config
from pyloggr.cache import Cache
from pyloggr.utils import sleep
logger = logging.getLogger(__name__)
PERIODIC_RABBIT_STATUS_TIME = 10 * 1000
[docs]class SyslogServers(Observable, Observer):
"""
Data about the running Pyloggr's syslog servers
Notified by Rabbitmq
Observed by Websocket
"""
def __init__(self):
Observable.__init__(self)
self.servers = dict()
# get initial data from Redis
self.servers = {syslog_server.server_id: {
'id': syslog_server.server_id,
'ports': syslog_server.ports,
'clients': {client['id']: {
'id': client['id'],
'host': client['host'],
'client_port': client['client_port'],
'server_port': client['server_port']
} for client in syslog_server.clients}
} for syslog_server in Cache.syslog_list.values()}
def notified(self, d):
# get updates from rabbitmq
if d['action'] == "add_client":
self.servers[d['server_id']]['clients'][d['id']] = {
'id': d['id'],
'host': d['host'],
'client_port': d['client_port'],
'server_port': d['server_port']
}
elif d['action'] == "remove_client":
del self.servers[d['server_id']]['clients'][d['id']]
elif d['action'] == "add_server":
self.servers[d['server_id']] = {
'id': d['server_id'],
'clients': {},
'ports': d['ports']
}
elif d['action'] == "remove_server":
del self.servers[d['server_id']]
else:
logger.debug("Action: '{}'".format(d['action']))
self.notify()
def notify(self):
status.syslog = bool(self.servers)
d = dict()
d['action'] = 'syslogs'
d['servers'] = self.servers
self.notify_observers(d)
[docs]class Status(Observable):
"""
Encapsulates the status of the the various pyloggr components
"""
labels = ['rabbitmq', 'redis', 'postgresql', 'syslog', 'parser', 'shipper']
def __init__(self):
Observable.__init__(self)
def __setattr__(self, key, value):
object.__setattr__(self, key, value)
if key in self.labels:
self.notify()
def notify(self):
d = dict()
for f in ['rabbitmq', 'redis', 'postgresql', 'syslog', 'parser', 'shipper']:
d[f] = "On" if getattr(self, f, None) else "Off"
d['action'] = 'status'
self.notify_observers(d)
# noinspection PyAbstractClass
[docs]class SyslogClientsFeed(WebSocketHandler, Observer):
"""
Websocket used to talk with the browser
"""
def open(self):
logger.debug("Websocket is opened")
self.set_nodelay(True)
# get status of pyloggr processes
status.register(self)
# get notifications from syslog servers
syslog_servers.register(self)
# get stats from RabbitMQ management API
rabbitmq_stats.register(self)
# get stats from PGSQL
pgsql_stats.register(self)
[docs] def notified(self, d):
"""
`notified` is called by observables, when some event is meant to be communicated to the web frontend
:param d: the event data to transmit to the web frontend
:type d: dict
"""
self.write_message(d)
def on_message(self, message):
# browser sent a message
logger.debug("Browser said: {}".format(message))
if message == "getStatus":
status.notify()
syslog_servers.notify()
rabbitmq_stats.notify()
pgsql_stats.notify()
def on_close(self):
logger.debug("WebSocket closed")
syslog_servers.unregister(self)
status.unregister(self)
rabbitmq_stats.unregister(self)
pgsql_stats.unregister(self)
def check_origin(self, origin):
return True
[docs]class QueryLogs(RequestHandler):
"""
Query the log database
"""
pass
[docs]class StatusPage(RequestHandler):
"""
Displays a status page
"""
def get(self):
html_output = self.application.templates['status'].render()
self.write(html_output)
class Upload(RequestHandler):
pass
# todo: upload log files via form or via POST API
[docs]class PgSQLStats(Observable):
"""
Gather information from the database
"""
def __init__(self):
Observable.__init__(self)
self._updating = False
self.stats = 0
@coroutine
def update(self):
if self._updating:
return
self._updating = True
db_conn = None
stats = 0
status_inc = True
for shipper in Config.SHIPPER2PGSQL:
try:
db_conn = yield momoko.Op(momoko.Connection().connect, Config.SHIPPER2PGSQL[shipper].dsn)
cursor = yield momoko.Op(db_conn.execute, 'SELECT COUNT(*) FROM {};'.format(
Config.SHIPPER2PGSQL[shipper].tablename
))
stats += cursor.fetchone()[0]
except psycopg2.Error:
logger.exception("Database seems down")
status_inc = False
finally:
if db_conn is not None:
db_conn.close()
status.postgresql = status_inc
self.stats = stats
self._updating = False
self.notify()
def notify(self):
d = dict()
d['action'] = 'pgsql.stats'
# noinspection PyUnresolvedReferences
d['status'] = status.postgresql
d['stats'] = self.stats
self.notify_observers(d)
[docs]class RabbitMQStats(Observable):
"""
Gather information from RabbitMQ management API
"""
def __init__(self):
Observable.__init__(self)
self.queue_names = [machine.source.queue for machine in Config.MACHINES.values()]
self.queue_names.extend(shipper.source_queue for shipper in Config.SHIPPER2PGSQL.values())
self.queues = {name: {} for name in self.queue_names}
self._updating = False
self._rabbitmq_api_client = management.Client(
host=Config.RABBITMQ_HTTP,
user=Config.RABBITMQ.user,
passwd=Config.RABBITMQ.password,
timeout=13
)
@coroutine
def update(self):
if self._updating:
return
self._updating = True
results = dict()
try:
for name in self.queue_names:
results[name] = yield self._rabbitmq_api_client.get_queue(Config.RABBITMQ.vhost, name)
except management.NetworkError:
logger.warning("RabbitMQ management API does not seem available")
status.rabbitmq = False
except management.HTTPError as ex:
logger.warning("Management API answered error code: '{}'".format(ex.status))
logger.debug("Reason: {}".format(ex.reason))
status.rabbitmq = False
else:
status.rabbitmq = True
if status.rabbitmq:
for name in self.queue_names:
self.queues[name]['messages'] = results[name]['messages']
else:
self.queues = {name: {} for name in self.queue_names}
self._updating = False
self.notify()
def notify(self):
d = dict()
d['action'] = 'queues.stats'
# noinspection PyUnresolvedReferences
d['status'] = status.rabbitmq
d['queues'] = self.queues
self.notify_observers(d)
class PyloggrApplication(Application):
def __init__(self, url_prefix):
urls = [
('/status/?', StatusPage, 'status'),
('/query/?', QueryLogs, 'query'),
('/websocket/?', SyslogClientsFeed, 'websocket'),
]
handlers = [url(url_prefix + path, method, name=name) for (path, method, name) in urls]
settings = {
'autoreload': False,
'debug': True,
'static_path': resource_filename('pyloggr', '/static'),
'static_url_prefix': url_prefix + '/static/',
'cookie_secret': Config.COOKIE_SECRET
}
template_loader = PackageLoader('pyloggr', 'templates')
template_env = Environment(loader=template_loader)
template_env.globals['reverse'] = self.reverse_url
template_env.globals['prefix'] = url_prefix
self.templates = {
'status': template_env.get_template('status.html')
}
Application.__init__(self, handlers, **settings)
[docs]class WebServer(object):
"""
Pyloggr process for the web frontend part
"""
def __init__(self, sockets):
self.app = PyloggrApplication('/syslog')
self.http_server = HTTPServer(self.app)
self._periodic = None
self.sockets = sockets
@coroutine
def launch(self):
global syslog_servers, notifications_consumer, status, rabbitmq_stats, pgsql_stats, notifications_consumer
status = Status()
rabbitmq_stats = RabbitMQStats()
pgsql_stats = PgSQLStats()
notifications_consumer = NotificationsConsumer(
rabbitmq_config=RMQConfig(
host=Config.RABBITMQ.host,
port=Config.RABBITMQ.port,
user=Config.RABBITMQ.user,
password=Config.RABBITMQ.password,
vhost=Config.RABBITMQ.vhost,
exchange=Config.NOTIFICATIONS.exchange,
binding_key=Config.NOTIFICATIONS.binding_key
)
)
syslog_servers = SyslogServers()
notifications_consumer.register(syslog_servers)
IOLoop.instance().add_callback(self.get_rabbit_notifications)
self.http_server.add_sockets(self.sockets)
self.start_periodic()
@coroutine
def get_rabbit_notifications(self):
try:
lost_rabbit_connection = yield notifications_consumer.start()
except (RabbitMQConnectionError, TimeoutError):
# no rabbitmq connection ==> no notifications
yield sleep(60)
IOLoop.instance().add_callback(self.get_rabbit_notifications)
return
else:
# we use a callback to immediately return (start_consuming never returns)
IOLoop.instance().add_callback(notifications_consumer.start_consuming)
yield lost_rabbit_connection.wait()
yield sleep(60)
IOLoop.instance().add_callback(self.get_rabbit_notifications)
@coroutine
def shutdown(self):
logger.debug("Dropping HTTP clients")
yield self.http_server.close_all_connections()
logger.debug("Stopping stats refresh")
self.stop_periodic()
logger.debug("Asking the notification consumer to stop")
yield notifications_consumer.stop()
logger.debug("Stopping the HTTP server")
self.http_server.stop()
def start_periodic(self):
self.app.rabbitmq_stats = RabbitMQStats()
self.app.pgsql_stats = PgSQLStats()
self.update_stats()
if self._periodic is None:
self._periodic = PeriodicCallback(self.update_stats, PERIODIC_RABBIT_STATUS_TIME)
self._periodic.start()
def stop_periodic(self):
if self._periodic is not None:
self._periodic.stop()
self._periodic = None
@coroutine
def update_stats(self):
yield rabbitmq_stats.update()
yield pgsql_stats.update()
status.redis = True
status = None
rabbitmq_stats = None
pgsql_stats = None
syslog_servers = None
notifications_consumer = None