cool.admin.admin 源代码

# encoding: utf-8
import warnings
from functools import reduce, wraps

import django
from django import forms
from django.contrib import admin
from django.contrib.admin.utils import flatten_fieldsets
from django.contrib.admin.views import main
from django.contrib.admin.widgets import AdminFileWidget
from django.contrib.auth import get_permission_codename
from django.core.exceptions import (
    FieldDoesNotExist, PermissionDenied, ValidationError,
)
from django.core.validators import EMPTY_VALUES
from django.db import models
from django.db.models import ManyToOneRel
from django.db.models.constants import LOOKUP_SEP
from django.db.models.fields.related import RelatedField
from django.forms.models import (
    BaseInlineFormSet, BaseModelFormSet, modelform_factory,
)
from django.urls import NoReverseMatch, reverse
from django.utils.encoding import force_str
from django.utils.http import urlencode
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _

from cool.admin import widgets
from cool.core.deprecation import RemovedInDjangoCool20Warning
from cool.settings import cool_settings


def extend_admincls(*admin_classes):
    assert admin_classes
    return type('_Admin', admin_classes, {})


def get_widget(field):
    if isinstance(field, models.ImageField):
        return widgets.ImageWidget
    return widgets.TagWidget


def get_field_widget(field):
    if isinstance(field, models.ImageField):
        return widgets.ImageFieldWidget
    return widgets.FieldWidget


def _lookup_field(name, obj):
    splices = name.split(LOOKUP_SEP)
    field_name = splices[-1]
    for gen in splices[:-1]:
        obj = getattr(obj, gen)

    field = obj._meta.get_field(field_name)
    if isinstance(field_name, RelatedField):
        label = field.remote_field.field.verbose_name
    else:
        label = field.verbose_name

    return field, label, getattr(obj, field.attname)


def format_field(verbose, field, options=None, **kwargs):
    """
    Format field with given widgets or default

    Example:
        class MyAdminClass(models.ModelAdmin):
            list_display = [format_field(
                verbose='名字', field='username', options={
                    'style': 'color:red'
                }), ...]
    """
    name = field
    widget_opts = options or {}

    def _format_field(obj):
        field, label, value = _lookup_field(name, obj)
        attrs = widget_opts.copy()
        widget_class = attrs.pop('widget_class', None)
        if widget_class is None:
            widget_class = get_widget(field)
        widget = widget_class(attrs)
        return mark_safe(widget.render(label, value, {}))

    _format_field.short_description = verbose
    for att, val in kwargs.items():
        setattr(_format_field, att, val)
    return _format_field


def collapse_fields(verbose, fields, options=None, **kwargs):
    """
    Collapse_fields into one field in changelist_view in modeladmin site

    Example:
        class MyAdminClass(models.ModelAdmin):
            list_display = [collapse_fields(
                verbose='名字', fields=('username', 'name'), options={
                    'username':{'style': 'color:red'}
                }), ...]
    """
    widget_map = {}
    for name in fields:
        if options:
            widget_kwargs = options.get(name, {})
            widget_class = widget_kwargs.pop('widget_class', None)
            widget_map[name] = {
                'widget_class': widget_class,
                'attrs': widget_kwargs,
            }
        else:
            widget_map[name] = {}

    def format_fields(obj):
        html = []
        for name in fields:
            field, label, value = _lookup_field(name, obj)
            widget_class = widget_map[name].get('widget_class', None) or get_field_widget(field)
            widget = widget_class(widget_map[name].get('attrs', {}))
            html.append(widget.render(name, (label, value), None))
        return mark_safe(''.join(html))

    format_fields.short_description = verbose
    for att, val in kwargs.items():
        setattr(format_fields, att, val)
    return format_fields


def get_related_model_fields(model, rel, is_foreign_key):
    """
    通过model下rel对象获取相关字段或关联属性
    """
    # 多对多关联的REL对象本身不区分关系前后
    # 相关代理类做同样处理
    if is_foreign_key and rel.field.model._meta.concrete_model == model._meta.concrete_model:
        return rel.field, rel.get_related_field()
    return rel.get_related_field(), rel.field


