From 2d293c5b370530f9530f6b7d4caa1c85d68ff39a Mon Sep 17 00:00:00 2001 From: Whispering Wind <151555003+ReturnFI@users.noreply.github.com> Date: Sun, 15 Dec 2024 00:28:55 +0330 Subject: [PATCH] Refactor Telegram BOT add utils folder --- core/scripts/telegrambot/utils/__init__.py | 0 core/scripts/telegrambot/utils/adduser.py | 103 +++++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 core/scripts/telegrambot/utils/__init__.py create mode 100644 core/scripts/telegrambot/utils/adduser.py diff --git a/core/scripts/telegrambot/utils/__init__.py b/core/scripts/telegrambot/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/scripts/telegrambot/utils/adduser.py b/core/scripts/telegrambot/utils/adduser.py new file mode 100644 index 0000000..317c87f --- /dev/null +++ b/core/scripts/telegrambot/utils/adduser.py @@ -0,0 +1,103 @@ +import qrcode +import io +import json +from telebot import types +from utils.command import * +from utils.common import create_main_markup + + +def create_cancel_markup(back_step=None): + markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) + if back_step: + markup.row(types.KeyboardButton("⬅️ Back")) + markup.row(types.KeyboardButton("❌ Cancel")) + return markup + +@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Add User') +def add_user(message): + msg = bot.reply_to(message, "Enter username:", reply_markup=create_cancel_markup()) + bot.register_next_step_handler(msg, process_add_user_step1) + +def process_add_user_step1(message): + if message.text == "❌ Cancel": + bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup()) + return + + username = message.text.strip() + if username == "": + bot.reply_to(message, "Username cannot be empty. Please enter a valid username.", reply_markup=create_cancel_markup()) + bot.register_next_step_handler(message, process_add_user_step1) + return + + command = f"python3 {CLI_PATH} list-users" + result = run_cli_command(command) + + try: + users = json.loads(result) + existing_users = {user.lower() for user in users.keys()} + + if username.lower() in existing_users: + bot.reply_to(message, f"Username '{username}' already exists. Please choose a different username:", reply_markup=create_cancel_markup()) + bot.register_next_step_handler(message, process_add_user_step1) + return + except json.JSONDecodeError: + if "No such file or directory" in result or result.strip() == "": + bot.reply_to(message, "User list file does not exist. Adding the first user.", reply_markup=create_cancel_markup()) + else: + bot.reply_to(message, "Error retrieving user list. Please try again later.") + bot.send_message(message.chat.id, "Returning to main menu...", reply_markup=create_main_markup()) + return + + msg = bot.reply_to(message, "Enter traffic limit (GB):", reply_markup=create_cancel_markup(back_step=process_add_user_step1)) + bot.register_next_step_handler(msg, process_add_user_step2, username) + +def process_add_user_step2(message, username): + if message.text == "❌ Cancel": + bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup()) + return + if message.text == "⬅️ Back": + msg = bot.reply_to(message, "Enter username:", reply_markup=create_cancel_markup()) + bot.register_next_step_handler(msg, process_add_user_step1) + return + + try: + traffic_limit = int(message.text.strip()) + msg = bot.reply_to(message, "Enter expiration days:", reply_markup=create_cancel_markup(back_step=process_add_user_step2)) + bot.register_next_step_handler(msg, process_add_user_step3, username, traffic_limit) + except ValueError: + bot.reply_to(message, "Invalid traffic limit. Please enter a number:", reply_markup=create_cancel_markup(back_step=process_add_user_step1)) + bot.register_next_step_handler(message, process_add_user_step2, username) + +def process_add_user_step3(message, username, traffic_limit): + if message.text == "❌ Cancel": + bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup()) + return + if message.text == "⬅️ Back": + msg = bot.reply_to(message, "Enter traffic limit (GB):", reply_markup=create_cancel_markup(back_step=process_add_user_step1)) + bot.register_next_step_handler(msg, process_add_user_step2, username) + return + + try: + expiration_days = int(message.text.strip()) + lower_username = username.lower() + command = f"python3 {CLI_PATH} add-user -u {username} -t {traffic_limit} -e {expiration_days}" + result = run_cli_command(command) + + bot.send_chat_action(message.chat.id, 'typing') + qr_command = f"python3 {CLI_PATH} show-user-uri -u {lower_username} -ip 4" + qr_result = run_cli_command(qr_command).replace("IPv4:\n", "").strip() + + if not qr_result: + bot.reply_to(message, "Failed to generate QR code.", reply_markup=create_main_markup()) + return + + qr_v4 = qrcode.make(qr_result) + bio_v4 = io.BytesIO() + qr_v4.save(bio_v4, 'PNG') + bio_v4.seek(0) + caption = f"{result}\n\n`{qr_result}`" + bot.send_photo(message.chat.id, photo=bio_v4, caption=caption, parse_mode="Markdown", reply_markup=create_main_markup()) + + except ValueError: + bot.reply_to(message, "Invalid expiration days. Please enter a number:", reply_markup=create_cancel_markup(back_step=process_add_user_step2)) + bot.register_next_step_handler(message, process_add_user_step3, username, traffic_limit)