Commit d026bb0a authored by Eliot Berriot's avatar Eliot Berriot

Added most methods for a basic/django-similar manager/queryset

parent 7f0d3886
from .registries import Registry, meta_registry
from .registries import Registry, meta_registry, QuerySet, Manager, MultipleObjectsReturned, DoesNotExist
__version__ = "0.2.1"
\ No newline at end of file
__version__ = "0.2.1"
from collections import OrderedDict
import inspect
import operator
try:
# use Python3 reload
......@@ -9,12 +10,122 @@ except:
# we are on Python2
pass
class DoesNotExist(ValueError):
pass
class MultipleObjectsReturned(ValueError):
pass
class QuerySet(object):
def __init__(self, values):
self.values = list(values)
def __iter__(self):
for value in self.values:
yield value
raise StopIteration
def __len__(self):
return len(self.values)
def __repr__(self):
return '<{0}: {1}>'.format(self.__class__.__name__, str(list(self.values)))
def __getitem__(self, i):
return self.values[i]
def __eq__(self, other):
return self.values == list(other)
def _clone(self, new_values):
return self.__class__(new_values)
def _build_filter(self, **kwargs):
"""build a single filter function used to match arbitrary object"""
def object_filter(obj):
for key, value in kwargs.items():
if not getattr(obj, key) == value:
return False
return True
return object_filter
def order_by(self, key):
reverse = False
if key.startswith('-'):
reverse = True
key = key[1:]
return self._clone(sorted(self.values, key=operator.attrgetter(key), reverse=reverse))
def all(self):
return self._clone(self.values)
def count(self):
return len(self)
def first(self):
try:
return self.values[0]
except IndexError:
return None
def last(self):
try:
return self.values[-1]
except IndexError:
return None
def filter(self, **kwargs):
_filter = self._build_filter(**kwargs)
return self._clone(filter(_filter, self.values))
def exclude(self, **kwargs):
_filter = self._build_filter(**kwargs)
return self._clone(filter(lambda v: not _filter(v), self.values))
def get(self, **kwargs):
matches = self.filter(**kwargs)
if len(matches) == 0:
raise DoesNotExist()
if len(matches) > 1:
raise MultipleObjectsReturned()
return matches[0]
class Manager(object):
"""Used to retrieve / order / filter preferences pretty much as django's ORM managers"""
def __init__(self, registry, queryset_class):
self.registry = registry
self.queryset_class = queryset_class
def get_queryset(self):
return self.queryset_class(self.registry.values())
def all(self):
return self.get_queryset().all()
def __getattr__(self, attr):
try:
return super(Manager, self).__getattr__(attr)
except AttributeError:
# Try to proxy on queryset if possible
return getattr(self.get_queryset(), attr)
class Registry(OrderedDict):
manager_class = Manager
queryset_class = QuerySet
def __init__(self, *args, **kwargs):
super(Registry, self).__init__(*args, **kwargs)
self.objects = self.manager_class(self, self.queryset_class)
def register_decorator_factory(self, **kwargs):
"""
"""
Return an actual decorator for registering objects into registry
"""
"""
name = kwargs.get('name')
def decorator(decorated):
self.register_func(data=decorated, name=name)
......@@ -30,7 +141,7 @@ class Registry(OrderedDict):
pass
:param:data: Something to register in the registry
:param:name: The unique name that will identify registered data.
:param:name: The unique name that will identify registered data.
If None, by default, registry will try to deduce name from class name (if object is a class or an object).
You can change this behaviour by overriding :py::method:`prepare_name`
......@@ -72,8 +183,8 @@ class Registry(OrderedDict):
"""
if self.validate(data):
o = self.prepare_data(data)
n = self.prepare_name(data, name)
self[n] = o
n = self.prepare_name(data, name)
self[n] = o
self.post_register(data=0, name=n)
else:
raise ValueError("{0} (type: {0.__class__}) is not a valid value for {1} registry".format(data, self.__class__))
......@@ -140,5 +251,5 @@ class MetaRegistry(Registry):
def autodiscover_registries(self, apps):
for key, registry in self.items():
registry.autodiscover(apps)
meta_registry = MetaRegistry()
\ No newline at end of file
meta_registry = MetaRegistry()
......@@ -10,4 +10,4 @@ class VegetableRegistry(Registry):
look_into = "vegetables"
vegetable_registry = VegetableRegistry()
meta_registry.register(vegetable_registry, name="vegetable_registry")
\ No newline at end of file
meta_registry.register(vegetable_registry, name="vegetable_registry")
import unittest
import unittest
from . import test_registries
import persisting_theory
......@@ -64,7 +64,7 @@ class RegistryTest(unittest.TestCase):
pass
self.assertEqual(registry.get('something'), something)
def test_can_register_via_decorator_using_custom_name(self):
registry = persisting_theory.Registry()
......@@ -152,11 +152,79 @@ class RegistryTest(unittest.TestCase):
def post_register(self, data, name):
raise PostRegisterException('Post register triggered')
r = PostRegister()
r = PostRegister()
with self.assertRaises(PostRegisterException):
r.register("hello", name="world")
class TestObject(object):
def __init__(self, name, **kwargs):
self.name = name
for key, value in kwargs.items():
setattr(self, key, value)
def __repr__(self):
return self.name
class TestManagers(unittest.TestCase):
OBJECTS = [
TestObject(name='test_1', order=2, a=1),
TestObject(name='test_2', order=3, a=1),
TestObject(name='test_3', order=1, a=2),
TestObject(name='test_4', order=4, a=2),
]
def setUp(self):
class R(persisting_theory.Registry):
def prepare_name(self, data, key=None):
return data.name
self.registry = R()
for o in self.OBJECTS:
self.registry.register(o)
def test_default_order(self):
self.assertEqual(list(self.registry.objects.all()), self.OBJECTS)
def test_can_get_using_attribute(self):
self.assertEqual(self.registry.objects.get(name='test_1'), self.OBJECTS[0])
def test_can_filter(self):
self.assertEqual(self.registry.objects.filter(a=1), self.OBJECTS[:2])
def test_can_combine_filters(self):
self.assertEqual(self.registry.objects.filter(a=1, name='test_2'), self.OBJECTS[1:2])
self.assertEqual(self.registry.objects.filter(a=1).filter(name='test_2'), self.OBJECTS[1:2])
def test_can_exclude(self):
self.assertEqual(self.registry.objects.exclude(a=1), self.OBJECTS[2:])
def test_can_combine_exclude(self):
self.assertEqual(self.registry.objects.exclude(a=1).exclude(name='test_4'), self.OBJECTS[2:3])
self.assertEqual(self.registry.objects.exclude(a=2, name='test_4'), self.OBJECTS[:3])
def test_can_count(self):
self.assertEqual(self.registry.objects.filter(a=1).count(), 2)
def test_first(self):
self.assertIsNone(self.registry.objects.filter(a=123).first())
self.assertIsNotNone(self.registry.objects.filter(a=1).first())
def test_ordering(self):
self.assertEqual(self.registry.objects.order_by('order')[:2], [self.OBJECTS[2], self.OBJECTS[0]])
self.assertEqual(self.registry.objects.order_by('-order')[:2], [self.OBJECTS[3], self.OBJECTS[1]])
def test_last(self):
self.assertIsNone(self.registry.objects.filter(a=123).last())
self.assertIsNotNone(self.registry.objects.filter(a=1).last())
def test_get_raise_exception_on_multiple_objects_returned(self):
with self.assertRaises(persisting_theory.MultipleObjectsReturned):
self.registry.objects.get(a=1)
def test_get_raise_exception_on_does_not_exist(self):
with self.assertRaises(persisting_theory.DoesNotExist):
self.registry.objects.get(a=123)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment