Files
Blitz-Proxy/core/traffic.py
2025-05-10 14:06:55 +03:30

278 lines
9.0 KiB
Python

#!/usr/bin/env python3
import json
import os
import sys
import time
import fcntl
import shutil
import datetime
from concurrent.futures import ThreadPoolExecutor
from hysteria2_api import Hysteria2Client
CONFIG_FILE = '/etc/hysteria/config.json'
USERS_FILE = '/etc/hysteria/users.json'
API_BASE_URL = 'http://127.0.0.1:25413'
LOCKFILE = "/tmp/kick.lock"
BACKUP_FILE = f"{USERS_FILE}.bak"
MAX_WORKERS = 8
# import logging
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s: [%(levelname)s] %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S'
# )
# logger = logging.getLogger()
# null_handler = logging.NullHandler()
# logger.handlers = [null_handler]
def acquire_lock():
"""Acquires a lock file to prevent concurrent execution"""
try:
lock_file = open(LOCKFILE, 'w')
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
return lock_file
except IOError:
sys.exit(1)
def traffic_status(no_gui=False):
"""Updates and retrieves traffic statistics for all users.
Args:
no_gui (bool): If True, suppresses output to console
Returns:
dict: User data including upload/download bytes and status
"""
green = '\033[0;32m'
cyan = '\033[0;36m'
NC = '\033[0m'
try:
with open(CONFIG_FILE, 'r') as config_file:
config = json.load(config_file)
secret = config.get('trafficStats', {}).get('secret')
except (json.JSONDecodeError, FileNotFoundError) as e:
if not no_gui:
print(f"Error: Failed to read secret from {CONFIG_FILE}. Details: {e}")
return None
if not secret:
if not no_gui:
print("Error: Secret not found in config.json")
return None
client = Hysteria2Client(base_url=API_BASE_URL, secret=secret)
try:
traffic_stats = client.get_traffic_stats(clear=True)
online_status = client.get_online_clients()
except Exception as e:
if not no_gui:
print(f"Error communicating with Hysteria2 API: {e}")
return None
users_data = {}
if os.path.exists(USERS_FILE):
try:
with open(USERS_FILE, 'r') as users_file:
users_data = json.load(users_file)
except json.JSONDecodeError:
if not no_gui:
print("Error: Failed to parse existing users data JSON file.")
return None
for user in users_data:
users_data[user]["status"] = "Offline"
for user_id, status in online_status.items():
if user_id in users_data:
users_data[user_id]["status"] = "Online" if status.is_online else "Offline"
else:
users_data[user_id] = {
"upload_bytes": 0,
"download_bytes": 0,
"status": "Online" if status.is_online else "Offline"
}
for user_id, stats in traffic_stats.items():
if user_id in users_data:
users_data[user_id]["upload_bytes"] = users_data[user_id].get("upload_bytes", 0) + stats.upload_bytes
users_data[user_id]["download_bytes"] = users_data[user_id].get("download_bytes", 0) + stats.download_bytes
else:
online = user_id in online_status and online_status[user_id].is_online
users_data[user_id] = {
"upload_bytes": stats.upload_bytes,
"download_bytes": stats.download_bytes,
"status": "Online" if online else "Offline"
}
with open(USERS_FILE, 'w') as users_file:
json.dump(users_data, users_file, indent=4)
if not no_gui:
display_traffic_data(users_data, green, cyan, NC)
return users_data
def display_traffic_data(data, green, cyan, NC):
"""Displays traffic data in a formatted table"""
if not data:
print("No traffic data to display.")
return
print("Traffic Data:")
print("-------------------------------------------------")
print(f"{'User':<15} {'Upload (TX)':<15} {'Download (RX)':<15} {'Status':<10}")
print("-------------------------------------------------")
for user, entry in data.items():
upload_bytes = entry.get("upload_bytes", 0)
download_bytes = entry.get("download_bytes", 0)
status = entry.get("status", "Offline")
formatted_tx = format_bytes(upload_bytes)
formatted_rx = format_bytes(download_bytes)
print(f"{user:<15} {green}{formatted_tx:<15}{NC} {cyan}{formatted_rx:<15}{NC} {status:<10}")
print("-------------------------------------------------")
def format_bytes(bytes):
"""Format bytes as human-readable string"""
if bytes < 1024:
return f"{bytes}B"
elif bytes < 1048576:
return f"{bytes / 1024:.2f}KB"
elif bytes < 1073741824:
return f"{bytes / 1048576:.2f}MB"
elif bytes < 1099511627776:
return f"{bytes / 1073741824:.2f}GB"
else:
return f"{bytes / 1099511627776:.2f}TB"
def kick_users(usernames, secret):
"""Kicks specified users from the server"""
try:
client = Hysteria2Client(
base_url=API_BASE_URL,
secret=secret
)
client.kick_clients(usernames)
return True
except Exception:
return False
def process_user(username, user_data, config_secret, users_data):
"""Process a single user to check if they should be kicked"""
blocked = user_data.get('blocked', False)
if blocked:
return None
max_download_bytes = user_data.get('max_download_bytes', 0)
expiration_days = user_data.get('expiration_days', 0)
account_creation_date = user_data.get('account_creation_date')
current_download_bytes = user_data.get('download_bytes', 0)
current_upload_bytes = user_data.get('upload_bytes', 0)
total_bytes = current_download_bytes + current_upload_bytes
if not account_creation_date:
return None
try:
current_date = datetime.datetime.now().timestamp()
creation_date = datetime.datetime.fromisoformat(account_creation_date.replace('Z', '+00:00'))
expiration_date = (creation_date + datetime.timedelta(days=expiration_days)).timestamp()
should_block = False
if max_download_bytes > 0 and total_bytes >= 0 and expiration_days > 0:
if total_bytes >= max_download_bytes or current_date >= expiration_date:
should_block = True
if should_block:
users_data[username]['blocked'] = True
return username
except Exception:
return None
return None
def kick_expired_users():
"""Kicks users who have exceeded their data limits or whose accounts have expired"""
lock_file = acquire_lock()
try:
shutil.copy2(USERS_FILE, BACKUP_FILE)
try:
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
secret = config.get('trafficStats', {}).get('secret', '')
if not secret:
sys.exit(1)
except Exception:
shutil.copy2(BACKUP_FILE, USERS_FILE)
sys.exit(1)
try:
with open(USERS_FILE, 'r') as f:
users_data = json.load(f)
except json.JSONDecodeError:
shutil.copy2(BACKUP_FILE, USERS_FILE)
sys.exit(1)
except Exception:
shutil.copy2(BACKUP_FILE, USERS_FILE)
sys.exit(1)
users_to_kick = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_user = {
executor.submit(process_user, username, user_data, secret, users_data): username
for username, user_data in users_data.items()
}
for future in future_to_user:
username = future.result()
if username:
users_to_kick.append(username)
if users_to_kick:
for retry in range(3):
try:
with open(USERS_FILE, 'w') as f:
json.dump(users_data, f, indent=2)
break
except Exception:
time.sleep(1)
if retry == 2:
raise
if users_to_kick:
batch_size = 50
for i in range(0, len(users_to_kick), batch_size):
batch = users_to_kick[i:i+batch_size]
kick_users(batch, secret)
except Exception:
shutil.copy2(BACKUP_FILE, USERS_FILE)
sys.exit(1)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
lock_file.close()
if __name__ == "__main__":
if len(sys.argv) > 1:
if sys.argv[1] == "kick":
kick_expired_users()
elif sys.argv[1] == "--no-gui":
traffic_status(no_gui=True)
kick_expired_users()
else:
print(f"Unknown argument: {sys.argv[1]}")
print("Usage: python traffic.py [kick|--no-gui]")
else:
traffic_status(no_gui=False)