Files
Blitz-Proxy/core/cli_api.py

1114 lines
33 KiB
Python

import os
import subprocess
from enum import Enum
from datetime import datetime
import json
from typing import Any, Optional
from dotenv import dotenv_values
import re
import secrets
import string
from . import traffic
DEBUG = False
SCRIPT_DIR = "/etc/hysteria/core/scripts"
CONFIG_FILE = "/etc/hysteria/config.json"
CONFIG_ENV_FILE = "/etc/hysteria/.configs.env"
WEBPANEL_ENV_FILE = "/etc/hysteria/core/scripts/webpanel/.env"
NORMALSUB_ENV_FILE = "/etc/hysteria/core/scripts/normalsub/.env"
TELEGRAM_ENV_FILE = "/etc/hysteria/core/scripts/telegrambot/.env"
NODES_JSON_PATH = "/etc/hysteria/nodes.json"
VENV_PYTHON = "/etc/hysteria/hysteria2_venv/bin/python3"
class Command(Enum):
"""Contains path to command's script"""
INSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "install.sh")
UNINSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "uninstall.py")
UPDATE_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "update.py")
RESTART_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "restart.py")
CHANGE_PORT_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "change_port.py")
CHANGE_SNI_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "change_sni.py")
GET_USER = os.path.join(SCRIPT_DIR, "hysteria2", "get_user.py")
ADD_USER = os.path.join(SCRIPT_DIR, "hysteria2", "add_user.py")
BULK_USER = os.path.join(SCRIPT_DIR, "hysteria2", "bulk_users.py")
EDIT_USER = os.path.join(SCRIPT_DIR, "hysteria2", "edit_user.py")
RESET_USER = os.path.join(SCRIPT_DIR, "hysteria2", "reset_user.py")
REMOVE_USER = os.path.join(SCRIPT_DIR, "hysteria2", "remove_user.py")
SHOW_USER_URI = os.path.join(SCRIPT_DIR, "hysteria2", "show_user_uri.py")
WRAPPER_URI = os.path.join(SCRIPT_DIR, "hysteria2", "wrapper_uri.py")
IP_ADD = os.path.join(SCRIPT_DIR, "hysteria2", "ip.py")
NODE_MANAGER = os.path.join(SCRIPT_DIR, "nodes", "node.py")
MANAGE_OBFS = os.path.join(SCRIPT_DIR, "hysteria2", "manage_obfs.py")
MASQUERADE_SCRIPT = os.path.join(SCRIPT_DIR, "hysteria2", "masquerade.py")
EXTRA_CONFIG_SCRIPT = os.path.join(SCRIPT_DIR, "hysteria2", "extra_config.py")
TRAFFIC_STATUS = "traffic.py" # won't be called directly (it's a python module)
UPDATE_GEO = os.path.join(SCRIPT_DIR, "hysteria2", "update_geo.py")
LIST_USERS = os.path.join(SCRIPT_DIR, "hysteria2", "list_users.py")
SERVER_INFO = os.path.join(SCRIPT_DIR, "hysteria2", "server_info.py")
BACKUP_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "backup.py")
RESTORE_HYSTERIA2 = os.path.join(SCRIPT_DIR, "hysteria2", "restore.py")
INSTALL_TELEGRAMBOT = os.path.join(SCRIPT_DIR, "telegrambot", "runbot.py")
SHELL_SINGBOX = os.path.join(SCRIPT_DIR, "singbox", "singbox_shell.sh")
SHELL_WEBPANEL = os.path.join(SCRIPT_DIR, "webpanel", "webpanel_shell.sh")
INSTALL_NORMALSUB = os.path.join(SCRIPT_DIR, "normalsub", "normalsub.sh")
INSTALL_TCP_BRUTAL = os.path.join(SCRIPT_DIR, "tcp-brutal", "install.py")
INSTALL_WARP = os.path.join(SCRIPT_DIR, "warp", "install.py")
UNINSTALL_WARP = os.path.join(SCRIPT_DIR, "warp", "uninstall.py")
CONFIGURE_WARP = os.path.join(SCRIPT_DIR, "warp", "configure.py")
STATUS_WARP = os.path.join(SCRIPT_DIR, "warp", "status.py")
SERVICES_STATUS = os.path.join(SCRIPT_DIR, "services_status.sh")
VERSION = os.path.join(SCRIPT_DIR, "hysteria2", "version.py")
LIMIT_SCRIPT = os.path.join(SCRIPT_DIR, "hysteria2", "limit.sh")
KICK_USER_SCRIPT = os.path.join(SCRIPT_DIR, "hysteria2", "kickuser.py")
# region Custom Exceptions
class HysteriaError(Exception):
"""Base class for Hysteria-related exceptions."""
pass
class CommandExecutionError(HysteriaError):
"""Raised when a command execution fails."""
pass
class InvalidInputError(HysteriaError):
"""Raised when the provided input is invalid."""
pass
class PasswordGenerationError(HysteriaError):
"""Raised when password generation fails."""
pass
class ScriptNotFoundError(HysteriaError):
"""Raised when a required script is not found."""
pass
# region Utils
def run_cmd(command: list[str]) -> str:
"""
Runs a command and returns its stdout if successful.
Raises CommandExecutionError if the command fails (non-zero exit code) or cannot be found.
"""
if DEBUG:
print(f"Executing command: {' '.join(command)}")
try:
process = subprocess.run(
command, capture_output=True, text=True, shell=False, check=False
)
if process.returncode != 0:
error_output = (
process.stderr.strip()
if process.stderr.strip()
else process.stdout.strip()
)
if not error_output:
error_output = f"Command exited with status {process.returncode} without specific error message."
detailed_error_message = f"Command '{' '.join(command)}' failed with exit code {process.returncode}: {error_output}"
raise CommandExecutionError(detailed_error_message)
return process.stdout.strip() if process.stdout else ""
except FileNotFoundError as e:
raise ScriptNotFoundError(
f"Script or command not found: {command[0]}. Original error: {e}"
)
except subprocess.TimeoutExpired as e:
raise CommandExecutionError(
f"Command '{' '.join(command)}' timed out. Original error: {e}"
)
except OSError as e:
raise CommandExecutionError(
f"OS error while trying to run command '{' '.join(command)}': {e}"
)
def run_cmd_and_stream(command: list[str]):
"""
Runs a command, streams its combined stdout/stderr, and raises an exception on failure.
"""
if DEBUG:
print(f"Executing command: {' '.join(command)}")
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
)
if process.stdout:
for line in iter(process.stdout.readline, ""):
print(line, end="")
process.stdout.close()
return_code = process.wait()
if return_code != 0:
raise CommandExecutionError(f"Process failed with exit code {return_code}")
except FileNotFoundError as e:
raise ScriptNotFoundError(
f"Script or command not found: {command[0]}. Original error: {e}"
)
except OSError as e:
raise CommandExecutionError(
f"OS error while trying to run command '{' '.join(command)}': {e}"
)
def generate_password() -> str:
"""
Generates a secure, random alphanumeric password.
"""
try:
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(32))
except Exception as e:
raise PasswordGenerationError(
f"Failed to generate password using secrets module: {e}"
)
# endregion
# region APIs
# region Hysteria
def install_hysteria2(port: int, sni: str):
"""
Installs Hysteria2 and streams the output of the installation script.
"""
run_cmd_and_stream(["bash", Command.INSTALL_HYSTERIA2.value, str(port), sni])
def uninstall_hysteria2():
"""Uninstalls Hysteria2."""
run_cmd([VENV_PYTHON, Command.UNINSTALL_HYSTERIA2.value])
def update_hysteria2():
"""Updates Hysteria2."""
run_cmd([VENV_PYTHON, Command.UPDATE_HYSTERIA2.value])
def restart_hysteria2():
"""Restarts Hysteria2."""
run_cmd([VENV_PYTHON, Command.RESTART_HYSTERIA2.value])
def get_hysteria2_port() -> int | None:
"""
Retrieves the port for Hysteria2.
"""
# read json config file and return port, example valaue of 'listen' field: '127.0.0.1:8080'
config = get_hysteria2_config_file()
port = config["listen"].split(":")
if len(port) > 1:
return int(port[1])
return None
def change_hysteria2_port(port: int):
"""
Changes the port for Hysteria2.
"""
run_cmd([VENV_PYTHON, Command.CHANGE_PORT_HYSTERIA2.value, str(port)])
def get_hysteria2_sni() -> str | None:
"""
Retrieves the SNI for Hysteria2.
"""
env_vars = dotenv_values(CONFIG_ENV_FILE)
return env_vars.get("SNI")
def change_hysteria2_sni(sni: str):
"""
Changes the SNI for Hysteria2.
"""
run_cmd([VENV_PYTHON, Command.CHANGE_SNI_HYSTERIA2.value, sni])
def backup_hysteria2():
"""Backups Hysteria configuration. Raises an exception on failure."""
try:
run_cmd([VENV_PYTHON, Command.BACKUP_HYSTERIA2.value])
except subprocess.CalledProcessError as e:
raise Exception(f"Backup failed: {e}")
except Exception as ex:
raise
def restore_hysteria2(backup_file_path: str):
"""Restores Hysteria configuration from the given backup file."""
try:
run_cmd([VENV_PYTHON, Command.RESTORE_HYSTERIA2.value, backup_file_path])
except subprocess.CalledProcessError as e:
raise Exception(f"Restore failed: {e}")
except Exception as ex:
raise
def enable_hysteria2_obfs():
"""Generates 'obfs' in Hysteria2 configuration."""
run_cmd([VENV_PYTHON, Command.MANAGE_OBFS.value, "--generate"])
def disable_hysteria2_obfs():
"""Removes 'obfs' from Hysteria2 configuration."""
run_cmd([VENV_PYTHON, Command.MANAGE_OBFS.value, "--remove"])
def check_hysteria2_obfs():
"""Removes 'obfs' from Hysteria2 configuration."""
result = subprocess.run(
[VENV_PYTHON, Command.MANAGE_OBFS.value, "--check"],
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
def enable_hysteria2_masquerade():
"""Enables masquerade for Hysteria2."""
return run_cmd([VENV_PYTHON, Command.MASQUERADE_SCRIPT.value, "1"])
def disable_hysteria2_masquerade():
"""Disables masquerade for Hysteria2."""
return run_cmd([VENV_PYTHON, Command.MASQUERADE_SCRIPT.value, "2"])
def get_hysteria2_masquerade_status():
"""Gets the current masquerade status for Hysteria2."""
return run_cmd([VENV_PYTHON, Command.MASQUERADE_SCRIPT.value, "status"])
def get_hysteria2_config_file() -> dict[str, Any]:
with open(CONFIG_FILE, "r") as f:
return json.loads(f.read())
def set_hysteria2_config_file(data: dict[str, Any]):
content = json.dumps(data, indent=4)
with open(CONFIG_FILE, "w") as f:
f.write(content)
# endregion
# region User
def list_users() -> dict[str, dict[str, Any]] | None:
"""
Lists all users.
"""
if res := run_cmd([VENV_PYTHON, Command.LIST_USERS.value]):
return json.loads(res)
def get_user(username: str) -> dict[str, Any] | None:
"""
Retrieves information about a specific user.
"""
if res := run_cmd([VENV_PYTHON, Command.GET_USER.value, "-u", str(username)]):
return json.loads(res)
def add_user(
username: str,
traffic_limit: int,
expiration_days: int,
password: str | None,
creation_date: str | None,
unlimited: bool,
note: str | None,
):
"""
Adds a new user with the given parameters, respecting positional argument requirements.
"""
command = [
VENV_PYTHON,
Command.ADD_USER.value,
username,
str(traffic_limit),
str(expiration_days),
]
final_password = password if password else generate_password()
command.append(final_password)
if unlimited:
command.append("true")
if note:
if not unlimited:
command.append("false")
command.append(note)
if creation_date:
if not unlimited:
command.append("false")
if not note:
command.append("")
command.append(creation_date)
run_cmd(command)
def bulk_user_add(
traffic_gb: float,
expiration_days: int,
count: int,
prefix: str,
start_number: int,
unlimited: bool,
):
"""
Executes the bulk user creation script with specified parameters.
"""
command = [
VENV_PYTHON,
Command.BULK_USER.value,
"--traffic-gb",
str(traffic_gb),
"--expiration-days",
str(expiration_days),
"--count",
str(count),
"--prefix",
prefix,
"--start-number",
str(start_number),
]
if unlimited:
command.append("--unlimited")
run_cmd(command)
def edit_user(
username: str,
new_username: str | None,
new_password: str | None,
new_traffic_limit: int | None,
new_expiration_days: int | None,
renew_password: bool,
renew_creation_date: bool,
blocked: bool | None,
unlimited_ip: bool | None,
note: str | None,
):
"""
Edits an existing user's details by calling the new edit_user.py script with named flags.
"""
if not username:
raise InvalidInputError("Error: username is required")
command_args = [VENV_PYTHON, Command.EDIT_USER.value, username]
if new_username:
command_args.extend(["--new-username", new_username])
password_to_set = None
if new_password:
password_to_set = new_password
elif renew_password:
password_to_set = generate_password()
if password_to_set:
command_args.extend(["--password", password_to_set])
if new_traffic_limit is not None:
if new_traffic_limit < 0:
raise InvalidInputError(
"Error: traffic limit must be a non-negative number."
)
command_args.extend(["--traffic-gb", str(new_traffic_limit)])
if new_expiration_days is not None:
if new_expiration_days < 0:
raise InvalidInputError(
"Error: expiration days must be a non-negative number."
)
command_args.extend(["--expiration-days", str(new_expiration_days)])
if renew_creation_date:
creation_date = datetime.now().strftime("%Y-%m-%d")
command_args.extend(["--creation-date", creation_date])
if blocked is not None:
command_args.extend(["--blocked", "true" if blocked else "false"])
if unlimited_ip is not None:
command_args.extend(["--unlimited", "true" if unlimited_ip else "false"])
if note is not None:
command_args.extend(["--note", note])
run_cmd(command_args)
def reset_user(username: str):
"""
Resets a user's configuration.
"""
run_cmd([VENV_PYTHON, Command.RESET_USER.value, username])
def remove_users(usernames: list[str]):
"""
Removes one or more users by username.
"""
if not usernames:
return
run_cmd([VENV_PYTHON, Command.REMOVE_USER.value, *usernames])
def kick_users_by_name(usernames: list[str]):
"""Kicks one or more users by username."""
if not usernames:
raise InvalidInputError("Username(s) must be provided to kick.")
script_path = Command.KICK_USER_SCRIPT.value
if not os.path.exists(script_path):
raise ScriptNotFoundError(f"Kick user script not found at: {script_path}")
try:
subprocess.run([VENV_PYTHON, script_path, *usernames], check=True)
except subprocess.CalledProcessError as e:
raise CommandExecutionError(f"Failed to execute kick user script: {e}")
def show_user_uri(
username: str, qrcode: bool, ipv: int, all: bool, singbox: bool, normalsub: bool
) -> str | None:
"""
Displays the URI for a user, with options for QR code and other formats.
"""
command_args = [VENV_PYTHON, Command.SHOW_USER_URI.value, "-u", username]
if qrcode:
command_args.append("-qr")
if all:
command_args.append("-a")
else:
command_args.extend(["-ip", str(ipv)])
if singbox:
command_args.append("-s")
if normalsub:
command_args.append("-n")
return run_cmd(command_args)
def show_user_uri_json(usernames: list[str]) -> list[dict[str, Any]] | None:
"""
Displays the URI for a list of users in JSON format.
"""
script_path = Command.WRAPPER_URI.value
if not os.path.exists(script_path):
raise ScriptNotFoundError(f"Wrapper URI script not found at: {script_path}")
try:
process = subprocess.run(
[VENV_PYTHON, script_path, *usernames],
capture_output=True,
text=True,
check=True,
)
return json.loads(process.stdout)
except subprocess.CalledProcessError as e:
raise CommandExecutionError(
f"Failed to execute wrapper URI script: {e}\nError: {e.stderr}"
)
except FileNotFoundError:
raise ScriptNotFoundError(f"Script not found: {script_path}")
except json.JSONDecodeError:
raise CommandExecutionError(
f"Failed to decode JSON output from script: {script_path}\nOutput: {process.stdout if 'process' in locals() else 'No output'}"
) # Add process check
except Exception as e:
raise HysteriaError(f"An unexpected error occurred: {e}")
# endregion
# region Server
def traffic_status(no_gui=False, display_output=True):
if no_gui:
data = traffic.traffic_status(no_gui=True)
traffic.kick_expired_users()
else:
data = traffic.traffic_status(no_gui=True if not display_output else no_gui)
return data
# Next Update:
# TODO: it's better to return json
# TODO: After json todo need fix Telegram Bot and WebPanel
def server_info() -> str | None:
"""Retrieves server information."""
return run_cmd([VENV_PYTHON, Command.SERVER_INFO.value])
def get_ip_address() -> tuple[str | None, str | None]:
"""
Retrieves the IP address from the .configs.env file.
"""
env_vars = dotenv_values(CONFIG_ENV_FILE)
return env_vars.get("IP4"), env_vars.get("IP6")
def add_ip_address():
"""
Adds IP addresses from the environment to the .configs.env file.
"""
run_cmd([VENV_PYTHON, Command.IP_ADD.value, "add"])
def edit_ip_address(ipv4: str, ipv6: str):
"""
Edits the IP address configuration based on provided IPv4 and/or IPv6 addresses.
:param ipv4: The new IPv4 address to be configured. If provided, the IPv4 address will be updated.
:param ipv6: The new IPv6 address to be configured. If provided, the IPv6 address will be updated.
:raises InvalidInputError: If neither ipv4 nor ipv6 is provided.
"""
# if not ipv4 and not ipv6:
# raise InvalidInputError('Error: --edit requires at least one of --ipv4 or --ipv6.')
if ipv4:
run_cmd([VENV_PYTHON, Command.IP_ADD.value, "edit", "-4", ipv4])
if ipv6:
run_cmd([VENV_PYTHON, Command.IP_ADD.value, "edit", "-6", ipv6])
def add_node(
name: str,
ip: str,
sni: Optional[str] = None,
pinSHA256: Optional[str] = None,
port: Optional[int] = None,
obfs: Optional[str] = None,
insecure: Optional[bool] = None,
):
"""
Adds a new external node.
"""
command = [
VENV_PYTHON,
Command.NODE_MANAGER.value,
"add",
"--name",
name,
"--ip",
ip,
]
if port:
command.extend(["--port", str(port)])
if sni:
command.extend(["--sni", sni])
if pinSHA256:
command.extend(["--pinSHA256", pinSHA256])
if obfs:
command.extend(["--obfs", obfs])
if insecure:
command.append("--insecure")
return run_cmd(command)
def delete_node(name: str):
"""
Deletes an external node by name.
"""
return run_cmd([VENV_PYTHON, Command.NODE_MANAGER.value, "delete", "--name", name])
def list_nodes():
"""
Lists all configured external nodes.
"""
return run_cmd([VENV_PYTHON, Command.NODE_MANAGER.value, "list"])
def generate_node_cert():
"""
Generates a self-signed certificate for nodes.
"""
return run_cmd([VENV_PYTHON, Command.NODE_MANAGER.value, "generate-cert"])
def update_geo(country: str):
"""
Updates geographic data files based on the specified country.
"""
script_path = Command.UPDATE_GEO.value
try:
subprocess.run([VENV_PYTHON, script_path, country.lower()], check=True)
except subprocess.CalledProcessError as e:
raise CommandExecutionError(f"Failed to update geo files: {e}")
except FileNotFoundError:
raise ScriptNotFoundError(f"Script not found: {script_path}")
except Exception as e:
raise HysteriaError(f"An unexpected error occurred: {e}")
def add_extra_config(name: str, uri: str) -> str:
"""Adds an extra proxy configuration."""
return run_cmd(
[
VENV_PYTHON,
Command.EXTRA_CONFIG_SCRIPT.value,
"add",
"--name",
name,
"--uri",
uri,
]
)
def delete_extra_config(name: str) -> str:
"""Deletes an extra proxy configuration."""
return run_cmd(
[VENV_PYTHON, Command.EXTRA_CONFIG_SCRIPT.value, "delete", "--name", name]
)
def list_extra_configs() -> str:
"""Lists all extra proxy configurations."""
return run_cmd([VENV_PYTHON, Command.EXTRA_CONFIG_SCRIPT.value, "list"])
def get_extra_config(name: str) -> dict[str, Any] | None:
"""Gets a specific extra proxy configuration."""
if res := run_cmd(
[VENV_PYTHON, Command.EXTRA_CONFIG_SCRIPT.value, "get", "--name", name]
):
return json.loads(res)
# endregion
# region Advanced Menu
def install_tcp_brutal():
"""Installs TCP Brutal."""
run_cmd([VENV_PYTHON, Command.INSTALL_TCP_BRUTAL.value])
def install_warp():
"""Installs WARP."""
run_cmd([VENV_PYTHON, Command.INSTALL_WARP.value])
def uninstall_warp():
"""Uninstalls WARP."""
run_cmd([VENV_PYTHON, Command.UNINSTALL_WARP.value])
def configure_warp(
all_state: str | None = None,
popular_sites_state: str | None = None,
domestic_sites_state: str | None = None,
block_adult_sites_state: str | None = None,
):
"""
Configures WARP with various options. States are 'on' or 'off'.
"""
cmd_args = [VENV_PYTHON, Command.CONFIGURE_WARP.value]
if all_state:
cmd_args.extend(["--set-all", all_state])
if popular_sites_state:
cmd_args.extend(["--set-popular-sites", popular_sites_state])
if domestic_sites_state:
cmd_args.extend(["--set-domestic-sites", domestic_sites_state])
if block_adult_sites_state:
cmd_args.extend(["--set-block-adult", block_adult_sites_state])
if len(cmd_args) == 2:
print("No WARP configuration options provided to cli_api.configure_warp.")
return
run_cmd(cmd_args)
def warp_status() -> str | None:
"""Checks the status of WARP."""
return run_cmd([VENV_PYTHON, Command.STATUS_WARP.value])
def start_telegram_bot(token: str, adminid: str, backup_interval: Optional[int] = None):
"""Starts the Telegram bot."""
if not token or not adminid:
raise InvalidInputError(
"Error: Both --token and --adminid are required for the start action."
)
command = [VENV_PYTHON, Command.INSTALL_TELEGRAMBOT.value, "start", token, adminid]
if backup_interval is not None:
command.append(str(backup_interval))
run_cmd(command)
def stop_telegram_bot():
"""Stops the Telegram bot."""
run_cmd([VENV_PYTHON, Command.INSTALL_TELEGRAMBOT.value, "stop"])
def get_telegram_bot_backup_interval() -> int | None:
"""Retrievels the current BACKUP_INTERVAL_HOUR for the Telegram Bot service from its .env file."""
try:
if not os.path.exists(TELEGRAM_ENV_FILE):
return None
env_vars = dotenv_values(TELEGRAM_ENV_FILE)
interval_str = env_vars.get("BACKUP_INTERVAL_HOUR")
if interval_str:
try:
return int(float(interval_str))
except (ValueError, TypeError):
return None
return None
except Exception as e:
print(f"Error reading Telegram Bot .env file: {e}")
return None
def set_telegram_bot_backup_interval(backup_interval: int):
"""Sets the backup interval for the Telegram bot."""
if backup_interval is None:
raise InvalidInputError("Error: Backup interval is required.")
run_cmd(
[
VENV_PYTHON,
Command.INSTALL_TELEGRAMBOT.value,
"set_backup_interval",
str(backup_interval),
]
)
def start_singbox(domain: str, port: int):
"""Starts Singbox."""
if not domain or not port:
raise InvalidInputError(
"Error: Both --domain and --port are required for the start action."
)
run_cmd(["bash", Command.SHELL_SINGBOX.value, "start", domain, str(port)])
def stop_singbox():
"""Stops Singbox."""
run_cmd(["bash", Command.SHELL_SINGBOX.value, "stop"])
def start_normalsub(domain: str, port: int):
"""Starts NormalSub."""
if not domain or not port:
raise InvalidInputError(
"Error: Both --domain and --port are required for the start action."
)
run_cmd(["bash", Command.INSTALL_NORMALSUB.value, "start", domain, str(port)])
def edit_normalsub_subpath(new_subpath: str):
"""Edits the subpath for NormalSub service."""
if not new_subpath:
raise InvalidInputError("Error: New subpath cannot be empty.")
if not re.match(r"^[a-zA-Z0-9]+(?:/[a-zA-Z0-9]+)*$", new_subpath):
raise InvalidInputError(
"Error: Invalid subpath format. Must be alphanumeric segments separated by single slashes (e.g., 'path' or 'path/to/resource')."
)
run_cmd(["bash", Command.INSTALL_NORMALSUB.value, "edit_subpath", new_subpath])
def get_normalsub_subpath() -> str | None:
"""Retrieves the current SUBPATH for the NormalSub service from its .env file."""
try:
if not os.path.exists(NORMALSUB_ENV_FILE):
return None
env_vars = dotenv_values(NORMALSUB_ENV_FILE)
return env_vars.get("SUBPATH")
except Exception as e:
print(f"Error reading NormalSub .env file: {e}")
return None
def stop_normalsub():
"""Stops NormalSub."""
run_cmd(["bash", Command.INSTALL_NORMALSUB.value, "stop"])
def start_webpanel(
domain: str,
port: int,
admin_username: str,
admin_password: str,
expiration_minutes: int,
debug: bool,
decoy_path: str,
):
"""Starts WebPanel."""
if (
not domain
or not port
or not admin_username
or not admin_password
or not expiration_minutes
):
raise InvalidInputError(
"Error: Both --domain and --port are required for the start action."
)
run_cmd(
[
"bash",
Command.SHELL_WEBPANEL.value,
"start",
domain,
str(port),
admin_username,
admin_password,
str(expiration_minutes),
str(debug).lower(),
str(decoy_path),
]
)
def stop_webpanel():
"""Stops WebPanel."""
run_cmd(["bash", Command.SHELL_WEBPANEL.value, "stop"])
def setup_webpanel_decoy(domain: str, decoy_path: str):
"""Sets up or updates the decoy site for the web panel."""
if not domain or not decoy_path:
raise InvalidInputError("Error: Both domain and decoy_path are required.")
run_cmd(["bash", Command.SHELL_WEBPANEL.value, "decoy", domain, decoy_path])
def stop_webpanel_decoy():
"""Stops and removes the decoy site configuration for the web panel."""
run_cmd(["bash", Command.SHELL_WEBPANEL.value, "stopdecoy"])
def get_webpanel_decoy_status() -> dict[str, Any]:
"""Checks the status of the webpanel decoy site configuration."""
try:
if not os.path.exists(WEBPANEL_ENV_FILE):
return {"active": False, "path": None}
env_vars = dotenv_values(WEBPANEL_ENV_FILE)
decoy_path = env_vars.get("DECOY_PATH")
if decoy_path and decoy_path.strip():
return {"active": True, "path": decoy_path.strip()}
else:
return {"active": False, "path": None}
except Exception as e:
print(f"Error checking decoy status: {e}")
return {"active": False, "path": None}
def get_webpanel_url() -> str | None:
"""Gets the URL of WebPanel."""
return run_cmd(["bash", Command.SHELL_WEBPANEL.value, "url"])
def get_webpanel_api_token() -> str | None:
"""Gets the API token of WebPanel."""
return run_cmd(["bash", Command.SHELL_WEBPANEL.value, "api-token"])
def get_webpanel_env_config() -> dict[str, Any]:
"""Retrieves the current configuration for the WebPanel service from its .env file."""
try:
if not os.path.exists(WEBPANEL_ENV_FILE):
return {}
env_vars = dotenv_values(WEBPANEL_ENV_FILE)
config = {}
config["DOMAIN"] = env_vars.get("DOMAIN")
config["ROOT_PATH"] = env_vars.get("ROOT_PATH")
port_val = env_vars.get("PORT")
if port_val and port_val.isdigit():
config["PORT"] = int(port_val)
exp_val = env_vars.get("EXPIRATION_MINUTES")
if exp_val and exp_val.isdigit():
config["EXPIRATION_MINUTES"] = int(exp_val)
return config
except Exception as e:
print(f"Error reading WebPanel .env file: {e}")
return {}
def reset_webpanel_credentials(
new_username: str | None = None, new_password: str | None = None
):
"""Resets the WebPanel admin username and/or password."""
if not new_username and not new_password:
raise InvalidInputError(
"Error: At least new username or new password must be provided."
)
cmd_args = ["bash", Command.SHELL_WEBPANEL.value, "resetcreds"]
if new_username:
cmd_args.extend(["-u", new_username])
if new_password:
cmd_args.extend(["-p", new_password])
run_cmd(cmd_args)
def change_webpanel_expiration(expiration_minutes: int):
"""Changes the session expiration time for the WebPanel."""
if not expiration_minutes:
raise InvalidInputError("Error: Expiration minutes must be provided.")
run_cmd(
["bash", Command.SHELL_WEBPANEL.value, "changeexp", str(expiration_minutes)]
)
def change_webpanel_root_path(root_path: str | None = None):
"""Changes the root path for the WebPanel. A new random path is generated if not provided."""
cmd_args = ["bash", Command.SHELL_WEBPANEL.value, "changeroot"]
if root_path:
cmd_args.append(root_path)
run_cmd(cmd_args)
def change_webpanel_domain_port(domain: str | None = None, port: int | None = None):
"""Changes the domain and/or port for the WebPanel."""
if not domain and not port:
raise InvalidInputError(
"Error: At least a new domain or new port must be provided."
)
cmd_args = ["bash", Command.SHELL_WEBPANEL.value, "changedomain"]
if domain:
cmd_args.extend(["-d", domain])
if port:
cmd_args.extend(["-p", str(port)])
run_cmd(cmd_args)
def get_services_status() -> dict[str, bool] | None:
"""Gets the status of all project services."""
if res := run_cmd(["bash", Command.SERVICES_STATUS.value]):
return json.loads(res)
def show_version() -> str | None:
"""Displays the currently installed version of the panel."""
return run_cmd([VENV_PYTHON, Command.VERSION.value, "show-version"])
def check_version() -> str | None:
"""Checks if the current version is up-to-date and displays changelog if not."""
return run_cmd([VENV_PYTHON, Command.VERSION.value, "check-version"])
def start_ip_limiter():
"""Starts the IP limiter service."""
run_cmd(["bash", Command.LIMIT_SCRIPT.value, "start"])
def stop_ip_limiter():
"""Stops the IP limiter service."""
run_cmd(["bash", Command.LIMIT_SCRIPT.value, "stop"])
def clean_ip_limiter():
"""Cleans the IP limiter database and unblocks all IPs."""
run_cmd(["bash", Command.LIMIT_SCRIPT.value, "clean"])
def config_ip_limiter(
block_duration: Optional[int] = None, max_ips: Optional[int] = None
):
"""Configures the IP limiter service."""
if block_duration is not None and block_duration <= 0:
raise InvalidInputError("Block duration must be greater than 0.")
if max_ips is not None and max_ips <= 0:
raise InvalidInputError("Max IPs must be greater than 0.")
cmd_args = ["bash", Command.LIMIT_SCRIPT.value, "config"]
if block_duration is not None:
cmd_args.append(str(block_duration))
else:
cmd_args.append("")
if max_ips is not None:
cmd_args.append(str(max_ips))
else:
cmd_args.append("")
run_cmd(cmd_args)
def get_ip_limiter_config() -> dict[str, int | None]:
"""Retrieves the current IP Limiter configuration from .configs.env."""
try:
if not os.path.exists(CONFIG_ENV_FILE):
return {"block_duration": None, "max_ips": None}
env_vars = dotenv_values(CONFIG_ENV_FILE)
block_duration_str = env_vars.get("BLOCK_DURATION")
max_ips_str = env_vars.get("MAX_IPS")
block_duration = (
int(block_duration_str)
if block_duration_str and block_duration_str.isdigit()
else None
)
max_ips = int(max_ips_str) if max_ips_str and max_ips_str.isdigit() else None
return {"block_duration": block_duration, "max_ips": max_ips}
except Exception as e:
print(f"Error reading IP Limiter config from .configs.env: {e}")
return {"block_duration": None, "max_ips": None}
# endregion