Commit fbf1d1bc authored by Jean-Baptiste's avatar Jean-Baptiste

update: add permissions to containers

parent 4aa927ef
Pipeline #2837 passed with stage
in 1 minute and 31 seconds
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.db import models
from django.urls import get_resolver
from django.utils.decorators import classonlymethod
from guardian.shortcuts import get_perms
class Model(models.Model):
......@@ -27,12 +30,12 @@ class Model(models.Model):
def get_container_id(self):
return Model.container_id(self)
@classmethod
@classonlymethod
def resource_id(cls, instance):
r_id = "{}{}".format(cls.container_id(instance), getattr(instance, cls.slug_field(instance)))
return cls.__clean_path(r_id)
@classmethod
@classonlymethod
def slug_field(cls, instance):
view_name = '{}-detail'.format(instance._meta.object_name.lower())
slug_field = '/{}'.format(get_resolver().reverse_dict[view_name][0][0][1][0])
......@@ -40,7 +43,7 @@ class Model(models.Model):
slug_field = slug_field[1:]
return slug_field
@classmethod
@classonlymethod
def container_id(cls, instance):
if isinstance(instance, cls):
path = instance.get_container_path()
......@@ -58,19 +61,19 @@ class Model(models.Model):
depth = 1
many_depth = 0
@classmethod
@classonlymethod
def resolve_id(cls, id):
id = cls.__clean_path(id)
view, args, kwargs = get_resolver().resolve(id)
return view.initkwargs['model'].objects.get(**kwargs)
@classmethod
@classonlymethod
def resolve_container(cls, path):
path = cls.__clean_path(path)
view, args, kwargs = get_resolver().resolve(path)
return view.initkwargs['model']
@classmethod
@classonlymethod
def resolve(cls, path):
container = cls.resolve_container(path)
try:
......@@ -79,7 +82,7 @@ class Model(models.Model):
resolve_id = None
return container, resolve_id
@classmethod
@classonlymethod
def __clean_path(cls, path):
if not path.startswith("/"):
path = "/{}".format(path)
......@@ -87,13 +90,28 @@ class Model(models.Model):
path = "{}/".format(path)
return path
@classmethod
@classonlymethod
def get_permission_classes(cls, related_model, default_permissions_classes):
try:
return getattr(related_model._meta, 'permission_classes',
getattr(related_model.Meta, 'permission_classes', default_permissions_classes))
except AttributeError:
return default_permissions_classes
return cls.get_meta(related_model, 'permission_classes', default_permissions_classes)
@classonlymethod
def get_meta(cls, model_class, meta_name, default=None):
if hasattr(model_class, 'Meta'):
meta = getattr(model_class.Meta, meta_name, default)
else:
meta = default
return getattr(model_class._meta, meta_name, meta)
@staticmethod
def get_permissions(obj_or_model, user_or_group, filter):
permissions = filter
for permission_class in Model.get_permission_classes(obj_or_model, []):
permissions = permission_class().filter_user_perms(user_or_group, obj_or_model, permissions)
if not isinstance(user_or_group, AnonymousUser):
permissions += get_perms(user_or_group, obj_or_model)
return [{'mode': {'@type': name.split('_')[0]}} for name in permissions]
class LDPSource(models.Model):
......
......@@ -2,6 +2,8 @@ from guardian.shortcuts import get_objects_for_user
from rest_framework import filters
from rest_framework import permissions
from djangoldp.models import Model
"""
Liste des actions passées dans views selon le protocole REST :
list
......@@ -35,11 +37,11 @@ class WACPermissions(permissions.DjangoObjectPermissions):
return super().has_permission(request, view)
# This method should be overriden by other permission classes
def user_permissions(self, request, obj):
def user_permissions(self, user, obj):
return []
def filter_user_perms(self, request, obj, permissions):
return [perm for perm in permissions if perm in self.user_permissions(request, obj)]
def filter_user_perms(self, user_or_group, obj, permissions):
return [perm for perm in permissions if perm in self.user_permissions(user_or_group, obj)]
class ObjectFilter(filters.BaseFilterBackend):
......@@ -77,11 +79,11 @@ class InboxPermissions(WACPermissions):
else:
return super().has_object_permission(request, view)
def user_permissions(self, request, obj):
if request.user.is_anonymous:
def user_permissions(self, user, obj):
if user.is_anonymous:
return self.anonymous_perms
else:
if hasattr(obj._meta, 'auto_author') and getattr(obj, obj._meta.auto_author) == request.user:
if Model.get_meta(obj, 'auto_author') == user:
return self.author_perms
else:
return self.authenticated_perms
......@@ -119,11 +121,11 @@ class AnonymousReadOnly(WACPermissions):
else:
return super().has_object_permission(request, view, obj)
def user_permissions(self, request, obj):
if request.user.is_anonymous:
def user_permissions(self, user, obj):
if user.is_anonymous:
return self.anonymous_perms
else:
if hasattr(obj._meta, 'auto_author') and getattr(obj, obj._meta.auto_author) == request.user:
if Model.get_meta(obj, 'auto_author') == user:
return self.author_perms
else:
return self.authenticated_perms
from collections import OrderedDict, Mapping
from urllib import parse
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import ImproperlyConfigured
from django.core.exceptions import ValidationError as DjangoValidationError
from django.core.urlresolvers import get_resolver, resolve, get_script_prefix, Resolver404
from django.utils.datastructures import MultiValueDictKeyError
from django.utils.encoding import uri_to_iri
from guardian.shortcuts import get_perms
from rest_framework.exceptions import ValidationError
from rest_framework.fields import SkipField, empty
from rest_framework.fields import get_error_detail, set_value
......@@ -35,7 +33,11 @@ class LDListMixin:
return [self.child.to_internal_value(item) for item in data]
def to_representation(self, value):
return {'@id': self.id, '@type': 'ldp:Container', 'ldp:contains': super().to_representation(value)}
return {'@id': self.id,
'@type': 'ldp:Container',
'ldp:contains': super().to_representation(value),
'permissions': Model.get_permissions(value.model, self.context['request'].user, ['view', 'add'])
}
def get_attribute(self, instance):
parent_id_field = self.parent.fields[self.parent.url_field_name]
......@@ -205,26 +207,13 @@ class LDPSerializer(HyperlinkedModelSerializer):
pass
return fields + list(getattr(self.Meta, 'extra_fields', []))
def get_permissions(self, obj):
permissions = ['view', 'add', 'change', 'control', 'delete']
if hasattr(obj._meta, 'permission_classes'):
for permission_class in obj._meta.permission_classes:
permissions = permission_class().filter_user_perms(self.context['request'], obj, permissions)
if not isinstance(self.context['request'].user, AnonymousUser):
permissions += get_perms(self.context['request'].user, obj)
return [{'mode': {'@type': name.split('_')[0]}} for name in permissions]
def to_representation(self, obj):
data = super().to_representation(obj)
if hasattr(obj._meta, 'rdf_type'):
data['@type'] = obj._meta.rdf_type
if hasattr(obj._meta, 'rdf_context'):
data['@context'] = obj._meta.rdf_context
data['permissions'] = self.get_permissions(obj)
data['@type'] = Model.get_meta(obj, 'rdf_type', None)
data['@context'] = Model.get_meta(obj, 'rdf_context', None)
data['permissions'] = Model.get_permissions(obj, self.context['request'].user,
['view', 'change', 'control', 'delete'])
return data
......
......@@ -2,6 +2,7 @@ from django.conf import settings
from django.db import models
from djangoldp.models import Model
from djangoldp.permissions import AnonymousReadOnly
class Skill(Model):
......@@ -50,6 +51,7 @@ class Invoice(Model):
class Meta:
depth = 2
permission_classes = [AnonymousReadOnly]
nested_fields = ["batches"]
......
......@@ -3,7 +3,7 @@ import json
from django.contrib.auth.models import User
from rest_framework.test import APIRequestFactory, APIClient, APITestCase
from djangoldp.tests.models import Post
from djangoldp.tests.models import Post, Task, Invoice
class TestGET(APITestCase):
......@@ -11,14 +11,27 @@ class TestGET(APITestCase):
def setUp(self):
self.factory = APIRequestFactory()
self.client = APIClient()
self.user = User.objects.create_user(username='john', email='jlennon@beatles.com', password='glass onion')
def tearDown(self):
self.user.delete()
pass
def test_get(self):
def test_get_resource(self):
post = Post.objects.create(content="content")
response = self.client.get('/posts/{}/'.format(post.pk), content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEquals(response.data['content'], "content")
self.assertIn('author', response.data)
def test_get_container(self):
Post.objects.create(content="content")
response = self.client.get('/posts/', content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertIn('permissions', response.data)
self.assertEquals(2, len(response.data['permissions'])) # read and add
Invoice.objects.create(title="content")
response = self.client.get('/invoices/', content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertIn('permissions', response.data)
self.assertEquals(1, len(response.data['permissions'])) # read only
......@@ -30,11 +30,7 @@ for class_name in model_classes:
urls_fct = model_class.get_view_set().urls
urlpatterns.append(url(r'^' + path, include(
urls_fct(model=model_class,
lookup_field=getattr(model_class._meta, 'lookup_field',
getattr(model_class.Meta, 'lookup_field', 'pk')),
permission_classes=getattr(model_class._meta, 'permission_classes',
getattr(model_class.Meta, 'permission_classes', [])),
fields=getattr(model_class._meta, 'serializer_fields',
getattr(model_class.Meta, 'serializer_fields', [])),
nested_fields=getattr(model_class._meta, 'nested_fields',
getattr(model_class.Meta, 'nested_fields', []))))))
lookup_field=Model.get_meta(model_class, 'lookup_field', 'pk'),
permission_classes=Model.get_meta(model_class, 'permission_classes', []),
fields=Model.get_meta(model_class, 'serializer_fields', []),
nested_fields=Model.get_meta(model_class, 'nested_fields', [])))))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment