Merge pull request #55 from ReturnFI/dev

Refactored Telegram Bot
This commit is contained in:
Whispering Wind
2024-12-15 12:02:24 +03:30
committed by GitHub
15 changed files with 486 additions and 384 deletions

5
changelog Normal file
View File

@ -0,0 +1,5 @@
1. Refactored Telegram Bot
2. Fixed some bugs
3. Added changelog file
4. Better perform with emojis
5. Update display the main menu

View File

@ -43,12 +43,12 @@ if [ "$online_user_count" == "null" ] || [ "$online_user_count" == "0" ]; then
online_user_count=0
fi
echo "CPU Usage: $cpu_usage"
echo "Total RAM: ${total_ram}MB"
echo "Used RAM: ${used_ram}MB"
echo "Online Users: $online_user_count"
echo "📈 CPU Usage: $cpu_usage"
echo "📋 Total RAM: ${total_ram}MB"
echo "💻 Used RAM: ${used_ram}MB"
echo "👥 Online Users: $online_user_count"
echo
echo "Total Traffic: "
echo "🚦Total Traffic: "
if [ -f "$USERS_FILE" ]; then
total_upload=0
@ -64,7 +64,7 @@ if [ -f "$USERS_FILE" ]; then
total_upload_human=$(convert_bytes $total_upload)
total_download_human=$(convert_bytes $total_download)
echo "${total_upload_human} uploaded"
echo "🔼${total_upload_human} uploaded"
echo "${total_download_human} downloaded"
echo "🔽${total_download_human} downloaded"
fi

View File

