Merge pull request #130 from ReturnFI/beta

This commit is contained in:
Whispering Wind
2025-04-19 19:47:31 +03:30
committed by GitHub
11 changed files with 486 additions and 380 deletions

View File

@ -1,11 +1,9 @@
## [1.6.0] - 2025-04-14
## [1.7.0] - 2025-04-19
### Changed
- 🚀 **Optimization:** Improved user edit/remove functionality and eliminated unnecessary service restart delay.
- 👥 **User Management:** Enhanced kick functionality with the ability to target specific users and integrated advanced handling.
- 🔍 **API Enhancement:** Expanded `UserInfoResponse` class with additional details for a more comprehensive output.
- ❗ **Bug Fix:** Improved user removal/reset API to return proper 404 errors when applicable.
- ⚙️ **CLI Update:** Added a new `--no-gui` flag to the `traffic-status` command for enhanced flexibility.
### Notes
- These updates aim to streamline user operations and improve overall system responsiveness.
- 🧪 feat: Add show-user-uri-json CLI command
- 🌐 feat: Introduce user URI API endpoint
- 🖥️ feat: Integrate show_user_uri_api into the Users page
- 🧹 refactor: Move URI generation logic from ViewModel to backend logic
- 📦 refactor: Rewrite show-user-uri to Python for consistency
- ⚙️ optimize: Improve server_info.sh for better performance and lower resource usage

View File

@ -7,7 +7,7 @@ import json
def pretty_print(data: typing.Any):
if isinstance(data, dict):
if isinstance(data, dict) or isinstance(data, list):
print(json.dumps(data, indent=4))
return
@ -205,6 +205,22 @@ def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: boo
click.echo(f"URI for user '{username}' could not be generated.")
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('show-user-uri-json')
@click.argument('usernames', nargs=-1, required=True)
def show_user_uri_json(usernames: list[str]):
"""
Displays URI information in JSON format for a list of users.
"""
try:
res = cli_api.show_user_uri_json(usernames)
if res:
pretty_print(res)
else:
click.echo('No user URIs could be generated.')
except Exception as e:
click.echo(f'{e}', err=True)
# endregion
# region Server

View File

@ -27,7 +27,8 @@ class Command(Enum):
EDIT_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'edit_user.sh')
RESET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'reset_user.sh')
REMOVE_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'remove_user.sh')
SHOW_USER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'show_user_uri.sh')
SHOW_USER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'show_user_uri.py')
WRAPPER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'wrapper_uri.py')
IP_ADD = os.path.join(SCRIPT_DIR, 'hysteria2', 'ip.sh')
MANAGE_OBFS = os.path.join(SCRIPT_DIR, 'hysteria2', 'manage_obfs.sh')
MASQUERADE_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'masquerade.sh')
@ -321,7 +322,7 @@ def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: boo
'''
Displays the URI for a user, with options for QR code and other formats.
'''
command_args = ['bash', Command.SHOW_USER_URI.value, '-u', username]
command_args = ['python3', Command.SHOW_USER_URI.value, '-u', username]
if qrcode:
command_args.append('-qr')
if all:
@ -334,6 +335,25 @@ def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: boo
command_args.append('-n')
return run_cmd(command_args)
def show_user_uri_json(usernames: list[str]) -> list[dict[str, Any]] | None:
'''
Displays the URI for a list of users in JSON format.
'''
script_path = Command.WRAPPER_URI.value
if not os.path.exists(script_path):
raise ScriptNotFoundError(f"Wrapper URI script not found at: {script_path}")
try:
process = subprocess.run(['python3', script_path, *usernames], capture_output=True, text=True, check=True)
return json.loads(process.stdout)
except subprocess.CalledProcessError as e:
raise CommandExecutionError(f"Failed to execute wrapper URI script: {e}\nError: {e.stderr}")
except FileNotFoundError:
raise ScriptNotFoundError(f'Script not found: {script_path}')
except json.JSONDecodeError:
raise CommandExecutionError(f"Failed to decode JSON output from script: {script_path}\nOutput: {process.stdout if 'process' in locals() else 'No output'}") # Add process check
except Exception as e:
raise HysteriaError(f'An unexpected error occurred: {e}')
# endregion
# region Server

