From 11e1a7c33a1254e1a7384b16757354b568ef785d Mon Sep 17 00:00:00 2001 From: Iam54r1n4 Date: Mon, 3 Feb 2025 19:02:31 +0000 Subject: [PATCH] Implement authentication middleware --- core/scripts/webpanel/authentication/auth.py | 59 ++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 core/scripts/webpanel/authentication/auth.py diff --git a/core/scripts/webpanel/authentication/auth.py b/core/scripts/webpanel/authentication/auth.py new file mode 100644 index 0000000..b906e33 --- /dev/null +++ b/core/scripts/webpanel/authentication/auth.py @@ -0,0 +1,59 @@ +from fastapi import Request, Response, HTTPException +from fastapi.responses import RedirectResponse +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.types import ASGIApp +from typing import Awaitable, Callable +from datetime import datetime, timezone + +from .session import SessionManager + + +class AuthMiddleware(BaseHTTPMiddleware): + '''Middleware that handles session authentication.''' + + def __init__(self, app: ASGIApp, session_manager: SessionManager, api_token: str | None): + super().__init__(app) + self.__session_manager = session_manager + self.__api_token = api_token + + async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]): + '''Handles session authentication.''' + public_routes = [ + '/login', + ] + if request.url.path in public_routes: + return await call_next(request) + + is_api_request = '/api/v1/' in request.url.path + + if is_api_request: + if self.__api_token: + # Attempt to authenticate with API token + if auth_header := request.headers.get('Authorization'): + scheme, _, token = auth_header.partition(' ') + if scheme.lower() == 'bearer' and token == self.__api_token: + return await call_next(request) + + # Extract session_id from cookies + session_id = request.cookies.get("session_id") + + if not session_id: + if is_api_request: + raise HTTPException(status_code=401, detail="Unauthorized") + return RedirectResponse(url='/login', status_code=302) + + session_data = self.__session_manager.get_session(session_id) + + if not session_data: + if is_api_request: + raise HTTPException(status_code=401, detail="The session is invalid.") + + return RedirectResponse(url='/login', status_code=302) + + if session_data.expires_at < datetime.now(timezone.utc): + if is_api_request: + raise HTTPException(status_code=401, detail="The session has expired.") + + return RedirectResponse(url='/login', status_code=302) + + return await call_next(request)