@ -1,39 +1,12 @@
import telebot
import subprocess
import qrcode
import io
import json
import os
import shlex
import re
from dotenv import load_dotenv
from telebot import types
load_dotenv()
API_TOKEN = os.getenv('API_TOKEN')
ADMIN_USER_IDS = json.loads(os.getenv('ADMIN_USER_IDS'))
CLI_PATH = '/etc/hysteria/core/cli.py'
BACKUP_DIRECTORY = '/opt/hysbackup'
bot = telebot.TeleBot(API_TOKEN)
def run_cli_command(command):
try:
args = shlex.split(command)
result = subprocess.check_output(args, stderr=subprocess.STDOUT)
return result.decode('utf-8').strip()
except subprocess.CalledProcessError as e:
return f'Error: {e.output.decode("utf-8")}'
def create_main_markup():
markup = types.ReplyKeyboardMarkup(resize_keyboard=True)
markup.row('Add User', 'Show User')
markup.row('Delete User', 'Server Info')
markup.row('Backup Server')
return markup
def is_admin(user_id):
return user_id in ADMIN_USER_IDS
from utils.common import create_main_markup
from utils.adduser import *
from utils.backup import *
from utils.command import *
from utils.deleteuser import *
from utils.edituser import *
from utils.search import *
from utils.serverinfo import *
@bot.message_handler(commands=['start'])
def send_welcome(message):
@ -43,344 +16,5 @@ def send_welcome(message):
else:
bot.reply_to(message, "Unauthorized access. You do not have permission to use this bot.")
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Add User')
def add_user(message):
msg = bot.reply_to(message, "Enter username:")
bot.register_next_step_handler(msg, process_add_user_step1)
def process_add_user_step1(message):
username = message.text.strip()
if username == "":
bot.reply_to(message, "Username cannot be empty. Please enter a valid username.")
return
command = f"python3 {CLI_PATH} list-users"
result = run_cli_command(command)
try:
users = json.loads(result)
existing_users = {user.lower() for user in users.keys()}
if username.lower() in existing_users:
bot.reply_to(message, f"Username '{username}' already exists. Please choose a different username.")
msg = bot.reply_to(message, "Enter a new username:")
bot.register_next_step_handler(msg, process_add_user_step1)
return
except json.JSONDecodeError:
if "No such file or directory" in result or result.strip() == "":
bot.reply_to(message, "User list file does not exist. Adding the first user.")
else:
bot.reply_to(message, "Error retrieving user list. Please try again later.")
return
msg = bot.reply_to(message, "Enter traffic limit (GB):")
bot.register_next_step_handler(msg, process_add_user_step2, username)
def process_add_user_step2(message, username):
try:
traffic_limit = int(message.text.strip())
msg = bot.reply_to(message, "Enter expiration days:")
bot.register_next_step_handler(msg, process_add_user_step3, username, traffic_limit)
except ValueError:
bot.reply_to(message, "Invalid traffic limit. Please enter a number.")
def process_add_user_step3(message, username, traffic_limit):
try:
expiration_days = int(message.text.strip())
lower_username = username.lower()
command = f"python3 {CLI_PATH} add-user -u {username} -t {traffic_limit} -e {expiration_days}"
result = run_cli_command(command)
bot.send_chat_action(message.chat.id, 'typing')
qr_command = f"python3 {CLI_PATH} show-user-uri -u {lower_username} -ip 4"
qr_result = run_cli_command(qr_command).replace("IPv4:\n", "").strip()
if not qr_result:
bot.reply_to(message, "Failed to generate QR code.")
return
qr_v4 = qrcode.make(qr_result)
bio_v4 = io.BytesIO()
qr_v4.save(bio_v4, 'PNG')
bio_v4.seek(0)
caption = f"{result}\n\n`{qr_result}`"
bot.send_photo(message.chat.id, photo=bio_v4, caption=caption, parse_mode="Markdown")
except ValueError:
bot.reply_to(message, "Invalid expiration days. Please enter a number.")
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Show User')
def show_user(message):
msg = bot.reply_to(message, "Enter username:")
bot.register_next_step_handler(msg, process_show_user)
def process_show_user(message):
username = message.text.strip().lower()
bot.send_chat_action(message.chat.id, 'typing')
command = f"python3 {CLI_PATH} list-users"
result = run_cli_command(command)
try:
users = json.loads(result)
existing_users = {user.lower(): user for user in users.keys()}
if username not in existing_users:
bot.reply_to(message, f"Username '{message.text.strip()}' does not exist. Please enter a valid username.")
return
actual_username = existing_users[username]
except json.JSONDecodeError:
bot.reply_to(message, "Error retrieving user list. Please try again later.")
return
command = f"python3 {CLI_PATH} get-user -u {actual_username}"
user_result = run_cli_command(command)
try:
user_details = json.loads(user_result)
upload_bytes = user_details.get('upload_bytes')
download_bytes = user_details.get('download_bytes')
status = user_details.get('status', 'Unknown')
if upload_bytes is None or download_bytes is None:
traffic_message = "**Traffic Data:**\nUser not active or no traffic data available."
else:
upload_gb = upload_bytes / (1024 ** 3) # Convert bytes to GB
download_gb = download_bytes / (1024 ** 3) # Convert bytes to GB
traffic_message = (
f"**Traffic Data:**\n"
f"Upload: {upload_gb:.2f} GB\n"
f"Download: {download_gb:.2f} GB\n"
f"Status: {status}"
)
except json.JSONDecodeError:
bot.reply_to(message, "Failed to parse JSON data. The command output may be malformed.")
return
formatted_details = (
f"**User Details:**\n\n"
f"Name: {actual_username}\n"
f"Traffic Limit: {user_details['max_download_bytes'] / (1024 ** 3):.2f} GB\n"
f"Days: {user_details['expiration_days']}\n"
f"Account Creation: {user_details['account_creation_date']}\n"
f"Blocked: {user_details['blocked']}\n\n"
f"{traffic_message}"
)
combined_command = f"python3 {CLI_PATH} show-user-uri -u {actual_username} -ip 4 -s -n"
combined_result = run_cli_command(combined_command)
if "Error" in combined_result or "Invalid" in combined_result:
bot.reply_to(message, combined_result)
return
result_lines = combined_result.strip().split('\n')
uri_v4 = ""
singbox_sublink = ""
normal_sub_sublink = ""
for line in result_lines:
line = line.strip()
if line.startswith("hy2://"):
uri_v4 = line
elif line.startswith("Singbox Sublink:"):
singbox_sublink = result_lines[result_lines.index(line) + 1].strip()
elif line.startswith("Normal-SUB Sublink:"):
normal_sub_sublink = result_lines[result_lines.index(line) + 1].strip()
if not uri_v4:
bot.reply_to(message, "No valid URI found.")
return
qr_v4 = qrcode.make(uri_v4)
bio_v4 = io.BytesIO()
qr_v4.save(bio_v4, 'PNG')
bio_v4.seek(0)
markup = types.InlineKeyboardMarkup(row_width=3)
markup.add(types.InlineKeyboardButton("Reset User", callback_data=f"reset_user:{actual_username}"),
types.InlineKeyboardButton("IPv6-URI", callback_data=f"ipv6_uri:{actual_username}"))
markup.add(types.InlineKeyboardButton("Edit Username", callback_data=f"edit_username:{actual_username}"),
types.InlineKeyboardButton("Edit Traffic Limit", callback_data=f"edit_traffic:{actual_username}"))
markup.add(types.InlineKeyboardButton("Edit Expiration Days", callback_data=f"edit_expiration:{actual_username}"),
types.InlineKeyboardButton("Renew Password", callback_data=f"renew_password:{actual_username}"))
markup.add(types.InlineKeyboardButton("Renew Creation Date", callback_data=f"renew_creation:{actual_username}"),
types.InlineKeyboardButton("Block User", callback_data=f"block_user:{actual_username}"))
caption = f"{formatted_details}\n\n**IPv4 URI:**\n\n`{uri_v4}`"
if singbox_sublink:
caption += f"\n\n**SingBox SUB:**\n{singbox_sublink}"
if normal_sub_sublink:
caption += f"\n\n**Normal SUB:**\n{normal_sub_sublink}"
bot.send_photo(
message.chat.id,
bio_v4,
caption=caption,
reply_markup=markup,
parse_mode="Markdown"
)
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Server Info')
def server_info(message):
command = f"python3 {CLI_PATH} server-info"
result = run_cli_command(command)
bot.send_chat_action(message.chat.id, 'typing')
bot.reply_to(message, result)
@bot.callback_query_handler(func=lambda call: call.data.startswith('edit_') or call.data.startswith('renew_') or call.data.startswith('block_') or call.data.startswith('reset_') or call.data.startswith('ipv6_'))
def handle_edit_callback(call):
action, username = call.data.split(':')
if action == 'edit_username':
msg = bot.send_message(call.message.chat.id, f"Enter new username for {username}:")
bot.register_next_step_handler(msg, process_edit_username, username)
elif action == 'edit_traffic':
msg = bot.send_message(call.message.chat.id, f"Enter new traffic limit (GB) for {username}:")
bot.register_next_step_handler(msg, process_edit_traffic, username)
elif action == 'edit_expiration':
msg = bot.send_message(call.message.chat.id, f"Enter new expiration days for {username}:")
bot.register_next_step_handler(msg, process_edit_expiration, username)
elif action == 'renew_password':
command = f"python3 {CLI_PATH} edit-user -u {username} -rp"
result = run_cli_command(command)
bot.send_message(call.message.chat.id, result)
elif action == 'renew_creation':
command = f"python3 {CLI_PATH} edit-user -u {username} -rc"
result = run_cli_command(command)
bot.send_message(call.message.chat.id, result)
elif action == 'block_user':
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton("True", callback_data=f"confirm_block:{username}:true"),
types.InlineKeyboardButton("False", callback_data=f"confirm_block:{username}:false"))
bot.send_message(call.message.chat.id, f"Set block status for {username}:", reply_markup=markup)
elif action == 'reset_user':
command = f"python3 {CLI_PATH} reset-user -u {username}"
result = run_cli_command(command)
bot.send_message(call.message.chat.id, result)
elif action == 'ipv6_uri':
command = f"python3 {CLI_PATH} show-user-uri -u {username} -ip 6"
result = run_cli_command(command)
if "Error" in result or "Invalid" in result:
bot.send_message(call.message.chat.id, result)
return
uri_v6 = result.split('\n')[-1].strip()
qr_v6 = qrcode.make(uri_v6)
bio_v6 = io.BytesIO()
qr_v6.save(bio_v6, 'PNG')
bio_v6.seek(0)
bot.send_photo(
call.message.chat.id,
bio_v6,
caption=f"**IPv6 URI for {username}:**\n\n`{uri_v6}`",
parse_mode="Markdown"
)
@bot.callback_query_handler(func=lambda call: call.data.startswith('confirm_block:'))
def handle_block_confirmation(call):
_, username, block_status = call.data.split(':')
command = f"python3 {CLI_PATH} edit-user -u {username} {'-b' if block_status == 'true' else ''}"
result = run_cli_command(command)
bot.send_message(call.message.chat.id, result)
def process_edit_username(message, username):
new_username = message.text.strip()
command = f"python3 {CLI_PATH} edit-user -u {username} -nu {new_username}"
result = run_cli_command(command)
bot.reply_to(message, result)
def process_edit_traffic(message, username):
try:
new_traffic_limit = int(message.text.strip())
command = f"python3 {CLI_PATH} edit-user -u {username} -nt {new_traffic_limit}"
result = run_cli_command(command)
bot.reply_to(message, result)
except ValueError:
bot.reply_to(message, "Invalid traffic limit. Please enter a number.")
def process_edit_expiration(message, username):
try:
new_expiration_days = int(message.text.strip())
command = f"python3 {CLI_PATH} edit-user -u {username} -ne {new_expiration_days}"
result = run_cli_command(command)
bot.reply_to(message, result)
except ValueError:
bot.reply_to(message, "Invalid expiration days. Please enter a number.")
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Delete User')
def delete_user(message):
msg = bot.reply_to(message, "Enter username:")
bot.register_next_step_handler(msg, process_delete_user)
def process_delete_user(message):
username = message.text.strip().lower()
command = f"python3 {CLI_PATH} remove-user -u {username}"
result = run_cli_command(command)
bot.reply_to(message, result)
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Backup Server')
def backup_server(message):
bot.reply_to(message, "Starting backup. This may take a few moments...")
bot.send_chat_action(message.chat.id, 'typing')
backup_command = f"python3 {CLI_PATH} backup-hysteria"
result = run_cli_command(backup_command)
if "Error" in result:
bot.reply_to(message, f"Backup failed: {result}")
else:
bot.reply_to(message, "Backup completed successfully!")
try:
files = [f for f in os.listdir(BACKUP_DIRECTORY) if f.endswith('.zip')]
files.sort(key=lambda x: os.path.getctime(os.path.join(BACKUP_DIRECTORY, x)), reverse=True)
latest_backup_file = files[0] if files else None
except Exception as e:
bot.reply_to(message, f"Failed to locate the backup file: {str(e)}")
return
if latest_backup_file:
backup_file_path = os.path.join(BACKUP_DIRECTORY, latest_backup_file)
with open(backup_file_path, 'rb') as f:
bot.send_document(message.chat.id, f, caption=f"Backup completed: {latest_backup_file}")
else:
bot.reply_to(message, "No backup file found after the backup process.")
@bot.inline_handler(lambda query: is_admin(query.from_user.id))
def handle_inline_query(query):
command = f"python3 {CLI_PATH} list-users"
result = run_cli_command(command)
try:
users = json.loads(result)
except json.JSONDecodeError:
bot.answer_inline_query(query.id, results=[], switch_pm_text="Error retrieving users.", switch_pm_user_id=query.from_user.id)
return
query_text = query.query.lower()
results = []
for username, details in users.items():
if query_text in username.lower():
title = f"{username}"
description = f"Traffic Limit: {details['max_download_bytes'] / (1024 ** 3):.2f} GB, Expiration Days: {details['expiration_days']}"
results.append(types.InlineQueryResultArticle(
id=username,
title=title,
description=description,
input_message_content=types.InputTextMessageContent(
message_text=f"Name: {username}\n"
f"Traffic limit: {details['max_download_bytes'] / (1024 ** 3):.2f} GB\n"
f"Days: {details['expiration_days']}\n"
f"Account Creation: {details['account_creation_date']}\n"
f"Blocked: {details['blocked']}"
)
))
bot.answer_inline_query(query.id, results, cache_time=0)
if __name__ == '__main__':
bot.polling(none_stop=True)

