From 562b8699808c3e5a796371bd950df6efef818820 Mon Sep 17 00:00:00 2001 From: Iam54r1n4 Date: Fri, 24 Jan 2025 05:24:49 +0000 Subject: [PATCH] Make cli.py more modular --- core/cli.py | 417 +++++++++----------------- core/cli_api.py | 403 +++++++++++++++++++++++++ core/scripts/hysteria2/manage_obfs.sh | 2 + core/scripts/hysteria2/masquerade.sh | 2 + 4 files changed, 557 insertions(+), 267 deletions(-) create mode 100644 core/cli_api.py diff --git a/core/cli.py b/core/cli.py index d49b183..9807c71 100644 --- a/core/cli.py +++ b/core/cli.py @@ -1,125 +1,97 @@ #!/usr/bin/env python3 from datetime import datetime -import os -import io import click import subprocess -from enum import Enum -import traffic import validator - - -SCRIPT_DIR = '/etc/hysteria/core/scripts' -DEBUG = False - - -class Command(Enum): - '''Constais path to command's script''' - INSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'install.sh') - UNINSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'uninstall.sh') - UPDATE_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'update.sh') - RESTART_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'restart.sh') - CHANGE_PORT_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_port.sh') - CHANGE_SNI_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_sni.sh') - GET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'get_user.sh') - ADD_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'add_user.sh') - EDIT_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'edit_user.sh') - RESET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'reset_user.sh') - REMOVE_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'remove_user.sh') - SHOW_USER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'show_user_uri.sh') - IP_ADD = os.path.join(SCRIPT_DIR, 'hysteria2', 'ip.sh') - MANAGE_OBFS = os.path.join(SCRIPT_DIR, 'hysteria2', 'manage_obfs.sh') - MASQUERADE_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'masquerade.sh') - TRAFFIC_STATUS = 'traffic.py' # won't be call directly (it's a python module) - UPDATE_GEO = os.path.join(SCRIPT_DIR, 'hysteria2', 'update_geo.py') - LIST_USERS = os.path.join(SCRIPT_DIR, 'hysteria2', 'list_users.sh') - SERVER_INFO = os.path.join(SCRIPT_DIR, 'hysteria2', 'server_info.sh') - BACKUP_HYSTERIA = os.path.join(SCRIPT_DIR, 'hysteria2', 'backup.sh') - INSTALL_TELEGRAMBOT = os.path.join(SCRIPT_DIR, 'telegrambot', 'runbot.sh') - INSTALL_SINGBOX = os.path.join(SCRIPT_DIR, 'singbox', 'singbox_shell.sh') - INSTALL_NORMALSUB = os.path.join(SCRIPT_DIR, 'normalsub', 'normalsub.sh') - INSTALL_TCP_BRUTAL = os.path.join(SCRIPT_DIR, 'tcp-brutal', 'install.sh') - INSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'install.sh') - UNINSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'uninstall.sh') - CONFIGURE_WARP = os.path.join(SCRIPT_DIR, 'warp', 'configure.sh') - STATUS_WARP = os.path.join(SCRIPT_DIR, 'warp', 'status.sh') - - -# region utils -def run_cmd(command: list[str]): - ''' - Runs a command and returns the output. - Could raise subprocess.CalledProcessError - ''' - - # if the command is GET_USER or LIST_USERS we don't print the debug-command and just print the output - if DEBUG and not (Command.GET_USER.value in command or Command.LIST_USERS.value in command): - print(' '.join(command)) - - result = subprocess.check_output(command, shell=False) - - print(result.decode().strip()) - - -def generate_password() -> str: - ''' - Generates a random password using pwgen for user. - Could raise subprocess.CalledProcessError - ''' - return subprocess.check_output(['pwgen', '-s', '32', '1'], shell=False).decode().strip() - -# endregion +import cli_api @click.group() def cli(): pass -# region hysteria2 menu options +# region Hysteria2 @cli.command('install-hysteria2') -@click.option('--port', '-p', required=True, help='Port for Hysteria2', type=int, callback=validator.validate_port) +@click.option('--port', '-p', required=True, help='Port for Hysteria2', type=int) @click.option('--sni', '-s', required=False, default='bts.com', help='SNI for Hysteria2 (default: bts.com)', type=str) def install_hysteria2(port: int, sni: str): - """ - Installs Hysteria2 on the given port and uses the provided or default SNI value. - """ - run_cmd(['bash', Command.INSTALL_HYSTERIA2.value, str(port), sni]) + try: + cli_api.install_hysteria2(port, sni) + except Exception as e: + click.echo(f'{e}', err=True) @cli.command('uninstall-hysteria2') def uninstall_hysteria2(): - run_cmd(['bash', Command.UNINSTALL_HYSTERIA2.value]) + try: + cli_api.uninstall_hysteria2() + except Exception as e: + click.echo(f'{e}', err=True) @cli.command('update-hysteria2') def update_hysteria2(): - run_cmd(['bash', Command.UPDATE_HYSTERIA2.value]) + try: + cli_api.update_hysteria2() + except Exception as e: + click.echo(f'{e}', err=True) @cli.command('restart-hysteria2') def restart_hysteria2(): - run_cmd(['bash', Command.RESTART_HYSTERIA2.value]) + try: + cli_api.restart_hysteria2() + except Exception as e: + click.echo(f'{e}', err=True) @cli.command('change-hysteria2-port') @click.option('--port', '-p', required=True, help='New port for Hysteria2', type=int, callback=validator.validate_port) def change_hysteria2_port(port: int): - run_cmd(['bash', Command.CHANGE_PORT_HYSTERIA2.value, str(port)]) + try: + cli_api.change_hysteria2_port(port) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('change-hysteria2-sni') @click.option('--sni', '-s', required=True, help='New SNI for Hysteria2', type=str) def change_hysteria2_sni(sni: str): - run_cmd(['bash', Command.CHANGE_SNI_HYSTERIA2.value, sni]) + try: + cli_api.change_hysteria2_sni(sni) + except Exception as e: + click.echo(f'{e}', err=True) + + +@cli.command('backup-hysteria') +def backup_hysteria(): + try: + cli_api.backup_hysteria() + except Exception as e: + click.echo(f'{e}', err=True) + +# endregion + +# region User + + +@ cli.command('list-users') +def list_users(): + cli_api.list_users() + @cli.command('get-user') @click.option('--username', '-u', required=True, help='Username for the user to get', type=str) def get_user(username: str): - cmd = ['bash', Command.GET_USER.value, '-u', str(username)] - run_cmd(cmd) + try: + cli_api.get_user(username) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('add-user') @click.option('--username', '-u', required=True, help='Username for the new user', type=str) @@ -128,19 +100,11 @@ def get_user(username: str): @click.option('--password', '-p', required=False, help='Password for the user', type=str) @click.option('--creation-date', '-c', required=False, help='Creation date for the user', type=str) def add_user(username: str, traffic_limit: int, expiration_days: int, password: str, creation_date: str): - if not password: - try: - password = generate_password() - except subprocess.CalledProcessError as e: - print(f'Error: failed to generate password\n{e}') - exit(1) - if not creation_date: - creation_date = datetime.now().strftime('%Y-%m-%d') try: - run_cmd(['bash', Command.ADD_USER.value, username, str(traffic_limit), str(expiration_days), password, creation_date]) - except subprocess.CalledProcessError as e: - click.echo(f"{e.output.decode()}", err=True) - exit(1) + cli_api.add_user(username, traffic_limit, expiration_days, password, creation_date) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('edit-user') @click.option('--username', '-u', required=True, help='Username for the user to edit', type=str) @@ -151,61 +115,30 @@ def add_user(username: str, traffic_limit: int, expiration_days: int, password: @click.option('--renew-creation-date', '-rc', is_flag=True, help='Renew creation date for the user') @click.option('--blocked', '-b', is_flag=True, help='Block the user') def edit_user(username: str, new_username: str, new_traffic_limit: int, new_expiration_days: int, renew_password: bool, renew_creation_date: bool, blocked: bool): - if not username: - print('Error: username is required') - exit(1) + try: + cli_api.edit_user(username, new_username, new_traffic_limit, new_expiration_days, + renew_password, renew_creation_date, blocked) + except Exception as e: + click.echo(f'{e}', err=True) - if not any([new_username, new_traffic_limit, new_expiration_days, renew_password, renew_creation_date, blocked is not None]): - print('Error: at least one option is required') - exit(1) - - if new_traffic_limit is not None and new_traffic_limit <= 0: - print('Error: traffic limit must be greater than 0') - exit(1) - - if new_expiration_days is not None and new_expiration_days <= 0: - print('Error: expiration days must be greater than 0') - exit(1) - - # Handle renewing password and creation date - if renew_password: - try: - password = generate_password() - except subprocess.CalledProcessError as e: - print(f'Error: failed to generate password\n{e}') - exit(1) - else: - password = "" - - if renew_creation_date: - creation_date = datetime.now().strftime('%Y-%m-%d') - else: - creation_date = "" - - # Prepare arguments for the command - command_args = [ - 'bash', - Command.EDIT_USER.value, - username, - new_username or '', - str(new_traffic_limit) if new_traffic_limit is not None else '', - str(new_expiration_days) if new_expiration_days is not None else '', - password, - creation_date, - 'true' if blocked else 'false' - ] - - run_cmd(command_args) @ cli.command('reset-user') @ click.option('--username', '-u', required=True, help='Username for the user to Reset', type=str) def reset_user(username: str): - run_cmd(['bash', Command.RESET_USER.value, username]) + try: + cli_api.reset_user(username) + except Exception as e: + click.echo(f'{e}', err=True) + @ cli.command('remove-user') @ click.option('--username', '-u', required=True, help='Username for the user to remove', type=str) def remove_user(username: str): - run_cmd(['bash', Command.REMOVE_USER.value, username]) + try: + cli_api.remove_user(username) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('show-user-uri') @click.option('--username', '-u', required=True, help='Username for the user to show the URI', type=str) @@ -215,136 +148,105 @@ def remove_user(username: str): @click.option('--singbox', '-s', is_flag=True, help='Generate Singbox sublink if Singbox service is active') @click.option('--normalsub', '-n', is_flag=True, help='Generate Normal sublink if normalsub service is active') def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: bool, normalsub: bool): - command_args = ['bash', Command.SHOW_USER_URI.value, '-u', username] - if qrcode: - command_args.append('-qr') - if all: - command_args.append('-a') - else: - command_args.extend(['-ip', str(ipv)]) - if singbox: - command_args.append('-s') - if normalsub: - command_args.append('-n') + try: + cli_api.show_user_uri(username, qrcode, ipv, all, singbox, normalsub) + except Exception as e: + click.echo(f'{e}', err=True) +# endregion - run_cmd(command_args) +# region Server @ cli.command('traffic-status') def traffic_status(): - traffic.traffic_status() - - -@ cli.command('list-users') -def list_users(): - run_cmd(['bash', Command.LIST_USERS.value]) + cli_api.traffic_status() + # traffic.traffic_status() @cli.command('server-info') def server_info(): - output = run_cmd(['bash', Command.SERVER_INFO.value]) - if output: - print(output) - -@cli.command('backup-hysteria') -def backup_hysteria(): try: - run_cmd(['bash', Command.BACKUP_HYSTERIA.value]) - except subprocess.CalledProcessError as e: - click.echo(f"Backup failed: {e.output.decode()}", err=True) + res = cli_api.server_info() + if res: + print(res) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('manage_obfs') @click.option('--remove', '-r', is_flag=True, help="Remove 'obfs' from config.json.") @click.option('--generate', '-g', is_flag=True, help="Generate new 'obfs' in config.json.") -def manage_obfs(remove, generate): - """Manage 'obfs' in Hysteria2 configuration.""" - if remove and generate: - click.echo("Error: You cannot use both --remove and --generate at the same time.") - return - elif remove: - click.echo("Removing 'obfs' from config.json...") - run_cmd(['bash', Command.MANAGE_OBFS.value, '--remove']) - elif generate: - click.echo("Generating 'obfs' in config.json...") - run_cmd(['bash', Command.MANAGE_OBFS.value, '--generate']) - else: - click.echo("Error: Please specify either --remove or --generate.") +def manage_obfs(remove: bool, generate: bool): + try: + cli_api.manage_obfs(remove, generate) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('ip-address') @click.option('--edit', is_flag=True, help="Edit IP addresses manually.") @click.option('-4', '--ipv4', type=str, help="Specify the new IPv4 address.") @click.option('-6', '--ipv6', type=str, help="Specify the new IPv6 address.") -def ip_address(edit, ipv4, ipv6): +def ip_address(edit: bool, ipv4: str, ipv6: str): """ Manage IP addresses in .configs.env. - Use without options to add auto-detected IPs. - Use --edit with -4 or -6 to manually update IPs. """ - if edit: - if ipv4: - run_cmd(['bash', Command.IP_ADD.value, 'edit', '-4', ipv4]) - if ipv6: - run_cmd(['bash', Command.IP_ADD.value, 'edit', '-6', ipv6]) - if not ipv4 and not ipv6: - click.echo("Error: --edit requires at least one of --ipv4 or --ipv6.") - else: - run_cmd(['bash', Command.IP_ADD.value, 'add']) + try: + cli_api.ip_address(edit, ipv4, ipv6) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('update-geo') -@click.option('--country', '-c', +@click.option('--country', '-c', type=click.Choice(['iran', 'china', 'russia'], case_sensitive=False), default='iran', help='Select country for geo files (default: iran)') -def cli_update_geo(country): - script_path = Command.UPDATE_GEO.value +def update_geo(country: str): try: - subprocess.run(['python3', script_path, country.lower()], check=True) - except subprocess.CalledProcessError as e: - print(f"Failed to update geo files: {e}") - except FileNotFoundError: - print(f"Script not found: {script_path}") + cli_api.update_geo(country) except Exception as e: - print(f"An unexpected error occurred: {e}") + click.echo(f'{e}', err=True) @cli.command('masquerade') @click.option('--remove', '-r', is_flag=True, help="Remove 'masquerade' from config.json.") @click.option('--enable', '-e', metavar="", type=str, help="Enable 'masquerade' in config.json with the specified domain.") -def masquerade(remove, enable): +def masquerade(remove: bool, enable: str): """Manage 'masquerade' in Hysteria2 configuration.""" - - if remove and enable: - click.echo("Error: You cannot use both --remove and --enable at the same time.") - return - elif remove: - click.echo("Removing 'masquerade' from config.json...") - run_cmd(['bash', Command.MASQUERADE_SCRIPT.value, '2']) - elif enable: - if not enable: - click.echo("Error: You must specify a domain with --enable.") - return - click.echo(f"Enabling 'masquerade' with URL: {enable}...") - run_cmd(['bash', Command.MASQUERADE_SCRIPT.value, '1', enable]) - else: - click.echo("Error: Please specify either --remove or --enable.") - + try: + cli_api.masquerade(remove, enable) + except Exception as e: + click.echo(f'{e}', err=True) + # endregion -# region advanced menu +# region Advanced Menu @ cli.command('install-tcp-brutal') def install_tcp_brutal(): - run_cmd(['bash', Command.INSTALL_TCP_BRUTAL.value]) + try: + cli_api.install_tcp_brutal() + except Exception as e: + click.echo(f'{e}', err=True) @ cli.command('install-warp') def install_warp(): - run_cmd(['bash', Command.INSTALL_WARP.value]) + try: + cli_api.install_warp() + except Exception as e: + click.echo(f'{e}', err=True) @ cli.command('uninstall-warp') def uninstall_warp(): - run_cmd(['bash', Command.UNINSTALL_WARP.value]) + try: + cli_api.uninstall_warp() + except Exception as e: + click.echo(f'{e}', err=True) @cli.command('configure-warp') @@ -355,78 +257,59 @@ def uninstall_warp(): @click.option('--warp-option', '-w', type=click.Choice(['warp', 'warp plus'], case_sensitive=False), help='Specify whether to use WARP or WARP Plus') @click.option('--warp-key', '-k', help='WARP Plus key (required if warp-option is "warp plus")') def configure_warp(all: bool, popular_sites: bool, domestic_sites: bool, block_adult_sites: bool, warp_option: str, warp_key: str): - if warp_option == 'warp plus' and not warp_key: - print("Error: WARP Plus key is required when 'warp plus' is selected.") - return + try: + cli_api.configure_warp(all, popular_sites, domestic_sites, block_adult_sites, warp_option, warp_key) + except Exception as e: + click.echo(f'{e}', err=True) - options = { - "all": 'true' if all else 'false', - "popular_sites": 'true' if popular_sites else 'false', - "domestic_sites": 'true' if domestic_sites else 'false', - "block_adult_sites": 'true' if block_adult_sites else 'false', - "warp_option": warp_option or '', - "warp_key": warp_key or '' - } - - cmd_args = [ - 'bash', Command.CONFIGURE_WARP.value, - options['all'], - options['popular_sites'], - options['domestic_sites'], - options['block_adult_sites'], - options['warp_option'] - ] - - if options['warp_key']: - cmd_args.append(options['warp_key']) - - run_cmd(cmd_args) @cli.command('warp-status') def warp_status(): - output = run_cmd(['bash', Command.STATUS_WARP.value]) - if output: - print(output) + try: + res = cli_api.warp_status() + if res: + print(res) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('telegram') @click.option('--action', '-a', required=True, help='Action to perform: start or stop', type=click.Choice(['start', 'stop'], case_sensitive=False)) @click.option('--token', '-t', required=False, help='Token for running the telegram bot', type=str) @click.option('--adminid', '-aid', required=False, help='Telegram admins ID for running the telegram bot', type=str) def telegram(action: str, token: str, adminid: str): - if action == 'start': - if not token or not adminid: - print("Error: Both --token and --adminid are required for the start action.") - return - admin_ids = f'{adminid}' - run_cmd(['bash', Command.INSTALL_TELEGRAMBOT.value, 'start', token, admin_ids]) - elif action == 'stop': - run_cmd(['bash', Command.INSTALL_TELEGRAMBOT.value, 'stop']) + try: + res = cli_api.telegram(action, token, adminid) + if res: + print(res) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('singbox') @click.option('--action', '-a', required=True, help='Action to perform: start or stop', type=click.Choice(['start', 'stop'], case_sensitive=False)) @click.option('--domain', '-d', required=False, help='Domain name for SSL', type=str) @click.option('--port', '-p', required=False, help='Port number for Singbox service', type=int) def singbox(action: str, domain: str, port: int): - if action == 'start': - if not domain or not port: - click.echo("Error: Both --domain and --port are required for the start action.") - return - run_cmd(['bash', Command.INSTALL_SINGBOX.value, 'start', domain, str(port)]) - elif action == 'stop': - run_cmd(['bash', Command.INSTALL_SINGBOX.value, 'stop']) + try: + res = cli_api.singbox(action, domain, port) + if res: + print(res) + except Exception as e: + click.echo(f'{e}', err=True) + @cli.command('normal-sub') @click.option('--action', '-a', required=True, help='Action to perform: start or stop', type=click.Choice(['start', 'stop'], case_sensitive=False)) @click.option('--domain', '-d', required=False, help='Domain name for SSL', type=str) @click.option('--port', '-p', required=False, help='Port number for NormalSub service', type=int) def normalsub(action: str, domain: str, port: int): - if action == 'start': - if not domain or not port: - click.echo("Error: Both --domain and --port are required for the start action.") - return - run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'start', domain, str(port)]) - elif action == 'stop': - run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'stop']) + try: + res = cli_api.normalsub(action, domain, port) + if res: + print(res) + except Exception as e: + click.echo(f'{e}', err=True) # endregion diff --git a/core/cli_api.py b/core/cli_api.py new file mode 100644 index 0000000..6fc1044 --- /dev/null +++ b/core/cli_api.py @@ -0,0 +1,403 @@ +import os +import subprocess +from enum import Enum +from datetime import datetime + +import traffic + +DEBUG = False +SCRIPT_DIR = '/etc/hysteria/core/scripts' + + +class Command(Enum): + '''Contains path to command's script''' + INSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'install.sh') + UNINSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'uninstall.sh') + UPDATE_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'update.sh') + RESTART_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'restart.sh') + CHANGE_PORT_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_port.sh') + CHANGE_SNI_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_sni.sh') + GET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'get_user.sh') + ADD_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'add_user.sh') + EDIT_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'edit_user.sh') + RESET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'reset_user.sh') + REMOVE_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'remove_user.sh') + SHOW_USER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'show_user_uri.sh') + IP_ADD = os.path.join(SCRIPT_DIR, 'hysteria2', 'ip.sh') + MANAGE_OBFS = os.path.join(SCRIPT_DIR, 'hysteria2', 'manage_obfs.sh') + MASQUERADE_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'masquerade.sh') + TRAFFIC_STATUS = 'traffic.py' # won't be called directly (it's a python module) + UPDATE_GEO = os.path.join(SCRIPT_DIR, 'hysteria2', 'update_geo.py') + LIST_USERS = os.path.join(SCRIPT_DIR, 'hysteria2', 'list_users.sh') + SERVER_INFO = os.path.join(SCRIPT_DIR, 'hysteria2', 'server_info.sh') + BACKUP_HYSTERIA = os.path.join(SCRIPT_DIR, 'hysteria2', 'backup.sh') + INSTALL_TELEGRAMBOT = os.path.join(SCRIPT_DIR, 'telegrambot', 'runbot.sh') + INSTALL_SINGBOX = os.path.join(SCRIPT_DIR, 'singbox', 'singbox_shell.sh') + INSTALL_NORMALSUB = os.path.join(SCRIPT_DIR, 'normalsub', 'normalsub.sh') + INSTALL_TCP_BRUTAL = os.path.join(SCRIPT_DIR, 'tcp-brutal', 'install.sh') + INSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'install.sh') + UNINSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'uninstall.sh') + CONFIGURE_WARP = os.path.join(SCRIPT_DIR, 'warp', 'configure.sh') + STATUS_WARP = os.path.join(SCRIPT_DIR, 'warp', 'status.sh') + +# region Custom Exceptions + + +class HysteriaError(Exception): + """Base class for Hysteria-related exceptions.""" + pass + + +class CommandExecutionError(HysteriaError): + """Raised when a command execution fails.""" + pass + + +class InvalidInputError(HysteriaError): + """Raised when the provided input is invalid.""" + pass + + +class PasswordGenerationError(HysteriaError): + """Raised when password generation fails.""" + pass + + +class ScriptNotFoundError(HysteriaError): + """Raised when a required script is not found.""" + pass + +# region Utils + + +def run_cmd(command: list[str]) -> str | None: + ''' + Runs a command and returns the output. + Could raise subprocess.CalledProcessError + ''' + if (DEBUG) and not (Command.GET_USER.value in command or Command.LIST_USERS.value in command): + print(' '.join(command)) + try: + result = subprocess.check_output(command, shell=False) + if result: + result = result.decode().strip() + return result + except subprocess.CalledProcessError as e: + raise CommandExecutionError(f"Command execution failed: {e}") + return None + + +def generate_password() -> str: + ''' + Generates a random password using pwgen for user. + Could raise subprocess.CalledProcessError + ''' + try: + return subprocess.check_output(['pwgen', '-s', '32', '1'], shell=False).decode().strip() + except subprocess.CalledProcessError as e: + raise PasswordGenerationError(f"Failed to generate password: {e}") + +# endregion + +# region APIs + +# region Hysteria + + +def install_hysteria2(port: int, sni: str): + """ + Installs Hysteria2 on the given port and uses the provided or default SNI value. + """ + run_cmd(['bash', Command.INSTALL_HYSTERIA2.value, str(port), sni]) + + +def uninstall_hysteria2(): + """Uninstalls Hysteria2.""" + run_cmd(['bash', Command.UNINSTALL_HYSTERIA2.value]) + + +def update_hysteria2(): + """Updates Hysteria2.""" + run_cmd(['bash', Command.UPDATE_HYSTERIA2.value]) + + +def restart_hysteria2(): + """Restarts Hysteria2.""" + run_cmd(['bash', Command.RESTART_HYSTERIA2.value]) + + +def change_hysteria2_port(port: int): + """ + Changes the port for Hysteria2. + """ + run_cmd(['bash', Command.CHANGE_PORT_HYSTERIA2.value, str(port)]) + + +def change_hysteria2_sni(sni: str): + """ + Changes the SNI for Hysteria2. + """ + run_cmd(['bash', Command.CHANGE_SNI_HYSTERIA2.value, sni]) + + +def backup_hysteria(): + """Backups Hysteria configuration.""" + run_cmd(['bash', Command.BACKUP_HYSTERIA.value]) + +# endregion + +# region User + + +def list_users() -> str | None: + """ + Lists all users. + """ + return run_cmd(['bash', Command.LIST_USERS.value]) + + +def get_user(username: str) -> str | None: + """ + Retrieves information about a specific user. + """ + return run_cmd(['bash', Command.GET_USER.value, '-u', str(username)]) + + +def add_user(username: str, traffic_limit: int, expiration_days: int, password: str, creation_date: str): + """ + Adds a new user with the given parameters. + """ + if not password: + password = generate_password() + if not creation_date: + creation_date = datetime.now().strftime('%Y-%m-%d') + run_cmd(['bash', Command.ADD_USER.value, username, str(traffic_limit), str(expiration_days), password, creation_date]) + + +def edit_user(username: str, new_username: str, new_traffic_limit: int, new_expiration_days: int, renew_password: bool, renew_creation_date: bool, blocked: bool): + """ + Edits an existing user's details. + """ + if not username: + raise InvalidInputError('Error: username is required') + if not any([new_username, new_traffic_limit, new_expiration_days, renew_password, renew_creation_date, blocked is not None]): + raise InvalidInputError('Error: at least one option is required') + if new_traffic_limit is not None and new_traffic_limit <= 0: + raise InvalidInputError('Error: traffic limit must be greater than 0') + if new_expiration_days is not None and new_expiration_days <= 0: + raise InvalidInputError('Error: expiration days must be greater than 0') + if renew_password: + password = generate_password() + else: + password = "" + if renew_creation_date: + creation_date = datetime.now().strftime('%Y-%m-%d') + else: + creation_date = "" + command_args = [ + 'bash', + Command.EDIT_USER.value, + username, + new_username or '', + str(new_traffic_limit) if new_traffic_limit is not None else '', + str(new_expiration_days) if new_expiration_days is not None else '', + password, + creation_date, + 'true' if blocked else 'false' + ] + run_cmd(command_args) + + +def reset_user(username: str): + """ + Resets a user's configuration. + """ + run_cmd(['bash', Command.RESET_USER.value, username]) + + +def remove_user(username: str): + """ + Removes a user by username. + """ + run_cmd(['bash', Command.REMOVE_USER.value, username]) + + +def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: bool, normalsub: bool) -> str | None: + """ + Displays the URI for a user, with options for QR code and other formats. + """ + command_args = ['bash', Command.SHOW_USER_URI.value, '-u', username] + if qrcode: + command_args.append('-qr') + if all: + command_args.append('-a') + else: + command_args.extend(['-ip', str(ipv)]) + if singbox: + command_args.append('-s') + if normalsub: + command_args.append('-n') + return run_cmd(command_args) + +# endregion + +# region Server + + +def traffic_status(): + """Fetches traffic status.""" + traffic.traffic_status() + + +def server_info() -> str | None: + """Retrieves server information.""" + return run_cmd(['bash', Command.SERVER_INFO.value]) + + +def manage_obfs(remove: bool, generate: bool): + """ + Manages 'obfs' in Hysteria2 configuration. + """ + if remove and generate: + raise InvalidInputError('Error: You cannot use both --remove and --generate at the same time') + elif remove: + run_cmd(['bash', Command.MANAGE_OBFS.value, '--remove']) + elif generate: + run_cmd(['bash', Command.MANAGE_OBFS.value, '--generate']) + else: + raise InvalidInputError("Error: Please specify either --remove or --generate.") + + +def ip_address(edit: bool, ipv4: str, ipv6: str): + """ + Manages IP address configuration with edit options. + """ + if edit: + if ipv4: + run_cmd(['bash', Command.IP_ADD.value, 'edit', '-4', ipv4]) + if ipv6: + run_cmd(['bash', Command.IP_ADD.value, 'edit', '-6', ipv6]) + if not ipv4 and not ipv6: + raise InvalidInputError("Error: --edit requires at least one of --ipv4 or --ipv6.") + else: + run_cmd(['bash', Command.IP_ADD.value, 'add']) + + +def update_geo(country: str): + """ + Updates geographic data files based on the specified country. + """ + script_path = Command.UPDATE_GEO.value + try: + subprocess.run(['python3', script_path, country.lower()], check=True) + except subprocess.CalledProcessError as e: + raise CommandExecutionError(f"Failed to update geo files: {e}") + except FileNotFoundError: + raise ScriptNotFoundError(f"Script not found: {script_path}") + except Exception as e: + raise HysteriaError(f"An unexpected error occurred: {e}") + + +def masquerade(remove: bool, enable: str): + """ + Configures masquerade settings. + """ + if remove and enable: + raise InvalidInputError("Error: You cannot use both --remove and --enable at the same time.") + if remove: + run_cmd(['bash', Command.MASQUERADE_SCRIPT.value, '2']) + elif enable: + run_cmd(['bash', Command.MASQUERADE_SCRIPT.value, '1', enable]) + else: + raise InvalidInputError("Error: Please specify either --remove or --enable.") + +# endregion + +# region Advanced Menu + + +def install_tcp_brutal(): + """Installs TCP Brutal.""" + run_cmd(['bash', Command.INSTALL_TCP_BRUTAL.value]) + + +def install_warp(): + """Installs WARP.""" + run_cmd(['bash', Command.INSTALL_WARP.value]) + + +def uninstall_warp(): + """Uninstalls WARP.""" + run_cmd(['bash', Command.UNINSTALL_WARP.value]) + + +def configure_warp(all: bool, popular_sites: bool, domestic_sites: bool, block_adult_sites: bool, warp_option: str, warp_key: str): + """ + Configures WARP with various options. + """ + if warp_option == 'warp plus' and not warp_key: + raise InvalidInputError("Error: WARP Plus key is required when 'warp plus' is selected.") + options = { + "all": 'true' if all else 'false', + "popular_sites": 'true' if popular_sites else 'false', + "domestic_sites": 'true' if domestic_sites else 'false', + "block_adult_sites": 'true' if block_adult_sites else 'false', + "warp_option": warp_option or '', + "warp_key": warp_key or '' + } + cmd_args = [ + 'bash', Command.CONFIGURE_WARP.value, + options['all'], + options['popular_sites'], + options['domestic_sites'], + options['block_adult_sites'], + options['warp_option'] + ] + if options['warp_key']: + cmd_args.append(options['warp_key']) + run_cmd(cmd_args) + + +def warp_status() -> str | None: + """Checks the status of WARP.""" + return run_cmd(['bash', Command.STATUS_WARP.value]) + + +def telegram(action: str, token: str, adminid: str): + """ + Manages the Telegram bot with start/stop actions. + """ + if action == 'start': + if not token or not adminid: + raise InvalidInputError("Error: Both --token and --adminid are required for the start action.") + admin_ids = f'{adminid}' + run_cmd(['bash', Command.INSTALL_TELEGRAMBOT.value, 'start', token, admin_ids]) + elif action == 'stop': + run_cmd(['bash', Command.INSTALL_TELEGRAMBOT.value, 'stop']) + + +def singbox(action: str, domain: str, port: int): + """ + Manages Singbox with start/stop actions. + """ + if action == 'start': + if not domain or not port: + raise InvalidInputError("Error: Both --domain and --port are required for the start action.") + run_cmd(['bash', Command.INSTALL_SINGBOX.value, 'start', domain, str(port)]) + elif action == 'stop': + run_cmd(['bash', Command.INSTALL_SINGBOX.value, 'stop']) + + +def normalsub(action: str, domain: str, port: int): + """ + Manages Normalsub with start/stop actions. + """ + if action == 'start': + if not domain or not port: + raise InvalidInputError("Error: Both --domain and --port are required for the start action.") + run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'start', domain, str(port)]) + elif action == 'stop': + run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'stop']) + +# endregion +# endregion diff --git a/core/scripts/hysteria2/manage_obfs.sh b/core/scripts/hysteria2/manage_obfs.sh index 0d7ef95..ff334be 100644 --- a/core/scripts/hysteria2/manage_obfs.sh +++ b/core/scripts/hysteria2/manage_obfs.sh @@ -33,8 +33,10 @@ generate_obfs() { } if [[ $1 == "--remove" || $1 == "-r" ]]; then + echo "Removing 'obfs' from config.json..." remove_obfs elif [[ $1 == "--generate" || $1 == "-g" ]]; then + echo "Generating 'obfs' in config.json..." generate_obfs else echo "Usage: $0 --remove|-r | --generate|-g" diff --git a/core/scripts/hysteria2/masquerade.sh b/core/scripts/hysteria2/masquerade.sh index 68b0c35..d6ed54c 100644 --- a/core/scripts/hysteria2/masquerade.sh +++ b/core/scripts/hysteria2/masquerade.sh @@ -27,8 +27,10 @@ function remove_masquerade() { } if [[ "$1" == "1" ]]; then + echo "Enabling 'masquerade' with URL: $2..." enable_masquerade "$2" elif [[ "$1" == "2" ]]; then + echo "Removing 'masquerade' from config.json..." remove_masquerade else echo "Usage: $0 {1|2} [domain]"