From 2839ed8684d0c7fe09d2356cefbe187639646ad9 Mon Sep 17 00:00:00 2001 From: Whispering Wind <151555003+ReturnFI@users.noreply.github.com> Date: Wed, 26 Feb 2025 23:41:24 +0330 Subject: [PATCH] Combined singbox with normal sub --- core/scripts/normalsub/normalsub.py | 747 +++++++++++++++++++--------- 1 file changed, 510 insertions(+), 237 deletions(-) diff --git a/core/scripts/normalsub/normalsub.py b/core/scripts/normalsub/normalsub.py index 190c992..987968a 100644 --- a/core/scripts/normalsub/normalsub.py +++ b/core/scripts/normalsub/normalsub.py @@ -1,295 +1,568 @@ +import logging import os import ssl -import subprocess -import time -import re -import shlex import json +import subprocess +import re +import time +import shlex import base64 -import hashlib -import qrcode +from typing import Dict, List, Optional, Tuple, Any, Union +from dataclasses import dataclass from io import BytesIO + from aiohttp import web from aiohttp.web_middlewares import middleware +from urllib.parse import unquote, parse_qs, urlparse from dotenv import load_dotenv +import qrcode from jinja2 import Environment, FileSystemLoader load_dotenv() - -CERTFILE = os.getenv('HYSTERIA_CERTFILE') -KEYFILE = os.getenv('HYSTERIA_KEYFILE') -PORT = int(os.getenv('HYSTERIA_PORT', '3325')) -DOMAIN = os.getenv('HYSTERIA_DOMAIN', 'localhost') - -RATE_LIMIT = 100 -RATE_LIMIT_WINDOW = 60 - -rate_limit_store = {} - -TEMPLATE_DIR = os.path.join(os.path.dirname(__file__)) -env = Environment(loader=FileSystemLoader(TEMPLATE_DIR)) -template = env.get_template('template.html') +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -@middleware -async def rate_limit_middleware(request, handler): - client_ip = request.headers.get('X-Forwarded-For', request.remote) - current_time = time.time() +@dataclass +class AppConfig: + """Application configuration settings""" + domain: str + cert_file: str + key_file: str + port: int + sni_file: str + singbox_template_path: str + hysteria_cli_path: str + rate_limit: int + rate_limit_window: int + sni: str + template_dir: str - if client_ip in rate_limit_store: - requests, last_request_time = rate_limit_store[client_ip] - if current_time - last_request_time < RATE_LIMIT_WINDOW: - if requests >= RATE_LIMIT: - return web.Response(status=429, text="Rate limit exceeded.") - if current_time - last_request_time >= RATE_LIMIT_WINDOW: - rate_limit_store[client_ip] = (1, current_time) + +class RateLimiter: + """Handles rate limiting for requests""" + + def __init__(self, limit: int, window: int): + self.limit = limit + self.window = window + self.store: Dict[str, Tuple[int, float]] = {} + + def check_limit(self, client_ip: str) -> bool: + """Checks if a client has exceeded their rate limit + + Returns: + bool: True if rate limit not exceeded, False otherwise + """ + current_time = time.monotonic() + requests, last_request_time = self.store.get(client_ip, (0, 0)) + + if current_time - last_request_time < self.window: + if requests >= self.limit: + return False else: - rate_limit_store[client_ip] = (requests + 1, last_request_time) - else: - rate_limit_store[client_ip] = (1, current_time) - - return await handler(request) + requests = 0 -def sanitize_input(value, pattern): - if not re.match(pattern, value): - raise ValueError(f"Invalid value: {value}") - return value - -def generate_qrcode_base64(data): - qr = qrcode.QRCode( - version=1, - error_correction=qrcode.constants.ERROR_CORRECT_L, - box_size=10, - border=4, - ) - qr.add_data(data) - qr.make(fit=True) - - img = qr.make_image(fill_color="black", back_color="white") - - buffered = BytesIO() - img.save(buffered, format="PNG") - img_base64 = base64.b64encode(buffered.getvalue()).decode() - return f"data:image/png;base64,{img_base64}" - -def human_readable_bytes(bytes_value): - """ - Converts bytes to a human-readable format (KB, MB, GB, TB). - """ - units = ["Bytes", "KB", "MB", "GB", "TB"] - size = float(bytes_value) - for unit in units: - if size < 1024: - return f"{size:.2f} {unit}" - size /= 1024 - return f"{size:.2f} PB" + self.store[client_ip] = (requests + 1, current_time) + return True -async def handle(request): - try: - username = sanitize_input(request.match_info.get('username', ''), r'^[a-zA-Z0-9_-]+$') +@dataclass +class UriComponents: + """Components extracted from a Hysteria2 URI""" + username: Optional[str] + password: Optional[str] + ip: Optional[str] + port: Optional[int] + obfs_password: str - if not username: - return web.Response(status=400, text="Error: Missing 'username' parameter.") - - user_agent = request.headers.get('User-Agent', '').lower() - if 'text/html' in request.headers.get('Accept', ''): - context = await get_template_context(username, user_agent) - html_output = template.render(context) - return web.Response(text=html_output, content_type='text/html') +@dataclass +class UserInfo: + """User information and statistics""" + username: str + upload_bytes: int + download_bytes: int + max_download_bytes: int + account_creation_date: str + expiration_days: int + + @property + def total_usage(self) -> int: + """Total bandwidth usage""" + return self.upload_bytes + self.download_bytes + + @property + def expiration_timestamp(self) -> int: + """Unix timestamp when account expires""" + if not self.account_creation_date or self.expiration_days <= 0: + return 0 + creation_timestamp = int(time.mktime(time.strptime(self.account_creation_date, "%Y-%m-%d"))) + return creation_timestamp + (self.expiration_days * 24 * 3600) + + @property + def expiration_date(self) -> str: + """Formatted expiration date string""" + if not self.account_creation_date or self.expiration_days <= 0: + return "N/A" + creation_timestamp = int(time.mktime(time.strptime(self.account_creation_date, "%Y-%m-%d"))) + expiration_timestamp = creation_timestamp + (self.expiration_days * 24 * 3600) + return time.strftime("%Y-%m-%d", time.localtime(expiration_timestamp)) + + @property + def usage_human_readable(self) -> str: + """Human readable string of usage""" + total = Utils.human_readable_bytes(self.max_download_bytes) + used = Utils.human_readable_bytes(self.total_usage) + return f"{used} / {total}" + + @property + def usage_detailed(self) -> str: + """Detailed usage breakdown""" + total = Utils.human_readable_bytes(self.max_download_bytes) + upload = Utils.human_readable_bytes(self.upload_bytes) + download = Utils.human_readable_bytes(self.download_bytes) + return f"Upload: {upload}, Download: {download}, Total: {total}" + + +@dataclass +class TemplateContext: + """Context for HTML template rendering""" + username: str + usage: str + usage_raw: str + expiration_date: str + sublink_qrcode: str + ipv4_qrcode: Optional[str] + ipv6_qrcode: Optional[str] + sub_link: str + ipv4_uri: Optional[str] + ipv6_uri: Optional[str] + + +class Utils: + """Utility functions""" + + @staticmethod + def sanitize_input(value: str, pattern: str) -> str: + """Sanitizes input using a regex pattern and quotes it for shell commands""" + if not re.match(pattern, value): + raise ValueError(f"Invalid value: {value}") + return shlex.quote(value) + + @staticmethod + def generate_qrcode_base64(data: str) -> str: + """Generates a base64-encoded PNG QR code image""" + if not data: + return None + + qr = qrcode.QRCode( + version=1, + error_correction=qrcode.constants.ERROR_CORRECT_L, + box_size=10, + border=4, + ) + qr.add_data(data) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + buffered = BytesIO() + img.save(buffered, format="PNG") + return "data:image/png;base64," + base64.b64encode(buffered.getvalue()).decode() + + @staticmethod + def human_readable_bytes(bytes_value: int) -> str: + """Converts bytes to a human-readable string (KB, MB, GB, etc.)""" + units = ["Bytes", "KB", "MB", "GB", "TB"] + size = float(bytes_value) + for unit in units: + if size < 1024: + return f"{size:.2f} {unit}" + size /= 1024 + return f"{size:.2f} PB" + + +class HysteriaCLI: + """Interface for Hysteria CLI commands""" + + def __init__(self, cli_path: str): + self.cli_path = cli_path + + def _run_command(self, args: List[str]) -> str: + """Runs the hysteria CLI with the given arguments and returns the output""" + try: + command = ['python3', self.cli_path] + args + return subprocess.check_output(command, stderr=subprocess.DEVNULL, text=True).strip() + except subprocess.CalledProcessError as e: + print(f"Hysteria CLI error: {e}") # Log the error + raise + + def get_user_info(self, username: str) -> UserInfo: + """Retrieves user information""" + raw_info = json.loads(self._run_command(['get-user', '-u', username])) + return UserInfo( + username=username, + upload_bytes=raw_info.get('upload_bytes', 0), + download_bytes=raw_info.get('download_bytes', 0), + max_download_bytes=raw_info.get('max_download_bytes', 0), + account_creation_date=raw_info.get('account_creation_date', ''), + expiration_days=raw_info.get('expiration_days', 0) + ) + + def get_user_uri(self, username: str, ip_version: Optional[str] = None) -> str: + """Gets the URI for a user, optionally specifying IP version""" + if ip_version: + return self._run_command(['show-user-uri', '-u', username, '-ip', ip_version]) else: - uri_output = get_user_uri(username, user_agent) - return web.Response(text=uri_output, content_type='text/plain') + output = self._run_command(['show-user-uri', '-u', username, '-a']) + return output - except ValueError as e: - return web.Response(status=400, text=f"Error: {str(e)}") - except Exception as e: - print(f"Internal Server Error: {str(e)}") - return web.Response(status=500, text="Error: Internal server error.") + def get_uris(self, username: str) -> Tuple[Optional[str], Optional[str]]: + """Retrieves IPv4 and IPv6 URIs for a user""" + output = self._run_command(['show-user-uri', '-u', username, '-a']) + ipv4_uri = re.search(r'IPv4:\s*(.*)', output) + ipv6_uri = re.search(r'IPv6:\s*(.*)', output) + return (ipv4_uri.group(1).strip() if ipv4_uri else None, + ipv6_uri.group(1).strip() if ipv6_uri else None) -async def get_template_context(username, user_agent): - """ - Gathers all the data needed to render the HTML template. - """ - try: - user_info = get_user_info(username) - upload = user_info.get('upload_bytes', 0) - download = user_info.get('download_bytes', 0) - total_bytes = user_info.get('max_download_bytes', 0) - creation_date_str = user_info.get('account_creation_date', '') - expiration_days = user_info.get('expiration_days', 0) - if creation_date_str and expiration_days > 0: +class UriParser: + """Parser for Hysteria2 URIs""" + + @staticmethod + def extract_uri_components(uri: Optional[str], prefix: str) -> Optional[UriComponents]: + """Extracts components from a Hysteria2 URI""" + if not uri or not uri.startswith(prefix): + return None + uri = uri[len(prefix):].strip() + try: + decoded_uri = unquote(uri) + parsed_url = urlparse(decoded_uri) + query_params = parse_qs(parsed_url.query) + hostname = parsed_url.hostname + if hostname and hostname.startswith('[') and hostname.endswith(']'): + hostname = hostname[1:-1] + + port = None + if parsed_url.port is not None: + try: + port = int(parsed_url.port) + except ValueError: + print(f"Warning: Invalid port in URI: {parsed_url.port}") + + return UriComponents( + username=parsed_url.username, + password=parsed_url.password, + ip=hostname, + port=port, + obfs_password=query_params.get('obfs-password', [''])[0] + ) + except Exception as e: + print(f"Error during URI parsing: {e}, URI: {uri}") + return None + + +class SingboxConfigGenerator: + """Generator for Sing-box configurations""" + + def __init__(self, hysteria_cli: HysteriaCLI, default_sni: str): + self.hysteria_cli = hysteria_cli + self.default_sni = default_sni + self._template_cache = None + self.template_path = None + + def set_template_path(self, path: str): + """Sets the path to the template file""" + self.template_path = path + self._template_cache = None + + def get_template(self) -> Dict[str, Any]: + """Loads and caches the singbox template""" + if self._template_cache is None: try: - creation_date = time.strptime(creation_date_str, "%Y-%m-%d") - expiration_timestamp = int(time.mktime(creation_date)) + (expiration_days * 24 * 60 * 60) - expiration_date = time.strftime("%Y-%m-%d", time.localtime(expiration_timestamp)) - except ValueError: - expiration_date = "Invalid Date" - else: - expiration_date = "N/A" + with open(self.template_path, 'r') as f: + self._template_cache = json.load(f) + except (FileNotFoundError, json.JSONDecodeError, IOError) as e: + raise RuntimeError(f"Error loading Singbox template: {e}") from e + return self._template_cache.copy() + def generate_config(self, username: str, ip_version: str, fragment: str) -> Optional[Dict[str, Any]]: + """Generates a Sing-box outbound configuration for a given user and IP version""" + try: + uri = self.hysteria_cli.get_user_uri(username, ip_version) + except Exception: + print(f"Warning: Failed to get URI for {username} with IP version {ip_version}. Skipping.") + return None + if not uri: + print(f"Warning: No URI found for {username} with IP version {ip_version}. Skipping.") + return None + components = UriParser.extract_uri_components(uri, f'IPv{ip_version}:') + if components is None or components.port is None: + print(f"Warning: Invalid URI components for {username} with IP version {ip_version}. Skipping.") + return None - ipv4_uri, ipv6_uri = get_uris(username) - sub_link = f"https://{DOMAIN}:{PORT}/sub/normal/{username}" - - ipv4_qrcode = generate_qrcode_base64(ipv4_uri) if ipv4_uri else None - ipv6_qrcode = generate_qrcode_base64(ipv6_uri) if ipv6_uri else None - sublink_qrcode = generate_qrcode_base64(sub_link) - - - total_human_readable = human_readable_bytes(total_bytes) - usage = f"{human_readable_bytes(upload + download)} / {total_human_readable}" - usage_raw = f"Upload: {human_readable_bytes(upload)}, Download: {human_readable_bytes(download)}, Total: {total_human_readable}" # For the tooltip - - - context = { - 'username': username, - 'usage': usage, - 'usage_raw': usage_raw, - 'expiration_date': expiration_date, - 'sublink_qrcode': sublink_qrcode, - 'ipv4_qrcode': ipv4_qrcode, - 'ipv6_qrcode': ipv6_qrcode, - 'sub_link': sub_link, - 'ipv4_uri': ipv4_uri, - 'ipv6_uri': ipv6_uri, + return { + "outbounds": [{ + "type": "hysteria2", + "tag": f"{username}-Hysteria2", + "server": components.ip, + "server_port": components.port, + "obfs": { + "type": "salamander", + "password": components.obfs_password + }, + "password": f"{username}:{components.password}", + "tls": { + "enabled": True, + "server_name": fragment if fragment else self.default_sni, + "insecure": True + } + }] } - return context - except Exception as e: - print(f"Error in get_template_context: {e}") - raise -def get_user_info(username): - """ - Retrieves user information from the cli.py script. - """ - try: - user_info_command = [ - 'python3', - '/etc/hysteria/core/cli.py', - 'get-user', - '-u', username + def combine_configs(self, username: str, config_v4: Optional[Dict[str, Any]], + config_v6: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """Combines IPv4 and IPv6 configurations into a single config""" + combined_config = self.get_template() + + combined_config['outbounds'] = [ + outbound for outbound in combined_config['outbounds'] + if outbound.get('type') != 'hysteria2' ] - safe_user_info_command = [shlex.quote(arg) for arg in user_info_command] - user_info_output = subprocess.check_output(safe_user_info_command).decode() - user_info = json.loads(user_info_output) - return user_info - except subprocess.CalledProcessError as e: - print(f"Error executing get-user command: {e}") - raise - except json.JSONDecodeError as e: - print(f"Error decoding JSON: {e}") - raise -def get_user_uri(username, user_agent): - """ - Returns the URI for the user, adapting the output based on the User-Agent. Returns BOTH IPv4 and IPv6. - """ - try: - user_info = get_user_info(username) - upload = user_info.get('upload_bytes', 0) - download = user_info.get('download_bytes', 0) - total = user_info.get('max_download_bytes', 0) - creation_date_str = user_info.get('account_creation_date', '') - expiration_days = user_info.get('expiration_days', 0) - if creation_date_str and expiration_days > 0: - try: - creation_date = time.strptime(creation_date_str, "%Y-%m-%d") - expiration_timestamp = int(time.mktime(creation_date)) + (expiration_days * 24 * 60 * 60) - except ValueError: - expiration_timestamp = 0 - else: - expiration_timestamp = 0 + modified_v4_outbounds = [] + if config_v4: + v4_outbound = config_v4['outbounds'][0] + v4_outbound['tag'] = f"{username}-IPv4" + modified_v4_outbounds = [v4_outbound] - ipv4_uri, ipv6_uri = get_uris(username) + modified_v6_outbounds = [] + if config_v6: + v6_outbound = config_v6['outbounds'][0] + v6_outbound['tag'] = f"{username}-IPv6" + modified_v6_outbounds = [v6_outbound] - output_lines = [] + select_outbounds = ["auto"] + if config_v4: + select_outbounds.append(f"{username}-IPv4") + if config_v6: + select_outbounds.append(f"{username}-IPv6") - if ipv4_uri: - output_lines.append(ipv4_uri) - if ipv6_uri: - output_lines.append(ipv6_uri) + auto_outbounds = [] + if config_v4: + auto_outbounds.append(f"{username}-IPv4") + if config_v6: + auto_outbounds.append(f"{username}-IPv6") + for outbound in combined_config['outbounds']: + if outbound.get('tag') == 'select': + outbound['outbounds'] = select_outbounds + elif outbound.get('tag') == 'auto': + outbound['outbounds'] = auto_outbounds + + combined_config['outbounds'].extend(modified_v4_outbounds + modified_v6_outbounds) + return combined_config + + +class SubscriptionManager: + """Handles user subscription generation""" + + def __init__(self, hysteria_cli: HysteriaCLI, config: AppConfig): + self.hysteria_cli = hysteria_cli + self.config = config + + def get_normal_subscription(self, username: str, user_agent: str) -> str: + """Generates the user URI for normal subscriptions""" + user_info = self.hysteria_cli.get_user_info(username) + ipv4_uri, ipv6_uri = self.hysteria_cli.get_uris(username) + + output_lines = [uri for uri in [ipv4_uri, ipv6_uri] if uri] if not output_lines: - return "No URI available" + return "No URI available" processed_uris = [] for uri in output_lines: if "v2ray" in user_agent and "ng" in user_agent: match = re.search(r'pinSHA256=sha256/([^&]+)', uri) if match: - base64_pin = match.group(1) - try: - decoded_pin = base64.b64decode(base64_pin) - hex_pin = ':'.join(['{:02X}'.format(byte) for byte in decoded_pin]) - uri = uri.replace(f'pinSHA256=sha256/{base64_pin}', f'pinSHA256={hex_pin}') - except Exception as e: - print(f"Error processing pinSHA256: {e}") + decoded = base64.b64decode(match.group(1)) + formatted = ":".join("{:02X}".format(byte) for byte in decoded) + uri = uri.replace(f'pinSHA256=sha256/{match.group(1)}', f'pinSHA256={formatted}') processed_uris.append(uri) - subscription_info = ( - f"//subscription-userinfo: upload={upload}; download={download}; total={total}; expire={expiration_timestamp}\n" + f"//subscription-userinfo: upload={user_info.upload_bytes}; " + f"download={user_info.download_bytes}; " + f"total={user_info.max_download_bytes}; " + f"expire={user_info.expiration_timestamp}\n" + ) + profile_lines = f"//profile-title: {username}-Hysteria2 🚀\n//profile-update-interval: 1\n" + + return profile_lines + subscription_info + "\n".join(processed_uris) + + +class TemplateRenderer: + """Handles HTML template rendering""" + + def __init__(self, template_dir: str, config: AppConfig): + self.env = Environment(loader=FileSystemLoader(template_dir), autoescape=True) + self.html_template = self.env.get_template('template.html') + self.config = config + + def render(self, context: TemplateContext) -> str: + """Renders the HTML template with the given context""" + return self.html_template.render(vars(context)) + + +class HysteriaServer: + """Main application server class""" + + def __init__(self): + self.config = self._load_config() + + self.rate_limiter = RateLimiter(self.config.rate_limit, self.config.rate_limit_window) + self.hysteria_cli = HysteriaCLI(self.config.hysteria_cli_path) + self.singbox_generator = SingboxConfigGenerator(self.hysteria_cli, self.config.sni) + self.singbox_generator.set_template_path(self.config.singbox_template_path) + self.subscription_manager = SubscriptionManager(self.hysteria_cli, self.config) + self.template_renderer = TemplateRenderer(self.config.template_dir, self.config) + + self.app = web.Application(middlewares=[self._rate_limit_middleware]) + self.app.add_routes([web.get('/sub/normal/{username}', self.handle)]) + self.app.router.add_route('*', '/sub/normal/{tail:.*}', self.handle_404) + + def _load_config(self) -> AppConfig: + """Loads application configuration from environment variables""" + domain = os.getenv('HYSTERIA_DOMAIN', 'localhost') + cert_file = os.getenv('HYSTERIA_CERTFILE') + key_file = os.getenv('HYSTERIA_KEYFILE') + port = int(os.getenv('HYSTERIA_PORT', '3326')) + sni_file = '/etc/hysteria/.configs.env' + singbox_template_path = '/etc/hysteria/core/scripts/normalsub/singbox.json' + hysteria_cli_path = '/etc/hysteria/core/cli.py' + rate_limit = 100 + rate_limit_window = 60 + template_dir = os.path.dirname(__file__) + + sni = self._load_sni_from_env(sni_file) + + return AppConfig( + domain=domain, + cert_file=cert_file, + key_file=key_file, + port=port, + sni_file=sni_file, + singbox_template_path=singbox_template_path, + hysteria_cli_path=hysteria_cli_path, + rate_limit=rate_limit, + rate_limit_window=rate_limit_window, + sni=sni, + template_dir=template_dir ) - profile_lines = f"//profile-title: {username}-Hysteria2 🚀\n//profile-update-interval: 1\n" - output = profile_lines + subscription_info + "\n".join(processed_uris) - return output + def _load_sni_from_env(self, sni_file: str) -> str: + """Loads SNI configuration from the environment file""" + try: + with open(sni_file, 'r') as f: + for line in f: + if line.startswith('SNI='): + return line.strip().split('=')[1] + except FileNotFoundError: + print("Warning: SNI file not found. Using default SNI.") + return "bts.com" - except subprocess.CalledProcessError: - raise RuntimeError("Failed to get URI or user info.") - except json.JSONDecodeError: - raise RuntimeError("Failed to parse user info JSON.") - except ValueError: - raise RuntimeError("expiration_timestamp OR account_creation_date in config file is invalid") + @middleware + async def _rate_limit_middleware(self, request: web.Request, handler): + """Middleware for rate limiting requests""" + client_ip = request.headers.get('X-Forwarded-For', + request.headers.get('X-Real-IP', request.remote)) -def get_uris(username): - """ - Gets the IPv4 and IPv6 URIs for a user, handling cases where one or both might be missing. - """ - try: - command = [ - 'python3', - '/etc/hysteria/core/cli.py', - 'show-user-uri', - '-u', username, - '-a' - ] - safe_command = [shlex.quote(arg) for arg in command] - output = subprocess.check_output(safe_command).decode().strip() + if not self.rate_limiter.check_limit(client_ip): + return web.Response(status=429, text="Rate limit exceeded.") - ipv4_match = re.search(r'IPv4:\s*(.*)', output) - ipv6_match = re.search(r'IPv6:\s*(.*)', output) + return await handler(request) - ipv4_uri = ipv4_match.group(1).strip() if ipv4_match else None - ipv6_uri = ipv6_match.group(1).strip() if ipv6_match else None + async def handle(self, request: web.Request) -> web.Response: + """Main request handler""" + try: + username = Utils.sanitize_input(request.match_info.get('username', ''), r'^[a-zA-Z0-9_-]+$') + if not username: + return web.Response(status=400, text="Error: Missing 'username' parameter.") - return ipv4_uri, ipv6_uri + user_agent = request.headers.get('User-Agent', '').lower() - except subprocess.CalledProcessError as e: - print(f"Error executing show-user-uri command: {e}") - raise - except AttributeError as e: - print(f"Error parsing URI output: {e}") - raise + if any(browser in user_agent for browser in ['chrome', 'firefox', 'safari', 'edge', 'opera']): + return await self._handle_html(request, username) + else: + fragment = request.query.get('fragment', '') + if not user_agent.startswith('hiddifynext') and ('singbox' in user_agent or 'sing' in user_agent): + return await self._handle_singbox(username, fragment) + return await self._handle_normalsub(request, username) + except ValueError as e: + return web.Response(status=400, text=f"Error: {e}") + except Exception as e: + print(f"Internal Server Error: {e}") + return web.Response(status=500, text="Error: Internal server error") + async def _handle_html(self, request: web.Request, username: str) -> web.Response: + """Handles requests for HTML output""" + context = await self._get_template_context(username) + rendered_html = self.template_renderer.render(context) + return web.Response(text=rendered_html, content_type='text/html') + + async def _handle_singbox(self, username: str, fragment: str) -> web.Response: + """Handles requests for Sing-box configuration""" + config_v4 = self.singbox_generator.generate_config(username, '4', fragment) + config_v6 = self.singbox_generator.generate_config(username, '6', fragment) + + if config_v4 is None and config_v6 is None: + return web.Response(status=404, text=f"Error: No valid URIs found for user {username}.") + + combined_config = self.singbox_generator.combine_configs(username, config_v4, config_v6) + return web.Response(text=json.dumps(combined_config, indent=4, sort_keys=True), + content_type='application/json') + + async def _handle_normalsub(self, request: web.Request, username: str) -> web.Response: + """Handles requests for normal subscription links""" + user_agent = request.headers.get('User-Agent', '').lower() + subscription = self.subscription_manager.get_normal_subscription(username, user_agent) + return web.Response(text=subscription, content_type='text/plain') + + async def _get_template_context(self, username: str) -> TemplateContext: + """Generates the context for HTML template rendering""" + user_info = self.hysteria_cli.get_user_info(username) + ipv4_uri, ipv6_uri = self.hysteria_cli.get_uris(username) + + sub_link = f"https://{self.config.domain}:{self.config.port}/sub/normal/{username}" + ipv4_qrcode = Utils.generate_qrcode_base64(ipv4_uri) + ipv6_qrcode = Utils.generate_qrcode_base64(ipv6_uri) + sublink_qrcode = Utils.generate_qrcode_base64(sub_link) + + return TemplateContext( + username=username, + usage=user_info.usage_human_readable, + usage_raw=user_info.usage_detailed, + expiration_date=user_info.expiration_date, + sublink_qrcode=sublink_qrcode, + ipv4_qrcode=ipv4_qrcode, + ipv6_qrcode=ipv6_qrcode, + sub_link=sub_link, + ipv4_uri=ipv4_uri, + ipv6_uri=ipv6_uri + ) + + async def handle_404(self, request: web.Request) -> web.Response: + """Handles 404 Not Found errors""" + print(f"404 Not Found: {request.path}") + return web.Response(status=404, text="Not Found") + + def run(self): + """Runs the web server""" + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(certfile=self.config.cert_file, keyfile=self.config.key_file) + ssl_context.set_ciphers('ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384') + + web.run_app(self.app, port=self.config.port, ssl_context=ssl_context) -async def handle_404(request): - print(f"404 Not Found: {request.path}") - return web.Response(status=404, text="Not Found") if __name__ == '__main__': - app = web.Application(middlewares=[rate_limit_middleware]) - - app.add_routes([web.get('/sub/normal/{username}', handle)]) - app.router.add_route('*', '/sub/normal/{tail:.*}', handle_404) - - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) - ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE) - ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2 - ssl_context.set_ciphers('AES256+EECDH:AES256+EDH') - - web.run_app(app, port=PORT, ssl_context=ssl_context) + server = HysteriaServer() + server.run()