diff --git a/core/scripts/webpanel/routers/__init__.py b/core/scripts/webpanel/routers/__init__.py index 4dc7607..2f5ca0e 100644 --- a/core/scripts/webpanel/routers/__init__.py +++ b/core/scripts/webpanel/routers/__init__.py @@ -1 +1,2 @@ from . import api +from . import user diff --git a/core/scripts/webpanel/routers/user/__init__.py b/core/scripts/webpanel/routers/user/__init__.py new file mode 100644 index 0000000..3dc7669 --- /dev/null +++ b/core/scripts/webpanel/routers/user/__init__.py @@ -0,0 +1,2 @@ + +from .user import router diff --git a/core/scripts/webpanel/routers/user/user.py b/core/scripts/webpanel/routers/user/user.py new file mode 100644 index 0000000..a71134f --- /dev/null +++ b/core/scripts/webpanel/routers/user/user.py @@ -0,0 +1,83 @@ + +from fastapi import APIRouter, Depends, Form, HTTPException, Request +from fastapi.templating import Jinja2Templates +from fastapi.responses import RedirectResponse +from typing import Annotated + +from pydantic import BaseModel + +from ..api.v1.schema.user import AddUserInputBody, EditUserInputBody +from .viewmodel import User +import cli_api + + +router = APIRouter() + +# TODO: Make this singleton or something +templates = Jinja2Templates(directory='templates') + + +@router.get('/') +async def users(request: Request): + try: + dict_users = cli_api.list_users() + if not dict_users: + raise HTTPException(status_code=404, detail='No users found.') + + users: list[User] = [User.from_dict(key, value) for key, value in dict_users.items()] + + return templates.TemplateResponse('users.html', {'users': users, 'request': request}) + except Exception as e: + raise HTTPException(status_code=400, detail=f'Error: {str(e)}') + + +@router.post('/add') +async def add_user(form_data: AddUserInputBody = Form()): + return + try: + cli_api.add_user( + form_data.username, + form_data.traffic_limit, + form_data.expiration_days, + form_data.password, + form_data.creation_date + ) + return RedirectResponse(url='/users', status_code=302) + except Exception as e: + raise HTTPException(status_code=400, detail=f'Error: {str(e)}') + + +# We defined separate parameter as 'username' in here because the 'EditUserInputBody' doesn't have 'username' field +@ router.post('/edit') +async def edit_user(username: str = Form(), form_data: EditUserInputBody = Form()): + try: + cli_api.edit_user( + username, + form_data.new_username, + form_data.new_traffic_limit, + form_data.new_expiration_days, + form_data.renew_password, + form_data.renew_creation_date, + form_data.blocked + ) + return RedirectResponse(url='/users', status_code=302) + except Exception as e: + raise HTTPException(status_code=400, detail=f'Error: {str(e)}') + + +@ router.post('/remove') +async def delete_user(username: str = Form()): + try: + cli_api.remove_user(username) + return RedirectResponse(url='/users', status_code=302) + except Exception as e: + raise HTTPException(status_code=400, detail=f'Error: {str(e)}') + + +@ router.post('/reset') +async def reset_user(username: str = Form()): + try: + cli_api.reset_user(username) + return RedirectResponse(url='/users', status_code=302) + except Exception as e: + raise HTTPException(status_code=400, detail=f'Error: {str(e)}') diff --git a/core/scripts/webpanel/routers/user/viewmodel.py b/core/scripts/webpanel/routers/user/viewmodel.py new file mode 100644 index 0000000..41ed225 --- /dev/null +++ b/core/scripts/webpanel/routers/user/viewmodel.py @@ -0,0 +1,108 @@ +from pydantic import BaseModel +from datetime import datetime, timedelta + +import cli_api + + +class Config(BaseModel): + type: str + link: str + + @staticmethod + def from_username(username: str) -> list['Config']: + raw_uri = Config.__get_user_configs_uri(username) + if not raw_uri: + return [] + + res = [] + for line in raw_uri.splitlines(): + config = Config.__parse_user_configs_uri_line(line) + if config: + res.append(config) + return res + + @staticmethod + def __get_user_configs_uri(username: str) -> str: + # This command is equivalent to `show-user-uri --username $username --ipv 4 --all --singbox --normalsub` + raw_uri = cli_api.show_user_uri(username, False, 4, True, True, True) + + return raw_uri.strip() if raw_uri else '' + + @staticmethod + def __parse_user_configs_uri_line(line: str) -> "Config | None": + config_type = '' + config_link = '' + + line = line.strip() + if line.startswith("hy2://"): + if "@" in line: + ip_version = "IPv6" if line.split("@")[1].count(":") > 1 else "IPv4" + config_type = ip_version + config_link = line + else: + return None + elif line.startswith("https://"): + if "singbox" in line.lower(): + config_type = "Singbox" + elif "normal" in line.lower(): + config_type = "Normal-SUB" + else: + return None + config_link = line + else: + return None + + return Config(type=config_type, link=config_link) + + +class User(BaseModel): + username: str + status: str + quota: str + traffic_used: str + expiry_date: datetime + expiry_days: int + enable: bool + configs: list[Config] + + @staticmethod + def from_dict(username: str, user_data: dict): + user_data = {'username': username, **user_data} + user_data = User.__parse_user_data(user_data) + return User(**user_data) + + @staticmethod + def __parse_user_data(user_data: dict) -> dict: + expiry_date = 'N/A' + creation_date_str = user_data.get("account_creation_date") + expiration_days = user_data.get('expiration_days', 0) + if creation_date_str and expiration_days: + try: + creation_date = datetime.strptime(creation_date_str, "%Y-%m-%d") + expiry_date = creation_date + timedelta(days=expiration_days) + except ValueError: + pass + + traffic_used = User.__format_traffic(user_data.get("download_bytes", 0) + user_data.get("upload_bytes", 0)) + + return { + 'username': user_data['username'], + 'status': user_data.get('status', 'Not Active'), + 'quota': User.__format_traffic(user_data.get('max_download_bytes', 0)), + 'traffic_used': traffic_used, + 'expiry_date': expiry_date, + 'expiry_days': expiration_days, + 'enable': False if user_data.get('blocked', False) else True, + 'configs': Config.from_username(user_data['username']) + } + + @staticmethod + def __format_traffic(traffic_bytes) -> str: + if traffic_bytes < 1024: + return f'{traffic_bytes} B' + elif traffic_bytes < 1024**2: + return f'{traffic_bytes / 1024:.2f} KB' + elif traffic_bytes < 1024**3: + return f'{traffic_bytes / 1024**2:.2f} MB' + else: + return f'{traffic_bytes / 1024**3:.2f} GB'