Redirect after successful login
This commit is contained in:
@ -1,9 +1,10 @@
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from fastapi import Request, Response, HTTPException
|
||||
from fastapi.responses import RedirectResponse
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.types import ASGIApp
|
||||
from typing import Awaitable, Callable
|
||||
from datetime import datetime, timezone
|
||||
from typing import Awaitable, Callable
|
||||
from starlette.types import ASGIApp
|
||||
from urllib.parse import quote
|
||||
|
||||
from session import SessionManager
|
||||
from config import CONFIGS
|
||||
@ -31,18 +32,20 @@ class AuthMiddleware(BaseHTTPMiddleware):
|
||||
if is_api_request:
|
||||
if self.__api_token:
|
||||
# Attempt to authenticate with API token
|
||||
if auth_header := request.headers.get('Authorization'):
|
||||
scheme, _, token = auth_header.partition(' ')
|
||||
if scheme.lower() == 'bearer' and token == self.__api_token:
|
||||
if api_key := request.headers.get('Authorization'):
|
||||
if api_key == self.__api_token:
|
||||
return await call_next(request)
|
||||
else:
|
||||
raise HTTPException(status_code=401, detail="Invalid API token.")
|
||||
|
||||
# Extract session_id from cookies
|
||||
session_id = request.cookies.get("session_id")
|
||||
|
||||
if not session_id:
|
||||
if is_api_request:
|
||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||
return RedirectResponse(url=request.url_for('login'), status_code=302)
|
||||
raise HTTPException(status_code=401, detail="Unauthorized.")
|
||||
|
||||
return self.__redirect_to_login(request)
|
||||
|
||||
session_data = self.__session_manager.get_session(session_id)
|
||||
|
||||
@ -50,12 +53,18 @@ class AuthMiddleware(BaseHTTPMiddleware):
|
||||
if is_api_request:
|
||||
raise HTTPException(status_code=401, detail="The session is invalid.")
|
||||
|
||||
return RedirectResponse(url=request.url_for('login'), status_code=302)
|
||||
return self.__redirect_to_login(request)
|
||||
|
||||
if session_data.expires_at < datetime.now(timezone.utc):
|
||||
if is_api_request:
|
||||
raise HTTPException(status_code=401, detail="The session has expired.")
|
||||
|
||||
return RedirectResponse(url=request.url_for('login'), status_code=302)
|
||||
return self.__redirect_to_login(request)
|
||||
|
||||
return await call_next(request)
|
||||
|
||||
def __redirect_to_login(self, request: Request):
|
||||
next_url = quote(str(request.url))
|
||||
redirect_url = str(request.url_for('login')) + f'?next_url={next_url}'
|
||||
|
||||
return RedirectResponse(url=redirect_url, status_code=302)
|
||||
|
||||
Reference in New Issue
Block a user