Files
Blitz-Proxy/core/scripts/normalsub/normalsub.py
2025-02-22 13:31:02 +03:30

296 lines
10 KiB
Python

import os
import ssl
import subprocess
import time
import re
import shlex
import json
import base64
import hashlib
import qrcode
from io import BytesIO
from aiohttp import web
from aiohttp.web_middlewares import middleware
from dotenv import load_dotenv
from jinja2 import Environment, FileSystemLoader
load_dotenv()
CERTFILE = os.getenv('HYSTERIA_CERTFILE')
KEYFILE = os.getenv('HYSTERIA_KEYFILE')
PORT = int(os.getenv('HYSTERIA_PORT', '3325'))
DOMAIN = os.getenv('HYSTERIA_DOMAIN', 'localhost')
RATE_LIMIT = 100
RATE_LIMIT_WINDOW = 60
rate_limit_store = {}
TEMPLATE_DIR = os.path.join(os.path.dirname(__file__))
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template('template.html')
@middleware
async def rate_limit_middleware(request, handler):
client_ip = request.headers.get('X-Forwarded-For', request.remote)
current_time = time.time()
if client_ip in rate_limit_store:
requests, last_request_time = rate_limit_store[client_ip]
if current_time - last_request_time < RATE_LIMIT_WINDOW:
if requests >= RATE_LIMIT:
return web.Response(status=429, text="Rate limit exceeded.")
if current_time - last_request_time >= RATE_LIMIT_WINDOW:
rate_limit_store[client_ip] = (1, current_time)
else:
rate_limit_store[client_ip] = (requests + 1, last_request_time)
else:
rate_limit_store[client_ip] = (1, current_time)
return await handler(request)
def sanitize_input(value, pattern):
if not re.match(pattern, value):
raise ValueError(f"Invalid value: {value}")
return value
def generate_qrcode_base64(data):
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffered = BytesIO()
img.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
return f"data:image/png;base64,{img_base64}"
def human_readable_bytes(bytes_value):
"""
Converts bytes to a human-readable format (KB, MB, GB, TB).
"""
units = ["Bytes", "KB", "MB", "GB", "TB"]
size = float(bytes_value)
for unit in units:
if size < 1024:
return f"{size:.2f} {unit}"
size /= 1024
return f"{size:.2f} PB"
async def handle(request):
try:
username = sanitize_input(request.match_info.get('username', ''), r'^[a-zA-Z0-9_-]+$')
if not username:
return web.Response(status=400, text="Error: Missing 'username' parameter.")
user_agent = request.headers.get('User-Agent', '').lower()
if 'text/html' in request.headers.get('Accept', ''):
context = await get_template_context(username, user_agent)
html_output = template.render(context)
return web.Response(text=html_output, content_type='text/html')
else:
uri_output = get_user_uri(username, user_agent)
return web.Response(text=uri_output, content_type='text/plain')
except ValueError as e:
return web.Response(status=400, text=f"Error: {str(e)}")
except Exception as e:
print(f"Internal Server Error: {str(e)}")
return web.Response(status=500, text="Error: Internal server error.")
async def get_template_context(username, user_agent):
"""
Gathers all the data needed to render the HTML template.
"""
try:
user_info = get_user_info(username)
upload = user_info.get('upload_bytes', 0)
download = user_info.get('download_bytes', 0)
total_bytes = user_info.get('max_download_bytes', 0)
creation_date_str = user_info.get('account_creation_date', '')
expiration_days = user_info.get('expiration_days', 0)
if creation_date_str and expiration_days > 0:
try:
creation_date = time.strptime(creation_date_str, "%Y-%m-%d")
expiration_timestamp = int(time.mktime(creation_date)) + (expiration_days * 24 * 60 * 60)
expiration_date = time.strftime("%Y-%m-%d", time.localtime(expiration_timestamp))
except ValueError:
expiration_date = "Invalid Date"
else:
expiration_date = "N/A"
ipv4_uri, ipv6_uri = get_uris(username)
sub_link = f"https://{DOMAIN}:{PORT}/sub/normal/{username}"
ipv4_qrcode = generate_qrcode_base64(ipv4_uri) if ipv4_uri else None
ipv6_qrcode = generate_qrcode_base64(ipv6_uri) if ipv6_uri else None
sublink_qrcode = generate_qrcode_base64(sub_link)
total_human_readable = human_readable_bytes(total_bytes)
usage = f"{human_readable_bytes(upload + download)} / {total_human_readable}"
usage_raw = f"Upload: {human_readable_bytes(upload)}, Download: {human_readable_bytes(download)}, Total: {total_human_readable}" # For the tooltip
context = {
'username': username,
'usage': usage,
'usage_raw': usage_raw,
'expiration_date': expiration_date,
'sublink_qrcode': sublink_qrcode,
'ipv4_qrcode': ipv4_qrcode,
'ipv6_qrcode': ipv6_qrcode,
'sub_link': sub_link,
'ipv4_uri': ipv4_uri,
'ipv6_uri': ipv6_uri,
}
return context
except Exception as e:
print(f"Error in get_template_context: {e}")
raise
def get_user_info(username):
"""
Retrieves user information from the cli.py script.
"""
try:
user_info_command = [
'python3',
'/etc/hysteria/core/cli.py',
'get-user',
'-u', username
]
safe_user_info_command = [shlex.quote(arg) for arg in user_info_command]
user_info_output = subprocess.check_output(safe_user_info_command).decode()
user_info = json.loads(user_info_output)
return user_info
except subprocess.CalledProcessError as e:
print(f"Error executing get-user command: {e}")
raise
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
raise
def get_user_uri(username, user_agent):
"""
Returns the URI for the user, adapting the output based on the User-Agent. Returns BOTH IPv4 and IPv6.
"""
try:
user_info = get_user_info(username)
upload = user_info.get('upload_bytes', 0)
download = user_info.get('download_bytes', 0)
total = user_info.get('max_download_bytes', 0)
creation_date_str = user_info.get('account_creation_date', '')
expiration_days = user_info.get('expiration_days', 0)
if creation_date_str and expiration_days > 0:
try:
creation_date = time.strptime(creation_date_str, "%Y-%m-%d")
expiration_timestamp = int(time.mktime(creation_date)) + (expiration_days * 24 * 60 * 60)
except ValueError:
expiration_timestamp = 0
else:
expiration_timestamp = 0
ipv4_uri, ipv6_uri = get_uris(username)
output_lines = []
if ipv4_uri:
output_lines.append(ipv4_uri)
if ipv6_uri:
output_lines.append(ipv6_uri)
if not output_lines:
return "No URI available"
processed_uris = []
for uri in output_lines:
if "v2ray" in user_agent and "ng" in user_agent:
match = re.search(r'pinSHA256=sha256/([^&]+)', uri)
if match:
base64_pin = match.group(1)
try:
decoded_pin = base64.b64decode(base64_pin)
hex_pin = ':'.join(['{:02X}'.format(byte) for byte in decoded_pin])
uri = uri.replace(f'pinSHA256=sha256/{base64_pin}', f'pinSHA256={hex_pin}')
except Exception as e:
print(f"Error processing pinSHA256: {e}")
processed_uris.append(uri)
subscription_info = (
f"//subscription-userinfo: upload={upload}; download={download}; total={total}; expire={expiration_timestamp}\n"
)
profile_lines = f"//profile-title: {username}-Hysteria2 🚀\n//profile-update-interval: 1\n"
output = profile_lines + subscription_info + "\n".join(processed_uris)
return output
except subprocess.CalledProcessError:
raise RuntimeError("Failed to get URI or user info.")
except json.JSONDecodeError:
raise RuntimeError("Failed to parse user info JSON.")
except ValueError:
raise RuntimeError("expiration_timestamp OR account_creation_date in config file is invalid")
def get_uris(username):
"""
Gets the IPv4 and IPv6 URIs for a user, handling cases where one or both might be missing.
"""
try:
command = [
'python3',
'/etc/hysteria/core/cli.py',
'show-user-uri',
'-u', username,
'-a'
]
safe_command = [shlex.quote(arg) for arg in command]
output = subprocess.check_output(safe_command).decode().strip()
ipv4_match = re.search(r'IPv4:\s*(.*)', output)
ipv6_match = re.search(r'IPv6:\s*(.*)', output)
ipv4_uri = ipv4_match.group(1).strip() if ipv4_match else None
ipv6_uri = ipv6_match.group(1).strip() if ipv6_match else None
return ipv4_uri, ipv6_uri
except subprocess.CalledProcessError as e:
print(f"Error executing show-user-uri command: {e}")
raise
except AttributeError as e:
print(f"Error parsing URI output: {e}")
raise
async def handle_404(request):
print(f"404 Not Found: {request.path}")
return web.Response(status=404, text="Not Found")
if __name__ == '__main__':
app = web.Application(middlewares=[rate_limit_middleware])
app.add_routes([web.get('/sub/normal/{username}', handle)])
app.router.add_route('*', '/sub/normal/{tail:.*}', handle_404)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
ssl_context.set_ciphers('AES256+EECDH:AES256+EDH')
web.run_app(app, port=PORT, ssl_context=ssl_context)