Files
Blitz-Proxy/core/scripts/telegrambot/utils/adduser.py
Whispering Wind 2d293c5b37 Refactor Telegram BOT
add utils folder
2024-12-15 00:28:55 +03:30

104 lines
4.9 KiB
Python

import qrcode
import io
import json
from telebot import types
from utils.command import *
from utils.common import create_main_markup
def create_cancel_markup(back_step=None):
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
if back_step:
markup.row(types.KeyboardButton("⬅️ Back"))
markup.row(types.KeyboardButton("❌ Cancel"))
return markup
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Add User')
def add_user(message):
msg = bot.reply_to(message, "Enter username:", reply_markup=create_cancel_markup())
bot.register_next_step_handler(msg, process_add_user_step1)
def process_add_user_step1(message):
if message.text == "❌ Cancel":
bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup())
return
username = message.text.strip()
if username == "":
bot.reply_to(message, "Username cannot be empty. Please enter a valid username.", reply_markup=create_cancel_markup())
bot.register_next_step_handler(message, process_add_user_step1)
return
command = f"python3 {CLI_PATH} list-users"
result = run_cli_command(command)
try:
users = json.loads(result)
existing_users = {user.lower() for user in users.keys()}
if username.lower() in existing_users:
bot.reply_to(message, f"Username '{username}' already exists. Please choose a different username:", reply_markup=create_cancel_markup())
bot.register_next_step_handler(message, process_add_user_step1)
return
except json.JSONDecodeError:
if "No such file or directory" in result or result.strip() == "":
bot.reply_to(message, "User list file does not exist. Adding the first user.", reply_markup=create_cancel_markup())
else:
bot.reply_to(message, "Error retrieving user list. Please try again later.")
bot.send_message(message.chat.id, "Returning to main menu...", reply_markup=create_main_markup())
return
msg = bot.reply_to(message, "Enter traffic limit (GB):", reply_markup=create_cancel_markup(back_step=process_add_user_step1))
bot.register_next_step_handler(msg, process_add_user_step2, username)
def process_add_user_step2(message, username):
if message.text == "❌ Cancel":
bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup())
return
if message.text == "⬅️ Back":
msg = bot.reply_to(message, "Enter username:", reply_markup=create_cancel_markup())
bot.register_next_step_handler(msg, process_add_user_step1)
return
try:
traffic_limit = int(message.text.strip())
msg = bot.reply_to(message, "Enter expiration days:", reply_markup=create_cancel_markup(back_step=process_add_user_step2))
bot.register_next_step_handler(msg, process_add_user_step3, username, traffic_limit)
except ValueError:
bot.reply_to(message, "Invalid traffic limit. Please enter a number:", reply_markup=create_cancel_markup(back_step=process_add_user_step1))
bot.register_next_step_handler(message, process_add_user_step2, username)
def process_add_user_step3(message, username, traffic_limit):
if message.text == "❌ Cancel":
bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup())
return
if message.text == "⬅️ Back":
msg = bot.reply_to(message, "Enter traffic limit (GB):", reply_markup=create_cancel_markup(back_step=process_add_user_step1))
bot.register_next_step_handler(msg, process_add_user_step2, username)
return
try:
expiration_days = int(message.text.strip())
lower_username = username.lower()
command = f"python3 {CLI_PATH} add-user -u {username} -t {traffic_limit} -e {expiration_days}"
result = run_cli_command(command)
bot.send_chat_action(message.chat.id, 'typing')
qr_command = f"python3 {CLI_PATH} show-user-uri -u {lower_username} -ip 4"
qr_result = run_cli_command(qr_command).replace("IPv4:\n", "").strip()
if not qr_result:
bot.reply_to(message, "Failed to generate QR code.", reply_markup=create_main_markup())
return
qr_v4 = qrcode.make(qr_result)
bio_v4 = io.BytesIO()
qr_v4.save(bio_v4, 'PNG')
bio_v4.seek(0)
caption = f"{result}\n\n`{qr_result}`"
bot.send_photo(message.chat.id, photo=bio_v4, caption=caption, parse_mode="Markdown", reply_markup=create_main_markup())
except ValueError:
bot.reply_to(message, "Invalid expiration days. Please enter a number:", reply_markup=create_cancel_markup(back_step=process_add_user_step2))
bot.register_next_step_handler(message, process_add_user_step3, username, traffic_limit)