class FormSetMixin:
    """
    添加验证以确保数据是最新的
    """
    def _existing_object(self, pk):
        if not hasattr(self, '_object_dict'):
            queryset = self.get_queryset()
            if self.data:  # 提交数据时
                pk_field = self.model._meta.pk
                pks = []
                for i in range(self.total_form_count()):
                    prefix = self.add_prefix(i)
                    pk_val = self.data.get('%s-%s' % (prefix, pk_field.name))
                    if pk_val is not None:
                        pks.append(pk_val)
                queryset = queryset.filter(pk__in=pks)
            self._object_dict = {o.pk: o for o in queryset}
        obj = self._object_dict.get(pk)
        if obj is None:
            obj = self.get_queryset().filter(pk=pk).first()
            self._object_dict[pk] = obj
        return obj

    def clean(self):
        super().clean()
        self.validate_queryset()

    def validate_queryset(self):
        pk = self.model._meta.pk
        to_python = pk.to_python
        rel = pk.remote_field
        while rel:
            related_field = rel.get_related_field()
            to_python = related_field.to_python
            rel = related_field.remote_field

        updated = False
        for form in self.forms:
            pk_data = form[pk.name].data
            if (pk_data not in EMPTY_VALUES
                    and form.instance.pk != to_python(pk_data)):
                updated = True
                break
        if updated:
            new_formset = self.__class__(
                save_as_new=self.save_as_new,
                prefix=self.prefix,
                queryset=self.queryset,
            )
            self.forms = new_formset.forms
            raise ValidationError(_('Page data has been updated, please modify and save again'))