View File

@ -0,0 +1,103 @@
import qrcode
import io
import json
from telebot import types
from utils.command import *
from utils.common import create_main_markup
def create_cancel_markup(back_step=None):
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
if back_step:
markup.row(types.KeyboardButton("⬅️ Back"))
markup.row(types.KeyboardButton("❌ Cancel"))
return markup
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Add User')
def add_user(message):
msg = bot.reply_to(message, "Enter username:", reply_markup=create_cancel_markup())
bot.register_next_step_handler(msg, process_add_user_step1)
def process_add_user_step1(message):
if message.text == "❌ Cancel":
bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup())
return
username = message.text.strip()
if username == "":
bot.reply_to(message, "Username cannot be empty. Please enter a valid username.", reply_markup=create_cancel_markup())
bot.register_next_step_handler(message, process_add_user_step1)
return
command = f"python3 {CLI_PATH} list-users"
result = run_cli_command(command)
try:
users = json.loads(result)
existing_users = {user.lower() for user in users.keys()}
if username.lower() in existing_users:
bot.reply_to(message, f"Username '{username}' already exists. Please choose a different username:", reply_markup=create_cancel_markup())
bot.register_next_step_handler(message, process_add_user_step1)
return
except json.JSONDecodeError:
if "No such file or directory" in result or result.strip() == "":
bot.reply_to(message, "User list file does not exist. Adding the first user.", reply_markup=create_cancel_markup())
else:
bot.reply_to(message, "Error retrieving user list. Please try again later.")
bot.send_message(message.chat.id, "Returning to main menu...", reply_markup=create_main_markup())
return
msg = bot.reply_to(message, "Enter traffic limit (GB):", reply_markup=create_cancel_markup(back_step=process_add_user_step1))
bot.register_next_step_handler(msg, process_add_user_step2, username)
def process_add_user_step2(message, username):
if message.text == "❌ Cancel":
bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup())
return
if message.text == "⬅️ Back":
msg = bot.reply_to(message, "Enter username:", reply_markup=create_cancel_markup())
bot.register_next_step_handler(msg, process_add_user_step1)
return
try:
traffic_limit = int(message.text.strip())
msg = bot.reply_to(message, "Enter expiration days:", reply_markup=create_cancel_markup(back_step=process_add_user_step2))
bot.register_next_step_handler(msg, process_add_user_step3, username, traffic_limit)
except ValueError:
bot.reply_to(message, "Invalid traffic limit. Please enter a number:", reply_markup=create_cancel_markup(back_step=process_add_user_step1))
bot.register_next_step_handler(message, process_add_user_step2, username)
def process_add_user_step3(message, username, traffic_limit):
if message.text == "❌ Cancel":
bot.reply_to(message, "Process canceled.", reply_markup=create_main_markup())
return
if message.text == "⬅️ Back":
msg = bot.reply_to(message, "Enter traffic limit (GB):", reply_markup=create_cancel_markup(back_step=process_add_user_step1))
bot.register_next_step_handler(msg, process_add_user_step2, username)
return
try:
expiration_days = int(message.text.strip())
lower_username = username.lower()
command = f"python3 {CLI_PATH} add-user -u {username} -t {traffic_limit} -e {expiration_days}"
result = run_cli_command(command)
bot.send_chat_action(message.chat.id, 'typing')
qr_command = f"python3 {CLI_PATH} show-user-uri -u {lower_username} -ip 4"
qr_result = run_cli_command(qr_command).replace("IPv4:\n", "").strip()
if not qr_result:
bot.reply_to(message, "Failed to generate QR code.", reply_markup=create_main_markup())
return
qr_v4 = qrcode.make(qr_result)
bio_v4 = io.BytesIO()
qr_v4.save(bio_v4, 'PNG')
bio_v4.seek(0)
caption = f"{result}\n\n`{qr_result}`"
bot.send_photo(message.chat.id, photo=bio_v4, caption=caption, parse_mode="Markdown", reply_markup=create_main_markup())
except ValueError:
bot.reply_to(message, "Invalid expiration days. Please enter a number:", reply_markup=create_cancel_markup(back_step=process_add_user_step2))
bot.register_next_step_handler(message, process_add_user_step3, username, traffic_limit)

