refactor(user): adapt show_user_uri script to use mongodb

This commit is contained in:
Whispering Wind
2025-09-07 00:33:01 +03:30
committed by GitHub
parent 9da68fa79c
commit 7ce683285d

View File

@ -9,11 +9,11 @@ import re
import qrcode import qrcode
from io import StringIO from io import StringIO
from typing import Tuple, Optional, Dict, List, Any from typing import Tuple, Optional, Dict, List, Any
from init_paths import * sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from db.database import db
from paths import * from paths import *
def load_env_file(env_file: str) -> Dict[str, str]: def load_env_file(env_file: str) -> Dict[str, str]:
"""Load environment variables from a file into a dictionary."""
env_vars = {} env_vars = {}
if os.path.exists(env_file): if os.path.exists(env_file):
with open(env_file, 'r') as f: with open(env_file, 'r') as f:
@ -25,7 +25,6 @@ def load_env_file(env_file: str) -> Dict[str, str]:
return env_vars return env_vars
def load_nodes() -> List[Dict[str, str]]: def load_nodes() -> List[Dict[str, str]]:
"""Load external node information from the nodes JSON file."""
if NODES_JSON_PATH.exists(): if NODES_JSON_PATH.exists():
try: try:
with NODES_JSON_PATH.open("r") as f: with NODES_JSON_PATH.open("r") as f:
@ -37,11 +36,9 @@ def load_nodes() -> List[Dict[str, str]]:
return [] return []
def load_hysteria2_env() -> Dict[str, str]: def load_hysteria2_env() -> Dict[str, str]:
"""Load Hysteria2 environment variables."""
return load_env_file(CONFIG_ENV) return load_env_file(CONFIG_ENV)
def load_hysteria2_ips() -> Tuple[str, str, str]: def load_hysteria2_ips() -> Tuple[str, str, str]:
"""Load Hysteria2 IPv4 and IPv6 addresses from environment."""
env_vars = load_hysteria2_env() env_vars = load_hysteria2_env()
ip4 = env_vars.get('IP4', 'None') ip4 = env_vars.get('IP4', 'None')
ip6 = env_vars.get('IP6', 'None') ip6 = env_vars.get('IP6', 'None')
@ -49,14 +46,12 @@ def load_hysteria2_ips() -> Tuple[str, str, str]:
return ip4, ip6, sni return ip4, ip6, sni
def get_singbox_domain_and_port() -> Tuple[str, str]: def get_singbox_domain_and_port() -> Tuple[str, str]:
"""Get domain and port from SingBox config."""
env_vars = load_env_file(SINGBOX_ENV) env_vars = load_env_file(SINGBOX_ENV)
domain = env_vars.get('HYSTERIA_DOMAIN', '') domain = env_vars.get('HYSTERIA_DOMAIN', '')
port = env_vars.get('HYSTERIA_PORT', '') port = env_vars.get('HYSTERIA_PORT', '')
return domain, port return domain, port
def get_normalsub_domain_and_port() -> Tuple[str, str, str]: def get_normalsub_domain_and_port() -> Tuple[str, str, str]:
"""Get domain, port, and subpath from Normal-SUB config."""
env_vars = load_env_file(NORMALSUB_ENV) env_vars = load_env_file(NORMALSUB_ENV)
domain = env_vars.get('HYSTERIA_DOMAIN', '') domain = env_vars.get('HYSTERIA_DOMAIN', '')
port = env_vars.get('HYSTERIA_PORT', '') port = env_vars.get('HYSTERIA_PORT', '')
@ -64,7 +59,6 @@ def get_normalsub_domain_and_port() -> Tuple[str, str, str]:
return domain, port, subpath return domain, port, subpath
def is_service_active(service_name: str) -> bool: def is_service_active(service_name: str) -> bool:
"""Check if a systemd service is active."""
try: try:
result = subprocess.run( result = subprocess.run(
['systemctl', 'is-active', '--quiet', service_name], ['systemctl', 'is-active', '--quiet', service_name],
@ -77,28 +71,23 @@ def is_service_active(service_name: str) -> bool:
def generate_uri(username: str, auth_password: str, ip: str, port: str, def generate_uri(username: str, auth_password: str, ip: str, port: str,
obfs_password: str, sha256: str, sni: str, ip_version: int, obfs_password: str, sha256: str, sni: str, ip_version: int,
insecure: bool, fragment_tag: str) -> str: insecure: bool, fragment_tag: str) -> str:
"""Generate Hysteria2 URI for the given parameters.""" ip_part = f"[{ip}]" if ip_version == 6 and ':' in ip else ip
uri_base = f"hy2://{username}%3A{auth_password}@{ip}:{port}" uri_base = f"hy2://{username}:{auth_password}@{ip_part}:{port}"
if ip_version == 6 and re.match(r'^[0-9a-fA-F:]+$', ip):
uri_base = f"hy2://{username}%3A{auth_password}@[{ip}]:{port}"
params = [] params = []
if obfs_password: if obfs_password:
params.append(f"obfs=salamander&obfs-password={obfs_password}") params.append(f"obfs=salamander&obfs-password={obfs_password}")
if sha256: if sha256:
params.append(f"pinSHA256={sha256}") params.append(f"pinSHA256={sha256}")
if sni:
params.append(f"sni={sni}")
insecure_value = "1" if insecure else "0" params.append(f"insecure={'1' if insecure else '0'}")
params.append(f"insecure={insecure_value}&sni={sni}")
params_str = "&".join(params) query_string = "&".join(params)
return f"{uri_base}?{params_str}#{fragment_tag}" return f"{uri_base}?{query_string}#{fragment_tag}"
def generate_qr_code(uri: str) -> List[str]: def generate_qr_code(uri: str) -> List[str]:
"""Generate terminal-friendly ASCII QR code using pure Python."""
try: try:
qr = qrcode.QRCode( qr = qrcode.QRCode(
version=1, version=1,
@ -116,18 +105,15 @@ def generate_qr_code(uri: str) -> List[str]:
return [f"Error generating QR code: {str(e)}"] return [f"Error generating QR code: {str(e)}"]
def center_text(text: str, width: int) -> str: def center_text(text: str, width: int) -> str:
"""Center text in the given width."""
return text.center(width) return text.center(width)
def get_terminal_width() -> int: def get_terminal_width() -> int:
"""Get terminal width."""
try: try:
return os.get_terminal_size().columns return os.get_terminal_size().columns
except (AttributeError, OSError): except (AttributeError, OSError):
return 80 return 80
def display_uri_and_qr(uri: str, label: str, args: argparse.Namespace, terminal_width: int): def display_uri_and_qr(uri: str, label: str, args: argparse.Namespace, terminal_width: int):
"""Helper function to print URI and its QR code."""
if not uri: if not uri:
return return
@ -140,27 +126,28 @@ def display_uri_and_qr(uri: str, label: str, args: argparse.Namespace, terminal_
print(center_text(line, terminal_width)) print(center_text(line, terminal_width))
def show_uri(args: argparse.Namespace) -> None: def show_uri(args: argparse.Namespace) -> None:
"""Show URI and optional QR codes for the given username and nodes.""" if db is None:
if not os.path.exists(USERS_FILE): print("\033[0;31mError:\033[0m Database connection failed.")
print(f"\033[0;31mError:\033[0m Config file {USERS_FILE} not found.")
return return
if not is_service_active("hysteria-server.service"): if not is_service_active("hysteria-server.service"):
print("\033[0;31mError:\033[0m Hysteria2 is not active.") print("\033[0;31mError:\033[0m Hysteria2 is not active.")
return return
with open(CONFIG_FILE, 'r') as f: try:
config = json.load(f) with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
with open(USERS_FILE, 'r') as f: except (FileNotFoundError, json.JSONDecodeError) as e:
users = json.load(f) print(f"\033[0;31mError:\033[0m Could not load config file {CONFIG_FILE}. Details: {e}")
return
if args.username not in users:
print("Invalid username. Please try again.") user_doc = db.get_user(args.username)
if not user_doc:
print(f"\033[0;31mError:\033[0m User '{args.username}' not found in the database.")
return return
auth_password = users[args.username]["password"] auth_password = user_doc["password"]
port = config["listen"].split(":")[1] if ":" in config["listen"] else config["listen"] port = config["listen"].split(":")[-1]
sha256 = config.get("tls", {}).get("pinSHA256", "") sha256 = config.get("tls", {}).get("pinSHA256", "")
obfs_password = config.get("obfs", {}).get("salamander", {}).get("password", "") obfs_password = config.get("obfs", {}).get("salamander", {}).get("password", "")
insecure = config.get("tls", {}).get("insecure", True) insecure = config.get("tls", {}).get("insecure", True)
@ -205,7 +192,6 @@ def show_uri(args: argparse.Namespace) -> None:
print(f"\nNormal-SUB Sublink:\nhttps://{domain}:{port}/{subpath}/sub/normal/{auth_password}#Hysteria2\n") print(f"\nNormal-SUB Sublink:\nhttps://{domain}:{port}/{subpath}/sub/normal/{auth_password}#Hysteria2\n")
def main(): def main():
"""Main function to parse arguments and show URIs."""
parser = argparse.ArgumentParser(description="Hysteria2 URI Generator") parser = argparse.ArgumentParser(description="Hysteria2 URI Generator")
parser.add_argument("-u", "--username", help="Username to generate URI for") parser.add_argument("-u", "--username", help="Username to generate URI for")
parser.add_argument("-qr", "--qrcode", action="store_true", help="Generate QR code") parser.add_argument("-qr", "--qrcode", action="store_true", help="Generate QR code")