Merge pull request #316 from SeyedHashtag/original

Added note support for user creation and editing within the Telegram bot
This commit is contained in:
Whispering Wind
2025-11-05 12:40:54 +01:00
committed by GitHub
2 changed files with 297 additions and 215 deletions

View File

@ -16,6 +16,13 @@ def create_cancel_markup(back_step=None):
markup.row(types.KeyboardButton("❌ Cancel")) markup.row(types.KeyboardButton("❌ Cancel"))
return markup return markup
def create_cancel_markup_with_skip(back_step=None, username=None, traffic_limit=None):
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
if back_step:
markup.row(types.KeyboardButton("⬅️ Back"))
markup.row(types.KeyboardButton("⏭️ Skip"), types.KeyboardButton("❌ Cancel"))
return markup
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == ' Add User') @bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == ' Add User')
def add_user(message): def add_user(message):
msg = bot.reply_to(message, "Enter username (only letters, numbers, and underscores are allowed):", reply_markup=create_cancel_markup()) msg = bot.reply_to(message, "Enter username (only letters, numbers, and underscores are allowed):", reply_markup=create_cancel_markup())
@ -100,9 +107,39 @@ def process_add_user_step3(message, username, traffic_limit):
bot.register_next_step_handler(message, process_add_user_step3, username, traffic_limit) bot.register_next_step_handler(message, process_add_user_step3, username, traffic_limit)
return return
add_user_command = f"python3 {CLI_PATH} add-user -u \"{username}\" -t {traffic_limit} -e {expiration_days}" msg = bot.reply_to(message, "Enter note (optional, press Skip to continue):", reply_markup=create_cancel_markup_with_skip(back_step=process_add_user_step3, username=username, traffic_limit=traffic_limit))
add_user_feedback = run_cli_command(add_user_command).strip() bot.register_next_step_handler(msg, process_add_user_step4, username, traffic_limit, expiration_days)
except ValueError:
bot.reply_to(message, "Invalid expiration days. Please enter a number:", reply_markup=create_cancel_markup(back_step=process_add_user_step2))
bot.register_next_step_handler(message, process_add_user_step3, username, traffic_limit)
def process_add_user_step4(message, username, traffic_limit, expiration_days):
if message.text == "❌ Cancel":
bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup())
return
if message.text == "⬅️ Back":
msg = bot.reply_to(message, "Enter expiration days:", reply_markup=create_cancel_markup(back_step=process_add_user_step2))
bot.register_next_step_handler(msg, process_add_user_step3, username, traffic_limit)
return
try:
if message.text == "⏭️ Skip":
note = None
else:
note = message.text.strip()
if len(note) > 200:
bot.reply_to(message, "Note is too long (max 200 characters). Please enter a shorter note or press Skip:", reply_markup=create_cancel_markup_with_skip(back_step=process_add_user_step3, username=username, traffic_limit=traffic_limit))
bot.register_next_step_handler(message, process_add_user_step4, username, traffic_limit, expiration_days)
return
# Build command with or without note
if note is not None:
add_user_command = f"python3 {CLI_PATH} add-user -u \"{username}\" -t {traffic_limit} -e {expiration_days} -n \"{note}\""
else:
add_user_command = f"python3 {CLI_PATH} add-user -u \"{username}\" -t {traffic_limit} -e {expiration_days}"
add_user_feedback = run_cli_command(add_user_command).strip()
bot.send_chat_action(message.chat.id, 'typing') bot.send_chat_action(message.chat.id, 'typing')
uri_info_command = f"python3 {CLI_PATH} show-user-uri -u \"{username}\" -ip 4 -n -s" uri_info_command = f"python3 {CLI_PATH} show-user-uri -u \"{username}\" -ip 4 -n -s"

View File