View File

@ -0,0 +1,41 @@
import telebot
import subprocess
import qrcode
import io
import json
import os
import shlex
import re
from dotenv import load_dotenv
from telebot import types
from utils.command import *
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Backup Server')
def backup_server(message):
bot.reply_to(message, "Starting backup. This may take a few moments...")
bot.send_chat_action(message.chat.id, 'typing')
backup_command = f"python3 {CLI_PATH} backup-hysteria"
result = run_cli_command(backup_command)
if "Error" in result:
bot.reply_to(message, f"Backup failed: {result}")
else:
bot.reply_to(message, "Backup completed successfully!")
try:
files = [f for f in os.listdir(BACKUP_DIRECTORY) if f.endswith('.zip')]
files.sort(key=lambda x: os.path.getctime(os.path.join(BACKUP_DIRECTORY, x)), reverse=True)
latest_backup_file = files[0] if files else None
except Exception as e:
bot.reply_to(message, f"Failed to locate the backup file: {str(e)}")
return
if latest_backup_file:
backup_file_path = os.path.join(BACKUP_DIRECTORY, latest_backup_file)
with open(backup_file_path, 'rb') as f:
bot.send_document(message.chat.id, f, caption=f"Backup completed: {latest_backup_file}")
else:
bot.reply_to(message, "No backup file found after the backup process.")

