Source code for pyloggr.utils.simple_queue

# encoding: utf-8

"""
Simplified queues
"""

__author__ = 'stef'

import threading
from collections import deque

from cytoolz import remove
# noinspection PyCompatibility
from concurrent.futures import Future as RealFuture
from tornado.ioloop import IOLoop
from tornado.gen import Task, TimeoutError
from tornado.concurrent import Future


[docs]class ThreadSafeQueue(object): """ Simplified thread-safe/coroutine queue, without size limit """ def __init__(self): self.lock = threading.Lock() self.queue = deque() self._waiting = deque()
[docs] def put(self, item): """ Put an item on the queue :param item: item """ with self.lock: # remove waiters that have expired self._waiting = deque(remove(lambda w: w.done(), self._waiting)) if len(self._waiting) > 0: waiter = self._waiting.popleft() self.queue.append(item) waiter.set_result(self.queue.popleft()) else: self.queue.append(item)
[docs] def get_wait(self, deadline=None): """ Wait for an available item and pop it from the queue :param deadline: optional deadline """ f = RealFuture() def _expired(): if not f.done(): f.set_exception(TimeoutError()) if deadline: IOLoop.current().add_timeout(deadline, _expired) with self.lock: if len(self.queue) > 0: f.set_result(self.queue.popleft()) else: self._waiting.append(f) return f
[docs] def get_all(self): """ Pop all items from the queue, without waiting """ with self.lock: if len(self.queue) == 0: return deque() l, self.queue = self.queue, deque() return l
[docs] def get(self): """ Pop one item from the queue, without waiting """ with self.lock: if len(self.queue) == 0: return None return self.queue.popleft()
[docs]class SimpleToroQueue(object): """ Simplified Toro queue without size limit """ def __init__(self, io_loop=None): self.io_loop = io_loop or IOLoop.current() self.getters = deque() self.queue = deque()
[docs] def qsize(self): """ Return number of items in the queue """ return len(self.queue)
[docs] def empty(self): """ Return ``True`` if the queue is empty, ``False`` otherwise """ return not self.queue
[docs] def put(self, item): """ Put an item into the queue (without waiting) :param item: item to add """ self.getters = deque(remove(lambda gettr: gettr.done(), self.getters)) if self.getters: getter = self.getters.popleft() self.queue.append(item) getter.set_result(self.queue.popleft()) else: self.queue.append(item)
[docs] def get_wait(self, deadline=None): """ Remove and return an item from the queue. Returns a Future. The Future blocks until an item is available, or raises :exc:`toro.Timeout`. :param deadline: Optional timeout, either an absolute timestamp (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a deadline relative to the current time. """ f = Future() def _expired(): if not f.done(): f.set_exception(TimeoutError()) if deadline: IOLoop.current().add_timeout(deadline, _expired) if len(self.queue) > 0: f.set_result(self.queue.popleft()) else: self.getters.append(f) return f
[docs] def get_nowait(self): """ Remove and return an item from the queue without blocking. Return an item if one is immediately available, else raise :exc:`queue.Empty`. """ if self.qsize(): return self.queue.popleft() else: return None
[docs] def get_all(self): """ Remove ans return all items from the queue, without blocking """ if self.qsize(): l, self.queue = self.queue, deque() return l else: return deque()
[docs]def TimeoutTask(func, deadline=None, *args, **kwargs): """ Encapsulate a Tornado Task with a deadline """ f = Future() def _expired(): if not f.done(): f.set_exception(TimeoutError()) def _done(task): res = task.result() if not f.done(): f.set_result(res) future_task = Task(func, *args, **kwargs) future_task.add_done_callback(_done) if deadline: IOLoop.current().add_timeout(deadline, _expired) return f