feat: add show-user-uri-json command
This commit is contained in:
18
core/cli.py
18
core/cli.py
@ -7,7 +7,7 @@ import json
|
||||
|
||||
|
||||
def pretty_print(data: typing.Any):
|
||||
if isinstance(data, dict):
|
||||
if isinstance(data, dict) or isinstance(data, list):
|
||||
print(json.dumps(data, indent=4))
|
||||
return
|
||||
|
||||
@ -205,6 +205,22 @@ def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: boo
|
||||
click.echo(f"URI for user '{username}' could not be generated.")
|
||||
except Exception as e:
|
||||
click.echo(f'{e}', err=True)
|
||||
|
||||
@cli.command('show-user-uri-json')
|
||||
@click.argument('usernames', nargs=-1, required=True)
|
||||
def show_user_uri_json(usernames: list[str]):
|
||||
"""
|
||||
Displays URI information in JSON format for a list of users.
|
||||
"""
|
||||
try:
|
||||
res = cli_api.show_user_uri_json(usernames)
|
||||
if res:
|
||||
pretty_print(res)
|
||||
else:
|
||||
click.echo('No user URIs could be generated.')
|
||||
except Exception as e:
|
||||
click.echo(f'{e}', err=True)
|
||||
|
||||
# endregion
|
||||
|
||||
# region Server
|
||||
|
||||
@ -28,6 +28,7 @@ class Command(Enum):
|
||||
RESET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'reset_user.sh')
|
||||
REMOVE_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'remove_user.sh')
|
||||
SHOW_USER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'show_user_uri.py')
|
||||
WRAPPER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'wrapper_uri.py')
|
||||
IP_ADD = os.path.join(SCRIPT_DIR, 'hysteria2', 'ip.sh')
|
||||
MANAGE_OBFS = os.path.join(SCRIPT_DIR, 'hysteria2', 'manage_obfs.sh')
|
||||
MASQUERADE_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'masquerade.sh')
|
||||
@ -334,6 +335,25 @@ def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: boo
|
||||
command_args.append('-n')
|
||||
return run_cmd(command_args)
|
||||
|
||||
def show_user_uri_json(usernames: list[str]) -> list[dict[str, Any]] | None:
|
||||
'''
|
||||
Displays the URI for a list of users in JSON format.
|
||||
'''
|
||||
script_path = Command.WRAPPER_URI.value
|
||||
if not os.path.exists(script_path):
|
||||
raise ScriptNotFoundError(f"Wrapper URI script not found at: {script_path}")
|
||||
try:
|
||||
process = subprocess.run(['python3', script_path, *usernames], capture_output=True, text=True, check=True)
|
||||
return json.loads(process.stdout)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise CommandExecutionError(f"Failed to execute wrapper URI script: {e}\nError: {e.stderr}")
|
||||
except FileNotFoundError:
|
||||
raise ScriptNotFoundError(f'Script not found: {script_path}')
|
||||
except json.JSONDecodeError:
|
||||
raise CommandExecutionError(f"Failed to decode JSON output from script: {script_path}\nOutput: {process.stdout if 'process' in locals() else 'No output'}") # Add process check
|
||||
except Exception as e:
|
||||
raise HysteriaError(f'An unexpected error occurred: {e}')
|
||||
|
||||
# endregion
|
||||
|
||||
# region Server
|
||||
|
||||
57
core/scripts/hysteria2/wrapper_uri.py
Normal file
57
core/scripts/hysteria2/wrapper_uri.py
Normal file
@ -0,0 +1,57 @@
|
||||
import subprocess
|
||||
import concurrent.futures
|
||||
import re
|
||||
import json
|
||||
import sys
|
||||
|
||||
SHOW_URI_SCRIPT = "/etc/hysteria/core/scripts/hysteria2/show_user_uri.py"
|
||||
DEFAULT_ARGS = ["-a", "-n", "-s"]
|
||||
|
||||
def run_show_uri(username):
|
||||
try:
|
||||
cmd = ["python3", SHOW_URI_SCRIPT, "-u", username] + DEFAULT_ARGS
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||||
output = result.stdout
|
||||
if "Invalid username" in output:
|
||||
return {"username": username, "error": "User not found"}
|
||||
return parse_output(username, output)
|
||||
except subprocess.CalledProcessError as e:
|
||||
return {"username": username, "error": e.stderr.strip()}
|
||||
|
||||
def parse_output(username, output):
|
||||
ipv4 = None
|
||||
ipv6 = None
|
||||
normal_sub = None
|
||||
|
||||
# Match links
|
||||
ipv4_match = re.search(r"IPv4:\s*(hy2://[^\s]+)", output)
|
||||
ipv6_match = re.search(r"IPv6:\s*(hy2://[^\s]+)", output)
|
||||
normal_sub_match = re.search(r"Normal-SUB Sublink:\s*(https?://[^\s]+)", output)
|
||||
|
||||
if ipv4_match:
|
||||
ipv4 = ipv4_match.group(1)
|
||||
if ipv6_match:
|
||||
ipv6 = ipv6_match.group(1)
|
||||
if normal_sub_match:
|
||||
normal_sub = normal_sub_match.group(1)
|
||||
|
||||
return {
|
||||
"username": username,
|
||||
"ipv4": ipv4,
|
||||
"ipv6": ipv6,
|
||||
"normal_sub": normal_sub
|
||||
}
|
||||
|
||||
def batch_show_uri(usernames, max_workers=20):
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
results = list(executor.map(run_show_uri, usernames))
|
||||
return results
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python3 show_uri_json.py user1 user2 ...")
|
||||
sys.exit(1)
|
||||
|
||||
usernames = sys.argv[1:]
|
||||
output_list = batch_show_uri(usernames)
|
||||
print(json.dumps(output_list, indent=2))
|
||||
Reference in New Issue
Block a user