APIView 的源码
APIView是REST framework提供的所有视图的基类,继承自Django的View类
APIView与View的不同点为:
传入到视图方法中的是REST framework的Request对象,而不是Django的HttpRequeset对象
视图方法可以返回REST framework的Response对象,视图会为响应数据设置(render)符合前端期望要求格式
任何APIException异常都会被捕获到,并且处理成合适格式(json)的响应信息返回给客户端
会重新声明了一个新的as_views方法并在dispatch()进行路由分发前,对请求的客户端进行身份认证、权限检查、流量控制
APIView还新增了如下的类属性:
authentication_classes 列表或元组,身份认证类
permissoin_classes 列表或元组,权限检查类
throttle_classes 列表或元祖,流量控制类
好长好长.....我尽力讲我们经常用到的。
APIView 的类属性
The following policies may be set at either globally, or per-view.
以下内容可以被全局应用也可以局部应用
parser_classes
= api_settings.DEFAULT_PARSER_CLASSES (分页器)常用
authentication_classes
= api_settings.DEFAULT_AUTHENTICATION_CLASSES (认证器)常用
throttle_classes
= api_settings.DEFAULT_THROTTLE_CLASSES (拦截器)常用
permission_classes
= api_settings.DEFAULT_PERMISSION_CLASSES (权限器)常用
content_negotiation_class
= api_settings.DEFAULT_CONTENT_NEGOTIATION_CLASS (不常用)
renderer_classes
= api_settings.DEFAULT_RENDERER_CLASSES(渲染器)
metadata_class
= api_settings.DEFAULT_METADATA_CLASS (不常用)
versioning_class
= api_settings.DEFAULT_VERSIONING_CLASS(不常用)
as_view()
哈哈哈,我们又回到了as_view()! 来,我们先看一下代码!
@classmethod
def as_view(cls, **initkwargs):
"""
Store the original class on the view function.
This allows us to discover information about the view when we do URL
reverse lookups. Used for breadcrumb generation.
"""
if isinstance(getattr(cls, 'queryset', None), models.query.QuerySet):
def force_evaluation():
raise RuntimeError(
'Do not evaluate the `.queryset` attribute directly, '
'as the result will be cached and reused between requests. '
'Use `.all()` or call `.get_queryset()` instead.'
)
cls.queryset._fetch_all = force_evaluation
view = super().as_view(**initkwargs)
view.cls = cls
view.initkwargs = initkwargs
# Note: session based authentication is explicitly CSRF validated,
# all other authentication is CSRF exempt.
return csrf_exempt(view)
This allows us to discover information about the view when we do URL reverse lookups. Used for breadcrumb generation.
这个方法使我们就能在进行 URL 反向查找时发现视图的相关信息。 用于生成面包屑。
isinstance
方法:判断 a
是不是 b
的对象
if isinstance(getattr(cls, 'queryset', None), models.query.QuerySet):
def force_evaluation():
raise RuntimeError(
'Do not evaluate the `.queryset` attribute directly, '
'as the result will be cached and reused between requests. '
'Use `.all()` or call `.get_queryset()` instead.'
)
cls.queryset._fetch_all = force_evaluation
判断条件
if isinstance(getattr(cls, 'queryset', None), models.query.QuerySet):
,
判断cls.queryset是不是 在模型层里面的qs对象。如果是的话,定义一个方法叫 force_evaluation
, 用来捕获异常,报错
view = super().as_view(**initkwargs)
这句话直接跑到父级,从父级返回回来view
给我们使用。
最后返回return csrf_exempt(view)
,一个去除了CSRF
认证的view
函数
认证组件
from rest_framework.authentication import BaseAuthentication, BasicAuthentication
BaseAuthentication
class BaseAuthentication:
"""
All authentication classes should extend BaseAuthentication.
"""
def authenticate(self, request):
"""
Authenticate the request and return a two-tuple of (user, token).
"""
raise NotImplementedError(".authenticate() must be overridden.")
def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
"""
pass
老样子,我们先看源码
class BasicAuthentication(BaseAuthentication):
"""
HTTP Basic authentication against username/password.
"""
www_authenticate_realm = 'api'
def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'basic':
return None
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
try:
auth_decoded = base64.b64decode(auth[1]).decode('utf-8')
except UnicodeDecodeError:
auth_decoded = base64.b64decode(auth[1]).decode('latin-1')
auth_parts = auth_decoded.partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)
userid, password = auth_parts[0], auth_parts[2]
return self.authenticate_credentials(userid, password, request)
def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(request=request, **credentials)
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
def authenticate_header(self, request):
return 'Basic realm="%s"' % self.www_authenticate_realm
def authenticate(self, request)
auth = get_authorization_header(request).split()
先通过类属性定义的方法,获取到request请求里面的认证头。
if not auth or auth[0].lower() != b'basic': return None
如果 auth
里面没有东西,或者 auth
取0下标,转小写不等于 basic的编码,都返回None
。
如果len(auth) == 1
或者 len(auth) > 2
, 统统捕获异常。
尝试通过两种解码方式解码
auth_decoded = base64.b64decode(auth[1]).decode('utf-8')
或者
auth_decoded = base64.b64decode(auth[1]).decode('latin-1')
,
一个是 utf-8
,一个是Latin-1
.
如果解码成功,分割auth
,用auth_parts = auth_decoded.partition(':')
。如果解码不成功,捕获异常。
分割成功后,我们可以获得 userid
, 和password
。
最后我们在作死的边缘反复横跳, return self.authenticate_credentials(userid, password, request)
def authenticate_credentials(self, userid, password, request=None)
def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(request=request, **credentials)
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
分析一下,credentials传进来一个字典
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
把字典打散传回__init__.py
下面的authenticate(request=None, **credentials)
方法。这里就先不看校验credential的方法了,但是通过这里,如果我们传进去的是一个合法的credentials
,我们会得到一个user object
对象
最后, 我们通过两个筛选条件if user is None:
和if not user.is_active:
。我们会得到return (user, None)
权限组件
from rest_framework.permissions import BasePermission
AllowAny
允许所有授权(通常不会这么用,可以直接用空代替,但是这么写可以让扩展性更高)
class AllowAny(BasePermission):
"""
允许所有授权(通常不会这么用,可以直接用空代替,但是这么写可以让扩展性更高)
"""
def has_permission(self, request, view):
return True
IsAuthenticated
只给认证后的用户授权
**class IsAuthenticated(BasePermission):
"""
Allows access only to authenticated users. (只给认证后的用户授权)
"""
def has_permission(self, request, view):
return bool(request.user and request.user.is_authenticated)**
通过调用认证组件里面的判断是否认证来返回布尔值,来进行授权。
判断是否认证的方法我们自己重新写。
def has_permission(self, request, view):