Files
Blitz-Proxy/core/cli.py
2024-07-20 21:24:17 +03:30

131 lines
4.6 KiB
Python

#!/usr/bin/env python3
import os
import subprocess
import click
from enum import StrEnum
import traffic
SCRIPT_DIR = 'scripts'
DEBUG = True
class Command(StrEnum):
'''Constais path to command's script'''
INSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR,'hysteria2' ,'install.sh')
UNINSTALL_HYSTERIA2 = os.path.join(SCRIPT_DIR,'hysteria2', 'uninstall.sh')
UPDATE_HYSTERIA2 = os.path.join(SCRIPT_DIR,'hysteria2', 'update.sh')
CHANGE_PORT_HYSTERIA2 = os.path.join(SCRIPT_DIR,'hysteria2' ,'change_port.sh')
ADD_USER = os.path.join(SCRIPT_DIR,'hysteria2' ,'add_user.sh')
EDIT_USER = os.path.join(SCRIPT_DIR,'hysteria2' ,'edit_user.sh')
REMOVE_USER = os.path.join(SCRIPT_DIR,'hysteria2' ,'remove_user.sh')
SHOW_USER_URI = os.path.join(SCRIPT_DIR,'hysteria2' ,'show_user_uri.sh')
TRAFFIC_STATUS = 'traffic.py' # won't be call directly (it's a python module)
LIST_USERS = '' # unknown for now
INSTALL_TCP_BRUTAL = os.path.join(SCRIPT_DIR,'tcp-brutal', 'install.sh')
INSTALL_WARP = os.path.join(SCRIPT_DIR,'warp', 'install.sh')
UNINSTALL_WARP = os.path.join(SCRIPT_DIR,'warp', 'uninstall.sh')
CONFIGURE_WARP = os.path.join(SCRIPT_DIR,'warp', 'configure.sh')
# region utils
def run_cmd(command:list[str]):
'''
Runs a command and returns the output.
Could raise subprocess.CalledProcessError
'''
result = subprocess.check_output(command, shell=True)
if DEBUG:
print(result.decode().strip())
def generate_password() -> str:
'''
Generates a random password using pwgen for user.
Could raise subprocess.CalledProcessError
'''
return subprocess.check_output(['pwgen', '-s', '32', '1'], shell=True).decode().strip()
# endregion
@click.group()
def cli():
pass
# region hysteria2 menu options
@cli.command('install-hysteria2')
def install_hysteria2():
run_cmd(['bash', Command.INSTALL_HYSTERIA2])
@cli.command('uninstall-hysteria2')
def uninstall_hysteria2():
run_cmd(['bash', Command.UNINSTALL_HYSTERIA2])
@cli.command('update-hysteria2')
def update_hysteria2():
run_cmd(['bash', Command.UPDATE_HYSTERIA2])
@cli.command('change-hysteria2-port')
@click.option('--port','-p', required=True, help='New port for Hysteria2',type=int)
def change_hysteria2_port(port:int):
run_cmd(['bash', Command.CHANGE_PORT_HYSTERIA2, str(port)])
@cli.command('add-user')
@click.option('--username','-u', required=True, help='Username for the new user',type=str)
@click.option('--traffic-limit','-t', required=True, help='Traffic limit for the new user in GB',type=float)
@click.option('--expiration-days','-e', required=True, help='Expiration days for the new user',type=int)
def add_user(username:str, traffic_limit:float, expiration_days:int):
run_cmd(['bash', Command.ADD_USER, username, str(traffic_limit), str(expiration_days)])
@cli.command('edit-user')
@click.option('--username','-u', required=True, help='Username for the user to edit',type=str)
@click.option('--traffic-limit','-t', required=True, help='Traffic limit for the new user in GB',type=float)
@click.option('--expiration-days','-e', required=True, help='Expiration days for the new user',type=int)
def edit_user(username:str, traffic_limit:float, expiration_days:int):
run_cmd(['bash', Command.EDIT_USER, username, str(traffic_limit), str(expiration_days)])
@cli.command('remove-user')
@click.option('--username','-u', required=True, help='Username for the user to remove',type=str)
def remove_user(username:str):
run_cmd(['bash', Command.REMOVE_USER, username])
@cli.command('show-user-uri')
@click.option('--username','-u', required=True, help='Username for the user to show the URI',type=str)
def show_user_uri(username:str):
run_cmd(['bash', Command.SHOW_USER_URI, username])
@cli.command('traffic-status')
def traffic_status():
traffic.traffic_status()
@cli.command('list-users')
def list_users():
pass
# endregion
# region advanced menu
@cli.command('install-tcp-brutal')
def install_tcp_brutal():
run_cmd(['bash', Command.INSTALL_TCP_BRUTAL])
@cli.command('install-warp')
def install_warp():
run_cmd(['bash', Command.INSTALL_WARP])
@cli.command('uninstall-warp')
def uninstall_warp():
run_cmd(['bash', Command.UNINSTALL_WARP])
@cli.command('configure-warp')
@click.option('--warp-mode','-m', required=True, help='Warp mode',type=click.Choice(['proxy','direct','reject']))
@click.option('--block-porn','-p', required=False, help='Block porn',type=bool)
def configure_warp(warp_mode:str, block_porn:bool):
run_cmd(['bash', Command.CONFIGURE_WARP, warp_mode, str(block_porn)])
# endregion
if __name__ == '__main__':
cli()