View File

@ -3,73 +3,61 @@
source /etc/hysteria/core/scripts/path.sh
get_secret() {
if [ ! -f "$CONFIG_FILE" ]; then
echo "Error: config.json file not found!"
[ ! -f "$CONFIG_FILE" ] && { echo "Error: config.json file not found!" >&2; exit 1; }
local secret=$(jq -r '.trafficStats.secret' "$CONFIG_FILE")
[ "$secret" = "null" ] || [ -z "$secret" ] && {
echo "Error: secret not found in config.json!" >&2
exit 1
fi
}
secret=$(jq -r '.trafficStats.secret' $CONFIG_FILE)
if [ "$secret" == "null" ] || [ -z "$secret" ]; then
echo "Error: secret not found in config.json!"
exit 1
fi
echo $secret
echo "$secret"
}
convert_bytes() {
local bytes=$1
if (( bytes < 1048576 )); then
echo "$(echo "scale=2; $bytes / 1024" | bc) KB"
printf "%.2f KB" "$(echo "scale=2; $bytes / 1024" | bc)"
elif (( bytes < 1073741824 )); then
echo "$(echo "scale=2; $bytes / 1048576" | bc) MB"
printf "%.2f MB" "$(echo "scale=2; $bytes / 1048576" | bc)"
elif (( bytes < 1099511627776 )); then
echo "$(echo "scale=2; $bytes / 1073741824" | bc) GB"
printf "%.2f GB" "$(echo "scale=2; $bytes / 1073741824" | bc)"
else
echo "$(echo "scale=2; $bytes / 1099511627776" | bc) TB"
printf "%.2f TB" "$(echo "scale=2; $bytes / 1099511627776" | bc)"
fi
}
# iT'S BETTER TO PRINT BYTES ITSELF AND NOT HUMAN READABLE FORMAT BECAUSE THE CALLER SHOULD DECIDE WHAT TO PRINT
cpu_usage=$(top -bn1 | grep "Cpu(s)" | sed "s/.*, *\([0-9.]*\)%* id.*/\1/" | awk '{print 100 - $1"%"}')
total_ram=$(free -m | awk '/Mem:/ {print $2}')
used_ram=$(free -m | awk '/Mem:/ {print $3}')
mem_stats=$(free -m)
mem_total=$(echo "$mem_stats" | awk '/Mem:/ {print $2}')
mem_used=$(echo "$mem_stats" | awk '/Mem:/ {print $3}')
secret=$(get_secret)
online_users=$(curl -s -H "Authorization: $secret" $ONLINE_API_URL)
online_user_count=$(echo $online_users | jq 'add')
if [ "$online_user_count" == "null" ] || [ "$online_user_count" == "0" ]; then
online_user_count=0
fi
online_users=$(curl -s -H "Authorization: $secret" "$ONLINE_API_URL")
online_user_count=$(echo "$online_users" | jq 'add // 0')
echo "📈 CPU Usage: $cpu_usage"
echo "📋 Total RAM: ${total_ram}MB"
echo "💻 Used RAM: ${used_ram}MB"
echo "📋 Total RAM: ${mem_total}MB"
echo "💻 Used RAM: ${mem_used}MB"
echo "👥 Online Users: $online_user_count"
echo
#echo "🚦Total Traffic: "
if [ -f "$USERS_FILE" ]; then
total_upload=0
total_download=0
read total_upload total_download <<< $(jq -r '
reduce .[] as $user (
{"up": 0, "down": 0};
.up += (($user.upload_bytes | numbers) // 0) |
.down += (($user.download_bytes | numbers) // 0)
) | "\(.up) \(.down)"' "$USERS_FILE" 2>/dev/null || echo "0 0")
while IFS= read -r line; do
upload=$(echo $line | jq -r '.upload_bytes')
download=$(echo $line | jq -r '.download_bytes')
total_upload=$(echo "$total_upload + $upload" | bc)
total_download=$(echo "$total_download + $download" | bc)
done <<< "$(jq -c '.[]' $USERS_FILE)"
total_upload=${total_upload:-0}
total_download=${total_download:-0}
total_upload_human=$(convert_bytes $total_upload)
total_download_human=$(convert_bytes $total_download)
echo "🔼 Uploaded Traffic: ${total_upload_human}"
echo "🔽 Downloaded Traffic: ${total_download_human}"
echo "🔼 Uploaded Traffic: $(convert_bytes "$total_upload")"
echo "🔽 Downloaded Traffic: $(convert_bytes "$total_download")"
total_traffic=$((total_upload + total_download))
total_traffic_human=$(convert_bytes $total_traffic)
echo "📊 Total Traffic: ${total_traffic_human}"
echo "📊 Total Traffic: $(convert_bytes "$total_traffic")"
fi

View File

@ -0,0 +1,216 @@
#!/usr/bin/env python3
import os
import sys
import json
import subprocess
import argparse
import re
from typing import Tuple, Optional, Dict, List, Any
CORE_DIR = "/etc/hysteria"
CONFIG_FILE = f"{CORE_DIR}/config.json"
USERS_FILE = f"{CORE_DIR}/users.json"
HYSTERIA2_ENV = f"{CORE_DIR}/.configs.env"
SINGBOX_ENV = f"{CORE_DIR}/core/scripts/singbox/.env"
NORMALSUB_ENV = f"{CORE_DIR}/core/scripts/normalsub/.env"
def load_env_file(env_file: str) -> Dict[str, str]:
"""Load environment variables from a file into a dictionary."""
env_vars = {}
if os.path.exists(env_file):
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
env_vars[key] = value
return env_vars
def load_hysteria2_env() -> Dict[str, str]:
"""Load Hysteria2 environment variables."""
return load_env_file(HYSTERIA2_ENV)
def load_hysteria2_ips() -> Tuple[str, str, str]:
"""Load Hysteria2 IPv4 and IPv6 addresses from environment."""
env_vars = load_hysteria2_env()
ip4 = env_vars.get('IP4', 'None')
ip6 = env_vars.get('IP6', 'None')
sni = env_vars.get('SNI', '')
return ip4, ip6, sni
def get_singbox_domain_and_port() -> Tuple[str, str]:
"""Get domain and port from SingBox config."""
env_vars = load_env_file(SINGBOX_ENV)
domain = env_vars.get('HYSTERIA_DOMAIN', '')
port = env_vars.get('HYSTERIA_PORT', '')
return domain, port
def get_normalsub_domain_and_port() -> Tuple[str, str, str]:
"""Get domain, port, and subpath from Normal-SUB config."""
env_vars = load_env_file(NORMALSUB_ENV)
domain = env_vars.get('HYSTERIA_DOMAIN', '')
port = env_vars.get('HYSTERIA_PORT', '')
subpath = env_vars.get('SUBPATH', '')
return domain, port, subpath
def is_service_active(service_name: str) -> bool:
"""Check if a systemd service is active."""
try:
result = subprocess.run(
['systemctl', 'is-active', '--quiet', service_name],
check=False
)
return result.returncode == 0
except Exception:
return False
def generate_uri(username: str, auth_password: str, ip: str, port: str,
obfs_password: str, sha256: str, sni: str, ip_version: int) -> str:
"""Generate Hysteria2 URI for the given parameters."""
uri_base = f"hy2://{username}%3A{auth_password}@{ip}:{port}"
# Handle IPv6 address formatting
if ip_version == 6 and re.match(r'^[0-9a-fA-F:]+$', ip):
uri_base = f"hy2://{username}%3A{auth_password}@[{ip}]:{port}"
params = []
if obfs_password:
params.append(f"obfs=salamander&obfs-password={obfs_password}")
if sha256:
params.append(f"pinSHA256={sha256}")
params.append(f"insecure=1&sni={sni}")
params_str = "&".join(params)
return f"{uri_base}?{params_str}#{username}-IPv{ip_version}"
def generate_qr_code(uri: str) -> List[str]:
"""Generate QR code for the URI using qrencode."""
try:
result = subprocess.run(
['qrencode', '-t', 'UTF8', '-s', '3', '-m', '2'],
input=uri.encode(),
capture_output=True,
check=True
)
return result.stdout.decode().splitlines()
except subprocess.CalledProcessError:
return ["QR Code generation failed. Is qrencode installed?"]
except Exception as e:
return [f"Error generating QR code: {str(e)}"]
def center_text(text: str, width: int) -> str:
"""Center text in the given width."""
return text.center(width)
def get_terminal_width() -> int:
"""Get terminal width."""
try:
return os.get_terminal_size().columns
except (AttributeError, OSError):
return 80
def show_uri(args: argparse.Namespace) -> None:
"""Show URI and optional QR codes for the given username."""
if not os.path.exists(USERS_FILE):
print(f"\033[0;31mError:\033[0m Config file {USERS_FILE} not found.")
return
if not is_service_active("hysteria-server.service"):
print("\033[0;31mError:\033[0m Hysteria2 is not active.")
return
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
with open(USERS_FILE, 'r') as f:
users = json.load(f)
if args.username not in users:
print("Invalid username. Please try again.")
return
auth_password = users[args.username]["password"]
port = config["listen"].split(":")[1] if ":" in config["listen"] else config["listen"]
sha256 = config.get("tls", {}).get("pinSHA256", "")
obfs_password = config.get("obfs", {}).get("salamander", {}).get("password", "")
ip4, ip6, sni = load_hysteria2_ips()
available_ip4 = ip4 and ip4 != "None"
available_ip6 = ip6 and ip6 != "None"
uri_ipv4 = None
uri_ipv6 = None
if args.all:
if available_ip4:
uri_ipv4 = generate_uri(args.username, auth_password, ip4, port,
obfs_password, sha256, sni, 4)
print(f"\nIPv4:\n{uri_ipv4}\n")
if available_ip6:
uri_ipv6 = generate_uri(args.username, auth_password, ip6, port,
obfs_password, sha256, sni, 6)
print(f"\nIPv6:\n{uri_ipv6}\n")
else:
if args.ip_version == 4 and available_ip4:
uri_ipv4 = generate_uri(args.username, auth_password, ip4, port,
obfs_password, sha256, sni, 4)
print(f"\nIPv4:\n{uri_ipv4}\n")
elif args.ip_version == 6 and available_ip6:
uri_ipv6 = generate_uri(args.username, auth_password, ip6, port,
obfs_password, sha256, sni, 6)
print(f"\nIPv6:\n{uri_ipv6}\n")
else:
print("Invalid IP version or no available IP for the requested version.")
return
if args.qrcode:
terminal_width = get_terminal_width()
if uri_ipv4:
qr_code = generate_qr_code(uri_ipv4)
print("\nIPv4 QR Code:\n")
for line in qr_code:
print(center_text(line, terminal_width))
if uri_ipv6:
qr_code = generate_qr_code(uri_ipv6)
print("\nIPv6 QR Code:\n")
for line in qr_code:
print(center_text(line, terminal_width))
if args.singbox and is_service_active("hysteria-singbox.service"):
domain, port = get_singbox_domain_and_port()
if domain and port:
print(f"\nSingbox Sublink:\nhttps://{domain}:{port}/sub/singbox/{args.username}/{args.ip_version}#{args.username}\n")
if args.normalsub and is_service_active("hysteria-normal-sub.service"):
domain, port, subpath = get_normalsub_domain_and_port()
if domain and port:
print(f"\nNormal-SUB Sublink:\nhttps://{domain}:{port}/{subpath}/sub/normal/{args.username}#Hysteria2\n")
def main():
"""Main function to parse arguments and show URIs."""
parser = argparse.ArgumentParser(description="Hysteria2 URI Generator")
parser.add_argument("-u", "--username", help="Username to generate URI for")
parser.add_argument("-qr", "--qrcode", action="store_true", help="Generate QR code")
parser.add_argument("-ip", "--ip-version", type=int, default=4, choices=[4, 6],
help="IP version (4 or 6)")
parser.add_argument("-a", "--all", action="store_true", help="Show all available IPs")
parser.add_argument("-s", "--singbox", action="store_true", help="Generate SingBox sublink")
parser.add_argument("-n", "--normalsub", action="store_true", help="Generate Normal-SUB sublink")
args = parser.parse_args()
if not args.username:
parser.print_help()
sys.exit(1)
show_uri(args)
if __name__ == "__main__":
main()

View File

@ -1,169 +0,0 @@
#!/bin/bash
source /etc/hysteria/core/scripts/path.sh
source /etc/hysteria/core/scripts/utils.sh
get_singbox_domain_and_port() {
if [ -f "$SINGBOX_ENV" ]; then
local domain port
domain=$(grep -E '^HYSTERIA_DOMAIN=' "$SINGBOX_ENV" | cut -d'=' -f2)
port=$(grep -E '^HYSTERIA_PORT=' "$SINGBOX_ENV" | cut -d'=' -f2)
echo "$domain" "$port"
else
echo ""
fi
}
get_normalsub_domain_and_port() {
if [ -f "$NORMALSUB_ENV" ]; then
local domain port subpath
domain=$(grep -E '^HYSTERIA_DOMAIN=' "$NORMALSUB_ENV" | cut -d'=' -f2)
port=$(grep -E '^HYSTERIA_PORT=' "$NORMALSUB_ENV" | cut -d'=' -f2)
subpath=$(grep -E '^SUBPATH=' "$NORMALSUB_ENV" | cut -d'=' -f2)
echo "$domain" "$port" "$subpath"
else
echo ""
fi
}
show_uri() {
if [ -f "$USERS_FILE" ]; then
if systemctl is-active --quiet hysteria-server.service; then
local username
local generate_qrcode=false
local ip_version=4
local show_all=false
local generate_singbox=false
local generate_normalsub=false
load_hysteria2_env
load_hysteria2_ips
available_ip4=true
available_ip6=true
if [[ -z "$IP4" || "$IP4" == "None" ]]; then
available_ip4=false
fi
if [[ -z "$IP6" || "$IP6" == "None" ]]; then
available_ip6=false
fi
while [[ "$#" -gt 0 ]]; do
case $1 in
-u|--username) username="$2"; shift ;;
-qr|--qrcode) generate_qrcode=true ;;
-ip) ip_version="$2"; shift ;;
-a|--all) show_all=true ;;
-s|--singbox) generate_singbox=true ;;
-n|--normalsub) generate_normalsub=true ;;
*) echo "Unknown parameter passed: $1"; exit 1 ;;
esac
shift
done
if [ -z "$username" ]; then
echo "Usage: $0 -u <username> [-qr] [-ip <4|6>] [-a] [-s]"
exit 1
fi
if jq -e "has(\"$username\")" "$USERS_FILE" > /dev/null; then
authpassword=$(jq -r ".\"$username\".password" "$USERS_FILE")
port=$(jq -r '.listen' "$CONFIG_FILE" | cut -d':' -f2)
sha256=$(jq -r '.tls.pinSHA256 // empty' "$CONFIG_FILE")
obfspassword=$(jq -r '.obfs.salamander.password // empty' "$CONFIG_FILE")
generate_uri() {
local ip_version=$1
local ip=$2
local uri_base="hy2://$username%3A$authpassword@$ip:$port"
if [ "$ip_version" -eq 6 ]; then
if [[ "$ip" =~ ^[0-9a-fA-F:]+$ ]]; then
uri_base="hy2://$username%3A$authpassword@[$ip]:$port"
else
uri_base="hy2://$username%3A$authpassword@$ip:$port"
fi
fi
local params=""
if [ -n "$obfspassword" ]; then
params+="obfs=salamander&obfs-password=$obfspassword&"
fi
if [ -n "$sha256" ]; then
params+="pinSHA256=$sha256&"
fi
params+="insecure=1&sni=$SNI"
echo "$uri_base?$params#$username-IPv$ip_version"
}
if [ "$show_all" = true ]; then
if [ "$available_ip4" = true ]; then
URI=$(generate_uri 4 "$IP4")
echo -e "\nIPv4:\n$URI\n"
fi
if [ "$available_ip6" = true ]; then
URI6=$(generate_uri 6 "$IP6")
echo -e "\nIPv6:\n$URI6\n"
fi
else
if [ "$ip_version" -eq 4 ] && [ "$available_ip4" = true ]; then
URI=$(generate_uri 4 "$IP4")
echo -e "\nIPv4:\n$URI\n"
elif [ "$ip_version" -eq 6 ] && [ "$available_ip6" = true ]; then
URI6=$(generate_uri 6 "$IP6")
echo -e "\nIPv6:\n$URI6\n"
else
echo "Invalid IP version or no available IP for the requested version."
exit 1
fi
fi
if [ "$generate_qrcode" = true ]; then
cols=$(tput cols)
if [ "$available_ip4" = true ] && [ -n "$URI" ]; then
qr1=$(echo -n "$URI" | qrencode -t UTF8 -s 3 -m 2)
echo -e "\nIPv4 QR Code:\n"
echo "$qr1" | while IFS= read -r line; do
printf "%*s\n" $(( (${#line} + cols) / 2)) "$line"
done
fi
if [ "$available_ip6" = true ] && [ -n "$URI6" ]; then
qr2=$(echo -n "$URI6" | qrencode -t UTF8 -s 3 -m 2)
echo -e "\nIPv6 QR Code:\n"
echo "$qr2" | while IFS= read -r line; do
printf "%*s\n" $(( (${#line} + cols) / 2)) "$line"
done
fi
fi
if [ "$generate_singbox" = true ] && systemctl is-active --quiet hysteria-singbox.service; then
read -r domain port < <(get_singbox_domain_and_port)
if [ -n "$domain" ] && [ -n "$port" ]; then
echo -e "\nSingbox Sublink:\nhttps://$domain:$port/sub/singbox/$username/$ip_version#$username\n"
fi
fi
if [ "$generate_normalsub" = true ] && systemctl is-active --quiet hysteria-normal-sub.service; then
read -r domain port subpath < <(get_normalsub_domain_and_port)
if [ -n "$domain" ] && [ -n "$port" ]; then
echo -e "\nNormal-SUB Sublink:\nhttps://$domain:$port/$subpath/sub/normal/$username#Hysteria2\n"
fi
fi
else
echo "Invalid username. Please try again."
fi
else
echo -e "\033[0;31mError:\033[0m Hysteria2 is not active."
fi
else
echo -e "\033[0;31mError:\033[0m Config file $USERS_FILE not found."
fi
}
show_uri "$@"

View File

@ -0,0 +1,57 @@
import subprocess
import concurrent.futures
import re
import json
import sys
SHOW_URI_SCRIPT = "/etc/hysteria/core/scripts/hysteria2/show_user_uri.py"
DEFAULT_ARGS = ["-a", "-n", "-s"]
def run_show_uri(username):
try:
cmd = ["python3", SHOW_URI_SCRIPT, "-u", username] + DEFAULT_ARGS
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
output = result.stdout
if "Invalid username" in output:
return {"username": username, "error": "User not found"}
return parse_output(username, output)
except subprocess.CalledProcessError as e:
return {"username": username, "error": e.stderr.strip()}
def parse_output(username, output):
ipv4 = None
ipv6 = None
normal_sub = None
# Match links
ipv4_match = re.search(r"IPv4:\s*(hy2://[^\s]+)", output)
ipv6_match = re.search(r"IPv6:\s*(hy2://[^\s]+)", output)
normal_sub_match = re.search(r"Normal-SUB Sublink:\s*(https?://[^\s]+)", output)
if ipv4_match:
ipv4 = ipv4_match.group(1)
if ipv6_match:
ipv6 = ipv6_match.group(1)
if normal_sub_match:
normal_sub = normal_sub_match.group(1)
return {
"username": username,
"ipv4": ipv4,
"ipv6": ipv6,
"normal_sub": normal_sub
}
def batch_show_uri(usernames, max_workers=20):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(run_show_uri, usernames))
return results
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python3 show_uri_json.py user1 user2 ...")
sys.exit(1)
usernames = sys.argv[1:]
output_list = batch_show_uri(usernames)
print(json.dumps(output_list, indent=2))

View File

@ -1,4 +1,4 @@
from pydantic import BaseModel
from typing import Optional
from pydantic import BaseModel, RootModel
@ -37,3 +37,9 @@ class EditUserInputBody(BaseModel):
renew_password: bool = False
renew_creation_date: bool = False
blocked: bool = False
class UserUriResponse(BaseModel):
username: str
ipv4: str | None = None
ipv6: str | None = None
normal_sub: str | None = None

View File

@ -1,6 +1,6 @@
from fastapi import APIRouter, HTTPException
from .schema.user import UserListResponse, UserInfoResponse, AddUserInputBody, EditUserInputBody
from .schema.user import UserListResponse, UserInfoResponse, AddUserInputBody, EditUserInputBody, UserUriResponse
from .schema.response import DetailResponse
import cli_api
@ -150,11 +150,33 @@ async def reset_user_api(username: str):
except Exception as e:
raise HTTPException(status_code=400, detail=f'Error: {str(e)}')
# TODO implement show user uri endpoint
# @router.get('/{username}/uri', response_model=TODO)
# async def show_user_uri(username: str):
# try:
# res = cli_api.show_user_uri(username)
# return res
# except Exception as e:
# raise HTTPException(status_code=400, detail=f'Error: {str(e)}')
@router.get('/{username}/uri', response_model=UserUriResponse)
async def show_user_uri_api(username: str):
"""
Get the URI information for a user in JSON format.
Args:
username: The username of the user.
Returns:
UserUriResponse: An object containing URI information for the user.
Raises:
HTTPException: 404 if the user is not found, 400 if another error occurs.
"""
try:
uri_data_list = cli_api.show_user_uri_json([username])
if not uri_data_list:
raise HTTPException(status_code=404, detail=f'URI for user {username} not found.')
uri_data = uri_data_list[0]
if uri_data.get('error'):
raise HTTPException(status_code=404, detail=f"{uri_data['error']}")
return uri_data
except cli_api.ScriptNotFoundError as e:
raise HTTPException(status_code=500, detail=f'Server script error: {str(e)}')
except cli_api.CommandExecutionError as e:
raise HTTPException(status_code=400, detail=f'Error executing script: {str(e)}')
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=f'Unexpected error: {str(e)}')

View File

@ -4,57 +4,6 @@ from datetime import datetime, timedelta
import cli_api
class Config(BaseModel):
type: str
link: str
@staticmethod
def from_username(username: str) -> list['Config']:
raw_uri = Config.__get_user_configs_uri(username)
if not raw_uri:
return []
res = []
for line in raw_uri.splitlines():
config = Config.__parse_user_configs_uri_line(line)
if config:
res.append(config)
return res
@staticmethod
def __get_user_configs_uri(username: str) -> str:
# This command is equivalent to `show-user-uri --username $username --ipv 4 --all --singbox --normalsub`
raw_uri = cli_api.show_user_uri(username, False, 4, True, True, True)
return raw_uri.strip() if raw_uri else ''
@staticmethod
def __parse_user_configs_uri_line(line: str) -> "Config | None":
config_type = ''
config_link = ''
line = line.strip()
if line.startswith("hy2://"):
if "@" in line:
ip_version = "IPv6" if line.split("@")[1].count(":") > 1 else "IPv4"
config_type = ip_version
config_link = line
else:
return None
elif line.startswith("https://"):
if "singbox" in line.lower():
config_type = "Singbox"
elif "normal" in line.lower():
config_type = "Normal-SUB"
else:
return None
config_link = line
else:
return None
return Config(type=config_type, link=config_link)
class User(BaseModel):
username: str
status: str
@ -63,7 +12,6 @@ class User(BaseModel):
expiry_date: datetime
expiry_days: int
enable: bool
configs: list[Config]
@staticmethod
def from_dict(username: str, user_data: dict):
@ -107,7 +55,6 @@ class User(BaseModel):
'expiry_date': expiry_date,
'expiry_days': expiration_days,
'enable': False if user_data.get('blocked', False) else True,
'configs': Config.from_username(user_data['username'])
}
@staticmethod

View File

@ -114,18 +114,6 @@
data-username="{{ user.username }}">
<i class="fas fa-qrcode"></i>
</a>
<div id="userConfigs-{{ user.username }}" style="display: none;">
{% for config in user.configs %}
<div class="config-container" data-link="{{ config.link }}">
<span class="config-type">{{ config.type }}:</span>
{% if config.type == "Singbox" or config.type == "Normal-SUB" %}
<span class="config-link-text">{{ config.link }}</span>
{% else %}
<span class="config-link-text">{{ config.link }}</span>
{% endif %}
</div>
{% endfor %}
</div>
</td>
<td class="text-nowrap">
<button type="button" class="btn btn-sm btn-info edit-user"
@ -653,28 +641,31 @@
// QR Code Modal
$("#qrcodeModal").on("show.bs.modal", function (event) {
const button = $(event.relatedTarget);
const configContainer = $(`#userConfigs-${button.data("username")}`);
const username = button.data("username");
const qrcodesContainer = $("#qrcodesContainer");
qrcodesContainer.empty();
configContainer.find(".config-container").each(function () {
const configLink = $(this).data("link");
const configType = $(this).find(".config-type").text().replace(":", "");
const userUriApiUrl = "{{ url_for('show_user_uri_api', username='USERNAME_PLACEHOLDER') }}";
const url = userUriApiUrl.replace("USERNAME_PLACEHOLDER", encodeURIComponent(username));
let displayType = configType;
$.ajax({
url: url,
method: "GET",
dataType: 'json',
success: function (response) {
// console.log("API Response:", response);
const hashMatch = configLink.match(/#(.+)$/);
if (hashMatch && hashMatch[1]) {
const hashValue = hashMatch[1];
if (hashValue.includes("IPv4") || hashValue.includes("IPv6")) {
displayType = hashValue;
}
} else if (configLink.includes("ipv4") || configLink.includes("IPv4")) {
displayType = "IPv4";
} else if (configLink.includes("ipv6") || configLink.includes("IPv6")) {
displayType = "IPv6";
}
const configs = [
{ type: "IPv4", link: response.ipv4 },
{ type: "IPv6", link: response.ipv6 },
{ type: "Normal-SUB", link: response.normal_sub }
];
configs.forEach(config => {
if (config.link) {
const displayType = config.type;
const configLink = config.link;
const qrCodeId = `qrcode-${displayType}-${Math.random().toString(36).substring(2, 10)}`;
const card = $(`
@ -686,7 +677,6 @@
</div>
</div>
`);
qrcodesContainer.append(card);
const qrCodeStyling = new QRCodeStyling({
@ -709,7 +699,6 @@
hideBackgroundDots: true,
}
});
qrCodeStyling.append(document.getElementById(qrCodeId));
card.on("click", function () {
@ -731,7 +720,23 @@
});
});
});
}
});
},
error: function (error) {
console.error("Error fetching user URI:", error);
Swal.fire({
title: "Error!",
text: "Failed to fetch user configuration URIs.",
icon: "error",
confirmButtonText: "OK",
});
}
});
});
$("#qrcodeModal .modal-content").on("click", function (e) {