Make cli.py more modular

This commit is contained in:
Iam54r1n4
2025-01-24 05:24:49 +00:00
parent 5dd2b531fb
commit 562b869980
4 changed files with 557 additions and 267 deletions

View File

@ -1,125 +1,97 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from datetime import datetime from datetime import datetime
import os
import io
import click import click
import subprocess import subprocess
from enum import Enum
import traffic
import validator import validator
import cli_api
SCRIPT_DIR = '/etc/hysteria/core/scripts'
DEBUG = False
class Command(Enum):
'''Constais path to command's script'''
INSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'install.sh')
UNINSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'uninstall.sh')
UPDATE_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'update.sh')
RESTART_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'restart.sh')
CHANGE_PORT_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_port.sh')
CHANGE_SNI_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_sni.sh')
GET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'get_user.sh')
ADD_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'add_user.sh')
EDIT_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'edit_user.sh')
RESET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'reset_user.sh')
REMOVE_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'remove_user.sh')
SHOW_USER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'show_user_uri.sh')
IP_ADD = os.path.join(SCRIPT_DIR, 'hysteria2', 'ip.sh')
MANAGE_OBFS = os.path.join(SCRIPT_DIR, 'hysteria2', 'manage_obfs.sh')
MASQUERADE_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'masquerade.sh')
TRAFFIC_STATUS = 'traffic.py' # won't be call directly (it's a python module)
UPDATE_GEO = os.path.join(SCRIPT_DIR, 'hysteria2', 'update_geo.py')
LIST_USERS = os.path.join(SCRIPT_DIR, 'hysteria2', 'list_users.sh')
SERVER_INFO = os.path.join(SCRIPT_DIR, 'hysteria2', 'server_info.sh')
BACKUP_HYSTERIA = os.path.join(SCRIPT_DIR, 'hysteria2', 'backup.sh')
INSTALL_TELEGRAMBOT = os.path.join(SCRIPT_DIR, 'telegrambot', 'runbot.sh')
INSTALL_SINGBOX = os.path.join(SCRIPT_DIR, 'singbox', 'singbox_shell.sh')
INSTALL_NORMALSUB = os.path.join(SCRIPT_DIR, 'normalsub', 'normalsub.sh')
INSTALL_TCP_BRUTAL = os.path.join(SCRIPT_DIR, 'tcp-brutal', 'install.sh')
INSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'install.sh')
UNINSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'uninstall.sh')
CONFIGURE_WARP = os.path.join(SCRIPT_DIR, 'warp', 'configure.sh')
STATUS_WARP = os.path.join(SCRIPT_DIR, 'warp', 'status.sh')
# region utils
def run_cmd(command: list[str]):
'''
Runs a command and returns the output.
Could raise subprocess.CalledProcessError
'''
# if the command is GET_USER or LIST_USERS we don't print the debug-command and just print the output
if DEBUG and not (Command.GET_USER.value in command or Command.LIST_USERS.value in command):
print(' '.join(command))
result = subprocess.check_output(command, shell=False)
print(result.decode().strip())
def generate_password() -> str:
'''
Generates a random password using pwgen for user.
Could raise subprocess.CalledProcessError
'''
return subprocess.check_output(['pwgen', '-s', '32', '1'], shell=False).decode().strip()
# endregion
@click.group() @click.group()
def cli(): def cli():
pass pass
# region hysteria2 menu options # region Hysteria2
@cli.command('install-hysteria2') @cli.command('install-hysteria2')
@click.option('--port', '-p', required=True, help='Port for Hysteria2', type=int, callback=validator.validate_port) @click.option('--port', '-p', required=True, help='Port for Hysteria2', type=int)
@click.option('--sni', '-s', required=False, default='bts.com', help='SNI for Hysteria2 (default: bts.com)', type=str) @click.option('--sni', '-s', required=False, default='bts.com', help='SNI for Hysteria2 (default: bts.com)', type=str)
def install_hysteria2(port: int, sni: str): def install_hysteria2(port: int, sni: str):
""" try:
Installs Hysteria2 on the given port and uses the provided or default SNI value. cli_api.install_hysteria2(port, sni)
""" except Exception as e:
run_cmd(['bash', Command.INSTALL_HYSTERIA2.value, str(port), sni]) click.echo(f'{e}', err=True)
@cli.command('uninstall-hysteria2') @cli.command('uninstall-hysteria2')
def uninstall_hysteria2(): def uninstall_hysteria2():
run_cmd(['bash', Command.UNINSTALL_HYSTERIA2.value]) try:
cli_api.uninstall_hysteria2()
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('update-hysteria2') @cli.command('update-hysteria2')
def update_hysteria2(): def update_hysteria2():
run_cmd(['bash', Command.UPDATE_HYSTERIA2.value]) try:
cli_api.update_hysteria2()
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('restart-hysteria2') @cli.command('restart-hysteria2')
def restart_hysteria2(): def restart_hysteria2():
run_cmd(['bash', Command.RESTART_HYSTERIA2.value]) try:
cli_api.restart_hysteria2()
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('change-hysteria2-port') @cli.command('change-hysteria2-port')
@click.option('--port', '-p', required=True, help='New port for Hysteria2', type=int, callback=validator.validate_port) @click.option('--port', '-p', required=True, help='New port for Hysteria2', type=int, callback=validator.validate_port)
def change_hysteria2_port(port: int): def change_hysteria2_port(port: int):
run_cmd(['bash', Command.CHANGE_PORT_HYSTERIA2.value, str(port)]) try:
cli_api.change_hysteria2_port(port)
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('change-hysteria2-sni') @cli.command('change-hysteria2-sni')
@click.option('--sni', '-s', required=True, help='New SNI for Hysteria2', type=str) @click.option('--sni', '-s', required=True, help='New SNI for Hysteria2', type=str)
def change_hysteria2_sni(sni: str): def change_hysteria2_sni(sni: str):
run_cmd(['bash', Command.CHANGE_SNI_HYSTERIA2.value, sni]) try:
cli_api.change_hysteria2_sni(sni)
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('backup-hysteria')
def backup_hysteria():
try:
cli_api.backup_hysteria()
except Exception as e:
click.echo(f'{e}', err=True)
# endregion
# region User
@ cli.command('list-users')
def list_users():
cli_api.list_users()
@cli.command('get-user') @cli.command('get-user')
@click.option('--username', '-u', required=True, help='Username for the user to get', type=str) @click.option('--username', '-u', required=True, help='Username for the user to get', type=str)
def get_user(username: str): def get_user(username: str):
cmd = ['bash', Command.GET_USER.value, '-u', str(username)] try:
run_cmd(cmd) cli_api.get_user(username)
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('add-user') @cli.command('add-user')
@click.option('--username', '-u', required=True, help='Username for the new user', type=str) @click.option('--username', '-u', required=True, help='Username for the new user', type=str)
@ -128,19 +100,11 @@ def get_user(username: str):
@click.option('--password', '-p', required=False, help='Password for the user', type=str) @click.option('--password', '-p', required=False, help='Password for the user', type=str)
@click.option('--creation-date', '-c', required=False, help='Creation date for the user', type=str) @click.option('--creation-date', '-c', required=False, help='Creation date for the user', type=str)
def add_user(username: str, traffic_limit: int, expiration_days: int, password: str, creation_date: str): def add_user(username: str, traffic_limit: int, expiration_days: int, password: str, creation_date: str):
if not password:
try:
password = generate_password()
except subprocess.CalledProcessError as e:
print(f'Error: failed to generate password\n{e}')
exit(1)
if not creation_date:
creation_date = datetime.now().strftime('%Y-%m-%d')
try: try:
run_cmd(['bash', Command.ADD_USER.value, username, str(traffic_limit), str(expiration_days), password, creation_date]) cli_api.add_user(username, traffic_limit, expiration_days, password, creation_date)
except subprocess.CalledProcessError as e: except Exception as e:
click.echo(f"{e.output.decode()}", err=True) click.echo(f'{e}', err=True)
exit(1)
@cli.command('edit-user') @cli.command('edit-user')
@click.option('--username', '-u', required=True, help='Username for the user to edit', type=str) @click.option('--username', '-u', required=True, help='Username for the user to edit', type=str)
@ -151,61 +115,30 @@ def add_user(username: str, traffic_limit: int, expiration_days: int, password:
@click.option('--renew-creation-date', '-rc', is_flag=True, help='Renew creation date for the user') @click.option('--renew-creation-date', '-rc', is_flag=True, help='Renew creation date for the user')
@click.option('--blocked', '-b', is_flag=True, help='Block the user') @click.option('--blocked', '-b', is_flag=True, help='Block the user')
def edit_user(username: str, new_username: str, new_traffic_limit: int, new_expiration_days: int, renew_password: bool, renew_creation_date: bool, blocked: bool): def edit_user(username: str, new_username: str, new_traffic_limit: int, new_expiration_days: int, renew_password: bool, renew_creation_date: bool, blocked: bool):
if not username: try:
print('Error: username is required') cli_api.edit_user(username, new_username, new_traffic_limit, new_expiration_days,
exit(1) renew_password, renew_creation_date, blocked)
except Exception as e:
click.echo(f'{e}', err=True)
if not any([new_username, new_traffic_limit, new_expiration_days, renew_password, renew_creation_date, blocked is not None]):
print('Error: at least one option is required')
exit(1)
if new_traffic_limit is not None and new_traffic_limit <= 0:
print('Error: traffic limit must be greater than 0')
exit(1)
if new_expiration_days is not None and new_expiration_days <= 0:
print('Error: expiration days must be greater than 0')
exit(1)
# Handle renewing password and creation date
if renew_password:
try:
password = generate_password()
except subprocess.CalledProcessError as e:
print(f'Error: failed to generate password\n{e}')
exit(1)
else:
password = ""
if renew_creation_date:
creation_date = datetime.now().strftime('%Y-%m-%d')
else:
creation_date = ""
# Prepare arguments for the command
command_args = [
'bash',
Command.EDIT_USER.value,
username,
new_username or '',
str(new_traffic_limit) if new_traffic_limit is not None else '',
str(new_expiration_days) if new_expiration_days is not None else '',
password,
creation_date,
'true' if blocked else 'false'
]
run_cmd(command_args)
@ cli.command('reset-user') @ cli.command('reset-user')
@ click.option('--username', '-u', required=True, help='Username for the user to Reset', type=str) @ click.option('--username', '-u', required=True, help='Username for the user to Reset', type=str)
def reset_user(username: str): def reset_user(username: str):
run_cmd(['bash', Command.RESET_USER.value, username]) try:
cli_api.reset_user(username)
except Exception as e:
click.echo(f'{e}', err=True)
@ cli.command('remove-user') @ cli.command('remove-user')
@ click.option('--username', '-u', required=True, help='Username for the user to remove', type=str) @ click.option('--username', '-u', required=True, help='Username for the user to remove', type=str)
def remove_user(username: str): def remove_user(username: str):
run_cmd(['bash', Command.REMOVE_USER.value, username]) try:
cli_api.remove_user(username)
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('show-user-uri') @cli.command('show-user-uri')
@click.option('--username', '-u', required=True, help='Username for the user to show the URI', type=str) @click.option('--username', '-u', required=True, help='Username for the user to show the URI', type=str)
@ -215,136 +148,105 @@ def remove_user(username: str):
@click.option('--singbox', '-s', is_flag=True, help='Generate Singbox sublink if Singbox service is active') @click.option('--singbox', '-s', is_flag=True, help='Generate Singbox sublink if Singbox service is active')
@click.option('--normalsub', '-n', is_flag=True, help='Generate Normal sublink if normalsub service is active') @click.option('--normalsub', '-n', is_flag=True, help='Generate Normal sublink if normalsub service is active')
def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: bool, normalsub: bool): def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: bool, normalsub: bool):
command_args = ['bash', Command.SHOW_USER_URI.value, '-u', username] try:
if qrcode: cli_api.show_user_uri(username, qrcode, ipv, all, singbox, normalsub)
command_args.append('-qr') except Exception as e:
if all: click.echo(f'{e}', err=True)
command_args.append('-a') # endregion
else:
command_args.extend(['-ip', str(ipv)])
if singbox:
command_args.append('-s')
if normalsub:
command_args.append('-n')
run_cmd(command_args)
# region Server
@ cli.command('traffic-status') @ cli.command('traffic-status')
def traffic_status(): def traffic_status():
traffic.traffic_status() cli_api.traffic_status()
# traffic.traffic_status()
@ cli.command('list-users')
def list_users():
run_cmd(['bash', Command.LIST_USERS.value])
@cli.command('server-info') @cli.command('server-info')
def server_info(): def server_info():
output = run_cmd(['bash', Command.SERVER_INFO.value])
if output:
print(output)
@cli.command('backup-hysteria')
def backup_hysteria():
try: try:
run_cmd(['bash', Command.BACKUP_HYSTERIA.value]) res = cli_api.server_info()
except subprocess.CalledProcessError as e: if res:
click.echo(f"Backup failed: {e.output.decode()}", err=True) print(res)
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('manage_obfs') @cli.command('manage_obfs')
@click.option('--remove', '-r', is_flag=True, help="Remove 'obfs' from config.json.") @click.option('--remove', '-r', is_flag=True, help="Remove 'obfs' from config.json.")
@click.option('--generate', '-g', is_flag=True, help="Generate new 'obfs' in config.json.") @click.option('--generate', '-g', is_flag=True, help="Generate new 'obfs' in config.json.")
def manage_obfs(remove, generate): def manage_obfs(remove: bool, generate: bool):
"""Manage 'obfs' in Hysteria2 configuration.""" try:
if remove and generate: cli_api.manage_obfs(remove, generate)
click.echo("Error: You cannot use both --remove and --generate at the same time.") except Exception as e:
return click.echo(f'{e}', err=True)
elif remove:
click.echo("Removing 'obfs' from config.json...")
run_cmd(['bash', Command.MANAGE_OBFS.value, '--remove'])
elif generate:
click.echo("Generating 'obfs' in config.json...")
run_cmd(['bash', Command.MANAGE_OBFS.value, '--generate'])
else:
click.echo("Error: Please specify either --remove or --generate.")
@cli.command('ip-address') @cli.command('ip-address')
@click.option('--edit', is_flag=True, help="Edit IP addresses manually.") @click.option('--edit', is_flag=True, help="Edit IP addresses manually.")
@click.option('-4', '--ipv4', type=str, help="Specify the new IPv4 address.") @click.option('-4', '--ipv4', type=str, help="Specify the new IPv4 address.")
@click.option('-6', '--ipv6', type=str, help="Specify the new IPv6 address.") @click.option('-6', '--ipv6', type=str, help="Specify the new IPv6 address.")
def ip_address(edit, ipv4, ipv6): def ip_address(edit: bool, ipv4: str, ipv6: str):
""" """
Manage IP addresses in .configs.env. Manage IP addresses in .configs.env.
- Use without options to add auto-detected IPs. - Use without options to add auto-detected IPs.
- Use --edit with -4 or -6 to manually update IPs. - Use --edit with -4 or -6 to manually update IPs.
""" """
if edit: try:
if ipv4: cli_api.ip_address(edit, ipv4, ipv6)
run_cmd(['bash', Command.IP_ADD.value, 'edit', '-4', ipv4]) except Exception as e:
if ipv6: click.echo(f'{e}', err=True)
run_cmd(['bash', Command.IP_ADD.value, 'edit', '-6', ipv6])
if not ipv4 and not ipv6:
click.echo("Error: --edit requires at least one of --ipv4 or --ipv6.")
else:
run_cmd(['bash', Command.IP_ADD.value, 'add'])
@cli.command('update-geo') @cli.command('update-geo')
@click.option('--country', '-c', @click.option('--country', '-c',
type=click.Choice(['iran', 'china', 'russia'], case_sensitive=False), type=click.Choice(['iran', 'china', 'russia'], case_sensitive=False),
default='iran', default='iran',
help='Select country for geo files (default: iran)') help='Select country for geo files (default: iran)')
def cli_update_geo(country): def update_geo(country: str):
script_path = Command.UPDATE_GEO.value
try: try:
subprocess.run(['python3', script_path, country.lower()], check=True) cli_api.update_geo(country)
except subprocess.CalledProcessError as e:
print(f"Failed to update geo files: {e}")
except FileNotFoundError:
print(f"Script not found: {script_path}")
except Exception as e: except Exception as e:
print(f"An unexpected error occurred: {e}") click.echo(f'{e}', err=True)
@cli.command('masquerade') @cli.command('masquerade')
@click.option('--remove', '-r', is_flag=True, help="Remove 'masquerade' from config.json.") @click.option('--remove', '-r', is_flag=True, help="Remove 'masquerade' from config.json.")
@click.option('--enable', '-e', metavar="<domain>", type=str, help="Enable 'masquerade' in config.json with the specified domain.") @click.option('--enable', '-e', metavar="<domain>", type=str, help="Enable 'masquerade' in config.json with the specified domain.")
def masquerade(remove, enable): def masquerade(remove: bool, enable: str):
"""Manage 'masquerade' in Hysteria2 configuration.""" """Manage 'masquerade' in Hysteria2 configuration."""
try:
if remove and enable: cli_api.masquerade(remove, enable)
click.echo("Error: You cannot use both --remove and --enable at the same time.") except Exception as e:
return click.echo(f'{e}', err=True)
elif remove:
click.echo("Removing 'masquerade' from config.json...")
run_cmd(['bash', Command.MASQUERADE_SCRIPT.value, '2'])
elif enable:
if not enable:
click.echo("Error: You must specify a domain with --enable.")
return
click.echo(f"Enabling 'masquerade' with URL: {enable}...")
run_cmd(['bash', Command.MASQUERADE_SCRIPT.value, '1', enable])
else:
click.echo("Error: Please specify either --remove or --enable.")
# endregion # endregion
# region advanced menu # region Advanced Menu
@ cli.command('install-tcp-brutal') @ cli.command('install-tcp-brutal')
def install_tcp_brutal(): def install_tcp_brutal():
run_cmd(['bash', Command.INSTALL_TCP_BRUTAL.value]) try:
cli_api.install_tcp_brutal()
except Exception as e:
click.echo(f'{e}', err=True)
@ cli.command('install-warp') @ cli.command('install-warp')
def install_warp(): def install_warp():
run_cmd(['bash', Command.INSTALL_WARP.value]) try:
cli_api.install_warp()
except Exception as e:
click.echo(f'{e}', err=True)
@ cli.command('uninstall-warp') @ cli.command('uninstall-warp')
def uninstall_warp(): def uninstall_warp():
run_cmd(['bash', Command.UNINSTALL_WARP.value]) try:
cli_api.uninstall_warp()
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('configure-warp') @cli.command('configure-warp')
@ -355,78 +257,59 @@ def uninstall_warp():
@click.option('--warp-option', '-w', type=click.Choice(['warp', 'warp plus'], case_sensitive=False), help='Specify whether to use WARP or WARP Plus') @click.option('--warp-option', '-w', type=click.Choice(['warp', 'warp plus'], case_sensitive=False), help='Specify whether to use WARP or WARP Plus')
@click.option('--warp-key', '-k', help='WARP Plus key (required if warp-option is "warp plus")') @click.option('--warp-key', '-k', help='WARP Plus key (required if warp-option is "warp plus")')
def configure_warp(all: bool, popular_sites: bool, domestic_sites: bool, block_adult_sites: bool, warp_option: str, warp_key: str): def configure_warp(all: bool, popular_sites: bool, domestic_sites: bool, block_adult_sites: bool, warp_option: str, warp_key: str):
if warp_option == 'warp plus' and not warp_key: try:
print("Error: WARP Plus key is required when 'warp plus' is selected.") cli_api.configure_warp(all, popular_sites, domestic_sites, block_adult_sites, warp_option, warp_key)
return except Exception as e:
click.echo(f'{e}', err=True)
options = {
"all": 'true' if all else 'false',
"popular_sites": 'true' if popular_sites else 'false',
"domestic_sites": 'true' if domestic_sites else 'false',
"block_adult_sites": 'true' if block_adult_sites else 'false',
"warp_option": warp_option or '',
"warp_key": warp_key or ''
}
cmd_args = [
'bash', Command.CONFIGURE_WARP.value,
options['all'],
options['popular_sites'],
options['domestic_sites'],
options['block_adult_sites'],
options['warp_option']
]
if options['warp_key']:
cmd_args.append(options['warp_key'])
run_cmd(cmd_args)
@cli.command('warp-status') @cli.command('warp-status')
def warp_status(): def warp_status():
output = run_cmd(['bash', Command.STATUS_WARP.value]) try:
if output: res = cli_api.warp_status()
print(output) if res:
print(res)
except Exception as e:
click.echo(f'{e}', err=True)
@cli.command('telegram') @cli.command('telegram')
@click.option('--action', '-a', required=True, help='Action to perform: start or stop', type=click.Choice(['start', 'stop'], case_sensitive=False)) @click.option('--action', '-a', required=True, help='Action to perform: start or stop', type=click.Choice(['start', 'stop'], case_sensitive=False))
@click.option('--token', '-t', required=False, help='Token for running the telegram bot', type=str) @click.option('--token', '-t', required=False, help='Token for running the telegram bot', type=str)
@click.option('--adminid', '-aid', required=False, help='Telegram admins ID for running the telegram bot', type=str) @click.option('--adminid', '-aid', required=False, help='Telegram admins ID for running the telegram bot', type=str)
def telegram(action: str, token: str, adminid: str): def telegram(action: str, token: str, adminid: str):
if action == 'start': try:
if not token or not adminid: res = cli_api.telegram(action, token, adminid)
print("Error: Both --token and --adminid are required for the start action.") if res:
return print(res)
admin_ids = f'{adminid}' except Exception as e:
run_cmd(['bash', Command.INSTALL_TELEGRAMBOT.value, 'start', token, admin_ids]) click.echo(f'{e}', err=True)
elif action == 'stop':
run_cmd(['bash', Command.INSTALL_TELEGRAMBOT.value, 'stop'])
@cli.command('singbox') @cli.command('singbox')
@click.option('--action', '-a', required=True, help='Action to perform: start or stop', type=click.Choice(['start', 'stop'], case_sensitive=False)) @click.option('--action', '-a', required=True, help='Action to perform: start or stop', type=click.Choice(['start', 'stop'], case_sensitive=False))
@click.option('--domain', '-d', required=False, help='Domain name for SSL', type=str) @click.option('--domain', '-d', required=False, help='Domain name for SSL', type=str)
@click.option('--port', '-p', required=False, help='Port number for Singbox service', type=int) @click.option('--port', '-p', required=False, help='Port number for Singbox service', type=int)
def singbox(action: str, domain: str, port: int): def singbox(action: str, domain: str, port: int):
if action == 'start': try:
if not domain or not port: res = cli_api.singbox(action, domain, port)
click.echo("Error: Both --domain and --port are required for the start action.") if res:
return print(res)
run_cmd(['bash', Command.INSTALL_SINGBOX.value, 'start', domain, str(port)]) except Exception as e:
elif action == 'stop': click.echo(f'{e}', err=True)
run_cmd(['bash', Command.INSTALL_SINGBOX.value, 'stop'])
@cli.command('normal-sub') @cli.command('normal-sub')
@click.option('--action', '-a', required=True, help='Action to perform: start or stop', type=click.Choice(['start', 'stop'], case_sensitive=False)) @click.option('--action', '-a', required=True, help='Action to perform: start or stop', type=click.Choice(['start', 'stop'], case_sensitive=False))
@click.option('--domain', '-d', required=False, help='Domain name for SSL', type=str) @click.option('--domain', '-d', required=False, help='Domain name for SSL', type=str)
@click.option('--port', '-p', required=False, help='Port number for NormalSub service', type=int) @click.option('--port', '-p', required=False, help='Port number for NormalSub service', type=int)
def normalsub(action: str, domain: str, port: int): def normalsub(action: str, domain: str, port: int):
if action == 'start': try:
if not domain or not port: res = cli_api.normalsub(action, domain, port)
click.echo("Error: Both --domain and --port are required for the start action.") if res:
return print(res)
run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'start', domain, str(port)]) except Exception as e:
elif action == 'stop': click.echo(f'{e}', err=True)
run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'stop'])
# endregion # endregion

