From 7ce683285d7bafd71775f71477cf4a13561b6a85 Mon Sep 17 00:00:00 2001 From: Whispering Wind <151555003+ReturnFI@users.noreply.github.com> Date: Sun, 7 Sep 2025 00:33:01 +0330 Subject: [PATCH] refactor(user): adapt show_user_uri script to use mongodb --- core/scripts/hysteria2/show_user_uri.py | 62 ++++++++++--------------- 1 file changed, 24 insertions(+), 38 deletions(-) diff --git a/core/scripts/hysteria2/show_user_uri.py b/core/scripts/hysteria2/show_user_uri.py index d841996..5f355a4 100644 --- a/core/scripts/hysteria2/show_user_uri.py +++ b/core/scripts/hysteria2/show_user_uri.py @@ -9,11 +9,11 @@ import re import qrcode from io import StringIO from typing import Tuple, Optional, Dict, List, Any -from init_paths import * +sys.path.append(os.path.join(os.path.dirname(__file__), '..')) +from db.database import db from paths import * def load_env_file(env_file: str) -> Dict[str, str]: - """Load environment variables from a file into a dictionary.""" env_vars = {} if os.path.exists(env_file): with open(env_file, 'r') as f: @@ -25,7 +25,6 @@ def load_env_file(env_file: str) -> Dict[str, str]: return env_vars def load_nodes() -> List[Dict[str, str]]: - """Load external node information from the nodes JSON file.""" if NODES_JSON_PATH.exists(): try: with NODES_JSON_PATH.open("r") as f: @@ -37,11 +36,9 @@ def load_nodes() -> List[Dict[str, str]]: return [] def load_hysteria2_env() -> Dict[str, str]: - """Load Hysteria2 environment variables.""" return load_env_file(CONFIG_ENV) def load_hysteria2_ips() -> Tuple[str, str, str]: - """Load Hysteria2 IPv4 and IPv6 addresses from environment.""" env_vars = load_hysteria2_env() ip4 = env_vars.get('IP4', 'None') ip6 = env_vars.get('IP6', 'None') @@ -49,14 +46,12 @@ def load_hysteria2_ips() -> Tuple[str, str, str]: return ip4, ip6, sni def get_singbox_domain_and_port() -> Tuple[str, str]: - """Get domain and port from SingBox config.""" env_vars = load_env_file(SINGBOX_ENV) domain = env_vars.get('HYSTERIA_DOMAIN', '') port = env_vars.get('HYSTERIA_PORT', '') return domain, port def get_normalsub_domain_and_port() -> Tuple[str, str, str]: - """Get domain, port, and subpath from Normal-SUB config.""" env_vars = load_env_file(NORMALSUB_ENV) domain = env_vars.get('HYSTERIA_DOMAIN', '') port = env_vars.get('HYSTERIA_PORT', '') @@ -64,7 +59,6 @@ def get_normalsub_domain_and_port() -> Tuple[str, str, str]: return domain, port, subpath def is_service_active(service_name: str) -> bool: - """Check if a systemd service is active.""" try: result = subprocess.run( ['systemctl', 'is-active', '--quiet', service_name], @@ -77,28 +71,23 @@ def is_service_active(service_name: str) -> bool: def generate_uri(username: str, auth_password: str, ip: str, port: str, obfs_password: str, sha256: str, sni: str, ip_version: int, insecure: bool, fragment_tag: str) -> str: - """Generate Hysteria2 URI for the given parameters.""" - uri_base = f"hy2://{username}%3A{auth_password}@{ip}:{port}" - - if ip_version == 6 and re.match(r'^[0-9a-fA-F:]+$', ip): - uri_base = f"hy2://{username}%3A{auth_password}@[{ip}]:{port}" + ip_part = f"[{ip}]" if ip_version == 6 and ':' in ip else ip + uri_base = f"hy2://{username}:{auth_password}@{ip_part}:{port}" params = [] - if obfs_password: params.append(f"obfs=salamander&obfs-password={obfs_password}") - if sha256: params.append(f"pinSHA256={sha256}") + if sni: + params.append(f"sni={sni}") - insecure_value = "1" if insecure else "0" - params.append(f"insecure={insecure_value}&sni={sni}") + params.append(f"insecure={'1' if insecure else '0'}") - params_str = "&".join(params) - return f"{uri_base}?{params_str}#{fragment_tag}" + query_string = "&".join(params) + return f"{uri_base}?{query_string}#{fragment_tag}" def generate_qr_code(uri: str) -> List[str]: - """Generate terminal-friendly ASCII QR code using pure Python.""" try: qr = qrcode.QRCode( version=1, @@ -116,18 +105,15 @@ def generate_qr_code(uri: str) -> List[str]: return [f"Error generating QR code: {str(e)}"] def center_text(text: str, width: int) -> str: - """Center text in the given width.""" return text.center(width) def get_terminal_width() -> int: - """Get terminal width.""" try: return os.get_terminal_size().columns except (AttributeError, OSError): return 80 def display_uri_and_qr(uri: str, label: str, args: argparse.Namespace, terminal_width: int): - """Helper function to print URI and its QR code.""" if not uri: return @@ -140,27 +126,28 @@ def display_uri_and_qr(uri: str, label: str, args: argparse.Namespace, terminal_ print(center_text(line, terminal_width)) def show_uri(args: argparse.Namespace) -> None: - """Show URI and optional QR codes for the given username and nodes.""" - if not os.path.exists(USERS_FILE): - print(f"\033[0;31mError:\033[0m Config file {USERS_FILE} not found.") + if db is None: + print("\033[0;31mError:\033[0m Database connection failed.") return - + if not is_service_active("hysteria-server.service"): print("\033[0;31mError:\033[0m Hysteria2 is not active.") return - with open(CONFIG_FILE, 'r') as f: - config = json.load(f) - - with open(USERS_FILE, 'r') as f: - users = json.load(f) - - if args.username not in users: - print("Invalid username. Please try again.") + try: + with open(CONFIG_FILE, 'r') as f: + config = json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"\033[0;31mError:\033[0m Could not load config file {CONFIG_FILE}. Details: {e}") + return + + user_doc = db.get_user(args.username) + if not user_doc: + print(f"\033[0;31mError:\033[0m User '{args.username}' not found in the database.") return - auth_password = users[args.username]["password"] - port = config["listen"].split(":")[1] if ":" in config["listen"] else config["listen"] + auth_password = user_doc["password"] + port = config["listen"].split(":")[-1] sha256 = config.get("tls", {}).get("pinSHA256", "") obfs_password = config.get("obfs", {}).get("salamander", {}).get("password", "") insecure = config.get("tls", {}).get("insecure", True) @@ -205,7 +192,6 @@ def show_uri(args: argparse.Namespace) -> None: print(f"\nNormal-SUB Sublink:\nhttps://{domain}:{port}/{subpath}/sub/normal/{auth_password}#Hysteria2\n") def main(): - """Main function to parse arguments and show URIs.""" parser = argparse.ArgumentParser(description="Hysteria2 URI Generator") parser.add_argument("-u", "--username", help="Username to generate URI for") parser.add_argument("-qr", "--qrcode", action="store_true", help="Generate QR code")