From 5e43ed9aa6a751493ce59c48b4c118dfd56f5d2e Mon Sep 17 00:00:00 2001 From: Whispering Wind <151555003+ReturnFI@users.noreply.github.com> Date: Sun, 7 Sep 2025 00:01:13 +0330 Subject: [PATCH] Fixed the validation in the restore API endpoint --- .../routers/api/v1/config/hysteria.py | 48 ++++++++----------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/core/scripts/webpanel/routers/api/v1/config/hysteria.py b/core/scripts/webpanel/routers/api/v1/config/hysteria.py index 7c676df..f428d88 100644 --- a/core/scripts/webpanel/routers/api/v1/config/hysteria.py +++ b/core/scripts/webpanel/routers/api/v1/config/hysteria.py @@ -7,6 +7,7 @@ import zipfile import tempfile # from ..schema.config.hysteria import InstallInputBody import os +from pathlib import Path import cli_api router = APIRouter() @@ -113,12 +114,9 @@ async def set_sni_api(sni: str): @router.get('/backup', response_class=FileResponse, summary='Backup Hysteria2 configuration') async def backup_api(): - """ - Backups the Hysteria2 configuration and sends the backup ZIP file. - """ try: cli_api.backup_hysteria2() - backup_dir = "/opt/hysbackup/" # TODO: get this path from .env + backup_dir = "/opt/hysbackup/" if not os.path.isdir(backup_dir): raise HTTPException(status_code=500, detail="Backup directory does not exist.") @@ -129,13 +127,8 @@ async def backup_api(): if latest_backup_file: backup_file_path = os.path.join(backup_dir, latest_backup_file) - - if not backup_file_path.startswith(backup_dir): - raise HTTPException(status_code=400, detail="Invalid backup file path.") - if not os.path.exists(backup_file_path): raise HTTPException(status_code=404, detail="Backup file not found.") - return FileResponse(path=backup_file_path, filename=os.path.basename(backup_file_path), media_type="application/zip") else: raise HTTPException(status_code=500, detail="No backup file found after backup process.") @@ -146,46 +139,43 @@ async def backup_api(): @router.post('/restore', response_model=DetailResponse, summary='Restore Hysteria2 Configuration') async def restore_api(file: UploadFile = File(...)): + temp_path = None try: - - with tempfile.NamedTemporaryFile(delete=False) as temp_file: + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_file: shutil.copyfileobj(file.file, temp_file) temp_path = temp_file.name if not zipfile.is_zipfile(temp_path): - os.unlink(temp_path) raise HTTPException(status_code=400, detail="Invalid file type. Must be a ZIP file.") - required_files = {"ca.key", "ca.crt", "users.json", "config.json", ".configs.env"} with zipfile.ZipFile(temp_path, 'r') as zip_ref: - zip_contents = set(zip_ref.namelist()) - missing_files = required_files - zip_contents + namelist = zip_ref.namelist() + required_flat_files = {"ca.key", "ca.crt", "config.json", ".configs.env"} + missing_files = required_flat_files - set(namelist) if missing_files: - os.unlink(temp_path) raise HTTPException( status_code=400, - detail=f"Backup file is missing required files: {', '.join(missing_files)}" + detail=f"Backup is missing required configuration files: {', '.join(missing_files)}" ) - dst_dir_path = '/opt/hysbackup/' # TODO: get this path from .env - os.makedirs(dst_dir_path, exist_ok=True) - - dst_path = os.path.join(dst_dir_path, file.filename) # type: ignore - shutil.move(temp_path, dst_path) - cli_api.restore_hysteria2(dst_path) + db_dump_prefix = "hysteria_panel/" + if not any(name.startswith(db_dump_prefix) for name in namelist): + raise HTTPException( + status_code=400, + detail=f"Backup file is not a modern backup and is missing the database dump directory: '{db_dump_prefix}'" + ) + + cli_api.restore_hysteria2(temp_path) return DetailResponse(detail='Hysteria2 restored successfully.') except HTTPException as e: raise e except Exception as e: - if 'temp_path' in locals(): - try: - os.unlink(temp_path) - except: - pass raise HTTPException(status_code=500, detail=f'Error: {str(e)}') - + finally: + if temp_path and os.path.exists(temp_path): + os.unlink(temp_path) @router.get('/enable-obfs', response_model=DetailResponse, summary='Enable Hysteria2 obfs') async def enable_obfs():