From 3aaf06fc108bb348c4e1bafc591c613c86591047 Mon Sep 17 00:00:00 2001 From: Whispering Wind <151555003+ReturnFI@users.noreply.github.com> Date: Sun, 15 Dec 2024 00:39:50 +0330 Subject: [PATCH] Refactor Telegram BOT Update main file --- core/scripts/telegrambot/tbot.py | 382 +------------------------------ 1 file changed, 8 insertions(+), 374 deletions(-) diff --git a/core/scripts/telegrambot/tbot.py b/core/scripts/telegrambot/tbot.py index 4c0899a..7c6f472 100644 --- a/core/scripts/telegrambot/tbot.py +++ b/core/scripts/telegrambot/tbot.py @@ -1,39 +1,12 @@ -import telebot -import subprocess -import qrcode -import io -import json -import os -import shlex -import re -from dotenv import load_dotenv from telebot import types - -load_dotenv() - -API_TOKEN = os.getenv('API_TOKEN') -ADMIN_USER_IDS = json.loads(os.getenv('ADMIN_USER_IDS')) -CLI_PATH = '/etc/hysteria/core/cli.py' -BACKUP_DIRECTORY = '/opt/hysbackup' -bot = telebot.TeleBot(API_TOKEN) - -def run_cli_command(command): - try: - args = shlex.split(command) - result = subprocess.check_output(args, stderr=subprocess.STDOUT) - return result.decode('utf-8').strip() - except subprocess.CalledProcessError as e: - return f'Error: {e.output.decode("utf-8")}' - -def create_main_markup(): - markup = types.ReplyKeyboardMarkup(resize_keyboard=True) - markup.row('Add User', 'Show User') - markup.row('Delete User', 'Server Info') - markup.row('Backup Server') - return markup - -def is_admin(user_id): - return user_id in ADMIN_USER_IDS +from utils.common import create_main_markup +from utils.adduser import * +from utils.backup import * +from utils.command import * +from utils.deleteuser import * +from utils.edituser import * +from utils.search import * +from utils.serverinfo import * @bot.message_handler(commands=['start']) def send_welcome(message): @@ -43,344 +16,5 @@ def send_welcome(message): else: bot.reply_to(message, "Unauthorized access. You do not have permission to use this bot.") -@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Add User') -def add_user(message): - msg = bot.reply_to(message, "Enter username:") - bot.register_next_step_handler(msg, process_add_user_step1) - -def process_add_user_step1(message): - username = message.text.strip() - if username == "": - bot.reply_to(message, "Username cannot be empty. Please enter a valid username.") - return - - command = f"python3 {CLI_PATH} list-users" - result = run_cli_command(command) - - try: - users = json.loads(result) - existing_users = {user.lower() for user in users.keys()} - - if username.lower() in existing_users: - bot.reply_to(message, f"Username '{username}' already exists. Please choose a different username.") - msg = bot.reply_to(message, "Enter a new username:") - bot.register_next_step_handler(msg, process_add_user_step1) - return - except json.JSONDecodeError: - if "No such file or directory" in result or result.strip() == "": - bot.reply_to(message, "User list file does not exist. Adding the first user.") - else: - bot.reply_to(message, "Error retrieving user list. Please try again later.") - return - - msg = bot.reply_to(message, "Enter traffic limit (GB):") - bot.register_next_step_handler(msg, process_add_user_step2, username) - -def process_add_user_step2(message, username): - try: - traffic_limit = int(message.text.strip()) - msg = bot.reply_to(message, "Enter expiration days:") - bot.register_next_step_handler(msg, process_add_user_step3, username, traffic_limit) - except ValueError: - bot.reply_to(message, "Invalid traffic limit. Please enter a number.") - -def process_add_user_step3(message, username, traffic_limit): - try: - expiration_days = int(message.text.strip()) - lower_username = username.lower() - command = f"python3 {CLI_PATH} add-user -u {username} -t {traffic_limit} -e {expiration_days}" - result = run_cli_command(command) - bot.send_chat_action(message.chat.id, 'typing') - qr_command = f"python3 {CLI_PATH} show-user-uri -u {lower_username} -ip 4" - qr_result = run_cli_command(qr_command).replace("IPv4:\n", "").strip() - - if not qr_result: - bot.reply_to(message, "Failed to generate QR code.") - return - - qr_v4 = qrcode.make(qr_result) - bio_v4 = io.BytesIO() - qr_v4.save(bio_v4, 'PNG') - bio_v4.seek(0) - caption = f"{result}\n\n`{qr_result}`" - bot.send_photo(message.chat.id, photo=bio_v4, caption=caption, parse_mode="Markdown") - - except ValueError: - bot.reply_to(message, "Invalid expiration days. Please enter a number.") - -@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Show User') -def show_user(message): - msg = bot.reply_to(message, "Enter username:") - bot.register_next_step_handler(msg, process_show_user) - -def process_show_user(message): - username = message.text.strip().lower() - bot.send_chat_action(message.chat.id, 'typing') - command = f"python3 {CLI_PATH} list-users" - result = run_cli_command(command) - - try: - users = json.loads(result) - existing_users = {user.lower(): user for user in users.keys()} - - if username not in existing_users: - bot.reply_to(message, f"Username '{message.text.strip()}' does not exist. Please enter a valid username.") - return - - actual_username = existing_users[username] - except json.JSONDecodeError: - bot.reply_to(message, "Error retrieving user list. Please try again later.") - return - - command = f"python3 {CLI_PATH} get-user -u {actual_username}" - user_result = run_cli_command(command) - - try: - user_details = json.loads(user_result) - - upload_bytes = user_details.get('upload_bytes') - download_bytes = user_details.get('download_bytes') - status = user_details.get('status', 'Unknown') - - if upload_bytes is None or download_bytes is None: - traffic_message = "**Traffic Data:**\nUser not active or no traffic data available." - else: - upload_gb = upload_bytes / (1024 ** 3) # Convert bytes to GB - download_gb = download_bytes / (1024 ** 3) # Convert bytes to GB - - traffic_message = ( - f"**Traffic Data:**\n" - f"Upload: {upload_gb:.2f} GB\n" - f"Download: {download_gb:.2f} GB\n" - f"Status: {status}" - ) - except json.JSONDecodeError: - bot.reply_to(message, "Failed to parse JSON data. The command output may be malformed.") - return - - formatted_details = ( - f"**User Details:**\n\n" - f"Name: {actual_username}\n" - f"Traffic Limit: {user_details['max_download_bytes'] / (1024 ** 3):.2f} GB\n" - f"Days: {user_details['expiration_days']}\n" - f"Account Creation: {user_details['account_creation_date']}\n" - f"Blocked: {user_details['blocked']}\n\n" - f"{traffic_message}" - ) - - combined_command = f"python3 {CLI_PATH} show-user-uri -u {actual_username} -ip 4 -s -n" - combined_result = run_cli_command(combined_command) - - if "Error" in combined_result or "Invalid" in combined_result: - bot.reply_to(message, combined_result) - return - - result_lines = combined_result.strip().split('\n') - - uri_v4 = "" - singbox_sublink = "" - normal_sub_sublink = "" - - for line in result_lines: - line = line.strip() - if line.startswith("hy2://"): - uri_v4 = line - elif line.startswith("Singbox Sublink:"): - singbox_sublink = result_lines[result_lines.index(line) + 1].strip() - elif line.startswith("Normal-SUB Sublink:"): - normal_sub_sublink = result_lines[result_lines.index(line) + 1].strip() - - if not uri_v4: - bot.reply_to(message, "No valid URI found.") - return - - qr_v4 = qrcode.make(uri_v4) - bio_v4 = io.BytesIO() - qr_v4.save(bio_v4, 'PNG') - bio_v4.seek(0) - - markup = types.InlineKeyboardMarkup(row_width=3) - markup.add(types.InlineKeyboardButton("Reset User", callback_data=f"reset_user:{actual_username}"), - types.InlineKeyboardButton("IPv6-URI", callback_data=f"ipv6_uri:{actual_username}")) - markup.add(types.InlineKeyboardButton("Edit Username", callback_data=f"edit_username:{actual_username}"), - types.InlineKeyboardButton("Edit Traffic Limit", callback_data=f"edit_traffic:{actual_username}")) - markup.add(types.InlineKeyboardButton("Edit Expiration Days", callback_data=f"edit_expiration:{actual_username}"), - types.InlineKeyboardButton("Renew Password", callback_data=f"renew_password:{actual_username}")) - markup.add(types.InlineKeyboardButton("Renew Creation Date", callback_data=f"renew_creation:{actual_username}"), - types.InlineKeyboardButton("Block User", callback_data=f"block_user:{actual_username}")) - - caption = f"{formatted_details}\n\n**IPv4 URI:**\n\n`{uri_v4}`" - if singbox_sublink: - caption += f"\n\n**SingBox SUB:**\n{singbox_sublink}" - if normal_sub_sublink: - caption += f"\n\n**Normal SUB:**\n{normal_sub_sublink}" - - bot.send_photo( - message.chat.id, - bio_v4, - caption=caption, - reply_markup=markup, - parse_mode="Markdown" - ) - - -@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Server Info') -def server_info(message): - command = f"python3 {CLI_PATH} server-info" - result = run_cli_command(command) - bot.send_chat_action(message.chat.id, 'typing') - bot.reply_to(message, result) - -@bot.callback_query_handler(func=lambda call: call.data.startswith('edit_') or call.data.startswith('renew_') or call.data.startswith('block_') or call.data.startswith('reset_') or call.data.startswith('ipv6_')) -def handle_edit_callback(call): - action, username = call.data.split(':') - if action == 'edit_username': - msg = bot.send_message(call.message.chat.id, f"Enter new username for {username}:") - bot.register_next_step_handler(msg, process_edit_username, username) - elif action == 'edit_traffic': - msg = bot.send_message(call.message.chat.id, f"Enter new traffic limit (GB) for {username}:") - bot.register_next_step_handler(msg, process_edit_traffic, username) - elif action == 'edit_expiration': - msg = bot.send_message(call.message.chat.id, f"Enter new expiration days for {username}:") - bot.register_next_step_handler(msg, process_edit_expiration, username) - elif action == 'renew_password': - command = f"python3 {CLI_PATH} edit-user -u {username} -rp" - result = run_cli_command(command) - bot.send_message(call.message.chat.id, result) - elif action == 'renew_creation': - command = f"python3 {CLI_PATH} edit-user -u {username} -rc" - result = run_cli_command(command) - bot.send_message(call.message.chat.id, result) - elif action == 'block_user': - markup = types.InlineKeyboardMarkup() - markup.add(types.InlineKeyboardButton("True", callback_data=f"confirm_block:{username}:true"), - types.InlineKeyboardButton("False", callback_data=f"confirm_block:{username}:false")) - bot.send_message(call.message.chat.id, f"Set block status for {username}:", reply_markup=markup) - elif action == 'reset_user': - command = f"python3 {CLI_PATH} reset-user -u {username}" - result = run_cli_command(command) - bot.send_message(call.message.chat.id, result) - elif action == 'ipv6_uri': - command = f"python3 {CLI_PATH} show-user-uri -u {username} -ip 6" - result = run_cli_command(command) - if "Error" in result or "Invalid" in result: - bot.send_message(call.message.chat.id, result) - return - - uri_v6 = result.split('\n')[-1].strip() - qr_v6 = qrcode.make(uri_v6) - bio_v6 = io.BytesIO() - qr_v6.save(bio_v6, 'PNG') - bio_v6.seek(0) - - bot.send_photo( - call.message.chat.id, - bio_v6, - caption=f"**IPv6 URI for {username}:**\n\n`{uri_v6}`", - parse_mode="Markdown" - ) - -@bot.callback_query_handler(func=lambda call: call.data.startswith('confirm_block:')) -def handle_block_confirmation(call): - _, username, block_status = call.data.split(':') - command = f"python3 {CLI_PATH} edit-user -u {username} {'-b' if block_status == 'true' else ''}" - result = run_cli_command(command) - bot.send_message(call.message.chat.id, result) - -def process_edit_username(message, username): - new_username = message.text.strip() - command = f"python3 {CLI_PATH} edit-user -u {username} -nu {new_username}" - result = run_cli_command(command) - bot.reply_to(message, result) - -def process_edit_traffic(message, username): - try: - new_traffic_limit = int(message.text.strip()) - command = f"python3 {CLI_PATH} edit-user -u {username} -nt {new_traffic_limit}" - result = run_cli_command(command) - bot.reply_to(message, result) - except ValueError: - bot.reply_to(message, "Invalid traffic limit. Please enter a number.") - -def process_edit_expiration(message, username): - try: - new_expiration_days = int(message.text.strip()) - command = f"python3 {CLI_PATH} edit-user -u {username} -ne {new_expiration_days}" - result = run_cli_command(command) - bot.reply_to(message, result) - except ValueError: - bot.reply_to(message, "Invalid expiration days. Please enter a number.") - -@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Delete User') -def delete_user(message): - msg = bot.reply_to(message, "Enter username:") - bot.register_next_step_handler(msg, process_delete_user) - -def process_delete_user(message): - username = message.text.strip().lower() - command = f"python3 {CLI_PATH} remove-user -u {username}" - result = run_cli_command(command) - bot.reply_to(message, result) - -@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Backup Server') -def backup_server(message): - bot.reply_to(message, "Starting backup. This may take a few moments...") - bot.send_chat_action(message.chat.id, 'typing') - - backup_command = f"python3 {CLI_PATH} backup-hysteria" - result = run_cli_command(backup_command) - - if "Error" in result: - bot.reply_to(message, f"Backup failed: {result}") - else: - bot.reply_to(message, "Backup completed successfully!") - - - try: - files = [f for f in os.listdir(BACKUP_DIRECTORY) if f.endswith('.zip')] - files.sort(key=lambda x: os.path.getctime(os.path.join(BACKUP_DIRECTORY, x)), reverse=True) - latest_backup_file = files[0] if files else None - except Exception as e: - bot.reply_to(message, f"Failed to locate the backup file: {str(e)}") - return - - if latest_backup_file: - backup_file_path = os.path.join(BACKUP_DIRECTORY, latest_backup_file) - with open(backup_file_path, 'rb') as f: - bot.send_document(message.chat.id, f, caption=f"Backup completed: {latest_backup_file}") - else: - bot.reply_to(message, "No backup file found after the backup process.") - -@bot.inline_handler(lambda query: is_admin(query.from_user.id)) -def handle_inline_query(query): - command = f"python3 {CLI_PATH} list-users" - result = run_cli_command(command) - try: - users = json.loads(result) - except json.JSONDecodeError: - bot.answer_inline_query(query.id, results=[], switch_pm_text="Error retrieving users.", switch_pm_user_id=query.from_user.id) - return - - query_text = query.query.lower() - results = [] - for username, details in users.items(): - if query_text in username.lower(): - title = f"{username}" - description = f"Traffic Limit: {details['max_download_bytes'] / (1024 ** 3):.2f} GB, Expiration Days: {details['expiration_days']}" - results.append(types.InlineQueryResultArticle( - id=username, - title=title, - description=description, - input_message_content=types.InputTextMessageContent( - message_text=f"Name: {username}\n" - f"Traffic limit: {details['max_download_bytes'] / (1024 ** 3):.2f} GB\n" - f"Days: {details['expiration_days']}\n" - f"Account Creation: {details['account_creation_date']}\n" - f"Blocked: {details['blocked']}" - ) - )) - - bot.answer_inline_query(query.id, results, cache_time=0) - if __name__ == '__main__': bot.polling(none_stop=True)