View File

@ -0,0 +1,26 @@
import telebot
import subprocess
import json
import os
import shlex
from dotenv import load_dotenv
from telebot import types
load_dotenv()
API_TOKEN = os.getenv('API_TOKEN')
ADMIN_USER_IDS = json.loads(os.getenv('ADMIN_USER_IDS'))
CLI_PATH = '/etc/hysteria/core/cli.py'
BACKUP_DIRECTORY = '/opt/hysbackup'
bot = telebot.TeleBot(API_TOKEN)
def run_cli_command(command):
try:
args = shlex.split(command)
result = subprocess.check_output(args, stderr=subprocess.STDOUT)
return result.decode('utf-8').strip()
except subprocess.CalledProcessError as e:
return f'Error: {e.output.decode("utf-8")}'
def is_admin(user_id):
return user_id in ADMIN_USER_IDS

View File

@ -0,0 +1,8 @@
from telebot import types
def create_main_markup():
markup = types.ReplyKeyboardMarkup(resize_keyboard=True)
markup.row('Add User', 'Show User')
markup.row('Delete User', 'Server Info')
markup.row('Backup Server')
return markup

View File

@ -0,0 +1,25 @@
from dotenv import load_dotenv
from telebot import types
from utils.command import *
from utils.common import *
@bot.callback_query_handler(func=lambda call: call.data == "cancel_delete")
def handle_cancel_delete(call):
bot.edit_message_text("Operation canceled.", chat_id=call.message.chat.id, message_id=call.message.message_id)
create_main_markup(call.message)
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Delete User')
def delete_user(message):
markup = types.InlineKeyboardMarkup()
cancel_button = types.InlineKeyboardButton("❌ Cancel", callback_data="cancel_delete")
markup.add(cancel_button)
msg = bot.reply_to(message, "Enter username:", reply_markup=markup)
bot.register_next_step_handler(msg, process_delete_user)
def process_delete_user(message):
username = message.text.strip().lower()
command = f"python3 {CLI_PATH} remove-user -u {username}"
result = run_cli_command(command)
bot.reply_to(message, result)

