feat(core): Generate URIs for external nodes
This commit is contained in:
@ -24,6 +24,18 @@ def load_env_file(env_file: str) -> Dict[str, str]:
|
|||||||
env_vars[key] = value
|
env_vars[key] = value
|
||||||
return env_vars
|
return env_vars
|
||||||
|
|
||||||
|
def load_nodes() -> List[Dict[str, str]]:
|
||||||
|
"""Load external node information from the nodes JSON file."""
|
||||||
|
if NODES_JSON_PATH.exists():
|
||||||
|
try:
|
||||||
|
with NODES_JSON_PATH.open("r") as f:
|
||||||
|
content = f.read()
|
||||||
|
if content:
|
||||||
|
return json.loads(content)
|
||||||
|
except (json.JSONDecodeError, IOError):
|
||||||
|
pass
|
||||||
|
return []
|
||||||
|
|
||||||
def load_hysteria2_env() -> Dict[str, str]:
|
def load_hysteria2_env() -> Dict[str, str]:
|
||||||
"""Load Hysteria2 environment variables."""
|
"""Load Hysteria2 environment variables."""
|
||||||
return load_env_file(CONFIG_ENV)
|
return load_env_file(CONFIG_ENV)
|
||||||
@ -63,7 +75,8 @@ def is_service_active(service_name: str) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def generate_uri(username: str, auth_password: str, ip: str, port: str,
|
def generate_uri(username: str, auth_password: str, ip: str, port: str,
|
||||||
obfs_password: str, sha256: str, sni: str, ip_version: int, insecure: bool) -> str:
|
obfs_password: str, sha256: str, sni: str, ip_version: int,
|
||||||
|
insecure: bool, fragment_tag: str) -> str:
|
||||||
"""Generate Hysteria2 URI for the given parameters."""
|
"""Generate Hysteria2 URI for the given parameters."""
|
||||||
uri_base = f"hy2://{username}%3A{auth_password}@{ip}:{port}"
|
uri_base = f"hy2://{username}%3A{auth_password}@{ip}:{port}"
|
||||||
|
|
||||||
@ -82,7 +95,7 @@ def generate_uri(username: str, auth_password: str, ip: str, port: str,
|
|||||||
params.append(f"insecure={insecure_value}&sni={sni}")
|
params.append(f"insecure={insecure_value}&sni={sni}")
|
||||||
|
|
||||||
params_str = "&".join(params)
|
params_str = "&".join(params)
|
||||||
return f"{uri_base}?{params_str}#{username}-IPv{ip_version}"
|
return f"{uri_base}?{params_str}#{fragment_tag}"
|
||||||
|
|
||||||
def generate_qr_code(uri: str) -> List[str]:
|
def generate_qr_code(uri: str) -> List[str]:
|
||||||
"""Generate terminal-friendly ASCII QR code using pure Python."""
|
"""Generate terminal-friendly ASCII QR code using pure Python."""
|
||||||
@ -113,8 +126,21 @@ def get_terminal_width() -> int:
|
|||||||
except (AttributeError, OSError):
|
except (AttributeError, OSError):
|
||||||
return 80
|
return 80
|
||||||
|
|
||||||
|
def display_uri_and_qr(uri: str, label: str, args: argparse.Namespace, terminal_width: int):
|
||||||
|
"""Helper function to print URI and its QR code."""
|
||||||
|
if not uri:
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"\n{label}:\n{uri}\n")
|
||||||
|
|
||||||
|
if args.qrcode:
|
||||||
|
print(f"{label} QR Code:\n")
|
||||||
|
qr_code = generate_qr_code(uri)
|
||||||
|
for line in qr_code:
|
||||||
|
print(center_text(line, terminal_width))
|
||||||
|
|
||||||
def show_uri(args: argparse.Namespace) -> None:
|
def show_uri(args: argparse.Namespace) -> None:
|
||||||
"""Show URI and optional QR codes for the given username."""
|
"""Show URI and optional QR codes for the given username and nodes."""
|
||||||
if not os.path.exists(USERS_FILE):
|
if not os.path.exists(USERS_FILE):
|
||||||
print(f"\033[0;31mError:\033[0m Config file {USERS_FILE} not found.")
|
print(f"\033[0;31mError:\033[0m Config file {USERS_FILE} not found.")
|
||||||
return
|
return
|
||||||
@ -137,53 +163,36 @@ def show_uri(args: argparse.Namespace) -> None:
|
|||||||
port = config["listen"].split(":")[1] if ":" in config["listen"] else config["listen"]
|
port = config["listen"].split(":")[1] if ":" in config["listen"] else config["listen"]
|
||||||
sha256 = config.get("tls", {}).get("pinSHA256", "")
|
sha256 = config.get("tls", {}).get("pinSHA256", "")
|
||||||
obfs_password = config.get("obfs", {}).get("salamander", {}).get("password", "")
|
obfs_password = config.get("obfs", {}).get("salamander", {}).get("password", "")
|
||||||
|
|
||||||
insecure = config.get("tls", {}).get("insecure", True)
|
insecure = config.get("tls", {}).get("insecure", True)
|
||||||
|
|
||||||
ip4, ip6, sni = load_hysteria2_ips()
|
ip4, ip6, sni = load_hysteria2_ips()
|
||||||
available_ip4 = ip4 and ip4 != "None"
|
nodes = load_nodes()
|
||||||
available_ip6 = ip6 and ip6 != "None"
|
|
||||||
|
|
||||||
uri_ipv4 = None
|
|
||||||
uri_ipv6 = None
|
|
||||||
|
|
||||||
if args.all:
|
|
||||||
if available_ip4:
|
|
||||||
uri_ipv4 = generate_uri(args.username, auth_password, ip4, port,
|
|
||||||
obfs_password, sha256, sni, 4, insecure)
|
|
||||||
print(f"\nIPv4:\n{uri_ipv4}\n")
|
|
||||||
|
|
||||||
if available_ip6:
|
|
||||||
uri_ipv6 = generate_uri(args.username, auth_password, ip6, port,
|
|
||||||
obfs_password, sha256, sni, 6, insecure)
|
|
||||||
print(f"\nIPv6:\n{uri_ipv6}\n")
|
|
||||||
else:
|
|
||||||
if args.ip_version == 4 and available_ip4:
|
|
||||||
uri_ipv4 = generate_uri(args.username, auth_password, ip4, port,
|
|
||||||
obfs_password, sha256, sni, 4, insecure)
|
|
||||||
print(f"\nIPv4:\n{uri_ipv4}\n")
|
|
||||||
elif args.ip_version == 6 and available_ip6:
|
|
||||||
uri_ipv6 = generate_uri(args.username, auth_password, ip6, port,
|
|
||||||
obfs_password, sha256, sni, 6, insecure)
|
|
||||||
print(f"\nIPv6:\n{uri_ipv6}\n")
|
|
||||||
else:
|
|
||||||
print("Invalid IP version or no available IP for the requested version.")
|
|
||||||
return
|
|
||||||
|
|
||||||
if args.qrcode:
|
|
||||||
terminal_width = get_terminal_width()
|
terminal_width = get_terminal_width()
|
||||||
|
|
||||||
if uri_ipv4:
|
if args.all or args.ip_version == 4:
|
||||||
qr_code = generate_qr_code(uri_ipv4)
|
if ip4 and ip4 != "None":
|
||||||
print("\nIPv4 QR Code:\n")
|
uri = generate_uri(args.username, auth_password, ip4, port,
|
||||||
for line in qr_code:
|
obfs_password, sha256, sni, 4, insecure, f"{args.username}-IPv4")
|
||||||
print(center_text(line, terminal_width))
|
display_uri_and_qr(uri, "IPv4", args, terminal_width)
|
||||||
|
|
||||||
if uri_ipv6:
|
if args.all or args.ip_version == 6:
|
||||||
qr_code = generate_qr_code(uri_ipv6)
|
if ip6 and ip6 != "None":
|
||||||
print("\nIPv6 QR Code:\n")
|
uri = generate_uri(args.username, auth_password, ip6, port,
|
||||||
for line in qr_code:
|
obfs_password, sha256, sni, 6, insecure, f"{args.username}-IPv6")
|
||||||
print(center_text(line, terminal_width))
|
display_uri_and_qr(uri, "IPv6", args, terminal_width)
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
node_name = node.get("name")
|
||||||
|
node_ip = node.get("ip")
|
||||||
|
if not node_name or not node_ip:
|
||||||
|
continue
|
||||||
|
|
||||||
|
ip_v = 4 if '.' in node_ip else 6
|
||||||
|
|
||||||
|
if args.all or args.ip_version == ip_v:
|
||||||
|
uri = generate_uri(args.username, auth_password, node_ip, port,
|
||||||
|
obfs_password, sha256, sni, ip_v, insecure, f"{args.username}-{node_name}")
|
||||||
|
display_uri_and_qr(uri, f"Node: {node_name} (IPv{ip_v})", args, terminal_width)
|
||||||
|
|
||||||
if args.singbox and is_service_active("hysteria-singbox.service"):
|
if args.singbox and is_service_active("hysteria-singbox.service"):
|
||||||
domain, port = get_singbox_domain_and_port()
|
domain, port = get_singbox_domain_and_port()
|
||||||
|
|||||||
@ -7,6 +7,7 @@ USERS_FILE = BASE_DIR / "users.json"
|
|||||||
TRAFFIC_FILE = BASE_DIR / "traffic_data.json"
|
TRAFFIC_FILE = BASE_DIR / "traffic_data.json"
|
||||||
CONFIG_FILE = BASE_DIR / "config.json"
|
CONFIG_FILE = BASE_DIR / "config.json"
|
||||||
CONFIG_ENV = BASE_DIR / ".configs.env"
|
CONFIG_ENV = BASE_DIR / ".configs.env"
|
||||||
|
NODES_JSON_PATH = BASE_DIR / "nodes.json"
|
||||||
TELEGRAM_ENV = BASE_DIR / "core/scripts/telegrambot/.env"
|
TELEGRAM_ENV = BASE_DIR / "core/scripts/telegrambot/.env"
|
||||||
SINGBOX_ENV = BASE_DIR / "core/scripts/singbox/.env"
|
SINGBOX_ENV = BASE_DIR / "core/scripts/singbox/.env"
|
||||||
NORMALSUB_ENV = BASE_DIR / "core/scripts/normalsub/.env"
|
NORMALSUB_ENV = BASE_DIR / "core/scripts/normalsub/.env"
|
||||||
|
|||||||
Reference in New Issue
Block a user