authenticated是tornado自带的登录验证装饰器,它的实现比较简单,验证比较简易,无法做到真正意义的前后端分离并且是同步的方式,所以这里我对它进行了重写,以适应异步JWT方式的登录验证。 Tornado自带的authenticated源码:
def authenticated(method): @functools.wraps(method) def wrapper(self, *args, **kwargs): if not self.current_user: if self.request.method in ("GET", "HEAD"): url = self.get_login_url() if "?" not in url: if urlparse.urlsplit(url).scheme: # if login url is absolute, make next absolute too next_url = self.request.full_url() else: next_url = self.request.uri url += "?" + urlencode(dict(next=next_url)) self.redirect(url) return raise HTTPError(403) return method(self, *args, **kwargs) return wrapper def get_login_url(self): """Override to customize the login URL based on the request. By default, we use the ``login_url`` application setting. """ self.require_setting("login_url", "@tornado.web.authenticated") return self.application.settings["login_url"] def redirect(self, url, permanent=False, status=None): if self._headers_written: raise Exception("Cannot redirect after headers have been written") if status is None: status = 301 if permanent else 302 else: assert isinstance(status, int) and 300 <= status <= 399 self.set_status(status) self.set_header("Location", utf8(url)) self.finish()
从源码可以看出,authenticated的作用:当current_user不存在时,它会调用get_login_url方法从settings里面去取login_url,从而获取user返回,当user未登录时,它会调用redirect重定向,返回301。
改写步骤: 1.将同步的方法使用协程改写 2.以JWT的方式校验用户token,用户不存在或token已过期直接返回状态码,限制继续访问接口 实现代码:
1 def authenticated(func): 2 """ 3 重写tornado authenticated 4 :param func: 5 :return: 6 """ 7 8 async def wrapper(self, *args, **kwargs): 9 res_data = {}10 token = self.request.headers.get("token")11 if token:12 try:13 send_data = jwt.decode(14 token, self.settings["secret_key"],15 leeway=self.settings["jwt_expire"],16 options={ "verify_exp": True}17 )18 user_id = send_data["id"]19 20 # 从数据库中获取到user并设置给_current_user21 try:22 user = await self.application.objects.get(23 User, id=user_id24 )25 self._current_user = user26 27 result = await func(self, *args, **kwargs)28 return result29 except User.DoesNotExist:30 res_data["content"] = "用户不存在"31 self.set_status(401)32 except Exception as e:33 print(e)34 self.set_status(401)35 res_data["content"] = "token不合法或已过期"36 else:37 self.set_status(401)38 res_data["content"] = "缺少token"39 40 self.write(res_data)41 42 return wrapper
请求接口的时候在headers里面带上token即可登录验证,测试如下: