Source code for pyloggr.utils.lmdb_wrapper

# encoding: utf-8

"""
Small wrapper around lmdb
"""
__author__ = 'stef'

import logging
import time

# noinspection PyCompatibility
from concurrent.futures import ThreadPoolExecutor, Future
from tornado.ioloop import IOLoop
import ujson
import lmdb
from future.utils import viewitems

from pyloggr.utils import to_bytes, to_unicode


def _json_decode(obj_bytes):
    if obj_bytes is None:
        return None
    try:
        return ujson.loads(obj_bytes)
    except (ValueError, TypeError):
        logging.getLogger(__name__).exception("lmdb_wrapper: JSON decoding error: {}".format(
            to_unicode(obj_bytes)
        ))
        return None


def _dumps(obj):
    if hasattr(obj, 'dumps'):
        return to_bytes(obj.dumps())
    else:
        return to_bytes(ujson.dumps(obj))


[docs]class LmdbWrapper(object): """ Wrapper around lmdb that eases storage and retrieval of JSONable python objects """ opened_db = dict() def __init__(self, path, size=52428800): self.path = path self.env = None self.size = size def open(self, sync=True, metasync=True, lock=True, max_dbs=10, max_spare_txns=10): if not self.env: # noinspection PyArgumentList self.env = lmdb.Environment( path=self.path, map_size=self.size, max_dbs=max_dbs, sync=sync, metasync=metasync, lock=lock, max_spare_txns=max_spare_txns ) self.opened_db[self.path] = self return self def close(self): if self.env: self.env.close() self.env = None if self.path in self.opened_db: del self.opened_db[self.path] def __enter__(self): return self.open() # noinspection PyUnusedLocal def __exit__(self, exc_type, exc_value, traceback): self.close() @classmethod
[docs] def get_instance(cls, path): """ :param path: database directory name :type path: str :return: LmdbWrapper object :rtype: LmdbWrapper """ if path not in cls.opened_db: cls.opened_db[path] = cls(path).open() return cls.opened_db[path]
@classmethod def close_all(cls): for db in cls.opened_db.values(): db.close() def get_obj(self, key): key = to_bytes(key) with self.env.begin() as txn: obj_bytes = txn.get(key) if obj_bytes is None: return None return _json_decode(obj_bytes) def __getitem__(self, item): return self.get_obj(item) def __setitem__(self, key, value): self.put_obj(key, value) def queue(self, queue_name): return Queue(queue_name, self) def hash(self, hash_name): return Hash(hash_name, self) def set(self, set_name): return Set(set_name, self) def get_many_objs(self, keys): with self.env.begin() as txn: objs_bytes = { key: txn.get(to_bytes(key)) for key in keys } return { key: None if obj_bytes is None else _json_decode(obj_bytes) for key, obj_bytes in viewitems(objs_bytes) } def put_obj(self, key, obj): key = to_bytes(key) with self.env.begin(write=True) as txn: return txn.put(key, _dumps(obj), overwrite=True) def put_many_objs(self, d): with self.env.begin(write=True) as txn: return { key: txn.put(to_bytes(key), _dumps(obj), overwrite=True) for key, obj in viewitems(dict(d)) } def del_obj(self, key): key = to_bytes(key) with self.env.begin(write=True) as txn: return txn.delete(key) def del_many_obj(self, keys): with self.env.begin(write=True) as txn: return { key: txn.delete(to_bytes(key)) for key in keys } def add_to_set(self, set_name, item): item = to_unicode(item) idx = "__set__" + to_bytes(set_name) with self.env.begin(write=True) as txn: existing_items = txn.get(idx) if existing_items is None: txn.put(idx, ujson.dumps([item])) else: existing_items = ujson.loads(existing_items) if item not in existing_items: existing_items.append(item) txn.put(idx, ujson.dumps(existing_items)) def remove_from_set(self, set_name, item): item = to_unicode(item) idx = "__set__" + to_bytes(set_name) with self.env.begin(write=True) as txn: existing_items = txn.get(idx) if existing_items is not None: existing_items = ujson.loads(existing_items) if item in existing_items: existing_items.remove(item) txn.put(idx, ujson.dumps(existing_items)) def is_in_set(self, set_name, item): item = to_unicode(item) idx = "__set__" + to_bytes(set_name) with self.env.begin() as txn: existing_items = txn.get(idx) if existing_items is None: return False existing_items = ujson.loads(existing_items) return item in existing_items def card_of_set(self, set_name): idx = "__set__" + to_bytes(set_name) with self.env.begin() as txn: existing_items = txn.get(idx) if existing_items is None: return 0 return len(ujson.loads(existing_items)) def members_of_set(self, set_name): idx = "__set__" + to_bytes(set_name) with self.env.begin() as txn: existing_items = txn.get(idx) if existing_items is None: return set() return set(ujson.loads(existing_items))
class Set(object): def __init__(self, set_name, wrapper): self.set_name = to_bytes(set_name) self.wrapper = wrapper def add(self, key): self.wrapper.add_to_set(self.set_name, key) def remove(self, key): self.wrapper.remove_from_set(self.set_name, key) def card(self): return self.wrapper.card_of_set(self.set_name) def __len__(self): return self.wrapper.card_of_set(self.set_name) def members(self): return self.wrapper.members_of_set(self.set_name) def __contains__(self, key): return self.wrapper.is_in_set(self.set_name, key) class Hash(object): def __init__(self, hash_name, wrapper): self.hash_name = to_bytes(hash_name) self.wrapper = wrapper def __getitem__(self, item): key = "__hash__" + self.hash_name + "__" + to_bytes(item) return self.wrapper.get_obj(key) def __setitem__(self, item, value): key = "__hash__" + self.hash_name + "__" + to_bytes(item) self.wrapper.put_obj(key, value) def __delitem__(self, item): key = "__hash__" + self.hash_name + "__" + to_bytes(item) self.wrapper.del_obj(key) class Queue(object): def __init__(self, queue_name, wrapper): """ :type wrapper: LmdbWrapper """ self.queue_name = to_bytes(queue_name) self.subdbname = "__queue__" + self.queue_name self.subdb = wrapper.env.open_db(key=self.subdbname, txn=None, reverse_key=False, dupsort=False, create=True) self.env = wrapper.env self.wrapper = wrapper self.exe = None def __enter__(self): self.exe = ThreadPoolExecutor(max_workers=10) return self def __exit__(self, type, value, traceback): if self.exe is not None: self.exe.shutdown() def generator(self, exclude=None): if exclude is None: exclude = set() with self.env.begin(db=self.subdb, write=False) as txn: with txn.cursor(db=self.subdb) as c: if not c.first(): return for key, value in c.iternext(keys=True, values=True): if key in exclude: continue if key: if value is not None: obj = _json_decode(value) if obj: yield (key, obj) def push(self, obj, idx=None): idx = to_bytes(obj.lmdb_idx()) if idx is None else to_bytes(idx) with self.env.begin(db=self.subdb, write=True) as txn: idx = to_bytes(idx) if hasattr(obj, 'dumps'): obj_bytes = to_bytes(obj.dumps()) else: obj_bytes = to_bytes(ujson.dumps(obj)) result = txn.put(idx, obj_bytes, overwrite=True) return result def delete(self, obj=None, idx=None): if obj is None and idx is None: return None idx = to_bytes(obj.lmdb_idx()) if idx is None else to_bytes(idx) with self.env.begin(db=self.subdb, write=True) as txn: return txn.delete(idx, db=self.subdb) def lpop(self): with self.env.begin(db=self.subdb, write=True) as txn: with txn.cursor(db=self.subdb) as c: if c.first(): idx = c.key() obj_bytes = c.pop(idx) if obj_bytes is None: return None else: obj = _json_decode(obj_bytes) if obj is None: return None else: return idx, obj else: # queue is empty return None def pop(self, key=None): if key is None: return self.lpop() key = to_bytes(key) with self.env.begin(db=self.subdb, write=True) as txn: obj_bytes = txn.pop(key) return _json_decode(obj_bytes) def pop_all(self): results = [] with self.env.begin(db=self.subdb, write=True) as txn: with txn.cursor(db=self.subdb) as c: if not c.first(): return [] for key in c.iternext(keys=True, values=False): obj = _json_decode(c.pop(key)) if obj: results.append((key, obj)) return results def empty(self): with self.env.begin(db=self.subdb, write=False) as txn: with txn.cursor(db=self.subdb) as c: return not c.first() def extend(self, values): with self.env.begin(db=self.subdb, write=True) as txn: results = [txn.put(to_bytes(obj.lmdb_idx()), to_bytes(ujson.dumps(obj)), overwrite=True) for obj in values] return results def keys(self): with self.env.begin(db=self.subdb) as txn: with txn.cursor(db=self.subdb) as c: if not c.first(): return set() return set(c.iternext(keys=True, values=False)) def wait_not_empty_future(self, timeout=None, tick=1.0): f = Future() exe = None if self.exe is None: exe = ThreadPoolExecutor(max_workers=10) def _wait_until_not_empty(): while self.empty(): print('empty') time.sleep(tick) def _nothing(g=None): if not f.done(): f.set_result(None) if exe is not None: exe.shutdown(wait=False) if timeout is not None: IOLoop.current().call_later(int(timeout), _nothing) if self.exe is None: wait_until_f = exe.submit(_wait_until_not_empty) else: wait_until_f = self.exe.submit(_wait_until_not_empty) IOLoop.current().add_future(wait_until_f, _nothing) return f def wait_not_empty(self, timeout=None, tick=1.0, exclude=None): start = time.time() if exclude is None: while self.empty(): if timeout is not None: if (time.time() - start) >= timeout: return False time.sleep(tick) return True else: while True: if timeout is not None: if (time.time() - start) >= timeout: return False if not self.empty(): if len(self.keys().difference(exclude)) > 0: return True time.sleep(tick)