View File

@ -0,0 +1,211 @@
#show and edituser file
import qrcode
import io
import json
from telebot import types
from utils.command import *
from utils.common import *
@bot.callback_query_handler(func=lambda call: call.data == "cancel_show_user")
def handle_cancel_show_user(call):
bot.edit_message_text("Operation canceled.", chat_id=call.message.chat.id, message_id=call.message.message_id)
create_main_markup(call.message)
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Show User')
def show_user(message):
markup = types.InlineKeyboardMarkup()
cancel_button = types.InlineKeyboardButton("❌ Cancel", callback_data="cancel_show_user")
markup.add(cancel_button)
msg = bot.reply_to(message, "Enter username:", reply_markup=markup)
bot.register_next_step_handler(msg, process_show_user)
def process_show_user(message):
username = message.text.strip().lower()
bot.send_chat_action(message.chat.id, 'typing')
command = f"python3 {CLI_PATH} list-users"
result = run_cli_command(command)
try:
users = json.loads(result)
existing_users = {user.lower(): user for user in users.keys()}
if username not in existing_users:
bot.reply_to(message, f"Username '{message.text.strip()}' does not exist. Please enter a valid username.")
return
actual_username = existing_users[username]
except json.JSONDecodeError:
bot.reply_to(message, "Error retrieving user list. Please try again later.")
return
command = f"python3 {CLI_PATH} get-user -u {actual_username}"
user_result = run_cli_command(command)
try:
user_details = json.loads(user_result)
upload_bytes = user_details.get('upload_bytes')
download_bytes = user_details.get('download_bytes')
status = user_details.get('status', 'Unknown')
if upload_bytes is None or download_bytes is None:
traffic_message = "**Traffic Data:**\nUser not active or no traffic data available."
else:
upload_gb = upload_bytes / (1024 ** 3) # Convert bytes to GB
download_gb = download_bytes / (1024 ** 3) # Convert bytes to GB
traffic_message = (
f"🔼 Upload: {upload_gb:.2f} GB\n"
f"🔽 Download: {download_gb:.2f} GB\n"
f"🌐 Status: {status}"
)
except json.JSONDecodeError:
bot.reply_to(message, "Failed to parse JSON data. The command output may be malformed.")
return
formatted_details = (
f"\n🆔 Name: {actual_username}\n"
f"📊 Traffic Limit: {user_details['max_download_bytes'] / (1024 ** 3):.2f} GB\n"
f"📅 Days: {user_details['expiration_days']}\n"
f"⏳ Creation: {user_details['account_creation_date']}\n"
f"💡 Blocked: {user_details['blocked']}\n\n"
f"{traffic_message}"
)
combined_command = f"python3 {CLI_PATH} show-user-uri -u {actual_username} -ip 4 -s -n"
combined_result = run_cli_command(combined_command)
if "Error" in combined_result or "Invalid" in combined_result:
bot.reply_to(message, combined_result)
return
result_lines = combined_result.strip().split('\n')
uri_v4 = ""
singbox_sublink = ""
normal_sub_sublink = ""
for line in result_lines:
line = line.strip()
if line.startswith("hy2://"):
uri_v4 = line
elif line.startswith("Singbox Sublink:"):
singbox_sublink = result_lines[result_lines.index(line) + 1].strip()
elif line.startswith("Normal-SUB Sublink:"):
normal_sub_sublink = result_lines[result_lines.index(line) + 1].strip()
if not uri_v4:
bot.reply_to(message, "No valid URI found.")
return
qr_v4 = qrcode.make(uri_v4)
bio_v4 = io.BytesIO()
qr_v4.save(bio_v4, 'PNG')
bio_v4.seek(0)
markup = types.InlineKeyboardMarkup(row_width=3)
markup.add(types.InlineKeyboardButton("Reset User", callback_data=f"reset_user:{actual_username}"),
types.InlineKeyboardButton("IPv6-URI", callback_data=f"ipv6_uri:{actual_username}"))
markup.add(types.InlineKeyboardButton("Edit Username", callback_data=f"edit_username:{actual_username}"),
types.InlineKeyboardButton("Edit Traffic Limit", callback_data=f"edit_traffic:{actual_username}"))
markup.add(types.InlineKeyboardButton("Edit Expiration Days", callback_data=f"edit_expiration:{actual_username}"),
types.InlineKeyboardButton("Renew Password", callback_data=f"renew_password:{actual_username}"))
markup.add(types.InlineKeyboardButton("Renew Creation Date", callback_data=f"renew_creation:{actual_username}"),
types.InlineKeyboardButton("Block User", callback_data=f"block_user:{actual_username}"))
caption = f"{formatted_details}\n\n**IPv4 URI:**\n\n`{uri_v4}`"
if singbox_sublink:
caption += f"\n\n**SingBox SUB:**\n{singbox_sublink}"
if normal_sub_sublink:
caption += f"\n\n**Normal SUB:**\n{normal_sub_sublink}"
bot.send_photo(
message.chat.id,
bio_v4,
caption=caption,
reply_markup=markup,
parse_mode="Markdown"
)
@bot.callback_query_handler(func=lambda call: call.data.startswith('edit_') or call.data.startswith('renew_') or call.data.startswith('block_') or call.data.startswith('reset_') or call.data.startswith('ipv6_'))
def handle_edit_callback(call):
action, username = call.data.split(':')
if action == 'edit_username':
msg = bot.send_message(call.message.chat.id, f"Enter new username for {username}:")
bot.register_next_step_handler(msg, process_edit_username, username)
elif action == 'edit_traffic':
msg = bot.send_message(call.message.chat.id, f"Enter new traffic limit (GB) for {username}:")
bot.register_next_step_handler(msg, process_edit_traffic, username)
elif action == 'edit_expiration':
msg = bot.send_message(call.message.chat.id, f"Enter new expiration days for {username}:")
bot.register_next_step_handler(msg, process_edit_expiration, username)
elif action == 'renew_password':
command = f"python3 {CLI_PATH} edit-user -u {username} -rp"
result = run_cli_command(command)
bot.send_message(call.message.chat.id, result)
elif action == 'renew_creation':
command = f"python3 {CLI_PATH} edit-user -u {username} -rc"
result = run_cli_command(command)
bot.send_message(call.message.chat.id, result)
elif action == 'block_user':
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton("True", callback_data=f"confirm_block:{username}:true"),
types.InlineKeyboardButton("False", callback_data=f"confirm_block:{username}:false"))
bot.send_message(call.message.chat.id, f"Set block status for {username}:", reply_markup=markup)
elif action == 'reset_user':
command = f"python3 {CLI_PATH} reset-user -u {username}"
result = run_cli_command(command)
bot.send_message(call.message.chat.id, result)
elif action == 'ipv6_uri':
command = f"python3 {CLI_PATH} show-user-uri -u {username} -ip 6"
result = run_cli_command(command)
if "Error" in result or "Invalid" in result:
bot.send_message(call.message.chat.id, result)
return
uri_v6 = result.split('\n')[-1].strip()
qr_v6 = qrcode.make(uri_v6)
bio_v6 = io.BytesIO()
qr_v6.save(bio_v6, 'PNG')
bio_v6.seek(0)
bot.send_photo(
call.message.chat.id,
bio_v6,
caption=f"**IPv6 URI for {username}:**\n\n`{uri_v6}`",
parse_mode="Markdown"
)
@bot.callback_query_handler(func=lambda call: call.data.startswith('confirm_block:'))
def handle_block_confirmation(call):
_, username, block_status = call.data.split(':')
command = f"python3 {CLI_PATH} edit-user -u {username} {'-b' if block_status == 'true' else ''}"
result = run_cli_command(command)
bot.send_message(call.message.chat.id, result)
def process_edit_username(message, username):
new_username = message.text.strip()
command = f"python3 {CLI_PATH} edit-user -u {username} -nu {new_username}"
result = run_cli_command(command)
bot.reply_to(message, result)
def process_edit_traffic(message, username):
try:
new_traffic_limit = int(message.text.strip())
command = f"python3 {CLI_PATH} edit-user -u {username} -nt {new_traffic_limit}"
result = run_cli_command(command)
bot.reply_to(message, result)
except ValueError:
bot.reply_to(message, "Invalid traffic limit. Please enter a number.")
def process_edit_expiration(message, username):
try:
new_expiration_days = int(message.text.strip())
command = f"python3 {CLI_PATH} edit-user -u {username} -ne {new_expiration_days}"
result = run_cli_command(command)
bot.reply_to(message, result)
except ValueError:
bot.reply_to(message, "Invalid expiration days. Please enter a number.")

