perf(api): optimize bulk user URI fetching in webpanel

Refactored the web panel's user link generation to resolve a major performance bottleneck. Previously, fetching links for N users would trigger N separate script executions, causing significant delays from process startup overhead.

- Introduced a new bulk API endpoint (`/api/v1/users/uri/bulk`) that accepts a list of usernames and calls the backend script only once.
- Updated the frontend JavaScript in `users.html` to use this new endpoint, replacing N parallel API calls with a single one.
- Cleaned up the `wrapper_uri.py` script for better readability and maintainability.
This commit is contained in:
Whispering Wind
2025-08-27 17:22:04 +03:30
committed by GitHub
parent c494250f9f
commit 18d3a1029b
4 changed files with 227 additions and 698 deletions

View File

@ -15,7 +15,7 @@ def load_json_file(file_path: str) -> Any:
if not os.path.exists(file_path):
return None
try:
with open(file_path, 'r') as f:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return json.loads(content) if content else None
except (json.JSONDecodeError, IOError):
@ -25,7 +25,7 @@ def load_json_file(file_path: str) -> Any:
def load_env_file(env_file: str) -> Dict[str, str]:
env_vars = {}
if os.path.exists(env_file):
with open(env_file, 'r') as f:
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
@ -34,51 +34,44 @@ def load_env_file(env_file: str) -> Dict[str, str]:
return env_vars
def generate_uri(username: str, auth_password: str, ip: str, port: str,
obfs_password: str, sha256: str, sni: str, ip_version: int,
insecure: bool, fragment_tag: str) -> str:
uri_params: Dict[str, str], ip_version: int, fragment_tag: str) -> str:
ip_part = f"[{ip}]" if ip_version == 6 and ':' in ip else ip
uri_base = f"hy2://{username}:{auth_password}@{ip_part}:{port}"
params = {
"insecure": "1" if insecure else "0",
"sni": sni
}
if obfs_password:
params["obfs"] = "salamander"
params["obfs-password"] = obfs_password
if sha256:
params["pinSHA256"] = sha256
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
query_string = "&".join([f"{k}={v}" for k, v in uri_params.items()])
return f"{uri_base}?{query_string}#{fragment_tag}"
def process_users(target_usernames: List[str]) -> List[Dict[str, Any]]:
config = load_json_file(CONFIG_FILE)
all_users = load_json_file(USERS_FILE)
nodes = load_json_file(NODES_JSON_PATH) or []
if not config or not all_users:
print("Error: Could not load hysteria2 configuration or user files.", file=sys.stderr)
print("Error: Could not load Hysteria2 configuration or user files.", file=sys.stderr)
sys.exit(1)
nodes = load_json_file(NODES_JSON_PATH) or []
port = config.get("listen", "").split(":")[-1]
tls_config = config.get("tls", {})
sha256 = tls_config.get("pinSHA256", "")
insecure = tls_config.get("insecure", True)
obfs_password = config.get("obfs", {}).get("salamander", {}).get("password", "")
hy2_env = load_env_file(CONFIG_ENV)
ns_env = load_env_file(NORMALSUB_ENV)
base_uri_params = {
"insecure": "1" if tls_config.get("insecure", True) else "0",
"sni": hy2_env.get('SNI', '')
}
obfs_password = config.get("obfs", {}).get("salamander", {}).get("password")
if obfs_password:
base_uri_params["obfs"] = "salamander"
base_uri_params["obfs-password"] = obfs_password
sha256 = tls_config.get("pinSHA256")
if sha256:
base_uri_params["pinSHA256"] = sha256
ip4 = hy2_env.get('IP4')
ip6 = hy2_env.get('IP6')
sni = hy2_env.get('SNI', '')
ns_env = load_env_file(NORMALSUB_ENV)
ns_domain = ns_env.get('HYSTERIA_DOMAIN')
ns_port = ns_env.get('HYSTERIA_PORT')
ns_subpath = ns_env.get('SUBPATH')
ns_domain, ns_port, ns_subpath = ns_env.get('HYSTERIA_DOMAIN'), ns_env.get('HYSTERIA_PORT'), ns_env.get('SUBPATH')
results = []
for username in target_usernames:
user_data = all_users.get(username)
if not user_data or "password" not in user_data:
@ -86,34 +79,20 @@ def process_users(target_usernames: List[str]) -> List[Dict[str, Any]]:
continue
auth_password = user_data["password"]
user_output = {
"username": username,
"ipv4": None,
"ipv6": None,
"nodes": [],
"normal_sub": None
}
user_output = {"username": username, "ipv4": None, "ipv6": None, "nodes": [], "normal_sub": None}
if ip4 and ip4 != "None":
user_output["ipv4"] = generate_uri(
username, auth_password, ip4, port, obfs_password, sha256, sni, 4, insecure, f"{username}-IPv4"
)
user_output["ipv4"] = generate_uri(username, auth_password, ip4, port, base_uri_params, 4, f"{username}-IPv4")
if ip6 and ip6 != "None":
user_output["ipv6"] = generate_uri(
username, auth_password, ip6, port, obfs_password, sha256, sni, 6, insecure, f"{username}-IPv6"
)
user_output["ipv6"] = generate_uri(username, auth_password, ip6, port, base_uri_params, 6, f"{username}-IPv6")
for node in nodes:
node_name, node_ip = node.get("name"), node.get("ip")
if not (node_name and node_ip):
continue
ip_v = 6 if ':' in node_ip else 4
tag = f"{username}-{node_name}"
uri = generate_uri(
username, auth_password, node_ip, port, obfs_password, sha256, sni, ip_v, insecure, tag
)
user_output["nodes"].append({"name": node_name, "uri": uri})
if node_name := node.get("name"):
if node_ip := node.get("ip"):
ip_v = 6 if ':' in node_ip else 4
tag = f"{username}-{node_name}"
uri = generate_uri(username, auth_password, node_ip, port, base_uri_params, ip_v, tag)
user_output["nodes"].append({"name": node_name, "uri": uri})
if ns_domain and ns_port and ns_subpath:
user_output["normal_sub"] = f"https://{ns_domain}:{ns_port}/{ns_subpath}/sub/normal/{auth_password}#{username}"
@ -123,16 +102,13 @@ def process_users(target_usernames: List[str]) -> List[Dict[str, Any]]:
return results
def main():
parser = argparse.ArgumentParser(
description="Efficiently generate Hysteria2 URIs for multiple users.",
formatter_class=argparse.RawTextHelpFormatter
)
parser = argparse.ArgumentParser(description="Efficiently generate Hysteria2 URIs for multiple users.")
parser.add_argument('usernames', nargs='*', help="A list of usernames to process.")
parser.add_argument('--all', action='store_true', help="Process all users from users.json.")
args = parser.parse_args()
target_usernames = args.usernames
if args.all:
all_users = load_json_file(USERS_FILE)
if all_users: