cool.views.fields 源代码

# encoding: utf-8
import copy
import inspect
import json
from collections import OrderedDict

from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError
from rest_framework.fields import CharField, Field, JSONField
from rest_framework.serializers import BaseSerializer

from . import utils
from .view import ParamSerializer


[文档]class SplitCharField(CharField): """ 分割字符串字段,字段会用指定分隔符分割为列表 """ child = CharField(allow_blank=True) def __init__(self, **kwargs): self.sep = kwargs.pop('sep', ',') self.child = kwargs.pop('child', copy.deepcopy(self.child)) assert not inspect.isclass(self.child), '`child` has not been instantiated.' assert self.child.source is None, ( "The `source` argument is not meaningful when applied to a `child=` field. " "Remove `source=` from the field declaration." ) super().__init__(**kwargs) self.child.bind(field_name='', parent=self) def to_internal_value(self, data): if data is None: return data if not isinstance(data, (list, tuple)): data = super().to_internal_value(data) data = data.split(self.sep) return self.run_child_validation(data) def run_child_validation(self, data): result = [] errors = OrderedDict() for idx, item in enumerate(data): try: result.append(self.child.run_validation(item)) except ValidationError as e: errors[idx] = e.detail if not errors: return result raise ValidationError([errors]) def to_representation(self, value): return self.sep.join(map(str, value))
class JSONCheckFieldBase(JSONField): default_error_messages = { 'not_a_list': _('Expected a list of items but got type "{input_type}".'), 'not_a_dict': _('Expected a dictionary of items but got type "{input_type}".'), 'empty': _('This list may not be empty.'), } def __init__(self, *args, **kwargs): self.is_list = kwargs.pop('is_list', False) if 'help_text' not in kwargs: kwargs['help_text'] = json.dumps(self.format_json(), ensure_ascii=False) super().__init__(*args, **kwargs) self.param_form = self.gen_param_form(self) def to_internal_value(self, data): data = super().to_internal_value(data) return self.run_children_validation(data) @classmethod def merge_data(cls, bounded_form, data): data.update(bounded_form.cleaned_data) return data def clean_dict_data(self, data): bounded_form = self.param_form(data=data) bounded_form.is_valid() errors = bounded_form.errors if errors: exc = ValidationError(errors) raise exc return self.merge_data(bounded_form, data) def format_json(self): raise NotImplementedError @classmethod def gen_param_form(cls, obj): raise NotImplementedError def run_children_validation(self, data): if data is None: return data if self.is_list and not isinstance(data, list): self.fail('not_a_list', input_type=type(data).__name__) if not self.is_list and not isinstance(data, dict): self.fail('not_a_dict', input_type=type(data).__name__) result = [] errors = OrderedDict() if self.is_list: for idx, item in enumerate(data): try: result.append(self.clean_dict_data(item)) except ValidationError as e: errors[idx] = e.detail else: try: result = self.clean_dict_data(data) except ValidationError as e: errors = e.detail if not errors: return result raise ValidationError([errors])
[文档]class JSONCheckField(JSONCheckFieldBase): """ json检查字段,会根据children限制对json数据进行检查 """ def __init__(self, *args, **kwargs): self.children = kwargs.pop('children', dict()) assert isinstance(self.children, dict) for child in self.children.values(): assert isinstance(child, Field) super().__init__(*args, **kwargs) def format_json(self): info = {key: child.label for key, child in self.children.items()} return [info] if self.is_list else info @classmethod def gen_param_form(cls, obj): form_attrs = copy.deepcopy(obj.children) class Meta: fields = list(form_attrs.keys()) form_attrs['Meta'] = Meta return type(ParamSerializer)(cls.__name__ + 'ParamSerializer', (ParamSerializer, ), form_attrs)
[文档]class SerializerField(JSONCheckFieldBase): """ 序列化类型字段,通过json字段解析 """ def __init__(self, serializer, *args, **kwargs): assert issubclass(serializer, BaseSerializer) self.serializer = serializer super().__init__(*args, **kwargs) def format_json(self): return utils.get_serializer_info(self.serializer(), self.is_list) @classmethod def merge_data(cls, bounded_form, data): data.update(bounded_form.validated_data) return data @classmethod def gen_param_form(cls, obj): return obj.serializer