View File

@ -0,0 +1,34 @@
from telebot import types
from utils.command import *
@bot.inline_handler(lambda query: is_admin(query.from_user.id))
def handle_inline_query(query):
command = f"python3 {CLI_PATH} list-users"
result = run_cli_command(command)
try:
users = json.loads(result)
except json.JSONDecodeError:
bot.answer_inline_query(query.id, results=[], switch_pm_text="Error retrieving users.", switch_pm_user_id=query.from_user.id)
return
query_text = query.query.lower()
results = []
for username, details in users.items():
if query_text in username.lower():
title = f"{username}"
description = f"Traffic Limit: {details['max_download_bytes'] / (1024 ** 3):.2f} GB, Expiration Days: {details['expiration_days']}"
results.append(types.InlineQueryResultArticle(
id=username,
title=title,
description=description,
input_message_content=types.InputTextMessageContent(
message_text=f"Name: {username}\n"
f"Traffic limit: {details['max_download_bytes'] / (1024 ** 3):.2f} GB\n"
f"Days: {details['expiration_days']}\n"
f"Account Creation: {details['account_creation_date']}\n"
f"Blocked: {details['blocked']}"
)
))
bot.answer_inline_query(query.id, results, cache_time=0)

View File

@ -0,0 +1,10 @@
from dotenv import load_dotenv
from telebot import types
from utils.command import *
@bot.message_handler(func=lambda message: is_admin(message.from_user.id) and message.text == 'Server Info')
def server_info(message):
command = f"python3 {CLI_PATH} server-info"
result = run_cli_command(command)
bot.send_chat_action(message.chat.id, 'typing')
bot.reply_to(message, result)

