Source code for flask_executor.futures
from collections import OrderedDict
from concurrent.futures import Future
from flask_executor.helpers import InstanceProxy
[docs]class FutureCollection:
"""A FutureCollection is an object to store and interact with
:class:`concurrent.futures.Future` objects. It provides access to all
attributes and methods of a Future by proxying attribute calls to the
stored Future object.
To access the methods of a Future from a FutureCollection instance, include
a valid ``future_key`` value as the first argument of the method call. To
access attributes, call them as though they were a method with
``future_key`` as the sole argument. If ``future_key`` does not exist, the
call will always return None. If ``future_key`` does exist but the
referenced Future does not contain the requested attribute an
:exc:`AttributeError` will be raised.
To prevent memory exhaustion a FutureCollection instance can be bounded by
number of items using the ``max_length`` parameter. As a best practice,
Futures should be popped once they are ready for use, with the proxied
attribute form used to determine whether a Future is ready to be used or
discarded.
:param max_length: Maximum number of Futures to store. Oldest Futures are
discarded first.
"""
def __init__(self, max_length=50):
self.max_length = max_length
self._futures = OrderedDict()
def __contains__(self, future):
return future in self._futures.values()
def __len__(self):
return len(self._futures)
def __getattr__(self, attr):
# Call any valid Future method or attribute
def _future_attr(future_key, *args, **kwargs):
if future_key not in self._futures:
return None
future_attr = getattr(self._futures[future_key], attr)
if callable(future_attr):
return future_attr(*args, **kwargs)
return future_attr
return _future_attr
def _check_limits(self):
if self.max_length is not None:
while len(self._futures) > self.max_length:
self._futures.popitem(last=False)
[docs] def add(self, future_key, future):
"""Add a new Future. If ``max_length`` limit was defined for the
FutureCollection, old Futures may be dropped to respect this limit.
:param future_key: Key for the Future to be added.
:param future: Future to be added.
"""
if future_key in self._futures:
raise ValueError("future_key {} already exists".format(future_key))
self._futures[future_key] = future
self._check_limits()
[docs] def pop(self, future_key):
"""Return a Future and remove it from the collection. Futures that are
ready to be used should always be popped so they do not continue to
consume memory.
Returns ``None`` if the key doesn't exist.
:param future_key: Key for the Future to be returned.
"""
return self._futures.pop(future_key, None)
[docs]class FutureProxy(InstanceProxy, Future):
"""A FutureProxy is an instance proxy that wraps an instance of
:class:`concurrent.futures.Future`. Since an executor can't be made to
return a subclassed Future object, this proxy class is used to override
instance behaviours whilst providing an agnostic method of accessing
the original methods and attributes.
:param future: An instance of :class:`~concurrent.futures.Future` that
the proxy will provide access to.
:param executor: An instance of :class:`flask_executor.Executor` which
will be used to provide access to Flask context features.
"""
def __init__(self, future, executor):
self._self = future
self._executor = executor
[docs] def add_done_callback(self, fn):
fn = self._executor._prepare_fn(fn, force_copy=True)
return self._self.add_done_callback(fn)
def __eq__(self, obj):
return self._self == obj
def __hash__(self):
return self._self.__hash__()