@ -1,213 +1,258 @@
import qrcode import qrcode
import io import io
import json import json
from telebot import types from telebot import types
from utils.command import * from utils.command import *
from utils.common import * from utils.common import *
def escape_markdown(text): def escape_markdown(text):
return str(text).replace('_', '\\_').replace('*', '\\*').replace('`', '\\`') return str(text).replace('_', '\\_').replace('*', '\\*').replace('`', '\\`')
@bot.callback_query_handler(func=lambda call: call.data == "cancel_show_user") @bot.callback_query_handler(func=lambda call: call.data == "cancel_show_user")
def handle_cancel_show_user(call): def handle_cancel_show_user(call):
bot.edit_message_text("Operation canceled.", chat_id=call.message.chat.id, message_id=call.message.message_id) bot.edit_message_text("Operation canceled.", chat_id=call.message.chat.id, message_id=call.message.message_id)
create_main_markup(call.message) create_main_markup(call.message)
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == '🔍 Show User') @bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == '🔍 Show User')
def show_user(message): def show_user(message):
markup = types.InlineKeyboardMarkup() markup = types.InlineKeyboardMarkup()
cancel_button = types.InlineKeyboardButton("❌ Cancel", callback_data="cancel_show_user") cancel_button = types.InlineKeyboardButton("❌ Cancel", callback_data="cancel_show_user")
markup.add(cancel_button) markup.add(cancel_button)
msg = bot.reply_to(message, "Enter username:", reply_markup=markup) msg = bot.reply_to(message, "Enter username:", reply_markup=markup)
bot.register_next_step_handler(msg, process_show_user) bot.register_next_step_handler(msg, process_show_user)
def process_show_user(message): def process_show_user(message):
username_input = message.text.strip().lower() username_input = message.text.strip().lower()
bot.send_chat_action(message.chat.id, 'typing') bot.send_chat_action(message.chat.id, 'typing')
command = f"python3 {CLI_PATH} list-users" command = f"python3 {CLI_PATH} list-users"
result = run_cli_command(command) result = run_cli_command(command)
try: try:
users_list = json.loads(result) users_list = json.loads(result)
existing_users = {user['username'].lower(): user['username'] for user in users_list} existing_users = {user['username'].lower(): user['username'] for user in users_list}
if username_input not in existing_users: if username_input not in existing_users:
bot.reply_to(message, f"Username '{escape_markdown(message.text.strip())}' does not exist. Please enter a valid username.") bot.reply_to(message, f"Username '{escape_markdown(message.text.strip())}' does not exist. Please enter a valid username.")
return return
actual_username = existing_users[username_input] actual_username = existing_users[username_input]
except (json.JSONDecodeError, KeyError): except (json.JSONDecodeError, KeyError):
bot.reply_to(message, "Error retrieving or parsing user list. Please try again later.") bot.reply_to(message, "Error retrieving or parsing user list. Please try again later.")
return return
command = f"python3 {CLI_PATH} get-user -u \"{actual_username}\"" command = f"python3 {CLI_PATH} get-user -u \"{actual_username}\""
user_result = run_cli_command(command) user_result = run_cli_command(command)
try: try:
user_details = json.loads(user_result) user_details = json.loads(user_result)
upload_bytes = user_details.get('upload_bytes') upload_bytes = user_details.get('upload_bytes')
download_bytes = user_details.get('download_bytes') download_bytes = user_details.get('download_bytes')
status = user_details.get('status', 'Unknown') status = user_details.get('status', 'Unknown')
if upload_bytes is None or download_bytes is None: if upload_bytes is None or download_bytes is None:
traffic_message = "*Traffic Data:*\nUser not active or no traffic data available." traffic_message = "*Traffic Data:*\nUser not active or no traffic data available."
else: else:
upload_gb = upload_bytes / (1024 ** 3) upload_gb = upload_bytes / (1024 ** 3)
download_gb = download_bytes / (1024 ** 3) download_gb = download_bytes / (1024 ** 3)
totalusage = upload_gb + download_gb totalusage = upload_gb + download_gb
traffic_message = ( traffic_message = (
f"🔼 Upload: {upload_gb:.2f} GB\n" f"🔼 Upload: {upload_gb:.2f} GB\n"
f"🔽 Download: {download_gb:.2f} GB\n" f"🔽 Download: {download_gb:.2f} GB\n"
f"📊 Total Usage: {totalusage:.2f} GB\n" f"📊 Total Usage: {totalusage:.2f} GB\n"
f"🌐 Status: {status}" f"🌐 Status: {status}"
) )
except json.JSONDecodeError: except json.JSONDecodeError:
bot.reply_to(message, "Failed to parse user details. The command output may be malformed.") bot.reply_to(message, "Failed to parse user details. The command output may be malformed.")
return return
display_username = escape_markdown(actual_username) display_username = escape_markdown(actual_username)
formatted_details = ( note = user_details.get('note', '')
f"\n🆔 Name: {display_username}\n" note_display = f"📝 Note: {escape_markdown(note)}" if note else "📝 Note: None"
f"📊 Traffic Limit: {user_details.get('max_download_bytes', 0) / (1024 ** 3):.2f} GB\n"
f"📅 Days: {user_details.get('expiration_days', 'N/A')}\n" formatted_details = (
f"⏳ Creation: {user_details.get('account_creation_date', 'N/A')}\n" f"\n🆔 Name: {display_username}\n"
f"💡 Blocked: {user_details.get('blocked', 'N/A')}\n\n" f"📊 Traffic Limit: {user_details.get('max_download_bytes', 0) / (1024 ** 3):.2f} GB\n"
f"{traffic_message}" f"📅 Days: {user_details.get('expiration_days', 'N/A')}\n"
) f"⏳ Creation: {user_details.get('account_creation_date', 'N/A')}\n"
f"💡 Blocked: {user_details.get('blocked', 'N/A')}\n"
combined_command = f"python3 {CLI_PATH} show-user-uri -u \"{actual_username}\" -ip 4 -s -n" f"{note_display}\n\n"
combined_result = run_cli_command(combined_command) f"{traffic_message}"
)
uri_v4 = ""
normal_sub_link = "" combined_command = f"python3 {CLI_PATH} show-user-uri -u \"{actual_username}\" -ip 4 -s -n"
combined_result = run_cli_command(combined_command)
lines = combined_result.strip().split('\n')
for i, line in enumerate(lines): uri_v4 = ""
if line.strip() == "IPv4:": normal_sub_link = ""
if i + 1 < len(lines) and lines[i+1].strip().startswith("hy2://"):
uri_v4 = lines[i+1].strip() lines = combined_result.strip().split('\n')
elif line.strip() == "Normal-SUB Sublink:": for i, line in enumerate(lines):
if i + 1 < len(lines) and (lines[i+1].strip().startswith("http://") or lines[i+1].strip().startswith("https://")): if line.strip() == "IPv4:":
normal_sub_link = lines[i+1].strip() if i + 1 < len(lines) and lines[i+1].strip().startswith("hy2://"):
uri_v4 = lines[i+1].strip()
qr_link = normal_sub_link if normal_sub_link else uri_v4 elif line.strip() == "Normal-SUB Sublink:":
if not qr_link: if i + 1 < len(lines) and (lines[i+1].strip().startswith("http://") or lines[i+1].strip().startswith("https://")):
bot.reply_to(message, "No valid URI or Subscription link found for this user.") normal_sub_link = lines[i+1].strip()
return
qr_link = normal_sub_link if normal_sub_link else uri_v4
qr_img = qrcode.make(qr_link) if not qr_link:
bio = io.BytesIO() bot.reply_to(message, "No valid URI or Subscription link found for this user.")
qr_img.save(bio, 'PNG') return
bio.seek(0)
qr_img = qrcode.make(qr_link)
markup = types.InlineKeyboardMarkup(row_width=3) bio = io.BytesIO()
markup.add(types.InlineKeyboardButton("🔄 Reset User", callback_data=f"reset_user:{actual_username}"), qr_img.save(bio, 'PNG')
types.InlineKeyboardButton("🌐 IPv6-URI", callback_data=f"ipv6_uri:{actual_username}")) bio.seek(0)
markup.add(types.InlineKeyboardButton("✏️ Edit Username", callback_data=f"edit_username:{actual_username}"),
types.InlineKeyboardButton("📶 Edit Traffic", callback_data=f"edit_traffic:{actual_username}")) markup = types.InlineKeyboardMarkup(row_width=3)
markup.add(types.InlineKeyboardButton("📅 Edit Expiration", callback_data=f"edit_expiration:{actual_username}"), markup.add(types.InlineKeyboardButton("🔄 Reset User", callback_data=f"reset_user:{actual_username}"),
types.InlineKeyboardButton("🔑 Renew Password", callback_data=f"renew_password:{actual_username}")) types.InlineKeyboardButton("🌐 IPv6-URI", callback_data=f"ipv6_uri:{actual_username}"))
markup.add(types.InlineKeyboardButton("🕒 Renew Creation Date", callback_data=f"renew_creation:{actual_username}"), markup.add(types.InlineKeyboardButton("✏️ Edit Username", callback_data=f"edit_username:{actual_username}"),
types.InlineKeyboardButton("⛔ Block User", callback_data=f"block_user:{actual_username}")) types.InlineKeyboardButton("📶 Edit Traffic", callback_data=f"edit_traffic:{actual_username}"))
markup.add(types.InlineKeyboardButton("📅 Edit Expiration", callback_data=f"edit_expiration:{actual_username}"),
caption = formatted_details types.InlineKeyboardButton("🔑 Renew Password", callback_data=f"renew_password:{actual_username}"))
if uri_v4: markup.add(types.InlineKeyboardButton("🕒 Renew Creation Date", callback_data=f"renew_creation:{actual_username}"),
caption += f"\n\n*IPv4 URI:*\n`{uri_v4}`" types.InlineKeyboardButton("📝 Edit Note", callback_data=f"edit_note:{actual_username}"))
if normal_sub_link: markup.add(types.InlineKeyboardButton("⛔ Block User", callback_data=f"block_user:{actual_username}"))
caption += f"\n\n*Normal SUB:*\n`{normal_sub_link}`"
caption = formatted_details
bot.send_photo( if uri_v4:
message.chat.id, caption += f"\n\n*IPv4 URI:*\n`{uri_v4}`"
bio, if normal_sub_link:
caption=caption, caption += f"\n\n*Normal SUB:*\n`{normal_sub_link}`"
reply_markup=markup,
parse_mode="Markdown" bot.send_photo(
) message.chat.id,
bio,
@bot.callback_query_handler(func=lambda call: call.data.startswith('edit_') or call.data.startswith('renew_') or call.data.startswith('block_') or call.data.startswith('reset_') or call.data.startswith('ipv6_')) caption=caption,
def handle_edit_callback(call): reply_markup=markup,
action, username = call.data.split(':') parse_mode="Markdown"
display_username = escape_markdown(username) )
if action == 'edit_username': @bot.callback_query_handler(func=lambda call: call.data.startswith(('edit_', 'renew_', 'block_', 'reset_', 'ipv6_', 'set_new_note', 'clear_note')))
msg = bot.send_message(call.message.chat.id, f"Enter new username for {display_username}:") def handle_edit_callback(call):
bot.register_next_step_handler(msg, process_edit_username, username) action, username = call.data.split(':', 1)
elif action == 'edit_traffic': display_username = escape_markdown(username)
msg = bot.send_message(call.message.chat.id, f"Enter new traffic limit (GB) for {display_username}:")
bot.register_next_step_handler(msg, process_edit_traffic, username) if action == 'edit_username':
elif action == 'edit_expiration': msg = bot.send_message(call.message.chat.id, f"Enter new username for {display_username}:")
msg = bot.send_message(call.message.chat.id, f"Enter new expiration days for {display_username}:") bot.register_next_step_handler(msg, process_edit_username, username)
bot.register_next_step_handler(msg, process_edit_expiration, username) elif action == 'edit_traffic':
elif action == 'renew_password': msg = bot.send_message(call.message.chat.id, f"Enter new traffic limit (GB) for {display_username}:")
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -rp" bot.register_next_step_handler(msg, process_edit_traffic, username)
result = run_cli_command(command) elif action == 'edit_expiration':
bot.send_message(call.message.chat.id, result) msg = bot.send_message(call.message.chat.id, f"Enter new expiration days for {display_username}:")
elif action == 'renew_creation': bot.register_next_step_handler(msg, process_edit_expiration, username)
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -rc" elif action == 'edit_note':
result = run_cli_command(command) command = f"python3 {CLI_PATH} get-user -u \"{username}\""
bot.send_message(call.message.chat.id, result) user_result = run_cli_command(command)
elif action == 'block_user': current_note = ""
markup = types.InlineKeyboardMarkup() try:
markup.add(types.InlineKeyboardButton("True", callback_data=f"confirm_block:{username}:true"), user_details = json.loads(user_result)
types.InlineKeyboardButton("False", callback_data=f"confirm_block:{username}:false")) current_note = user_details.get('note', '')
bot.send_message(call.message.chat.id, f"Set block status for {display_username}:", reply_markup=markup) except json.JSONDecodeError:
elif action == 'reset_user': pass
command = f"python3 {CLI_PATH} reset-user -u \"{username}\""
result = run_cli_command(command) markup = types.InlineKeyboardMarkup()
bot.send_message(call.message.chat.id, result) markup.add(types.InlineKeyboardButton("✏️ Set New Note", callback_data=f"set_new_note:{username}"))
elif action == 'ipv6_uri': markup.add(types.InlineKeyboardButton("🗑️ Clear Note", callback_data=f"clear_note:{username}"))
command = f"python3 {CLI_PATH} show-user-uri -u \"{username}\" -ip 6"
result = run_cli_command(command) message_text = f"Select an action for the note of {display_username}:"
if "Error" in result or "Invalid" in result: if current_note:
bot.send_message(call.message.chat.id, result) message_text = f"Current note for {display_username}: `{escape_markdown(current_note)}`\n\nSelect an action:"
return
bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id)
uri_v6 = result.split('\n')[-1].strip() bot.send_message(call.message.chat.id, message_text, reply_markup=markup, parse_mode="Markdown")
qr_v6 = qrcode.make(uri_v6) elif action == 'set_new_note':
bio_v6 = io.BytesIO() msg = bot.edit_message_text(f"Enter new note for {display_username}:", call.message.chat.id, call.message.message_id)
qr_v6.save(bio_v6, 'PNG') bot.register_next_step_handler(msg, process_edit_note, username)
bio_v6.seek(0) elif action == 'clear_note':
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" --note \"\""
bot.send_photo( result = run_cli_command(command)
call.message.chat.id, bot.edit_message_text(result, chat_id=call.message.chat.id, message_id=call.message.message_id)
bio_v6, elif action == 'renew_password':
caption=f"**IPv6 URI for {display_username}:**\n\n`{uri_v6}`", command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -rp"
parse_mode="Markdown" result = run_cli_command(command)
) bot.send_message(call.message.chat.id, result)
elif action == 'renew_creation':
@bot.callback_query_handler(func=lambda call: call.data.startswith('confirm_block:')) command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -rc"
def handle_block_confirmation(call): result = run_cli_command(command)
_, username, block_status = call.data.split(':') bot.send_message(call.message.chat.id, result)
flag = '-b' if block_status == 'true' else '--unblocked' elif action == 'block_user':
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" {flag}" markup = types.InlineKeyboardMarkup()
result = run_cli_command(command) markup.add(types.InlineKeyboardButton("True", callback_data=f"confirm_block:{username}:true"),
bot.send_message(call.message.chat.id, result) types.InlineKeyboardButton("False", callback_data=f"confirm_block:{username}:false"))
bot.send_message(call.message.chat.id, f"Set block status for {display_username}:", reply_markup=markup)
def process_edit_username(message, username): elif action == 'reset_user':
new_username = message.text.strip() command = f"python3 {CLI_PATH} reset-user -u \"{username}\""
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -nu \"{new_username}\"" result = run_cli_command(command)
result = run_cli_command(command) bot.send_message(call.message.chat.id, result)
bot.reply_to(message, result) elif action == 'ipv6_uri':
command = f"python3 {CLI_PATH} show-user-uri -u \"{username}\" -ip 6"
def process_edit_traffic(message, username): result = run_cli_command(command)
try: if "Error" in result or "Invalid" in result:
new_traffic_limit = int(message.text.strip()) bot.send_message(call.message.chat.id, result)
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -nt {new_traffic_limit}" return
result = run_cli_command(command)
bot.reply_to(message, result) uri_v6 = result.split('\n')[-1].strip()
except ValueError: qr_v6 = qrcode.make(uri_v6)
bot.reply_to(message, "Invalid traffic limit. Please enter a number.") bio_v6 = io.BytesIO()
qr_v6.save(bio_v6, 'PNG')
def process_edit_expiration(message, username): bio_v6.seek(0)
try:
new_expiration_days = int(message.text.strip()) bot.send_photo(
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -ne {new_expiration_days}" call.message.chat.id,
result = run_cli_command(command) bio_v6,
bot.reply_to(message, result) caption=f"**IPv6 URI for {display_username}:**\n\n`{uri_v6}`",
except ValueError: parse_mode="Markdown"
bot.reply_to(message, "Invalid expiration days. Please enter a number.") )
@bot.callback_query_handler(func=lambda call: call.data.startswith('confirm_block:'))
def handle_block_confirmation(call):
_, username, block_status = call.data.split(':', 2)
flag = '-b' if block_status == 'true' else '--unblocked'
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" {flag}"
result = run_cli_command(command)
bot.edit_message_text(result, call.message.chat.id, call.message.message_id)
def process_edit_username(message, username):
new_username = message.text.strip()
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -nu \"{new_username}\""
result = run_cli_command(command)
bot.reply_to(message, result)
def process_edit_traffic(message, username):
try:
new_traffic_limit = int(message.text.strip())
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -nt {new_traffic_limit}"
result = run_cli_command(command)
bot.reply_to(message, result)
except ValueError:
bot.reply_to(message, "Invalid traffic limit. Please enter a number.")
def process_edit_expiration(message, username):
try:
new_expiration_days = int(message.text.strip())
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" -ne {new_expiration_days}"
result = run_cli_command(command)
bot.reply_to(message, result)
except ValueError:
bot.reply_to(message, "Invalid expiration days. Please enter a number.")
def process_edit_note(message, username):
note_input = message.text.strip()
if len(note_input) > 200:
bot.reply_to(message, "Note is too long (max 200 characters). Please enter a shorter note:")
bot.register_next_step_handler(message, process_edit_note, username)
return
command = f"python3 {CLI_PATH} edit-user -u \"{username}\" --note \"{note_input}\""
result = run_cli_command(command)
bot.reply_to(message, result)