View File

@ -42,14 +42,15 @@ version_greater_equal() {
check_version() {
local_version=$(cat /etc/hysteria/VERSION)
latest_version=$(curl -s https://raw.githubusercontent.com/ReturnFI/Hysteria2/main/VERSION)
latest_changelog=$(curl -s https://raw.githubusercontent.com/ReturnFI/Hysteria2/main/changelog)
if version_greater_equal "$local_version" "$latest_version"; then
echo -e "Panel Version: ${cyan}$local_version${NC}"
else
echo -e "Panel Version: ${cyan}$local_version${NC}"
echo -e "Latest Version: ${cyan}$latest_version${NC}"
echo -e "${red}Please update your panel.${NC}"
echo -e "${cyan}Bug squashing party!${yellow} Update for the best invitation.${NC}"
echo -e "${yellow}$latest_version Version Change Log:${NC}"
echo -e "${cyan}$latest_changelog ${NC}"
fi
}

View File

@ -53,7 +53,7 @@ else
echo "All required packages are already installed."
fi
git clone https://github.com/ReturnFI/Hysteria2 /etc/hysteria
git clone -b dev https://github.com/ReturnFI/Hysteria2 /etc/hysteria
cd /etc/hysteria
python3 -m venv hysteria2_venv

View File

@ -552,6 +552,10 @@ display_main_menu() {
echo -e "${LPurple}◇──────────────────────────────────────────────────────────────────────◇${NC}"
check_version
echo -e "${LPurple}◇──────────────────────────────────────────────────────────────────────◇${NC}"
echo -e "${yellow} ☼ Services Status ☼ ${NC}"
echo -e "${LPurple}◇──────────────────────────────────────────────────────────────────────◇${NC}"
check_services
echo -e "${LPurple}◇──────────────────────────────────────────────────────────────────────◇${NC}"