403
core/cli_api.py Normal file
View File

@ -0,0 +1,403 @@
import os
import subprocess
from enum import Enum
from datetime import datetime
import traffic
DEBUG = False
SCRIPT_DIR = '/etc/hysteria/core/scripts'
class Command(Enum):
'''Contains path to command's script'''
INSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'install.sh')
UNINSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'uninstall.sh')
UPDATE_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'update.sh')
RESTART_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'restart.sh')
CHANGE_PORT_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_port.sh')
CHANGE_SNI_HYSTERIA2 = os.path.join(SCRIPT_DIR, 'hysteria2', 'change_sni.sh')
GET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'get_user.sh')
ADD_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'add_user.sh')
EDIT_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'edit_user.sh')
RESET_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'reset_user.sh')
REMOVE_USER = os.path.join(SCRIPT_DIR, 'hysteria2', 'remove_user.sh')
SHOW_USER_URI = os.path.join(SCRIPT_DIR, 'hysteria2', 'show_user_uri.sh')
IP_ADD = os.path.join(SCRIPT_DIR, 'hysteria2', 'ip.sh')
MANAGE_OBFS = os.path.join(SCRIPT_DIR, 'hysteria2', 'manage_obfs.sh')
MASQUERADE_SCRIPT = os.path.join(SCRIPT_DIR, 'hysteria2', 'masquerade.sh')
TRAFFIC_STATUS = 'traffic.py' # won't be called directly (it's a python module)
UPDATE_GEO = os.path.join(SCRIPT_DIR, 'hysteria2', 'update_geo.py')
LIST_USERS = os.path.join(SCRIPT_DIR, 'hysteria2', 'list_users.sh')
SERVER_INFO = os.path.join(SCRIPT_DIR, 'hysteria2', 'server_info.sh')
BACKUP_HYSTERIA = os.path.join(SCRIPT_DIR, 'hysteria2', 'backup.sh')
INSTALL_TELEGRAMBOT = os.path.join(SCRIPT_DIR, 'telegrambot', 'runbot.sh')
INSTALL_SINGBOX = os.path.join(SCRIPT_DIR, 'singbox', 'singbox_shell.sh')
INSTALL_NORMALSUB = os.path.join(SCRIPT_DIR, 'normalsub', 'normalsub.sh')
INSTALL_TCP_BRUTAL = os.path.join(SCRIPT_DIR, 'tcp-brutal', 'install.sh')
INSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'install.sh')
UNINSTALL_WARP = os.path.join(SCRIPT_DIR, 'warp', 'uninstall.sh')
CONFIGURE_WARP = os.path.join(SCRIPT_DIR, 'warp', 'configure.sh')
STATUS_WARP = os.path.join(SCRIPT_DIR, 'warp', 'status.sh')
# region Custom Exceptions
class HysteriaError(Exception):
"""Base class for Hysteria-related exceptions."""
pass
class CommandExecutionError(HysteriaError):
"""Raised when a command execution fails."""
pass
class InvalidInputError(HysteriaError):
"""Raised when the provided input is invalid."""
pass
class PasswordGenerationError(HysteriaError):
"""Raised when password generation fails."""
pass
class ScriptNotFoundError(HysteriaError):
"""Raised when a required script is not found."""
pass
# region Utils
def run_cmd(command: list[str]) -> str | None:
'''
Runs a command and returns the output.
Could raise subprocess.CalledProcessError
'''
if (DEBUG) and not (Command.GET_USER.value in command or Command.LIST_USERS.value in command):
print(' '.join(command))
try:
result = subprocess.check_output(command, shell=False)
if result:
result = result.decode().strip()
return result
except subprocess.CalledProcessError as e:
raise CommandExecutionError(f"Command execution failed: {e}")
return None
def generate_password() -> str:
'''
Generates a random password using pwgen for user.
Could raise subprocess.CalledProcessError
'''
try:
return subprocess.check_output(['pwgen', '-s', '32', '1'], shell=False).decode().strip()
except subprocess.CalledProcessError as e:
raise PasswordGenerationError(f"Failed to generate password: {e}")
# endregion
# region APIs
# region Hysteria
def install_hysteria2(port: int, sni: str):
"""
Installs Hysteria2 on the given port and uses the provided or default SNI value.
"""
run_cmd(['bash', Command.INSTALL_HYSTERIA2.value, str(port), sni])
def uninstall_hysteria2():
"""Uninstalls Hysteria2."""
run_cmd(['bash', Command.UNINSTALL_HYSTERIA2.value])
def update_hysteria2():
"""Updates Hysteria2."""
run_cmd(['bash', Command.UPDATE_HYSTERIA2.value])
def restart_hysteria2():
"""Restarts Hysteria2."""
run_cmd(['bash', Command.RESTART_HYSTERIA2.value])
def change_hysteria2_port(port: int):
"""
Changes the port for Hysteria2.
"""
run_cmd(['bash', Command.CHANGE_PORT_HYSTERIA2.value, str(port)])
def change_hysteria2_sni(sni: str):
"""
Changes the SNI for Hysteria2.
"""
run_cmd(['bash', Command.CHANGE_SNI_HYSTERIA2.value, sni])
def backup_hysteria():
"""Backups Hysteria configuration."""
run_cmd(['bash', Command.BACKUP_HYSTERIA.value])
# endregion
# region User
def list_users() -> str | None:
"""
Lists all users.
"""
return run_cmd(['bash', Command.LIST_USERS.value])
def get_user(username: str) -> str | None:
"""
Retrieves information about a specific user.
"""
return run_cmd(['bash', Command.GET_USER.value, '-u', str(username)])
def add_user(username: str, traffic_limit: int, expiration_days: int, password: str, creation_date: str):
"""
Adds a new user with the given parameters.
"""
if not password:
password = generate_password()
if not creation_date:
creation_date = datetime.now().strftime('%Y-%m-%d')
run_cmd(['bash', Command.ADD_USER.value, username, str(traffic_limit), str(expiration_days), password, creation_date])
def edit_user(username: str, new_username: str, new_traffic_limit: int, new_expiration_days: int, renew_password: bool, renew_creation_date: bool, blocked: bool):
"""
Edits an existing user's details.
"""
if not username:
raise InvalidInputError('Error: username is required')
if not any([new_username, new_traffic_limit, new_expiration_days, renew_password, renew_creation_date, blocked is not None]):
raise InvalidInputError('Error: at least one option is required')
if new_traffic_limit is not None and new_traffic_limit <= 0:
raise InvalidInputError('Error: traffic limit must be greater than 0')
if new_expiration_days is not None and new_expiration_days <= 0:
raise InvalidInputError('Error: expiration days must be greater than 0')
if renew_password:
password = generate_password()
else:
password = ""
if renew_creation_date:
creation_date = datetime.now().strftime('%Y-%m-%d')
else:
creation_date = ""
command_args = [
'bash',
Command.EDIT_USER.value,
username,
new_username or '',
str(new_traffic_limit) if new_traffic_limit is not None else '',
str(new_expiration_days) if new_expiration_days is not None else '',
password,
creation_date,
'true' if blocked else 'false'
]
run_cmd(command_args)
def reset_user(username: str):
"""
Resets a user's configuration.
"""
run_cmd(['bash', Command.RESET_USER.value, username])
def remove_user(username: str):
"""
Removes a user by username.
"""
run_cmd(['bash', Command.REMOVE_USER.value, username])
def show_user_uri(username: str, qrcode: bool, ipv: int, all: bool, singbox: bool, normalsub: bool) -> str | None:
"""
Displays the URI for a user, with options for QR code and other formats.
"""
command_args = ['bash', Command.SHOW_USER_URI.value, '-u', username]
if qrcode:
command_args.append('-qr')
if all:
command_args.append('-a')
else:
command_args.extend(['-ip', str(ipv)])
if singbox:
command_args.append('-s')
if normalsub:
command_args.append('-n')
return run_cmd(command_args)
# endregion
# region Server
def traffic_status():
"""Fetches traffic status."""
traffic.traffic_status()
def server_info() -> str | None:
"""Retrieves server information."""
return run_cmd(['bash', Command.SERVER_INFO.value])
def manage_obfs(remove: bool, generate: bool):
"""
Manages 'obfs' in Hysteria2 configuration.
"""
if remove and generate:
raise InvalidInputError('Error: You cannot use both --remove and --generate at the same time')
elif remove:
run_cmd(['bash', Command.MANAGE_OBFS.value, '--remove'])
elif generate:
run_cmd(['bash', Command.MANAGE_OBFS.value, '--generate'])
else:
raise InvalidInputError("Error: Please specify either --remove or --generate.")
def ip_address(edit: bool, ipv4: str, ipv6: str):
"""
Manages IP address configuration with edit options.
"""
if edit:
if ipv4:
run_cmd(['bash', Command.IP_ADD.value, 'edit', '-4', ipv4])
if ipv6:
run_cmd(['bash', Command.IP_ADD.value, 'edit', '-6', ipv6])
if not ipv4 and not ipv6:
raise InvalidInputError("Error: --edit requires at least one of --ipv4 or --ipv6.")
else:
run_cmd(['bash', Command.IP_ADD.value, 'add'])
def update_geo(country: str):
"""
Updates geographic data files based on the specified country.
"""
script_path = Command.UPDATE_GEO.value
try:
subprocess.run(['python3', script_path, country.lower()], check=True)
except subprocess.CalledProcessError as e:
raise CommandExecutionError(f"Failed to update geo files: {e}")
except FileNotFoundError:
raise ScriptNotFoundError(f"Script not found: {script_path}")
except Exception as e:
raise HysteriaError(f"An unexpected error occurred: {e}")
def masquerade(remove: bool, enable: str):
"""
Configures masquerade settings.
"""
if remove and enable:
raise InvalidInputError("Error: You cannot use both --remove and --enable at the same time.")
if remove:
run_cmd(['bash', Command.MASQUERADE_SCRIPT.value, '2'])
elif enable:
run_cmd(['bash', Command.MASQUERADE_SCRIPT.value, '1', enable])
else:
raise InvalidInputError("Error: Please specify either --remove or --enable.")
# endregion
# region Advanced Menu
def install_tcp_brutal():
"""Installs TCP Brutal."""
run_cmd(['bash', Command.INSTALL_TCP_BRUTAL.value])
def install_warp():
"""Installs WARP."""
run_cmd(['bash', Command.INSTALL_WARP.value])
def uninstall_warp():
"""Uninstalls WARP."""
run_cmd(['bash', Command.UNINSTALL_WARP.value])
def configure_warp(all: bool, popular_sites: bool, domestic_sites: bool, block_adult_sites: bool, warp_option: str, warp_key: str):
"""
Configures WARP with various options.
"""
if warp_option == 'warp plus' and not warp_key:
raise InvalidInputError("Error: WARP Plus key is required when 'warp plus' is selected.")
options = {
"all": 'true' if all else 'false',
"popular_sites": 'true' if popular_sites else 'false',
"domestic_sites": 'true' if domestic_sites else 'false',
"block_adult_sites": 'true' if block_adult_sites else 'false',
"warp_option": warp_option or '',
"warp_key": warp_key or ''
}
cmd_args = [
'bash', Command.CONFIGURE_WARP.value,
options['all'],
options['popular_sites'],
options['domestic_sites'],
options['block_adult_sites'],
options['warp_option']
]
if options['warp_key']:
cmd_args.append(options['warp_key'])
run_cmd(cmd_args)
def warp_status() -> str | None:
"""Checks the status of WARP."""
return run_cmd(['bash', Command.STATUS_WARP.value])
def telegram(action: str, token: str, adminid: str):
"""
Manages the Telegram bot with start/stop actions.
"""
if action == 'start':
if not token or not adminid:
raise InvalidInputError("Error: Both --token and --adminid are required for the start action.")
admin_ids = f'{adminid}'
run_cmd(['bash', Command.INSTALL_TELEGRAMBOT.value, 'start', token, admin_ids])
elif action == 'stop':
run_cmd(['bash', Command.INSTALL_TELEGRAMBOT.value, 'stop'])
def singbox(action: str, domain: str, port: int):
"""
Manages Singbox with start/stop actions.
"""
if action == 'start':
if not domain or not port:
raise InvalidInputError("Error: Both --domain and --port are required for the start action.")
run_cmd(['bash', Command.INSTALL_SINGBOX.value, 'start', domain, str(port)])
elif action == 'stop':
run_cmd(['bash', Command.INSTALL_SINGBOX.value, 'stop'])
def normalsub(action: str, domain: str, port: int):
"""
Manages Normalsub with start/stop actions.
"""
if action == 'start':
if not domain or not port:
raise InvalidInputError("Error: Both --domain and --port are required for the start action.")
run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'start', domain, str(port)])
elif action == 'stop':
run_cmd(['bash', Command.INSTALL_NORMALSUB.value, 'stop'])
# endregion
# endregion

View File

@ -33,8 +33,10 @@ generate_obfs() {
} }
if [[ $1 == "--remove" || $1 == "-r" ]]; then if [[ $1 == "--remove" || $1 == "-r" ]]; then
echo "Removing 'obfs' from config.json..."
remove_obfs remove_obfs
elif [[ $1 == "--generate" || $1 == "-g" ]]; then elif [[ $1 == "--generate" || $1 == "-g" ]]; then
echo "Generating 'obfs' in config.json..."
generate_obfs generate_obfs
else else
echo "Usage: $0 --remove|-r | --generate|-g" echo "Usage: $0 --remove|-r | --generate|-g"

View File

@ -27,8 +27,10 @@ function remove_masquerade() {
} }
if [[ "$1" == "1" ]]; then if [[ "$1" == "1" ]]; then
echo "Enabling 'masquerade' with URL: $2..."
enable_masquerade "$2" enable_masquerade "$2"
elif [[ "$1" == "2" ]]; then elif [[ "$1" == "2" ]]; then
echo "Removing 'masquerade' from config.json..."
remove_masquerade remove_masquerade
else else
echo "Usage: $0 {1|2} [domain]" echo "Usage: $0 {1|2} [domain]"