From 3812e6c3ad650f840d050f8378a79c94430dbea1 Mon Sep 17 00:00:00 2001 From: bivashy Date: Fri, 23 Jan 2026 04:32:48 +0500 Subject: [PATCH] Basic Rest API implementation --- .dockerignore | 38 ++ Dockerfile | 55 +++ README-DOCKER.md | 271 ++++++++++++++ compose.yml | 52 +++ core/api.py | 686 +++++++++++++++++++++++++++++++++++ core/cli_api.py | 914 +++++++++++++++++++++++++++++------------------ requirements.txt | 18 + 7 files changed, 1683 insertions(+), 351 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 README-DOCKER.md create mode 100644 compose.yml create mode 100644 core/api.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..63c3979 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,38 @@ +# Git +.git +.gitignore +.gitattributes + +# Docker +Dockerfile +docker-compose.yml +.dockerignore + +# Python +__pycache__ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +ENV/ +*.egg-info/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Logs +*.log + +# Temporary files +*.tmp +*.temp diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d6fc370 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,55 @@ +FROM ubuntu:22.04 + +# Prevent interactive prompts +ENV DEBIAN_FRONTEND=noninteractive +ENV PYTHONUNBUFFERED=1 + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + python3 \ + python3-pip \ + python3-venv \ + curl \ + wget \ + jq \ + pwgen \ + bc \ + zip \ + unzip \ + lsof \ + gnupg \ + lsb-release \ + systemctl \ + iptables \ + iproute2 \ + && rm -rf /var/lib/apt/lists/* + +# Create working directory +WORKDIR /etc/hysteria + +# Copy requirements first (for better caching) +COPY requirements.txt . + +# Create and activate virtual environment, install dependencies +RUN python3 -m venv /etc/hysteria/hysteria2_venv && \ + /etc/hysteria/hysteria2_venv/bin/pip install --no-cache-dir --upgrade pip && \ + /etc/hysteria/hysteria2_venv/bin/pip install --no-cache-dir -r requirements.txt && \ + /etc/hysteria/hysteria2_venv/bin/pip install --no-cache-dir fastapi uvicorn python-multipart + +# Copy application files +COPY . /etc/hysteria/ + +# Make scripts executable +RUN find /etc/hysteria/core/scripts -type f -name "*.sh" -exec chmod +x {} \; && \ + find /etc/hysteria/core/scripts -type f -name "*.py" -exec chmod +x {} \; && \ + chmod +x /etc/hysteria/menu.sh + +# Expose ports +EXPOSE 8000 443 80 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +# Start FastAPI server +CMD ["/etc/hysteria/hysteria2_venv/bin/uvicorn", "core.api:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] diff --git a/README-DOCKER.md b/README-DOCKER.md new file mode 100644 index 0000000..e21ffba --- /dev/null +++ b/README-DOCKER.md @@ -0,0 +1,271 @@ +# Blitz Panel - Docker Setup + +## 📋 Prerequisites + +- Docker Engine 20.10+ +- Docker Compose 2.0+ +- At least 2GB RAM +- Ports available: 8000 (API), 443 (Hysteria2), 80 (HTTP) + +## 🚀 Quick Start + +### 1. Prepare Your Files + +Make sure your directory structure looks like this: + +``` +Blitz/ +├── core/ +│ └── scripts/ +│ ├── auth/ +│ ├── db/ +│ ├── hysteria2/ +│ └── ... (other script folders) +├── api.py +├── cli_api.py +├── requirements.txt +├── Dockerfile +├── docker-compose.yml +└── .dockerignore +``` + +### 2. Build and Run + +```bash +# Build and start containers +docker-compose up -d --build + +# Check logs +docker-compose logs -f blitz-api + +# Check if services are running +docker-compose ps +``` + +### 3. Test the API + +```bash +# Health check +curl http://localhost:8000/health + +# Get API docs (interactive) +open http://localhost:8000/docs +``` + +## 📚 API Endpoints + +### User Management + +```bash +# List all users +curl http://localhost:8000/api/v1/users + +# Get specific user +curl http://localhost:8000/api/v1/users/john + +# Create user +curl -X POST http://localhost:8000/api/v1/users \ + -H "Content-Type: application/json" \ + -d '{ + "username": "john", + "traffic_limit": 100, + "expiration_days": 30, + "unlimited": false + }' + +# Edit user +curl -X PUT http://localhost:8000/api/v1/users/john \ + -H "Content-Type: application/json" \ + -d '{ + "new_traffic_limit": 200, + "renew_creation_date": true + }' + +# Delete user +curl -X DELETE http://localhost:8000/api/v1/users/john + +# Get user URI +curl http://localhost:8000/api/v1/users/john/uri?qrcode=false +``` + +### Hysteria2 Management + +```bash +# Install Hysteria2 +curl -X POST http://localhost:8000/api/v1/hysteria2/install \ + -H "Content-Type: application/json" \ + -d '{ + "port": 443, + "sni": "example.com" + }' + +# Restart Hysteria2 +curl -X POST http://localhost:8000/api/v1/hysteria2/restart + +# Get current port +curl http://localhost:8000/api/v1/hysteria2/config/port + +# Change port +curl -X PUT http://localhost:8000/api/v1/hysteria2/config/port \ + -H "Content-Type: application/json" \ + -d '{"port": 8443}' + +# Enable obfuscation +curl -X POST http://localhost:8000/api/v1/hysteria2/obfs/enable +``` + +### Traffic & Stats + +```bash +# Get traffic status +curl http://localhost:8000/api/v1/traffic/status + +# Get server info +curl http://localhost:8000/api/v1/info/server + +# Get services status +curl http://localhost:8000/api/v1/info/services +``` + +## 🛠️ Development Mode + +To run in development with auto-reload: + +```bash +# Run API locally (not in Docker) +source /etc/hysteria/hysteria2_venv/bin/activate +cd /path/to/Blitz +uvicorn api:app --reload --host 0.0.0.0 --port 8000 +``` + +## 🐛 Troubleshooting + +### Container won't start + +```bash +# Check logs +docker-compose logs blitz-api + +# Check MongoDB +docker-compose logs mongodb + +# Restart services +docker-compose restart +``` + +### Permission issues + +```bash +# Fix permissions for scripts +docker-compose exec blitz-api bash +find /etc/hysteria/core/scripts -type f -name "*.sh" -exec chmod +x {} \; +find /etc/hysteria/core/scripts -type f -name "*.py" -exec chmod +x {} \; +``` + +### Network issues + +```bash +# Check if ports are available +sudo netstat -tlnp | grep -E ':(8000|443|80)' + +# If ports are in use, change them in docker-compose.yml +``` + +### Reset everything + +```bash +# Stop and remove everything +docker-compose down -v + +# Remove volumes (WARNING: deletes all data) +docker volume rm blitz_mongodb_data blitz_config blitz_certs + +# Rebuild +docker-compose up -d --build +``` + +## 📊 Monitoring + +```bash +# Watch logs in real-time +docker-compose logs -f + +# Check resource usage +docker stats + +# Execute commands in container +docker-compose exec blitz-api bash +``` + +## 🔒 Production Deployment + +For production, modify `docker-compose.yml`: + +1. **Add authentication** to the API (JWT, API keys) +2. **Use HTTPS** with proper SSL certificates +3. **Set up reverse proxy** (nginx/traefik) +4. **Configure firewall** rules +5. **Enable MongoDB authentication** +6. **Set resource limits** + +Example production additions: + +```yaml +# In docker-compose.yml +services: + blitz-api: + deploy: + resources: + limits: + cpus: '2' + memory: 2G + reservations: + cpus: '1' + memory: 1G + environment: + - API_KEY=your-secret-key +``` + +## 📝 Notes + +- Default API runs on port 8000 +- MongoDB stores data in named volume `mongodb_data` +- Configs persist in `blitz_config` volume +- All scripts from `core/` are copied into container +- Health check runs every 30 seconds + +## 🤝 Common Tasks + +### Backup + +```bash +# Backup MongoDB data +docker-compose exec mongodb mongodump --out /data/backup + +# Copy backup out +docker cp blitz-mongodb:/data/backup ./backup +``` + +### Update + +```bash +# Pull latest code +git pull + +# Rebuild +docker-compose up -d --build +``` + +### Scale + +To run multiple instances (load balancing): + +```bash +docker-compose up -d --scale blitz-api=3 +``` + +## 🆘 Support + +- Check logs: `docker-compose logs -f` +- Interactive docs: http://localhost:8000/docs +- ReDoc: http://localhost:8000/redoc diff --git a/compose.yml b/compose.yml new file mode 100644 index 0000000..2e7b53b --- /dev/null +++ b/compose.yml @@ -0,0 +1,52 @@ +services: + mongodb: + image: mongo:8.0 + container_name: blitz-mongodb + restart: unless-stopped + environment: + MONGO_INITDB_DATABASE: hysteria + volumes: + - mongodb_data:/data/db + networks: + - blitz-network + healthcheck: + test: echo 'db.runCommand("ping").ok' | mongosh localhost:27017/test --quiet + interval: 10s + timeout: 5s + retries: 5 + + blitz-api: + build: + context: . + dockerfile: Dockerfile + container_name: blitz-api + restart: unless-stopped + ports: + - "8000:8000" + - "443:443" + - "80:80" + environment: + - MONGODB_URI=mongodb://mongodb:27017/hysteria + - PYTHONUNBUFFERED=1 + volumes: + - ./core:/etc/hysteria/core + - blitz_config:/etc/hysteria + - blitz_certs:/root/.acme.sh + networks: + - blitz-network + depends_on: + mongodb: + condition: service_healthy + privileged: true + cap_add: + - NET_ADMIN + - SYS_MODULE + +networks: + blitz-network: + driver: bridge + +volumes: + mongodb_data: + blitz_config: + blitz_certs: diff --git a/core/api.py b/core/api.py new file mode 100644 index 0000000..08f7446 --- /dev/null +++ b/core/api.py @@ -0,0 +1,686 @@ +import sys + +sys.path.insert(0, "/etc/hysteria/core/scripts") + +from fastapi import FastAPI, HTTPException, status, Query +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Any +from datetime import datetime +from . import cli_api +import asyncio + +app = FastAPI( + title="Blitz Panel API", + description="REST API for Hysteria2 Panel Management", + version="1.0.0", +) + +# ============================================================================ +# Pydantic Models +# ============================================================================ + + +class UserCreate(BaseModel): + username: str = Field(..., min_length=1, max_length=50) + traffic_limit: int = Field(..., ge=0, description="Traffic limit in GB") + expiration_days: int = Field(..., ge=0) + password: Optional[str] = None + creation_date: Optional[str] = None + unlimited: bool = False + note: Optional[str] = None + + +class BulkUserCreate(BaseModel): + traffic_gb: float = Field(..., ge=0) + expiration_days: int = Field(..., ge=0) + count: int = Field(..., ge=1, le=1000) + prefix: str = Field(..., min_length=1) + start_number: int = Field(1, ge=1) + unlimited: bool = False + + +class UserEdit(BaseModel): + new_username: Optional[str] = None + new_password: Optional[str] = None + new_traffic_limit: Optional[int] = Field(None, ge=0) + new_expiration_days: Optional[int] = Field(None, ge=0) + renew_password: bool = False + renew_creation_date: bool = False + blocked: Optional[bool] = None + unlimited_ip: Optional[bool] = None + note: Optional[str] = None + + +class PortChange(BaseModel): + port: int = Field(..., ge=1, le=65535) + + +class SNIChange(BaseModel): + sni: str = Field(..., min_length=1) + + +class IPConfig(BaseModel): + ipv4: Optional[str] = None + ipv6: Optional[str] = None + + +class Hysteria2Install(BaseModel): + port: int = Field(..., ge=1, le=65535) + sni: str = Field(..., min_length=1) + + +class NodeCreate(BaseModel): + name: str + ip: str + sni: Optional[str] = None + pinSHA256: Optional[str] = None + port: Optional[int] = Field(None, ge=1, le=65535) + obfs: Optional[str] = None + insecure: Optional[bool] = False + + +class TelegramBotConfig(BaseModel): + token: str + admin_ids: str + backup_interval: Optional[int] = Field(None, ge=1) + + +class WebPanelConfig(BaseModel): + domain: str + port: int = Field(..., ge=1, le=65535) + admin_username: str + admin_password: str + expiration_minutes: int = Field(30, ge=1) + debug: bool = False + decoy_path: str = "" + + +class NormalSubConfig(BaseModel): + domain: str + port: int = Field(..., ge=1, le=65535) + + +class WARPConfig(BaseModel): + all_state: Optional[str] = Field(None, pattern="^(on|off)$") + popular_sites_state: Optional[str] = Field(None, pattern="^(on|off)$") + domestic_sites_state: Optional[str] = Field(None, pattern="^(on|off)$") + block_adult_sites_state: Optional[str] = Field(None, pattern="^(on|off)$") + + +class IPLimiterConfig(BaseModel): + block_duration: Optional[int] = Field(None, ge=1) + max_ips: Optional[int] = Field(None, ge=1) + + +class ExtraProxyConfig(BaseModel): + name: str + uri: str + + +# ============================================================================ +# Error Handler +# ============================================================================ + + +def handle_api_error(func): + """Decorator to handle CLI API errors""" + if asyncio.iscoroutinefunction(func): + + async def async_wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except cli_api.InvalidInputError as e: + raise HTTPException(status_code=400, detail=str(e)) + except cli_api.ScriptNotFoundError as e: + raise HTTPException(status_code=500, detail=f"Script error: {str(e)}") + except cli_api.CommandExecutionError as e: + raise HTTPException( + status_code=500, detail=f"Execution error: {str(e)}" + ) + except cli_api.HysteriaError as e: + raise HTTPException(status_code=500, detail=str(e)) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Unexpected error: {str(e)}" + ) + + return async_wrapper + else: + + def sync_wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except cli_api.InvalidInputError as e: + raise HTTPException(status_code=400, detail=str(e)) + except cli_api.ScriptNotFoundError as e: + raise HTTPException(status_code=500, detail=f"Script error: {str(e)}") + except cli_api.CommandExecutionError as e: + raise HTTPException( + status_code=500, detail=f"Execution error: {str(e)}" + ) + except cli_api.HysteriaError as e: + raise HTTPException(status_code=500, detail=str(e)) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Unexpected error: {str(e)}" + ) + + return sync_wrapper + + +# ============================================================================ +# Health & Info +# ============================================================================ + + +@app.get("/health") +async def health_check(): + """Health check endpoint""" + return {"status": "healthy", "timestamp": datetime.now().isoformat()} + + +@app.get("/api/v1/info/server") +async def get_server_info(): + """Get server information""" + info = cli_api.server_info() + return {"data": info} + + +@app.get("/api/v1/info/version") +async def get_version(): + """Get panel version""" + version = cli_api.show_version() + return {"version": version} + + +@app.get("/api/v1/info/services") +async def get_services_status(): + """Get status of all services""" + status_data = cli_api.get_services_status() + return {"services": status_data} + + +# ============================================================================ +# User Management +# ============================================================================ + + +@app.get("/api/v1/users") +async def list_users(): + """List all users""" + users = cli_api.list_users() + return {"users": users or {}} + + +@app.get("/api/v1/users/{username}") +async def get_user(username: str): + """Get user details""" + user = cli_api.get_user(username) + if not user: + raise HTTPException(status_code=404, detail=f"User '{username}' not found") + return {"user": user} + + +@app.post("/api/v1/users", status_code=status.HTTP_201_CREATED) +async def create_user(user: UserCreate): + """Create a new user""" + cli_api.add_user( + username=user.username, + traffic_limit=user.traffic_limit, + expiration_days=user.expiration_days, + password=user.password, + creation_date=user.creation_date, + unlimited=user.unlimited, + note=user.note, + ) + return {"message": f"User '{user.username}' created successfully"} + + +@app.post("/api/v1/users/bulk", status_code=status.HTTP_201_CREATED) +async def create_bulk_users(bulk: BulkUserCreate): + """Create multiple users at once""" + cli_api.bulk_user_add( + traffic_gb=bulk.traffic_gb, + expiration_days=bulk.expiration_days, + count=bulk.count, + prefix=bulk.prefix, + start_number=bulk.start_number, + unlimited=bulk.unlimited, + ) + return {"message": f"Created {bulk.count} users with prefix '{bulk.prefix}'"} + + +@app.put("/api/v1/users/{username}") +async def edit_user(username: str, user: UserEdit): + """Edit user details""" + cli_api.edit_user( + username=username, + new_username=user.new_username, + new_password=user.new_password, + new_traffic_limit=user.new_traffic_limit, + new_expiration_days=user.new_expiration_days, + renew_password=user.renew_password, + renew_creation_date=user.renew_creation_date, + blocked=user.blocked, + unlimited_ip=user.unlimited_ip, + note=user.note, + ) + return {"message": f"User '{username}' updated successfully"} + + +@app.post("/api/v1/users/{username}/reset") +async def reset_user(username: str): + """Reset user traffic and dates""" + cli_api.reset_user(username) + return {"message": f"User '{username}' reset successfully"} + + +@app.delete("/api/v1/users/{username}") +async def delete_user(username: str): + """Delete a user""" + cli_api.remove_users([username]) + return {"message": f"User '{username}' deleted successfully"} + + +@app.delete("/api/v1/users") +async def delete_multiple_users(usernames: List[str] = Query(...)): + """Delete multiple users""" + cli_api.remove_users(usernames) + return {"message": f"Deleted {len(usernames)} users"} + + +@app.post("/api/v1/users/{username}/kick") +async def kick_user(username: str): + """Kick user (disconnect)""" + cli_api.kick_users_by_name([username]) + return {"message": f"User '{username}' kicked successfully"} + + +@app.get("/api/v1/users/{username}/uri") +async def get_user_uri( + username: str, + qrcode: bool = False, + ipv: int = 4, + all: bool = False, + singbox: bool = False, + normalsub: bool = False, +): + """Get user connection URI""" + uri = cli_api.show_user_uri(username, qrcode, ipv, all, singbox, normalsub) + return {"uri": uri} + + +@app.get("/api/v1/users/uri/batch") +async def get_batch_user_uri(usernames: List[str] = Query(...)): + """Get URIs for multiple users""" + uris = cli_api.show_user_uri_json(usernames) + return {"uris": uris} + + +# ============================================================================ +# Hysteria2 Core Management +# ============================================================================ + + +@app.post("/api/v1/hysteria2/install") +async def install_hysteria2(config: Hysteria2Install): + """Install Hysteria2""" + cli_api.install_hysteria2(port=config.port, sni=config.sni) + return {"message": "Hysteria2 installed successfully"} + + +@app.post("/api/v1/hysteria2/uninstall") +async def uninstall_hysteria2(): + """Uninstall Hysteria2""" + cli_api.uninstall_hysteria2() + return {"message": "Hysteria2 uninstalled successfully"} + + +@app.post("/api/v1/hysteria2/update") +async def update_hysteria2(): + """Update Hysteria2 core""" + cli_api.update_hysteria2() + return {"message": "Hysteria2 updated successfully"} + + +@app.post("/api/v1/hysteria2/restart") +async def restart_hysteria2(): + """Restart Hysteria2 service""" + cli_api.restart_hysteria2() + return {"message": "Hysteria2 restarted successfully"} + + +@app.get("/api/v1/hysteria2/config/port") +async def get_port(): + """Get current Hysteria2 port""" + port = cli_api.get_hysteria2_port() + return {"port": port} + + +@app.put("/api/v1/hysteria2/config/port") +async def change_port(config: PortChange): + """Change Hysteria2 port""" + cli_api.change_hysteria2_port(config.port) + return {"message": f"Port changed to {config.port}"} + + +@app.get("/api/v1/hysteria2/config/sni") +async def get_sni(): + """Get current SNI""" + sni = cli_api.get_hysteria2_sni() + return {"sni": sni} + + +@app.put("/api/v1/hysteria2/config/sni") +async def change_sni(config: SNIChange): + """Change SNI""" + cli_api.change_hysteria2_sni(config.sni) + return {"message": f"SNI changed to {config.sni}"} + + +@app.post("/api/v1/hysteria2/obfs/enable") +async def enable_obfs(): + """Enable obfuscation""" + cli_api.enable_hysteria2_obfs() + return {"message": "Obfuscation enabled"} + + +@app.post("/api/v1/hysteria2/obfs/disable") +async def disable_obfs(): + """Disable obfuscation""" + cli_api.disable_hysteria2_obfs() + return {"message": "Obfuscation disabled"} + + +@app.get("/api/v1/hysteria2/obfs/status") +async def check_obfs(): + """Check obfuscation status""" + status_msg = cli_api.check_hysteria2_obfs() + return {"status": status_msg} + + +@app.post("/api/v1/hysteria2/masquerade/enable") +async def enable_masquerade(): + """Enable masquerade""" + result = cli_api.enable_hysteria2_masquerade() + return {"message": result} + + +@app.post("/api/v1/hysteria2/masquerade/disable") +async def disable_masquerade(): + """Disable masquerade""" + result = cli_api.disable_hysteria2_masquerade() + return {"message": result} + + +@app.get("/api/v1/hysteria2/masquerade/status") +async def get_masquerade_status(): + """Get masquerade status""" + status_msg = cli_api.get_hysteria2_masquerade_status() + return {"status": status_msg} + + +# ============================================================================ +# Traffic & Statistics +# ============================================================================ + + +@app.get("/api/v1/traffic/status") +async def get_traffic_status(): + """Get traffic status for all users""" + data = cli_api.traffic_status(no_gui=True, display_output=False) + return {"traffic": data} + + +# ============================================================================ +# IP Configuration +# ============================================================================ + + +@app.get("/api/v1/config/ip") +async def get_ip_addresses(): + """Get configured IP addresses""" + ipv4, ipv6 = cli_api.get_ip_address() + return {"ipv4": ipv4, "ipv6": ipv6} + + +@app.post("/api/v1/config/ip") +async def add_ip_addresses(): + """Auto-detect and add IP addresses""" + cli_api.add_ip_address() + return {"message": "IP addresses added"} + + +@app.put("/api/v1/config/ip") +async def edit_ip_addresses(config: IPConfig): + """Edit IP addresses""" + cli_api.edit_ip_address(ipv4=config.ipv4, ipv6=config.ipv6) + return {"message": "IP addresses updated"} + + +# ============================================================================ +# Advanced Features +# ============================================================================ + + +@app.post("/api/v1/advanced/tcp-brutal/install") +async def install_tcp_brutal(): + """Install TCP Brutal""" + cli_api.install_tcp_brutal() + return {"message": "TCP Brutal installed"} + + +@app.post("/api/v1/advanced/warp/install") +async def install_warp(): + """Install WARP""" + cli_api.install_warp() + return {"message": "WARP installed"} + + +@app.post("/api/v1/advanced/warp/uninstall") +async def uninstall_warp(): + """Uninstall WARP""" + cli_api.uninstall_warp() + return {"message": "WARP uninstalled"} + + +@app.put("/api/v1/advanced/warp/configure") +async def configure_warp(config: WARPConfig): + """Configure WARP settings""" + cli_api.configure_warp( + all_state=config.all_state, + popular_sites_state=config.popular_sites_state, + domestic_sites_state=config.domestic_sites_state, + block_adult_sites_state=config.block_adult_sites_state, + ) + return {"message": "WARP configured"} + + +@app.get("/api/v1/advanced/warp/status") +async def get_warp_status(): + """Get WARP status""" + status_data = cli_api.warp_status() + return {"status": status_data} + + +# ============================================================================ +# Telegram Bot +# ============================================================================ + + +@app.post("/api/v1/services/telegram/start") +async def start_telegram(config: TelegramBotConfig): + """Start Telegram bot""" + cli_api.start_telegram_bot( + token=config.token, + adminid=config.admin_ids, + backup_interval=config.backup_interval, + ) + return {"message": "Telegram bot started"} + + +@app.post("/api/v1/services/telegram/stop") +async def stop_telegram(): + """Stop Telegram bot""" + cli_api.stop_telegram_bot() + return {"message": "Telegram bot stopped"} + + +# ============================================================================ +# WebPanel +# ============================================================================ + + +@app.post("/api/v1/services/webpanel/start") +async def start_webpanel(config: WebPanelConfig): + """Start WebPanel""" + cli_api.start_webpanel( + domain=config.domain, + port=config.port, + admin_username=config.admin_username, + admin_password=config.admin_password, + expiration_minutes=config.expiration_minutes, + debug=config.debug, + decoy_path=config.decoy_path, + ) + return {"message": "WebPanel started"} + + +@app.post("/api/v1/services/webpanel/stop") +async def stop_webpanel(): + """Stop WebPanel""" + cli_api.stop_webpanel() + return {"message": "WebPanel stopped"} + + +@app.get("/api/v1/services/webpanel/url") +async def get_webpanel_url(): + """Get WebPanel URL""" + url = cli_api.get_webpanel_url() + return {"url": url} + + +# ============================================================================ +# Normal-Sub +# ============================================================================ + + +@app.post("/api/v1/services/normalsub/start") +async def start_normalsub(config: NormalSubConfig): + """Start Normal-Sub service""" + cli_api.start_normalsub(domain=config.domain, port=config.port) + return {"message": "Normal-Sub started"} + + +@app.post("/api/v1/services/normalsub/stop") +async def stop_normalsub(): + """Stop Normal-Sub service""" + cli_api.stop_normalsub() + return {"message": "Normal-Sub stopped"} + + +# ============================================================================ +# IP Limiter +# ============================================================================ + + +@app.post("/api/v1/services/ip-limiter/start") +async def start_ip_limiter(): + """Start IP limiter service""" + cli_api.start_ip_limiter() + return {"message": "IP limiter started"} + + +@app.post("/api/v1/services/ip-limiter/stop") +async def stop_ip_limiter(): + """Stop IP limiter service""" + cli_api.stop_ip_limiter() + return {"message": "IP limiter stopped"} + + +@app.put("/api/v1/services/ip-limiter/config") +async def config_ip_limiter(config: IPLimiterConfig): + """Configure IP limiter""" + cli_api.config_ip_limiter( + block_duration=config.block_duration, max_ips=config.max_ips + ) + return {"message": "IP limiter configured"} + + +@app.get("/api/v1/services/ip-limiter/config") +async def get_ip_limiter_config(): + """Get IP limiter configuration""" + config = cli_api.get_ip_limiter_config() + return config + + +# ============================================================================ +# Nodes Management +# ============================================================================ + + +@app.post("/api/v1/nodes") +async def add_node(node: NodeCreate): + """Add external node""" + result = cli_api.add_node( + name=node.name, + ip=node.ip, + sni=node.sni, + pinSHA256=node.pinSHA256, + port=node.port, + obfs=node.obfs, + insecure=node.insecure, + ) + return {"message": result} + + +@app.delete("/api/v1/nodes/{name}") +async def delete_node(name: str): + """Delete node""" + result = cli_api.delete_node(name) + return {"message": result} + + +@app.get("/api/v1/nodes") +async def list_nodes(): + """List all nodes""" + result = cli_api.list_nodes() + return {"nodes": result} + + +# ============================================================================ +# Geo Updates +# ============================================================================ + + +@app.post("/api/v1/geo/update/{country}") +async def update_geo(country: str): + """Update geo files for country (iran, china, russia)""" + cli_api.update_geo(country) + return {"message": f"Geo files updated for {country}"} + + +# ============================================================================ +# Backup & Restore +# ============================================================================ + + +@app.post("/api/v1/backup") +async def create_backup(): + """Create backup""" + cli_api.backup_hysteria2() + return {"message": "Backup created successfully"} + + +@app.post("/api/v1/restore") +async def restore_backup(backup_file_path: str): + """Restore from backup""" + cli_api.restore_hysteria2(backup_file_path) + return {"message": "Restored successfully"} + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/core/cli_api.py b/core/cli_api.py index 8f69b13..c76dac1 100644 --- a/core/cli_api.py +++ b/core/cli_api.py @@ -9,122 +9,142 @@ import re import secrets import string -import traffic +from . import traffic DEBUG = False -SCRIPT_DIR = '/etc/hysteria/core/scripts' -CONFIG_FILE = '/etc/hysteria/config.json' -CONFIG_ENV_FILE = '/etc/hysteria/.configs.env' -WEBPANEL_ENV_FILE = '/etc/hysteria/core/scripts/webpanel/.env' -NORMALSUB_ENV_FILE = '/etc/hysteria/core/scripts/normalsub/.env' -TELEGRAM_ENV_FILE = '/etc/hysteria/core/scripts/telegrambot/.env' +SCRIPT_DIR = "/etc/hysteria/core/scripts" +CONFIG_FILE = "/etc/hysteria/config.json" +CONFIG_ENV_FILE = "/etc/hysteria/.configs.env" +WEBPANEL_ENV_FILE = "/etc/hysteria/core/scripts/webpanel/.env" +NORMALSUB_ENV_FILE = "/etc/hysteria/core/scripts/normalsub/.env" +TELEGRAM_ENV_FILE = "/etc/hysteria/core/scripts/telegrambot/.env" NODES_JSON_PATH = "/etc/hysteria/nodes.json" +VENV_PYTHON = "/etc/hysteria/hysteria2_venv/bin/python3" class Command(Enum): - '''Contains path to command's script''' - INSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'install.sh') - UNINSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'uninstall.py') - UPDATE_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'update.py') - RESTART_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'restart.py') - CHANGE_PORT_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_port.py') - CHANGE_SNI_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_sni.py') - GET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'get_user.py') - ADD_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'add_user.py') - BULK_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'bulk_users.py') - EDIT_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'edit_user.py') - RESET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'reset_user.py') - REMOVE_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'remove_user.py') - SHOW_USER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'show_user_uri.py') - WRAPPER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'wrapper_uri.py') - IP_ADD = os.path.join(SCRIPT_DIR, 'hysteria2', 'ip.py') - NODE_MANAGER = os.path.join(SCRIPT_DIR, 'nodes', 'node.py') - MANAGE_OBFS = os.path.join(SCRIPT_DIR, 'hysteria2', 'manage_obfs.py') - MASQUERADE_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'masquerade.py') - EXTRA_CONFIG_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'extra_config.py') - TRAFFIC_STATUS = 'traffic.py' # won't be called directly (it's a python module) - UPDATE_GEO = os.path.join(SCRIPT_DIR, 'hysteria2', 'update_geo.py') - LIST_USERS = os.path.join(SCRIPT_DIR, 'hysteria2', 'list_users.py') - SERVER_INFO = os.path.join(SCRIPT_DIR, 'hysteria2', 'server_info.py') - BACKUP_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'backup.py') - RESTORE_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'restore.py') - INSTALL_TELEGRAMBOT = os.path.join(SCRIPT_DIR, 'telegrambot', 'runbot.py') - SHELL_SINGBOX = os.path.join(SCRIPT_DIR, 'singbox', 'singbox_shell.sh') - SHELL_WEBPANEL = os.path.join(SCRIPT_DIR, 'webpanel', 'webpanel_shell.sh') - INSTALL_NORMALSUB = os.path.join(SCRIPT_DIR, 'normalsub', 'normalsub.sh') - INSTALL_TCP_BRUTAL = os.path.join(SCRIPT_DIR, 'tcp-brutal', 'install.py') - INSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'install.py') - UNINSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'uninstall.py') - CONFIGURE_WARP = os.path.join(SCRIPT_DIR, 'warp', 'configure.py') - STATUS_WARP = os.path.join(SCRIPT_DIR, 'warp', 'status.py') - SERVICES_STATUS = os.path.join(SCRIPT_DIR, 'services_status.sh') - VERSION = os.path.join(SCRIPT_DIR, 'hysteria2', 'version.py') - LIMIT_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'limit.sh') - KICK_USER_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'kickuser.py') + """Contains path to command's script""" + + INSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "install.sh") + UNINSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "uninstall.py") + UPDATE_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "update.py") + RESTART_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "restart.py") + CHANGE_PORT_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "change_port.py") + CHANGE_SNI_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "change_sni.py") + GET_USER = os.path.join(SCRIPT_DIR, "hysteria2", "get_user.py") + ADD_USER = os.path.join(SCRIPT_DIR, "hysteria2", "add_user.py") + BULK_USER = os.path.join(SCRIPT_DIR, "hysteria2", "bulk_users.py") + EDIT_USER = os.path.join(SCRIPT_DIR, "hysteria2", "edit_user.py") + RESET_USER = os.path.join(SCRIPT_DIR, "hysteria2", "reset_user.py") + REMOVE_USER = os.path.join(SCRIPT_DIR, "hysteria2", "remove_user.py") + SHOW_USER_URI = os.path.join(SCRIPT_DIR, "hysteria2", "show_user_uri.py") + WRAPPER_URI = os.path.join(SCRIPT_DIR, "hysteria2", "wrapper_uri.py") + IP_ADD = os.path.join(SCRIPT_DIR, "hysteria2", "ip.py") + NODE_MANAGER = os.path.join(SCRIPT_DIR, "nodes", "node.py") + MANAGE_OBFS = os.path.join(SCRIPT_DIR, "hysteria2", "manage_obfs.py") + MASQUERADE_SCRIPT = os.path.join(SCRIPT_DIR, "hysteria2", "masquerade.py") + EXTRA_CONFIG_SCRIPT = os.path.join(SCRIPT_DIR, "hysteria2", "extra_config.py") + TRAFFIC_STATUS = "traffic.py" # won't be called directly (it's a python module) + UPDATE_GEO = os.path.join(SCRIPT_DIR, "hysteria2", "update_geo.py") + LIST_USERS = os.path.join(SCRIPT_DIR, "hysteria2", "list_users.py") + SERVER_INFO = os.path.join(SCRIPT_DIR, "hysteria2", "server_info.py") + BACKUP_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "backup.py") + RESTORE_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "restore.py") + INSTALL_TELEGRAMBOT = os.path.join(SCRIPT_DIR, "telegrambot", "runbot.py") + SHELL_SINGBOX = os.path.join(SCRIPT_DIR, "singbox", "singbox_shell.sh") + SHELL_WEBPANEL = os.path.join(SCRIPT_DIR, "webpanel", "webpanel_shell.sh") + INSTALL_NORMALSUB = os.path.join(SCRIPT_DIR, "normalsub", "normalsub.sh") + INSTALL_TCP_BRUTAL = os.path.join(SCRIPT_DIR, "tcp-brutal", "install.py") + INSTALL_WARP = os.path.join(SCRIPT_DIR, "warp", "install.py") + UNINSTALL_WARP = os.path.join(SCRIPT_DIR, "warp", "uninstall.py") + CONFIGURE_WARP = os.path.join(SCRIPT_DIR, "warp", "configure.py") + STATUS_WARP = os.path.join(SCRIPT_DIR, "warp", "status.py") + SERVICES_STATUS = os.path.join(SCRIPT_DIR, "services_status.sh") + VERSION = os.path.join(SCRIPT_DIR, "hysteria2", "version.py") + LIMIT_SCRIPT = os.path.join(SCRIPT_DIR, "hysteria2", "limit.sh") + KICK_USER_SCRIPT = os.path.join(SCRIPT_DIR, "hysteria2", "kickuser.py") # region Custom Exceptions class HysteriaError(Exception): - '''Base class for Hysteria-related exceptions.''' + """Base class for Hysteria-related exceptions.""" + pass class CommandExecutionError(HysteriaError): - '''Raised when a command execution fails.''' + """Raised when a command execution fails.""" + pass class InvalidInputError(HysteriaError): - '''Raised when the provided input is invalid.''' + """Raised when the provided input is invalid.""" + pass class PasswordGenerationError(HysteriaError): - '''Raised when password generation fails.''' + """Raised when password generation fails.""" + pass class ScriptNotFoundError(HysteriaError): - '''Raised when a required script is not found.''' + """Raised when a required script is not found.""" + pass + # region Utils def run_cmd(command: list[str]) -> str: - ''' + """ Runs a command and returns its stdout if successful. Raises CommandExecutionError if the command fails (non-zero exit code) or cannot be found. - ''' + """ if DEBUG: print(f"Executing command: {' '.join(command)}") try: - process = subprocess.run(command, capture_output=True, text=True, shell=False, check=False) + process = subprocess.run( + command, capture_output=True, text=True, shell=False, check=False + ) if process.returncode != 0: - error_output = process.stderr.strip() if process.stderr.strip() else process.stdout.strip() + error_output = ( + process.stderr.strip() + if process.stderr.strip() + else process.stdout.strip() + ) if not error_output: error_output = f"Command exited with status {process.returncode} without specific error message." - + detailed_error_message = f"Command '{' '.join(command)}' failed with exit code {process.returncode}: {error_output}" raise CommandExecutionError(detailed_error_message) return process.stdout.strip() if process.stdout else "" except FileNotFoundError as e: - raise ScriptNotFoundError(f"Script or command not found: {command[0]}. Original error: {e}") - except subprocess.TimeoutExpired as e: - raise CommandExecutionError(f"Command '{' '.join(command)}' timed out. Original error: {e}") - except OSError as e: - raise CommandExecutionError(f"OS error while trying to run command '{' '.join(command)}': {e}") + raise ScriptNotFoundError( + f"Script or command not found: {command[0]}. Original error: {e}" + ) + except subprocess.TimeoutExpired as e: + raise CommandExecutionError( + f"Command '{' '.join(command)}' timed out. Original error: {e}" + ) + except OSError as e: + raise CommandExecutionError( + f"OS error while trying to run command '{' '.join(command)}': {e}" + ) def run_cmd_and_stream(command: list[str]): - ''' + """ Runs a command, streams its combined stdout/stderr, and raises an exception on failure. - ''' + """ if DEBUG: print(f"Executing command: {' '.join(command)}") try: @@ -134,34 +154,41 @@ def run_cmd_and_stream(command: list[str]): stderr=subprocess.STDOUT, text=True, bufsize=1, - universal_newlines=True + universal_newlines=True, ) - + if process.stdout: - for line in iter(process.stdout.readline, ''): - print(line, end='') + for line in iter(process.stdout.readline, ""): + print(line, end="") process.stdout.close() - + return_code = process.wait() if return_code != 0: raise CommandExecutionError(f"Process failed with exit code {return_code}") except FileNotFoundError as e: - raise ScriptNotFoundError(f"Script or command not found: {command[0]}. Original error: {e}") - except OSError as e: - raise CommandExecutionError(f"OS error while trying to run command '{' '.join(command)}': {e}") + raise ScriptNotFoundError( + f"Script or command not found: {command[0]}. Original error: {e}" + ) + except OSError as e: + raise CommandExecutionError( + f"OS error while trying to run command '{' '.join(command)}': {e}" + ) def generate_password() -> str: - ''' + """ Generates a secure, random alphanumeric password. - ''' + """ try: alphabet = string.ascii_letters + string.digits - return ''.join(secrets.choice(alphabet) for _ in range(32)) + return "".join(secrets.choice(alphabet) for _ in range(32)) except Exception as e: - raise PasswordGenerationError(f"Failed to generate password using secrets module: {e}") + raise PasswordGenerationError( + f"Failed to generate password using secrets module: {e}" + ) + # endregion @@ -171,65 +198,65 @@ def generate_password() -> str: def install_hysteria2(port: int, sni: str): - ''' + """ Installs Hysteria2 and streams the output of the installation script. - ''' - run_cmd_and_stream(['bash', Command.INSTALL_HYSTERIA2.value, str(port), sni]) + """ + run_cmd_and_stream(["bash", Command.INSTALL_HYSTERIA2.value, str(port), sni]) def uninstall_hysteria2(): - '''Uninstalls Hysteria2.''' - run_cmd(['python3', Command.UNINSTALL_HYSTERIA2.value]) + """Uninstalls Hysteria2.""" + run_cmd([VENV_PYTHON, Command.UNINSTALL_HYSTERIA2.value]) def update_hysteria2(): - '''Updates Hysteria2.''' - run_cmd(['python3', Command.UPDATE_HYSTERIA2.value]) + """Updates Hysteria2.""" + run_cmd([VENV_PYTHON, Command.UPDATE_HYSTERIA2.value]) def restart_hysteria2(): - '''Restarts Hysteria2.''' - run_cmd(['python3', Command.RESTART_HYSTERIA2.value]) + """Restarts Hysteria2.""" + run_cmd([VENV_PYTHON, Command.RESTART_HYSTERIA2.value]) def get_hysteria2_port() -> int | None: - ''' + """ Retrieves the port for Hysteria2. - ''' + """ # read json config file and return port, example valaue of 'listen' field: '127.0.0.1:8080' config = get_hysteria2_config_file() - port = config['listen'].split(':') + port = config["listen"].split(":") if len(port) > 1: return int(port[1]) return None def change_hysteria2_port(port: int): - ''' + """ Changes the port for Hysteria2. - ''' - run_cmd(['python3', Command.CHANGE_PORT_HYSTERIA2.value, str(port)]) + """ + run_cmd([VENV_PYTHON, Command.CHANGE_PORT_HYSTERIA2.value, str(port)]) def get_hysteria2_sni() -> str | None: - ''' + """ Retrieves the SNI for Hysteria2. - ''' + """ env_vars = dotenv_values(CONFIG_ENV_FILE) - return env_vars.get('SNI') + return env_vars.get("SNI") def change_hysteria2_sni(sni: str): - ''' + """ Changes the SNI for Hysteria2. - ''' - run_cmd(['python3', Command.CHANGE_SNI_HYSTERIA2.value, sni]) + """ + run_cmd([VENV_PYTHON, Command.CHANGE_SNI_HYSTERIA2.value, sni]) def backup_hysteria2(): - '''Backups Hysteria configuration. Raises an exception on failure.''' + """Backups Hysteria configuration. Raises an exception on failure.""" try: - run_cmd(['python3', Command.BACKUP_HYSTERIA2.value]) + run_cmd([VENV_PYTHON, Command.BACKUP_HYSTERIA2.value]) except subprocess.CalledProcessError as e: raise Exception(f"Backup failed: {e}") except Exception as ex: @@ -237,9 +264,9 @@ def backup_hysteria2(): def restore_hysteria2(backup_file_path: str): - '''Restores Hysteria configuration from the given backup file.''' + """Restores Hysteria configuration from the given backup file.""" try: - run_cmd(['python3', Command.RESTORE_HYSTERIA2.value, backup_file_path]) + run_cmd([VENV_PYTHON, Command.RESTORE_HYSTERIA2.value, backup_file_path]) except subprocess.CalledProcessError as e: raise Exception(f"Restore failed: {e}") except Exception as ex: @@ -247,116 +274,169 @@ def restore_hysteria2(backup_file_path: str): def enable_hysteria2_obfs(): - '''Generates 'obfs' in Hysteria2 configuration.''' - run_cmd(['python3', Command.MANAGE_OBFS.value, '--generate']) + """Generates 'obfs' in Hysteria2 configuration.""" + run_cmd([VENV_PYTHON, Command.MANAGE_OBFS.value, "--generate"]) def disable_hysteria2_obfs(): - '''Removes 'obfs' from Hysteria2 configuration.''' - run_cmd(['python3', Command.MANAGE_OBFS.value, '--remove']) + """Removes 'obfs' from Hysteria2 configuration.""" + run_cmd([VENV_PYTHON, Command.MANAGE_OBFS.value, "--remove"]) + def check_hysteria2_obfs(): - '''Removes 'obfs' from Hysteria2 configuration.''' - result = subprocess.run(["python3", Command.MANAGE_OBFS.value, "--check"], check=True, capture_output=True, text=True) + """Removes 'obfs' from Hysteria2 configuration.""" + result = subprocess.run( + [VENV_PYTHON, Command.MANAGE_OBFS.value, "--check"], + check=True, + capture_output=True, + text=True, + ) return result.stdout.strip() + def enable_hysteria2_masquerade(): - '''Enables masquerade for Hysteria2.''' - return run_cmd(['python3', Command.MASQUERADE_SCRIPT.value, '1']) + """Enables masquerade for Hysteria2.""" + return run_cmd([VENV_PYTHON, Command.MASQUERADE_SCRIPT.value, "1"]) + def disable_hysteria2_masquerade(): - '''Disables masquerade for Hysteria2.''' - return run_cmd(['python3', Command.MASQUERADE_SCRIPT.value, '2']) + """Disables masquerade for Hysteria2.""" + return run_cmd([VENV_PYTHON, Command.MASQUERADE_SCRIPT.value, "2"]) + def get_hysteria2_masquerade_status(): - '''Gets the current masquerade status for Hysteria2.''' - return run_cmd(['python3', Command.MASQUERADE_SCRIPT.value, 'status']) + """Gets the current masquerade status for Hysteria2.""" + return run_cmd([VENV_PYTHON, Command.MASQUERADE_SCRIPT.value, "status"]) def get_hysteria2_config_file() -> dict[str, Any]: - with open(CONFIG_FILE, 'r') as f: + with open(CONFIG_FILE, "r") as f: return json.loads(f.read()) def set_hysteria2_config_file(data: dict[str, Any]): content = json.dumps(data, indent=4) - with open(CONFIG_FILE, 'w') as f: + with open(CONFIG_FILE, "w") as f: f.write(content) + + # endregion # region User def list_users() -> dict[str, dict[str, Any]] | None: - ''' + """ Lists all users. - ''' - if res := run_cmd(['python3', Command.LIST_USERS.value]): + """ + if res := run_cmd([VENV_PYTHON, Command.LIST_USERS.value]): return json.loads(res) def get_user(username: str) -> dict[str, Any] | None: - ''' + """ Retrieves information about a specific user. - ''' - if res := run_cmd(['python3', Command.GET_USER.value, '-u', str(username)]): + """ + if res := run_cmd([VENV_PYTHON, Command.GET_USER.value, "-u", str(username)]): return json.loads(res) -def add_user(username: str, traffic_limit: int, expiration_days: int, password: str | None, creation_date: str | None, unlimited: bool, note: str | None): - ''' +def add_user( + username: str, + traffic_limit: int, + expiration_days: int, + password: str | None, + creation_date: str | None, + unlimited: bool, + note: str | None, +): + """ Adds a new user with the given parameters, respecting positional argument requirements. - ''' - command = ['python3', Command.ADD_USER.value, username, str(traffic_limit), str(expiration_days)] + """ + command = [ + VENV_PYTHON, + Command.ADD_USER.value, + username, + str(traffic_limit), + str(expiration_days), + ] final_password = password if password else generate_password() command.append(final_password) - + if unlimited: - command.append('true') - + command.append("true") + if note: - if not unlimited: command.append('false') + if not unlimited: + command.append("false") command.append(note) - + if creation_date: - if not unlimited: command.append('false') - if not note: command.append('') + if not unlimited: + command.append("false") + if not note: + command.append("") command.append(creation_date) - + run_cmd(command) -def bulk_user_add(traffic_gb: float, expiration_days: int, count: int, prefix: str, start_number: int, unlimited: bool): + +def bulk_user_add( + traffic_gb: float, + expiration_days: int, + count: int, + prefix: str, + start_number: int, + unlimited: bool, +): """ Executes the bulk user creation script with specified parameters. """ command = [ - 'python3', + VENV_PYTHON, Command.BULK_USER.value, - '--traffic-gb', str(traffic_gb), - '--expiration-days', str(expiration_days), - '--count', str(count), - '--prefix', prefix, - '--start-number', str(start_number) + "--traffic-gb", + str(traffic_gb), + "--expiration-days", + str(expiration_days), + "--count", + str(count), + "--prefix", + prefix, + "--start-number", + str(start_number), ] if unlimited: - command.append('--unlimited') - + command.append("--unlimited") + run_cmd(command) -def edit_user(username: str, new_username: str | None, new_password: str | None, new_traffic_limit: int | None, new_expiration_days: int | None, renew_password: bool, renew_creation_date: bool, blocked: bool | None, unlimited_ip: bool | None, note: str | None): - ''' - Edits an existing user's details by calling the new edit_user.py script with named flags. - ''' - if not username: - raise InvalidInputError('Error: username is required') - command_args = ['python3', Command.EDIT_USER.value, username] +def edit_user( + username: str, + new_username: str | None, + new_password: str | None, + new_traffic_limit: int | None, + new_expiration_days: int | None, + renew_password: bool, + renew_creation_date: bool, + blocked: bool | None, + unlimited_ip: bool | None, + note: str | None, +): + """ + Edits an existing user's details by calling the new edit_user.py script with named flags. + """ + if not username: + raise InvalidInputError("Error: username is required") + + command_args = [VENV_PYTHON, Command.EDIT_USER.value, username] if new_username: - command_args.extend(['--new-username', new_username]) + command_args.extend(["--new-username", new_username]) password_to_set = None if new_password: @@ -365,97 +445,116 @@ def edit_user(username: str, new_username: str | None, new_password: str | None, password_to_set = generate_password() if password_to_set: - command_args.extend(['--password', password_to_set]) + command_args.extend(["--password", password_to_set]) if new_traffic_limit is not None: if new_traffic_limit < 0: - raise InvalidInputError('Error: traffic limit must be a non-negative number.') - command_args.extend(['--traffic-gb', str(new_traffic_limit)]) + raise InvalidInputError( + "Error: traffic limit must be a non-negative number." + ) + command_args.extend(["--traffic-gb", str(new_traffic_limit)]) if new_expiration_days is not None: if new_expiration_days < 0: - raise InvalidInputError('Error: expiration days must be a non-negative number.') - command_args.extend(['--expiration-days', str(new_expiration_days)]) - + raise InvalidInputError( + "Error: expiration days must be a non-negative number." + ) + command_args.extend(["--expiration-days", str(new_expiration_days)]) + if renew_creation_date: - creation_date = datetime.now().strftime('%Y-%m-%d') - command_args.extend(['--creation-date', creation_date]) - + creation_date = datetime.now().strftime("%Y-%m-%d") + command_args.extend(["--creation-date", creation_date]) + if blocked is not None: - command_args.extend(['--blocked', 'true' if blocked else 'false']) - + command_args.extend(["--blocked", "true" if blocked else "false"]) + if unlimited_ip is not None: - command_args.extend(['--unlimited', 'true' if unlimited_ip else 'false']) + command_args.extend(["--unlimited", "true" if unlimited_ip else "false"]) if note is not None: - command_args.extend(['--note', note]) + command_args.extend(["--note", note]) run_cmd(command_args) def reset_user(username: str): - ''' + """ Resets a user's configuration. - ''' - run_cmd(['python3', Command.RESET_USER.value, username]) + """ + run_cmd([VENV_PYTHON, Command.RESET_USER.value, username]) def remove_users(usernames: list[str]): - ''' + """ Removes one or more users by username. - ''' + """ if not usernames: return - run_cmd(['python3', Command.REMOVE_USER.value, *usernames]) + run_cmd([VENV_PYTHON, Command.REMOVE_USER.value, *usernames]) + def kick_users_by_name(usernames: list[str]): - '''Kicks one or more users by username.''' + """Kicks one or more users by username.""" if not usernames: - raise InvalidInputError('Username(s) must be provided to kick.') + raise InvalidInputError("Username(s) must be provided to kick.") script_path = Command.KICK_USER_SCRIPT.value if not os.path.exists(script_path): raise ScriptNotFoundError(f"Kick user script not found at: {script_path}") try: - subprocess.run(['python3', script_path, *usernames], check=True) + subprocess.run([VENV_PYTHON, script_path, *usernames], check=True) except subprocess.CalledProcessError as e: raise CommandExecutionError(f"Failed to execute kick user script: {e}") - -def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: bool, normalsub: bool) -> str | None: - ''' + + +def show_user_uri( + username: str, qrcode: bool, ipv: int, all: bool, singbox: bool, normalsub: bool +) -> str | None: + """ Displays the URI for a user, with options for QR code and other formats. - ''' - command_args = ['python3', Command.SHOW_USER_URI.value, '-u', username] + """ + command_args = [VENV_PYTHON, Command.SHOW_USER_URI.value, "-u", username] if qrcode: - command_args.append('-qr') + command_args.append("-qr") if all: - command_args.append('-a') + command_args.append("-a") else: - command_args.extend(['-ip', str(ipv)]) + command_args.extend(["-ip", str(ipv)]) if singbox: - command_args.append('-s') + command_args.append("-s") if normalsub: - command_args.append('-n') + command_args.append("-n") return run_cmd(command_args) + def show_user_uri_json(usernames: list[str]) -> list[dict[str, Any]] | None: - ''' + """ Displays the URI for a list of users in JSON format. - ''' + """ script_path = Command.WRAPPER_URI.value if not os.path.exists(script_path): raise ScriptNotFoundError(f"Wrapper URI script not found at: {script_path}") try: - process = subprocess.run(['python3', script_path, *usernames], capture_output=True, text=True, check=True) + process = subprocess.run( + [VENV_PYTHON, script_path, *usernames], + capture_output=True, + text=True, + check=True, + ) return json.loads(process.stdout) except subprocess.CalledProcessError as e: - raise CommandExecutionError(f"Failed to execute wrapper URI script: {e}\nError: {e.stderr}") + raise CommandExecutionError( + f"Failed to execute wrapper URI script: {e}\nError: {e.stderr}" + ) except FileNotFoundError: - raise ScriptNotFoundError(f'Script not found: {script_path}') + raise ScriptNotFoundError(f"Script not found: {script_path}") except json.JSONDecodeError: - raise CommandExecutionError(f"Failed to decode JSON output from script: {script_path}\nOutput: {process.stdout if 'process' in locals() else 'No output'}") # Add process check + raise CommandExecutionError( + f"Failed to decode JSON output from script: {script_path}\nOutput: {process.stdout if 'process' in locals() else 'No output'}" + ) # Add process check except Exception as e: - raise HysteriaError(f'An unexpected error occurred: {e}') - + raise HysteriaError(f"An unexpected error occurred: {e}") + + # endregion # region Server @@ -467,7 +566,7 @@ def traffic_status(no_gui=False, display_output=True): traffic.kick_expired_users() else: data = traffic.traffic_status(no_gui=True if not display_output else no_gui) - + return data @@ -475,271 +574,358 @@ def traffic_status(no_gui=False, display_output=True): # TODO: it's better to return json # TODO: After json todo need fix Telegram Bot and WebPanel def server_info() -> str | None: - '''Retrieves server information.''' - return run_cmd(['python3', Command.SERVER_INFO.value]) + """Retrieves server information.""" + return run_cmd([VENV_PYTHON, Command.SERVER_INFO.value]) def get_ip_address() -> tuple[str | None, str | None]: - ''' + """ Retrieves the IP address from the .configs.env file. - ''' + """ env_vars = dotenv_values(CONFIG_ENV_FILE) - return env_vars.get('IP4'), env_vars.get('IP6') + return env_vars.get("IP4"), env_vars.get("IP6") def add_ip_address(): - ''' + """ Adds IP addresses from the environment to the .configs.env file. - ''' - run_cmd(['python3', Command.IP_ADD.value, 'add']) + """ + run_cmd([VENV_PYTHON, Command.IP_ADD.value, "add"]) def edit_ip_address(ipv4: str, ipv6: str): - ''' + """ Edits the IP address configuration based on provided IPv4 and/or IPv6 addresses. :param ipv4: The new IPv4 address to be configured. If provided, the IPv4 address will be updated. :param ipv6: The new IPv6 address to be configured. If provided, the IPv6 address will be updated. :raises InvalidInputError: If neither ipv4 nor ipv6 is provided. - ''' + """ # if not ipv4 and not ipv6: # raise InvalidInputError('Error: --edit requires at least one of --ipv4 or --ipv6.') if ipv4: - run_cmd(['python3', Command.IP_ADD.value, 'edit', '-4', ipv4]) + run_cmd([VENV_PYTHON, Command.IP_ADD.value, "edit", "-4", ipv4]) if ipv6: - run_cmd(['python3', Command.IP_ADD.value, 'edit', '-6', ipv6]) + run_cmd([VENV_PYTHON, Command.IP_ADD.value, "edit", "-6", ipv6]) -def add_node(name: str, ip: str, sni: Optional[str] = None, pinSHA256: Optional[str] = None, port: Optional[int] = None, obfs: Optional[str] = None, insecure: Optional[bool] = None): + +def add_node( + name: str, + ip: str, + sni: Optional[str] = None, + pinSHA256: Optional[str] = None, + port: Optional[int] = None, + obfs: Optional[str] = None, + insecure: Optional[bool] = None, +): """ Adds a new external node. """ - command = ['python3', Command.NODE_MANAGER.value, 'add', '--name', name, '--ip', ip] + command = [ + VENV_PYTHON, + Command.NODE_MANAGER.value, + "add", + "--name", + name, + "--ip", + ip, + ] if port: - command.extend(['--port', str(port)]) + command.extend(["--port", str(port)]) if sni: - command.extend(['--sni', sni]) + command.extend(["--sni", sni]) if pinSHA256: - command.extend(['--pinSHA256', pinSHA256]) + command.extend(["--pinSHA256", pinSHA256]) if obfs: - command.extend(['--obfs', obfs]) + command.extend(["--obfs", obfs]) if insecure: - command.append('--insecure') + command.append("--insecure") return run_cmd(command) + def delete_node(name: str): """ Deletes an external node by name. """ - return run_cmd(['python3', Command.NODE_MANAGER.value, 'delete', '--name', name]) + return run_cmd([VENV_PYTHON, Command.NODE_MANAGER.value, "delete", "--name", name]) + def list_nodes(): """ Lists all configured external nodes. """ - return run_cmd(['python3', Command.NODE_MANAGER.value, 'list']) + return run_cmd([VENV_PYTHON, Command.NODE_MANAGER.value, "list"]) + def generate_node_cert(): """ Generates a self-signed certificate for nodes. """ - return run_cmd(['python3', Command.NODE_MANAGER.value, 'generate-cert']) + return run_cmd([VENV_PYTHON, Command.NODE_MANAGER.value, "generate-cert"]) + def update_geo(country: str): - ''' + """ Updates geographic data files based on the specified country. - ''' + """ script_path = Command.UPDATE_GEO.value try: - subprocess.run(['python3', script_path, country.lower()], check=True) + subprocess.run([VENV_PYTHON, script_path, country.lower()], check=True) except subprocess.CalledProcessError as e: - raise CommandExecutionError(f'Failed to update geo files: {e}') + raise CommandExecutionError(f"Failed to update geo files: {e}") except FileNotFoundError: - raise ScriptNotFoundError(f'Script not found: {script_path}') + raise ScriptNotFoundError(f"Script not found: {script_path}") except Exception as e: - raise HysteriaError(f'An unexpected error occurred: {e}') + raise HysteriaError(f"An unexpected error occurred: {e}") + def add_extra_config(name: str, uri: str) -> str: """Adds an extra proxy configuration.""" - return run_cmd(['python3', Command.EXTRA_CONFIG_SCRIPT.value, 'add', '--name', name, '--uri', uri]) + return run_cmd( + [ + VENV_PYTHON, + Command.EXTRA_CONFIG_SCRIPT.value, + "add", + "--name", + name, + "--uri", + uri, + ] + ) def delete_extra_config(name: str) -> str: """Deletes an extra proxy configuration.""" - return run_cmd(['python3', Command.EXTRA_CONFIG_SCRIPT.value, 'delete', '--name', name]) + return run_cmd( + [VENV_PYTHON, Command.EXTRA_CONFIG_SCRIPT.value, "delete", "--name", name] + ) def list_extra_configs() -> str: """Lists all extra proxy configurations.""" - return run_cmd(['python3', Command.EXTRA_CONFIG_SCRIPT.value, 'list']) + return run_cmd([VENV_PYTHON, Command.EXTRA_CONFIG_SCRIPT.value, "list"]) def get_extra_config(name: str) -> dict[str, Any] | None: """Gets a specific extra proxy configuration.""" - if res := run_cmd(['python3', Command.EXTRA_CONFIG_SCRIPT.value, 'get', '--name', name]): + if res := run_cmd( + [VENV_PYTHON, Command.EXTRA_CONFIG_SCRIPT.value, "get", "--name", name] + ): return json.loads(res) + # endregion # region Advanced Menu def install_tcp_brutal(): - '''Installs TCP Brutal.''' - run_cmd(['python3', Command.INSTALL_TCP_BRUTAL.value]) + """Installs TCP Brutal.""" + run_cmd([VENV_PYTHON, Command.INSTALL_TCP_BRUTAL.value]) def install_warp(): - '''Installs WARP.''' - run_cmd(['python3', Command.INSTALL_WARP.value]) + """Installs WARP.""" + run_cmd([VENV_PYTHON, Command.INSTALL_WARP.value]) def uninstall_warp(): - '''Uninstalls WARP.''' - run_cmd(['python3', Command.UNINSTALL_WARP.value]) + """Uninstalls WARP.""" + run_cmd([VENV_PYTHON, Command.UNINSTALL_WARP.value]) -def configure_warp(all_state: str | None = None, - popular_sites_state: str | None = None, - domestic_sites_state: str | None = None, - block_adult_sites_state: str | None = None): - ''' +def configure_warp( + all_state: str | None = None, + popular_sites_state: str | None = None, + domestic_sites_state: str | None = None, + block_adult_sites_state: str | None = None, +): + """ Configures WARP with various options. States are 'on' or 'off'. - ''' - cmd_args = [ - 'python3', Command.CONFIGURE_WARP.value - ] + """ + cmd_args = [VENV_PYTHON, Command.CONFIGURE_WARP.value] if all_state: - cmd_args.extend(['--set-all', all_state]) + cmd_args.extend(["--set-all", all_state]) if popular_sites_state: - cmd_args.extend(['--set-popular-sites', popular_sites_state]) + cmd_args.extend(["--set-popular-sites", popular_sites_state]) if domestic_sites_state: - cmd_args.extend(['--set-domestic-sites', domestic_sites_state]) + cmd_args.extend(["--set-domestic-sites", domestic_sites_state]) if block_adult_sites_state: - cmd_args.extend(['--set-block-adult', block_adult_sites_state]) + cmd_args.extend(["--set-block-adult", block_adult_sites_state]) - if len(cmd_args) == 2: + if len(cmd_args) == 2: print("No WARP configuration options provided to cli_api.configure_warp.") - return + return run_cmd(cmd_args) def warp_status() -> str | None: - '''Checks the status of WARP.''' - return run_cmd(['python3', Command.STATUS_WARP.value]) + """Checks the status of WARP.""" + return run_cmd([VENV_PYTHON, Command.STATUS_WARP.value]) def start_telegram_bot(token: str, adminid: str, backup_interval: Optional[int] = None): - '''Starts the Telegram bot.''' + """Starts the Telegram bot.""" if not token or not adminid: - raise InvalidInputError('Error: Both --token and --adminid are required for the start action.') - - command = ['python3', Command.INSTALL_TELEGRAMBOT.value, 'start', token, adminid] + raise InvalidInputError( + "Error: Both --token and --adminid are required for the start action." + ) + + command = [VENV_PYTHON, Command.INSTALL_TELEGRAMBOT.value, "start", token, adminid] if backup_interval is not None: command.append(str(backup_interval)) - + run_cmd(command) + def stop_telegram_bot(): - '''Stops the Telegram bot.''' - run_cmd(['python3', Command.INSTALL_TELEGRAMBOT.value, 'stop']) + """Stops the Telegram bot.""" + run_cmd([VENV_PYTHON, Command.INSTALL_TELEGRAMBOT.value, "stop"]) + def get_telegram_bot_backup_interval() -> int | None: - '''Retrievels the current BACKUP_INTERVAL_HOUR for the Telegram Bot service from its .env file.''' + """Retrievels the current BACKUP_INTERVAL_HOUR for the Telegram Bot service from its .env file.""" try: if not os.path.exists(TELEGRAM_ENV_FILE): - return None - + return None + env_vars = dotenv_values(TELEGRAM_ENV_FILE) - interval_str = env_vars.get('BACKUP_INTERVAL_HOUR') - + interval_str = env_vars.get("BACKUP_INTERVAL_HOUR") + if interval_str: try: return int(float(interval_str)) except (ValueError, TypeError): return None - + return None except Exception as e: print(f"Error reading Telegram Bot .env file: {e}") return None + def set_telegram_bot_backup_interval(backup_interval: int): - '''Sets the backup interval for the Telegram bot.''' + """Sets the backup interval for the Telegram bot.""" if backup_interval is None: - raise InvalidInputError('Error: Backup interval is required.') - run_cmd(['python3', Command.INSTALL_TELEGRAMBOT.value, 'set_backup_interval', str(backup_interval)]) + raise InvalidInputError("Error: Backup interval is required.") + run_cmd( + [ + VENV_PYTHON, + Command.INSTALL_TELEGRAMBOT.value, + "set_backup_interval", + str(backup_interval), + ] + ) def start_singbox(domain: str, port: int): - '''Starts Singbox.''' + """Starts Singbox.""" if not domain or not port: - raise InvalidInputError('Error: Both --domain and --port are required for the start action.') - run_cmd(['bash', Command.SHELL_SINGBOX.value, 'start', domain, str(port)]) + raise InvalidInputError( + "Error: Both --domain and --port are required for the start action." + ) + run_cmd(["bash", Command.SHELL_SINGBOX.value, "start", domain, str(port)]) def stop_singbox(): - '''Stops Singbox.''' - run_cmd(['bash', Command.SHELL_SINGBOX.value, 'stop']) + """Stops Singbox.""" + run_cmd(["bash", Command.SHELL_SINGBOX.value, "stop"]) def start_normalsub(domain: str, port: int): - '''Starts NormalSub.''' + """Starts NormalSub.""" if not domain or not port: - raise InvalidInputError('Error: Both --domain and --port are required for the start action.') - run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'start', domain, str(port)]) + raise InvalidInputError( + "Error: Both --domain and --port are required for the start action." + ) + run_cmd(["bash", Command.INSTALL_NORMALSUB.value, "start", domain, str(port)]) + def edit_normalsub_subpath(new_subpath: str): - '''Edits the subpath for NormalSub service.''' + """Edits the subpath for NormalSub service.""" if not new_subpath: - raise InvalidInputError('Error: New subpath cannot be empty.') + raise InvalidInputError("Error: New subpath cannot be empty.") if not re.match(r"^[a-zA-Z0-9]+(?:/[a-zA-Z0-9]+)*$", new_subpath): - raise InvalidInputError("Error: Invalid subpath format. Must be alphanumeric segments separated by single slashes (e.g., 'path' or 'path/to/resource').") - - run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'edit_subpath', new_subpath]) + raise InvalidInputError( + "Error: Invalid subpath format. Must be alphanumeric segments separated by single slashes (e.g., 'path' or 'path/to/resource')." + ) + + run_cmd(["bash", Command.INSTALL_NORMALSUB.value, "edit_subpath", new_subpath]) + def get_normalsub_subpath() -> str | None: - '''Retrieves the current SUBPATH for the NormalSub service from its .env file.''' + """Retrieves the current SUBPATH for the NormalSub service from its .env file.""" try: if not os.path.exists(NORMALSUB_ENV_FILE): - return None - + return None + env_vars = dotenv_values(NORMALSUB_ENV_FILE) - return env_vars.get('SUBPATH') + return env_vars.get("SUBPATH") except Exception as e: print(f"Error reading NormalSub .env file: {e}") return None + def stop_normalsub(): - '''Stops NormalSub.''' - run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'stop']) + """Stops NormalSub.""" + run_cmd(["bash", Command.INSTALL_NORMALSUB.value, "stop"]) -def start_webpanel(domain: str, port: int, admin_username: str, admin_password: str, expiration_minutes: int, debug: bool, decoy_path: str): - '''Starts WebPanel.''' - if not domain or not port or not admin_username or not admin_password or not expiration_minutes: - raise InvalidInputError('Error: Both --domain and --port are required for the start action.') +def start_webpanel( + domain: str, + port: int, + admin_username: str, + admin_password: str, + expiration_minutes: int, + debug: bool, + decoy_path: str, +): + """Starts WebPanel.""" + if ( + not domain + or not port + or not admin_username + or not admin_password + or not expiration_minutes + ): + raise InvalidInputError( + "Error: Both --domain and --port are required for the start action." + ) run_cmd( - ['bash', Command.SHELL_WEBPANEL.value, 'start', - domain, str(port), admin_username, admin_password, str(expiration_minutes), str(debug).lower(), str(decoy_path)] + [ + "bash", + Command.SHELL_WEBPANEL.value, + "start", + domain, + str(port), + admin_username, + admin_password, + str(expiration_minutes), + str(debug).lower(), + str(decoy_path), + ] ) def stop_webpanel(): - '''Stops WebPanel.''' - run_cmd(['bash', Command.SHELL_WEBPANEL.value, 'stop']) + """Stops WebPanel.""" + run_cmd(["bash", Command.SHELL_WEBPANEL.value, "stop"]) + def setup_webpanel_decoy(domain: str, decoy_path: str): - '''Sets up or updates the decoy site for the web panel.''' + """Sets up or updates the decoy site for the web panel.""" if not domain or not decoy_path: - raise InvalidInputError('Error: Both domain and decoy_path are required.') - run_cmd(['bash', Command.SHELL_WEBPANEL.value, 'decoy', domain, decoy_path]) + raise InvalidInputError("Error: Both domain and decoy_path are required.") + run_cmd(["bash", Command.SHELL_WEBPANEL.value, "decoy", domain, decoy_path]) + def stop_webpanel_decoy(): - '''Stops and removes the decoy site configuration for the web panel.''' - run_cmd(['bash', Command.SHELL_WEBPANEL.value, 'stopdecoy']) + """Stops and removes the decoy site configuration for the web panel.""" + run_cmd(["bash", Command.SHELL_WEBPANEL.value, "stopdecoy"]) + def get_webpanel_decoy_status() -> dict[str, Any]: """Checks the status of the webpanel decoy site configuration.""" @@ -748,7 +934,7 @@ def get_webpanel_decoy_status() -> dict[str, Any]: return {"active": False, "path": None} env_vars = dotenv_values(WEBPANEL_ENV_FILE) - decoy_path = env_vars.get('DECOY_PATH') + decoy_path = env_vars.get("DECOY_PATH") if decoy_path and decoy_path.strip(): return {"active": True, "path": decoy_path.strip()} @@ -758,144 +944,170 @@ def get_webpanel_decoy_status() -> dict[str, Any]: print(f"Error checking decoy status: {e}") return {"active": False, "path": None} + def get_webpanel_url() -> str | None: - '''Gets the URL of WebPanel.''' - return run_cmd(['bash', Command.SHELL_WEBPANEL.value, 'url']) + """Gets the URL of WebPanel.""" + return run_cmd(["bash", Command.SHELL_WEBPANEL.value, "url"]) def get_webpanel_api_token() -> str | None: - '''Gets the API token of WebPanel.''' - return run_cmd(['bash', Command.SHELL_WEBPANEL.value, 'api-token']) + """Gets the API token of WebPanel.""" + return run_cmd(["bash", Command.SHELL_WEBPANEL.value, "api-token"]) + def get_webpanel_env_config() -> dict[str, Any]: - '''Retrieves the current configuration for the WebPanel service from its .env file.''' + """Retrieves the current configuration for the WebPanel service from its .env file.""" try: if not os.path.exists(WEBPANEL_ENV_FILE): return {} - + env_vars = dotenv_values(WEBPANEL_ENV_FILE) config = {} - config['DOMAIN'] = env_vars.get('DOMAIN') - config['ROOT_PATH'] = env_vars.get('ROOT_PATH') - - port_val = env_vars.get('PORT') + config["DOMAIN"] = env_vars.get("DOMAIN") + config["ROOT_PATH"] = env_vars.get("ROOT_PATH") + + port_val = env_vars.get("PORT") if port_val and port_val.isdigit(): - config['PORT'] = int(port_val) - - exp_val = env_vars.get('EXPIRATION_MINUTES') + config["PORT"] = int(port_val) + + exp_val = env_vars.get("EXPIRATION_MINUTES") if exp_val and exp_val.isdigit(): - config['EXPIRATION_MINUTES'] = int(exp_val) - + config["EXPIRATION_MINUTES"] = int(exp_val) + return config except Exception as e: print(f"Error reading WebPanel .env file: {e}") return {} -def reset_webpanel_credentials(new_username: str | None = None, new_password: str | None = None): - '''Resets the WebPanel admin username and/or password.''' - if not new_username and not new_password: - raise InvalidInputError('Error: At least new username or new password must be provided.') - cmd_args = ['bash', Command.SHELL_WEBPANEL.value, 'resetcreds'] +def reset_webpanel_credentials( + new_username: str | None = None, new_password: str | None = None +): + """Resets the WebPanel admin username and/or password.""" + if not new_username and not new_password: + raise InvalidInputError( + "Error: At least new username or new password must be provided." + ) + + cmd_args = ["bash", Command.SHELL_WEBPANEL.value, "resetcreds"] if new_username: - cmd_args.extend(['-u', new_username]) + cmd_args.extend(["-u", new_username]) if new_password: - cmd_args.extend(['-p', new_password]) - + cmd_args.extend(["-p", new_password]) + run_cmd(cmd_args) + def change_webpanel_expiration(expiration_minutes: int): - '''Changes the session expiration time for the WebPanel.''' + """Changes the session expiration time for the WebPanel.""" if not expiration_minutes: - raise InvalidInputError('Error: Expiration minutes must be provided.') + raise InvalidInputError("Error: Expiration minutes must be provided.") run_cmd( - ['bash', Command.SHELL_WEBPANEL.value, 'changeexp', str(expiration_minutes)] + ["bash", Command.SHELL_WEBPANEL.value, "changeexp", str(expiration_minutes)] ) def change_webpanel_root_path(root_path: str | None = None): - '''Changes the root path for the WebPanel. A new random path is generated if not provided.''' - cmd_args = ['bash', Command.SHELL_WEBPANEL.value, 'changeroot'] + """Changes the root path for the WebPanel. A new random path is generated if not provided.""" + cmd_args = ["bash", Command.SHELL_WEBPANEL.value, "changeroot"] if root_path: cmd_args.append(root_path) run_cmd(cmd_args) def change_webpanel_domain_port(domain: str | None = None, port: int | None = None): - '''Changes the domain and/or port for the WebPanel.''' + """Changes the domain and/or port for the WebPanel.""" if not domain and not port: - raise InvalidInputError('Error: At least a new domain or new port must be provided.') - - cmd_args = ['bash', Command.SHELL_WEBPANEL.value, 'changedomain'] + raise InvalidInputError( + "Error: At least a new domain or new port must be provided." + ) + + cmd_args = ["bash", Command.SHELL_WEBPANEL.value, "changedomain"] if domain: - cmd_args.extend(['-d', domain]) + cmd_args.extend(["-d", domain]) if port: - cmd_args.extend(['-p', str(port)]) - + cmd_args.extend(["-p", str(port)]) + run_cmd(cmd_args) + def get_services_status() -> dict[str, bool] | None: - '''Gets the status of all project services.''' - if res := run_cmd(['bash', Command.SERVICES_STATUS.value]): + """Gets the status of all project services.""" + if res := run_cmd(["bash", Command.SERVICES_STATUS.value]): return json.loads(res) + def show_version() -> str | None: """Displays the currently installed version of the panel.""" - return run_cmd(['python3', Command.VERSION.value, 'show-version']) + return run_cmd([VENV_PYTHON, Command.VERSION.value, "show-version"]) def check_version() -> str | None: """Checks if the current version is up-to-date and displays changelog if not.""" - return run_cmd(['python3', Command.VERSION.value, 'check-version']) + return run_cmd([VENV_PYTHON, Command.VERSION.value, "check-version"]) + def start_ip_limiter(): - '''Starts the IP limiter service.''' - run_cmd(['bash', Command.LIMIT_SCRIPT.value, 'start']) + """Starts the IP limiter service.""" + run_cmd(["bash", Command.LIMIT_SCRIPT.value, "start"]) + def stop_ip_limiter(): - '''Stops the IP limiter service.''' - run_cmd(['bash', Command.LIMIT_SCRIPT.value, 'stop']) + """Stops the IP limiter service.""" + run_cmd(["bash", Command.LIMIT_SCRIPT.value, "stop"]) + def clean_ip_limiter(): """Cleans the IP limiter database and unblocks all IPs.""" - run_cmd(['bash', Command.LIMIT_SCRIPT.value, 'clean']) + run_cmd(["bash", Command.LIMIT_SCRIPT.value, "clean"]) -def config_ip_limiter(block_duration: Optional[int] = None, max_ips: Optional[int] = None): - '''Configures the IP limiter service.''' + +def config_ip_limiter( + block_duration: Optional[int] = None, max_ips: Optional[int] = None +): + """Configures the IP limiter service.""" if block_duration is not None and block_duration <= 0: raise InvalidInputError("Block duration must be greater than 0.") if max_ips is not None and max_ips <= 0: raise InvalidInputError("Max IPs must be greater than 0.") - cmd_args = ['bash', Command.LIMIT_SCRIPT.value, 'config'] + cmd_args = ["bash", Command.LIMIT_SCRIPT.value, "config"] if block_duration is not None: cmd_args.append(str(block_duration)) else: - cmd_args.append('') + cmd_args.append("") if max_ips is not None: cmd_args.append(str(max_ips)) else: - cmd_args.append('') + cmd_args.append("") run_cmd(cmd_args) + def get_ip_limiter_config() -> dict[str, int | None]: - '''Retrieves the current IP Limiter configuration from .configs.env.''' + """Retrieves the current IP Limiter configuration from .configs.env.""" try: if not os.path.exists(CONFIG_ENV_FILE): return {"block_duration": None, "max_ips": None} - + env_vars = dotenv_values(CONFIG_ENV_FILE) - block_duration_str = env_vars.get('BLOCK_DURATION') - max_ips_str = env_vars.get('MAX_IPS') - - block_duration = int(block_duration_str) if block_duration_str and block_duration_str.isdigit() else None + block_duration_str = env_vars.get("BLOCK_DURATION") + max_ips_str = env_vars.get("MAX_IPS") + + block_duration = ( + int(block_duration_str) + if block_duration_str and block_duration_str.isdigit() + else None + ) max_ips = int(max_ips_str) if max_ips_str and max_ips_str.isdigit() else None - + return {"block_duration": block_duration, "max_ips": max_ips} except Exception as e: print(f"Error reading IP Limiter config from .configs.env: {e}") return {"block_duration": None, "max_ips": None} -# endregion \ No newline at end of file + + +# endregion + diff --git a/requirements.txt b/requirements.txt index 99b3bdd..0200da5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,3 +38,21 @@ annotated-types==0.7.0 # SSL / ACME management certbot==5.2.2 + +# Core dependencies (from your existing project) +pymongo +python-dotenv +qrcode[pil] +Pillow + +# FastAPI and server +fastapi +uvicorn[standard] +python-multipart +pydantic + +# Additional utilities +requests +cryptography +asyncio +requests