From bd0124446d62e575c37a05c157e327c926a75a33 Mon Sep 17 00:00:00 2001 From: Whispering Wind <151555003+ReturnFI@users.noreply.github.com> Date: Wed, 27 Aug 2025 16:36:46 +0330 Subject: [PATCH] fix(bot): handle escaped underscores in usernames --- core/scripts/telegrambot/utils/adduser.py | 26 ++++++++++++------ core/scripts/telegrambot/utils/edituser.py | 32 +++++++++++----------- core/scripts/telegrambot/utils/search.py | 2 +- 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/core/scripts/telegrambot/utils/adduser.py b/core/scripts/telegrambot/utils/adduser.py index 7298f2e..b81d3c4 100644 --- a/core/scripts/telegrambot/utils/adduser.py +++ b/core/scripts/telegrambot/utils/adduser.py @@ -1,10 +1,13 @@ import qrcode import io import json +import re from telebot import types from utils.command import * from utils.common import create_main_markup +def escape_markdown(text): + return str(text).replace('_', '\\_').replace('*', '\\*').replace('`', '\\`') def create_cancel_markup(back_step=None): markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) @@ -15,7 +18,7 @@ def create_cancel_markup(back_step=None): @bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Add User') def add_user(message): - msg = bot.reply_to(message, "Enter username:", reply_markup=create_cancel_markup()) + msg = bot.reply_to(message, "Enter username (only letters, numbers, and underscores are allowed):", reply_markup=create_cancel_markup()) bot.register_next_step_handler(msg, process_add_user_step1) def process_add_user_step1(message): @@ -24,6 +27,12 @@ def process_add_user_step1(message): return username = message.text.strip() + + if not re.match("^[a-zA-Z0-9_]*$", username): + bot.reply_to(message, "Invalid username. Only letters, numbers, and underscores are allowed. Please try again:", reply_markup=create_cancel_markup()) + bot.register_next_step_handler(message, process_add_user_step1) + return + if not username: bot.reply_to(message, "Username cannot be empty. Please enter a valid username:", reply_markup=create_cancel_markup()) bot.register_next_step_handler(message, process_add_user_step1) @@ -41,7 +50,7 @@ def process_add_user_step1(message): users_data = json.loads(result) existing_users = {user_key.lower() for user_key in users_data.keys()} if username.lower() in existing_users: - bot.reply_to(message, f"Username '{username}' already exists. Please choose a different username:", reply_markup=create_cancel_markup()) + bot.reply_to(message, f"Username '{escape_markdown(username)}' already exists. Please choose a different username:", reply_markup=create_cancel_markup()) bot.register_next_step_handler(message, process_add_user_step1) return except json.JSONDecodeError: @@ -59,7 +68,7 @@ def process_add_user_step2(message, username): bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup()) return if message.text == "ā¬…ļø Back": - msg = bot.reply_to(message, "Enter username:", reply_markup=create_cancel_markup()) + msg = bot.reply_to(message, "Enter username (only letters, numbers, and underscores are allowed):", reply_markup=create_cancel_markup()) bot.register_next_step_handler(msg, process_add_user_step1) return @@ -96,8 +105,7 @@ def process_add_user_step3(message, username, traffic_limit): bot.send_chat_action(message.chat.id, 'typing') - lower_username = username.lower() - uri_info_command = f"python3 {CLI_PATH} show-user-uri -u \"{lower_username}\" -ip 4 -n" + uri_info_command = f"python3 {CLI_PATH} show-user-uri -u \"{username}\" -ip 4 -n" uri_info_output = run_cli_command(uri_info_command) direct_uri = None @@ -121,18 +129,20 @@ def process_add_user_step3(message, username, traffic_limit): except (IndexError, AttributeError): pass - caption_text = f"{add_user_feedback}\n" + display_username = escape_markdown(username) + escaped_feedback = escape_markdown(add_user_feedback) + caption_text = f"{escaped_feedback}\n" link_to_generate_qr_for = None link_type_for_caption = "" if normal_sub_link: link_to_generate_qr_for = normal_sub_link link_type_for_caption = "Normal Subscription Link" - caption_text += f"\n{link_type_for_caption} for `{username}`:\n`{normal_sub_link}`" + caption_text += f"\n{link_type_for_caption} for `{display_username}`:\n`{normal_sub_link}`" elif direct_uri: link_to_generate_qr_for = direct_uri link_type_for_caption = "Hysteria2 IPv4 URI" - caption_text += f"\n{link_type_for_caption} for `{username}`:\n`{direct_uri}`" + caption_text += f"\n{link_type_for_caption} for `{display_username}`:\n`{direct_uri}`" if link_to_generate_qr_for: qr_img = qrcode.make(link_to_generate_qr_for) diff --git a/core/scripts/telegrambot/utils/edituser.py b/core/scripts/telegrambot/utils/edituser.py index 537084c..bc9eed4 100644 --- a/core/scripts/telegrambot/utils/edituser.py +++ b/core/scripts/telegrambot/utils/edituser.py @@ -1,5 +1,3 @@ -#show and edituser file - import qrcode import io import json @@ -8,6 +6,9 @@ from utils.command import * from utils.common import * +def escape_markdown(text): + return str(text).replace('_', '\\_').replace('*', '\\*').replace('`', '\\`') + @bot.callback_query_handler(func=lambda call: call.data == "cancel_show_user") def handle_cancel_show_user(call): bot.edit_message_text("Operation canceled.", chat_id=call.message.chat.id, message_id=call.message.message_id) @@ -33,7 +34,7 @@ def process_show_user(message): existing_users = {user.lower(): user for user in users.keys()} if username not in existing_users: - bot.reply_to(message, f"Username '{message.text.strip()}' does not exist. Please enter a valid username.") + bot.reply_to(message, f"Username '{escape_markdown(message.text.strip())}' does not exist. Please enter a valid username.") return actual_username = existing_users[username] @@ -54,8 +55,8 @@ def process_show_user(message): if upload_bytes is None or download_bytes is None: traffic_message = "**Traffic Data:**\nUser not active or no traffic data available." else: - upload_gb = upload_bytes / (1024 ** 3) # Convert bytes to GB - download_gb = download_bytes / (1024 ** 3) # Convert bytes to GB + upload_gb = upload_bytes / (1024 ** 3) + download_gb = download_bytes / (1024 ** 3) totalusage = upload_gb + download_gb traffic_message = ( @@ -68,8 +69,10 @@ def process_show_user(message): bot.reply_to(message, "Failed to parse JSON data. The command output may be malformed.") return + display_username = escape_markdown(actual_username) + formatted_details = ( - f"\nšŸ†” Name: {actual_username}\n" + f"\nšŸ†” Name: {display_username}\n" f"šŸ“Š Traffic Limit: {user_details['max_download_bytes'] / (1024 ** 3):.2f} GB\n" f"šŸ“… Days: {user_details['expiration_days']}\n" f"ā³ Creation: {user_details['account_creation_date']}\n" @@ -87,15 +90,12 @@ def process_show_user(message): result_lines = combined_result.strip().split('\n') uri_v4 = "" - # singbox_sublink = "" normal_sub_sublink = "" for line in result_lines: line = line.strip() if line.startswith("hy2://"): uri_v4 = line - # elif line.startswith("Singbox Sublink:"): - # singbox_sublink = result_lines[result_lines.index(line) + 1].strip() elif line.startswith("Normal-SUB Sublink:"): normal_sub_sublink = result_lines[result_lines.index(line) + 1].strip() @@ -119,8 +119,6 @@ def process_show_user(message): types.InlineKeyboardButton("Block User", callback_data=f"block_user:{actual_username}")) caption = f"{formatted_details}\n\n**IPv4 URI:**\n\n`{uri_v4}`" - # if singbox_sublink: - # caption += f"\n\n**SingBox SUB:**\n{singbox_sublink}" if normal_sub_sublink: caption += f"\n\n**Normal SUB:**\n{normal_sub_sublink}" @@ -135,14 +133,16 @@ def process_show_user(message): @bot.callback_query_handler(func=lambda call: call.data.startswith('edit_') or call.data.startswith('renew_') or call.data.startswith('block_') or call.data.startswith('reset_') or call.data.startswith('ipv6_')) def handle_edit_callback(call): action, username = call.data.split(':') + display_username = escape_markdown(username) + if action == 'edit_username': - msg = bot.send_message(call.message.chat.id, f"Enter new username for {username}:") + msg = bot.send_message(call.message.chat.id, f"Enter new username for {display_username}:") bot.register_next_step_handler(msg, process_edit_username, username) elif action == 'edit_traffic': - msg = bot.send_message(call.message.chat.id, f"Enter new traffic limit (GB) for {username}:") + msg = bot.send_message(call.message.chat.id, f"Enter new traffic limit (GB) for {display_username}:") bot.register_next_step_handler(msg, process_edit_traffic, username) elif action == 'edit_expiration': - msg = bot.send_message(call.message.chat.id, f"Enter new expiration days for {username}:") + msg = bot.send_message(call.message.chat.id, f"Enter new expiration days for {display_username}:") bot.register_next_step_handler(msg, process_edit_expiration, username) elif action == 'renew_password': command = f"python3 {CLI_PATH} edit-user -u {username} -rp" @@ -156,7 +156,7 @@ def handle_edit_callback(call): markup = types.InlineKeyboardMarkup() markup.add(types.InlineKeyboardButton("True", callback_data=f"confirm_block:{username}:true"), types.InlineKeyboardButton("False", callback_data=f"confirm_block:{username}:false")) - bot.send_message(call.message.chat.id, f"Set block status for {username}:", reply_markup=markup) + bot.send_message(call.message.chat.id, f"Set block status for {display_username}:", reply_markup=markup) elif action == 'reset_user': command = f"python3 {CLI_PATH} reset-user -u {username}" result = run_cli_command(command) @@ -177,7 +177,7 @@ def handle_edit_callback(call): bot.send_photo( call.message.chat.id, bio_v6, - caption=f"**IPv6 URI for {username}:**\n\n`{uri_v6}`", + caption=f"**IPv6 URI for {display_username}:**\n\n`{uri_v6}`", parse_mode="Markdown" ) diff --git a/core/scripts/telegrambot/utils/search.py b/core/scripts/telegrambot/utils/search.py index 31821e0..684c87d 100644 --- a/core/scripts/telegrambot/utils/search.py +++ b/core/scripts/telegrambot/utils/search.py @@ -11,7 +11,7 @@ def handle_inline_query(query): bot.answer_inline_query(query.id, results=[], switch_pm_text="Error retrieving users.", switch_pm_user_id=query.from_user.id) return - query_text = query.query.lower() + query_text = query.query.lower().replace('\\_', '_') results = [] if query_text == "block":