[文档]class StrictModelFormSet(FormSetMixin, BaseModelFormSet): pass
[文档]class StrictInlineFormSet(FormSetMixin, BaseInlineFormSet): pass
class AdminImageWidget(AdminFileWidget): def render(self, name, value, attrs=None, renderer=None): output = [] if value and getattr(value, "url", None): image_url = value.url file_name = str(value) output.append(' <a href="%s" target="_blank">' '<img src="%s" alt="%s" style="max-width: 500px; max-height: 1000px" /></a>' % (image_url, image_url, file_name)) output.append(super().render(name, value, attrs, renderer)) return mark_safe(''.join(output)) class AutoCompleteMixin: def formfield_for_foreignkey(self, db_field, request, **kwargs): db = kwargs.get('using') if ('widget' not in kwargs and cool_settings.ADMIN_FOREIGNKEY_FIELD_USE_AUTOCOMPLETE and hasattr(db_field.remote_field.model, 'get_search_fields') and db_field.name not in [*self.raw_id_fields, *self.radio_fields]): kwargs['widget'] = widgets.CoolAutocompleteSelect(db_field, self.admin_site, using=db) return super().formfield_for_foreignkey(db_field, request, **kwargs) def formfield_for_manytomany(self, db_field, request, **kwargs): db = kwargs.get('using') if ('widget' not in kwargs and cool_settings.ADMIN_MANYTOMANY_FIELD_USE_AUTOCOMPLETE and hasattr(db_field.remote_field.model, 'get_search_fields') and db_field.name not in [*self.raw_id_fields, *self.filter_vertical, *self.filter_horizontal]): kwargs['widget'] = widgets.CoolAutocompleteSelectMultiple(db_field, self.admin_site, using=db) if django.VERSION >= (3, 1) or 'widget' not in kwargs: return super().formfield_for_manytomany(db_field, request, **kwargs) else: if not db_field.remote_field.through._meta.auto_created: return None if 'queryset' not in kwargs: queryset = self.get_field_queryset(db, db_field, request) if queryset is not None: kwargs['queryset'] = queryset form_field = db_field.formfield(**kwargs) return form_field def get_search_fields(self, request): search_fields = super().get_search_fields(request) get_search_fields_func = getattr(self.opts.model, 'get_search_fields', None) if get_search_fields_func and callable(get_search_fields_func): return tuple(set(get_search_fields_func()) | set(search_fields)) return () class AutoSetRelatedFieldChangeList(main.ChangeList): def apply_select_related(self, qs): if self.list_select_related is False: related_fields = self.get_related_field_in_list_display() if related_fields: return qs.select_related(*related_fields) return super().apply_select_related(qs) def get_related_field_in_list_display(self): ret = [] for field_name in self.list_display: try: field = self.lookup_opts.get_field(field_name) except FieldDoesNotExist: pass else: if isinstance(field.remote_field, ManyToOneRel): if field_name != field.get_attname(): ret.append(field_name) return ret
[文档]class BaseModelAdmin(AutoCompleteMixin, admin.ModelAdmin): """ 自定义Admin基类,列表页显示默认字段,支持自定义功能 """ # remove "__str__" list_display = [] list_display_links = ['id', ] # Extend options to manage site # extend field exclude RelatedField and PrimaryKey fields into list_display empty_value_display = _('[None]') with_related_items = True extend_normal_fields = True extend_related_fields = False auto_set_list_select_related = True exclude_list_display = [] heads = ['id', ] tails = [] # manage Add/Change view addable = True changeable = None editable = True deletable = True # manage Change view change_view_readonly_fields = [] changeable_fields = forms.ALL_FIELDS formset = StrictModelFormSet def __getattr__(self, attr): if ('__' in attr and not attr.startswith('_') and not attr.endswith('_boolean') and not attr.endswith('_short_description')): def dyn_lookup(instance): # traverse all __ lookups return reduce(lambda parent, child: getattr(parent, child), attr.split('__'), instance) # get admin_order_field, boolean and short_description dyn_lookup.admin_order_field = attr dyn_lookup.boolean = getattr(self, '{}_boolean'.format(attr), False) dyn_lookup.short_description = getattr( self, '{}_short_description'.format(attr), attr.replace('_', ' ').capitalize() ) return dyn_lookup # not dynamic lookup, default behaviour return self.__getattribute__(attr) def get_editable(self): if self.changeable is not None: warnings.warn( "The changeable argument is deprecated in favor of editable", RemovedInDjangoCool20Warning, stacklevel=2 ) self.editable = self.changeable return self.editable def formfield_for_dbfield(self, db_field, **kwargs): if cool_settings.ADMIN_SHOW_IMAGE_IN_FORM_PAGE and isinstance(db_field, models.ImageField): kwargs.pop("request", None) kwargs['widget'] = AdminImageWidget return db_field.formfield(**kwargs) return super().formfield_for_dbfield(db_field, **kwargs) def get_changeable_fields(self, request, obj=None): if not self.get_editable(): return () if self.changeable_fields == forms.ALL_FIELDS: return None elif self.changeable_fields is None: return () return self.changeable_fields def has_delete_permission(self, request, obj=None): return self.deletable and super().has_delete_permission(request, obj) def has_change_permission(self, request, obj=None): return self.get_editable() and super().has_change_permission(request, obj) def has_add_permission(self, request): return self.addable and super().has_add_permission(request) def get_user_queryset(self, queryset): return queryset def get_form(self, request, obj=None, **kwargs): if 'fields' in kwargs: fields = kwargs.get('fields') else: # 'get_fieldsets' would call 'get_form' again with kwargs: field=None fields = flatten_fieldsets(self.get_fieldsets(request, obj)) # get a blank form to fetch all fields form = modelform_factory(self.model, kwargs.get('form', self.form), exclude=()) if fields is None or fields is forms.ALL_FIELDS: fields = form.base_fields.keys() readonly_fields = self.get_readonly_fields(request, obj, fields=fields) readonly_fields_set = set(readonly_fields) if 'exclude' not in kwargs: if self.exclude is None: exclude = [] else: exclude = list(self.exclude) exclude.extend(readonly_fields) to_exclude_fields = [] for name in fields: field = form.base_fields.get(name, None) if field is None: continue try: val = field.prepare_value(getattr(obj, name, None)) except Exception: val = None if val is None: continue elif name in readonly_fields_set: continue elif isinstance(field, forms.ModelMultipleChoiceField): relates = list(val.all().values_list('pk', flat=True)) if not relates: continue queryset = self.get_user_queryset(field.queryset) key = field.to_field_name or 'pk' if len(relates) != queryset.filter(**{'%s__in' % key: relates}).count(): to_exclude_fields.append(name) readonly_fields_set.add(name) elif isinstance(field, forms.ModelChoiceField): queryset = self.get_user_queryset(field.queryset) key = field.to_field_name or 'pk' if not queryset.filter(**{key: val}).exists(): to_exclude_fields.append(name) readonly_fields_set.add(name) elif isinstance(field, forms.ChoiceField): if field.choices and val not in dict(field.choices): to_exclude_fields.append(name) readonly_fields_set.add(name) if to_exclude_fields: exclude.extend(to_exclude_fields) readonly_fields.extend(to_exclude_fields) kwargs['exclude'] = exclude kwargs['fields'] = fields form = super().get_form(request, obj, **kwargs) # remove the custom fields from base_fields which is in 'exlcude' or not # in 'fields' to prevent from checking these fields exclude = form._meta.exclude or () for name in form.declared_fields: if name in exclude or name not in fields: form.base_fields.pop(name, None) return form def get_fieldsets(self, request, obj=None): fieldsets = list(super().get_fieldsets(request, obj)) is_add = obj is None if (is_add and self.has_add_permission(request)) or (not is_add and self.has_change_permission(request, obj)): return fieldsets else: list_display = self.get_list_display(request, False) valid_f_names = flatten_fieldsets(fieldsets) fields = [f for f in list_display if f in valid_f_names] return [(None, {'fields': fields})] def get_readonly_fields(self, request, obj=None, fields=None): is_add = obj is None readonly_fields = list(super().get_readonly_fields(request, obj)) if (is_add and not self.has_add_permission(request)) or \ (not is_add and not self.has_change_permission(request, obj)): return fields if fields is not None else self.get_list_display(request, False) readonly_fields_set = set(readonly_fields) if not is_add: for field in self.change_view_readonly_fields: if field not in readonly_fields_set: readonly_fields_set.add(field) readonly_fields.append(field) changeable_fields = self.get_changeable_fields(request, obj) if changeable_fields is not None: changeable_fields_set = set(changeable_fields) declared_fields = self.fieldsets if declared_fields: declared_fields = flatten_fieldsets(declared_fields) else: declared_fields = [f.name for f in self.opts._get_fields(reverse=False)] for field in declared_fields: if field not in changeable_fields_set and field not in readonly_fields_set: readonly_fields_set.add(field) readonly_fields.append(field) return readonly_fields def gen_access_rels(self, request): if not hasattr(request, '_access_rels'): _access_rels = [] def _add_access_rels(_rel, is_foreign_key): target_field, remote_field = get_related_model_fields(self.model, _rel, is_foreign_key) rel_opts = remote_field.model._meta view_perm = '%s.%s' % (rel_opts.app_label, get_permission_codename('change', rel_opts)) change_perm = '%s.%s' % (rel_opts.app_label, get_permission_codename('change', rel_opts)) if request.user.has_perm(view_perm) or request.user.has_perm(change_perm): try: uri = reverse('admin:%s_%s_changelist' % (rel_opts.app_label, rel_opts.model_name)) except NoReverseMatch: pass else: _access_rels.append((uri, target_field, remote_field, is_foreign_key)) for rel in self.opts.related_objects: # + tuple(r.remote_field for r in self.opts.many_to_many): _add_access_rels(rel, False) for key, field in self.opts._forward_fields_map.items(): if key != field.name: continue if field.is_relation and (field.many_to_one or field.one_to_one) \ and hasattr(field.remote_field, 'model') and field.remote_field.model: _add_access_rels(field.remote_field, True) setattr(request, '_access_rels', _access_rels) setattr(self, '_access_rels', request._access_rels) def get_list_display(self, request, in_change_view=True): """ get all fields except Relations if extend_normal_fields is True """ list_display = list(super().get_list_display(request)) if self.with_related_items: self.gen_access_rels(request) if not self.extend_normal_fields: if hasattr(request, '_access_rels'): list_display.append('get_all_relations') return list_display field_names = [] heads = [] middles = [] tails = [] def _get_field(field_name): if not field_name or not isinstance(field_name, str) or not cool_settings.ADMIN_SHOW_IMAGE_IN_CHANGE_LIST: return field_name try: field = self.model._meta.get_field(field_name) if field and isinstance(field, models.ImageField): return format_field(field.verbose_name, field.name) else: return field_name except FieldDoesNotExist: return field_name while list_display: name = list_display.pop(0) if name in self.tails: tails.append(_get_field(name)) elif name in self.heads: heads.append(_get_field(name)) else: middles.append(_get_field(name)) for field in self.get_normal_fields(): if field.name in heads or field.name in middles or field.name in tails: continue if in_change_view and field.name in self.exclude_list_display: pass elif field.name in self.tails: tails.append(_get_field(field.name)) elif field.name in self.heads: heads.append(_get_field(field.name)) else: middles.append(_get_field(field.name)) field_names.extend(heads) field_names.extend(middles) field_names.extend(tails) if hasattr(request, '_access_rels'): field_names.append('get_all_relations') return field_names def get_list_display_links(self, request, list_display): list_display_links = super().get_list_display_links(request, list_display) if list_display and not set(list_display_links) & set(list_display): return list(list_display)[:1] return list_display_links def get_changelist(self, request, **kwargs): if self.auto_set_list_select_related: return AutoSetRelatedFieldChangeList return super().get_changelist(request, **kwargs) def get_normal_fields(self): fields = [] exclude = self.exclude or () for field in self.model._meta.concrete_fields: if field.name.startswith('_') or field.attname in exclude: continue if isinstance(field, RelatedField): if not (self.extend_related_fields and (field.many_to_one or field.one_to_one)): continue if isinstance(field, models.FileField) and not isinstance(field, models.ImageField): continue fields.append(field) return fields def get_all_relations(self, obj): if not getattr(self, '_access_rels'): return '' html_list = [ '<select onchange="window.open(this.value, \'_self\');">', '<option value="" selected="selected">------</option>' ] for uri, target_field, remote_field, is_foreign_key in self._access_rels: rel_opts = remote_field.model._meta value = getattr(obj, target_field.attname) value = force_str(value) params = {remote_field.name: value} url = '%s?%s' % (uri, urlencode(params)) if is_foreign_key: html_list.append('<option value="%s">%s</option>' % (url, target_field.verbose_name)) else: html_list.append( '<option value="%s">%s-%s</option>' % (url, rel_opts.verbose_name, remote_field.verbose_name) ) html_list.append('</select>') return mark_safe(''.join(html_list)) get_all_relations.short_description = _("Related items")
def check_perms(*perms): """ Returns True if the given request has permissions to manage an object. """ def inner(func): @wraps(func) def wrapper(_admin, request, *args, **kwargs): opts = _admin.opts for perm in perms: codename = get_permission_codename(perm, opts) if not request.user.has_perm("%s.%s" % (opts.app_label, codename)): raise PermissionDenied return func(_admin, request, *args, **kwargs) return wrapper return inner
[文档]def site_register(model_or_iterable, admin_class=None, site=None, **options): """ 将model通过admin_class注册到后台管理中,admin_class不传默认使用BaseModelAdmin """ if not isinstance(model_or_iterable, (list, tuple, set)): model_or_iterable = [model_or_iterable] for model in model_or_iterable: if cool_settings.ADMIN_SITE_REGISTER_FILTER_FUNCTION is not None: new_options = cool_settings.ADMIN_SITE_REGISTER_FILTER_FUNCTION( model, admin_class=admin_class, site=site, **options ) else: new_options = options new_admin_class = new_options.pop('admin_class', admin_class) new_site = new_options.pop('site', site) if new_site is None: new_site = admin.site if new_admin_class is None: new_admin_class = BaseModelAdmin new_site.register(model_or_iterable, new_admin_class, **new_options)
[文档]def admin_register(func=None, *, admin_class=None, site=None, **options): """ model装饰器,使用后将model通过admin_class注册到后台管理中 """ def _model_admin_wrapper(model_class): if site is not None and not isinstance(site, admin.AdminSite): raise ValueError('site must subclass AdminSite') if admin_class is not None and not issubclass(admin_class, admin.ModelAdmin): raise ValueError('admin_class must subclass ModelAdmin.') if model_class is None or not issubclass(model_class, models.Model): raise ValueError('Wrapped class must subclass Model.') site_register(model_class, admin_class=admin_class, site=site, **options) return model_class if func is None: return _model_admin_wrapper else: return _model_admin_wrapper(func)