import os import ssl import json import subprocess import re import time import shlex import base64 from typing import Dict, List, Optional, Tuple, Any, Union from dataclasses import dataclass from io import BytesIO from aiohttp import web from aiohttp.web_middlewares import middleware from urllib.parse import unquote, parse_qs, urlparse, urljoin from dotenv import load_dotenv import qrcode from jinja2 import Environment, FileSystemLoader load_dotenv() @dataclass class AppConfig: domain: str cert_file: str key_file: str port: int sni_file: str singbox_template_path: str hysteria_cli_path: str rate_limit: int rate_limit_window: int sni: str template_dir: str subpath: str class RateLimiter: def __init__(self, limit: int, window: int): self.limit = limit self.window = window self.store: Dict[str, Tuple[int, float]] = {} def check_limit(self, client_ip: str) -> bool: current_time = time.monotonic() requests, last_request_time = self.store.get(client_ip, (0, 0)) if current_time - last_request_time < self.window: if requests >= self.limit: return False else: requests = 0 self.store[client_ip] = (requests + 1, current_time) return True @dataclass class UriComponents: username: Optional[str] password: Optional[str] ip: Optional[str] port: Optional[int] obfs_password: str @dataclass class UserInfo: username: str upload_bytes: int download_bytes: int max_download_bytes: int account_creation_date: str expiration_days: int @property def total_usage(self) -> int: return self.upload_bytes + self.download_bytes @property def expiration_timestamp(self) -> int: if not self.account_creation_date or self.expiration_days <= 0: return 0 creation_timestamp = int(time.mktime(time.strptime(self.account_creation_date, "%Y-%m-%d"))) return creation_timestamp + (self.expiration_days * 24 * 3600) @property def expiration_date(self) -> str: if not self.account_creation_date or self.expiration_days <= 0: return "N/A" creation_timestamp = int(time.mktime(time.strptime(self.account_creation_date, "%Y-%m-%d"))) expiration_timestamp = creation_timestamp + (self.expiration_days * 24 * 3600) return time.strftime("%Y-%m-%d", time.localtime(expiration_timestamp)) @property def usage_human_readable(self) -> str: total = Utils.human_readable_bytes(self.max_download_bytes) used = Utils.human_readable_bytes(self.total_usage) return f"{used} / {total}" @property def usage_detailed(self) -> str: total = Utils.human_readable_bytes(self.max_download_bytes) upload = Utils.human_readable_bytes(self.upload_bytes) download = Utils.human_readable_bytes(self.download_bytes) return f"Upload: {upload}, Download: {download}, Total: {total}" @dataclass class TemplateContext: username: str usage: str usage_raw: str expiration_date: str sublink_qrcode: str ipv4_qrcode: Optional[str] ipv6_qrcode: Optional[str] sub_link: str ipv4_uri: Optional[str] ipv6_uri: Optional[str] class Utils: @staticmethod def sanitize_input(value: str, pattern: str) -> str: if not re.match(pattern, value): raise ValueError(f"Invalid value: {value}") return shlex.quote(value) @staticmethod def generate_qrcode_base64(data: str) -> str: if not data: return None qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4) qr.add_data(data) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffered = BytesIO() img.save(buffered, format="PNG") return "data:image/png;base64," + base64.b64encode(buffered.getvalue()).decode() @staticmethod def human_readable_bytes(bytes_value: int) -> str: units = ["Bytes", "KB", "MB", "GB", "TB"] size = float(bytes_value) for unit in units: if size < 1024: return f"{size:.2f} {unit}" size /= 1024 return f"{size:.2f} PB" @staticmethod def build_url(base: str, path: str) -> str: return urljoin(base, path) @staticmethod def is_valid_url(url: str) -> bool: """Checks if the given string is a valid URL.""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except ValueError: return False class HysteriaCLI: def __init__(self, cli_path: str): self.cli_path = cli_path def _run_command(self, args: List[str]) -> str: try: command = ['python3', self.cli_path] + args return subprocess.check_output(command, stderr=subprocess.DEVNULL, text=True).strip() except subprocess.CalledProcessError as e: print(f"Hysteria CLI error: {e}") raise def get_user_info(self, username: str) -> UserInfo: raw_info = json.loads(self._run_command(['get-user', '-u', username])) return UserInfo( username=username, upload_bytes=raw_info.get('upload_bytes', 0), download_bytes=raw_info.get('download_bytes', 0), max_download_bytes=raw_info.get('max_download_bytes', 0), account_creation_date=raw_info.get('account_creation_date', ''), expiration_days=raw_info.get('expiration_days', 0) ) def get_user_uri(self, username: str, ip_version: Optional[str] = None) -> str: if ip_version: return self._run_command(['show-user-uri', '-u', username, '-ip', ip_version]) else: return self._run_command(['show-user-uri', '-u', username, '-a']) def get_uris(self, username: str) -> Tuple[Optional[str], Optional[str]]: output = self._run_command(['show-user-uri', '-u', username, '-a']) ipv4_uri = re.search(r'IPv4:\s*(.*)', output) ipv6_uri = re.search(r'IPv6:\s*(.*)', output) return (ipv4_uri.group(1).strip() if ipv4_uri else None, ipv6_uri.group(1).strip() if ipv6_uri else None) class UriParser: @staticmethod def extract_uri_components(uri: Optional[str], prefix: str) -> Optional[UriComponents]: if not uri or not uri.startswith(prefix): return None uri = uri[len(prefix):].strip() try: decoded_uri = unquote(uri) parsed_url = urlparse(decoded_uri) query_params = parse_qs(parsed_url.query) hostname = parsed_url.hostname if hostname and hostname.startswith('[') and hostname.endswith(']'): hostname = hostname[1:-1] port = parsed_url.port if parsed_url.port is not None else None return UriComponents( username=parsed_url.username, password=parsed_url.password, ip=hostname, port=port, obfs_password=query_params.get('obfs-password', [''])[0] ) except Exception as e: print(f"Error during URI parsing: {e}, URI: {uri}") return None class SingboxConfigGenerator: def __init__(self, hysteria_cli: HysteriaCLI, default_sni: str): self.hysteria_cli = hysteria_cli self.default_sni = default_sni self._template_cache = None self.template_path = None def set_template_path(self, path: str): self.template_path = path self._template_cache = None def get_template(self) -> Dict[str, Any]: if self._template_cache is None: try: with open(self.template_path, 'r') as f: self._template_cache = json.load(f) except (FileNotFoundError, json.JSONDecodeError, IOError) as e: raise RuntimeError(f"Error loading Singbox template: {e}") from e return self._template_cache.copy() def generate_config(self, username: str, ip_version: str, fragment: str) -> Optional[Dict[str, Any]]: try: uri = self.hysteria_cli.get_user_uri(username, ip_version) except Exception: print(f"Failed to get URI for {username} with IP version {ip_version}. Skipping.") return None if not uri: print(f"No URI found for {username} with IP version {ip_version}. Skipping.") return None components = UriParser.extract_uri_components(uri, f'IPv{ip_version}:') if components is None or components.port is None: print(f"Invalid URI components for {username} with IP version {ip_version}. Skipping.") return None return { "outbounds": [{ "type": "hysteria2", "tag": f"{username}-Hysteria2", "server": components.ip, "server_port": components.port, "obfs": { "type": "salamander", "password": components.obfs_password }, "password": f"{username}:{components.password}", "tls": { "enabled": True, "server_name": fragment if fragment else self.default_sni, "insecure": True } }] } def combine_configs(self, username: str, config_v4: Optional[Dict[str, Any]], config_v6: Optional[Dict[str, Any]]) -> Dict[str, Any]: combined_config = self.get_template() combined_config['outbounds'] = [outbound for outbound in combined_config['outbounds'] if outbound.get('type') != 'hysteria2'] modified_v4_outbounds = [] if config_v4: v4_outbound = config_v4['outbounds'][0] v4_outbound['tag'] = f"{username}-IPv4" modified_v4_outbounds.append(v4_outbound) modified_v6_outbounds = [] if config_v6: v6_outbound = config_v6['outbounds'][0] v6_outbound['tag'] = f"{username}-IPv6" modified_v6_outbounds.append(v6_outbound) select_outbounds = ["auto"] if config_v4: select_outbounds.append(f"{username}-IPv4") if config_v6: select_outbounds.append(f"{username}-IPv6") auto_outbounds = [] if config_v4: auto_outbounds.append(f"{username}-IPv4") if config_v6: auto_outbounds.append(f"{username}-IPv6") for outbound in combined_config['outbounds']: if outbound.get('tag') == 'select': outbound['outbounds'] = select_outbounds elif outbound.get('tag') == 'auto': outbound['outbounds'] = auto_outbounds combined_config['outbounds'].extend(modified_v4_outbounds + modified_v6_outbounds) return combined_config class SubscriptionManager: def __init__(self, hysteria_cli: HysteriaCLI, config: AppConfig): self.hysteria_cli = hysteria_cli self.config = config def get_normal_subscription(self, username: str, user_agent: str) -> str: user_info = self.hysteria_cli.get_user_info(username) ipv4_uri, ipv6_uri = self.hysteria_cli.get_uris(username) output_lines = [uri for uri in [ipv4_uri, ipv6_uri] if uri] if not output_lines: return "No URI available" processed_uris = [] for uri in output_lines: if "v2ray" in user_agent and "ng" in user_agent: match = re.search(r'pinSHA256=sha256/([^&]+)', uri) if match: decoded = base64.b64decode(match.group(1)) formatted = ":".join("{:02X}".format(byte) for byte in decoded) uri = uri.replace(f'pinSHA256=sha256/{match.group(1)}', f'pinSHA256={formatted}') processed_uris.append(uri) subscription_info = ( f"//subscription-userinfo: upload={user_info.upload_bytes}; " f"download={user_info.download_bytes}; " f"total={user_info.max_download_bytes}; " f"expire={user_info.expiration_timestamp}\n" ) profile_lines = f"//profile-title: {username}-Hysteria2 🚀\n//profile-update-interval: 1\n" return profile_lines + subscription_info + "\n".join(processed_uris) class TemplateRenderer: def __init__(self, template_dir: str, config: AppConfig): self.env = Environment(loader=FileSystemLoader(template_dir), autoescape=True) self.html_template = self.env.get_template('template.html') self.config = config def render(self, context: TemplateContext) -> str: return self.html_template.render(vars(context)) class HysteriaServer: def __init__(self): self.config = self._load_config() self.rate_limiter = RateLimiter(self.config.rate_limit, self.config.rate_limit_window) self.hysteria_cli = HysteriaCLI(self.config.hysteria_cli_path) self.singbox_generator = SingboxConfigGenerator(self.hysteria_cli, self.config.sni) self.singbox_generator.set_template_path(self.config.singbox_template_path) self.subscription_manager = SubscriptionManager(self.hysteria_cli, self.config) self.template_renderer = TemplateRenderer(self.config.template_dir, self.config) self.app = web.Application(middlewares=[self._rate_limit_middleware]) safe_subpath = self.validate_and_escape_subpath(self.config.subpath) self.app.add_routes([ web.get(f'/{safe_subpath}/sub/normal/{{username}}', self.handle) ]) self.app.router.add_route('*', f'/{safe_subpath}/{{tail:.*}}', self.handle_404) self.app.router.add_route('*', '/{tail:.*}', self.handle_generic_404) def _load_config(self) -> AppConfig: domain = os.getenv('HYSTERIA_DOMAIN', 'localhost') cert_file = os.getenv('HYSTERIA_CERTFILE') key_file = os.getenv('HYSTERIA_KEYFILE') port = int(os.getenv('HYSTERIA_PORT', '3326')) subpath = os.getenv('SUBPATH', '').strip().strip("/") if not self.is_valid_subpath(subpath): raise ValueError(f"Invalid SUBPATH: '{subpath}'. Subpath must contain only alphanumeric characters, hyphens, and underscores.") sni_file = '/etc/hysteria/.configs.env' singbox_template_path = '/etc/hysteria/core/scripts/normalsub/singbox.json' hysteria_cli_path = '/etc/hysteria/core/cli.py' rate_limit = 100 rate_limit_window = 60 template_dir = os.path.dirname(__file__) sni = self._load_sni_from_env(sni_file) return AppConfig(domain=domain, cert_file=cert_file, key_file=key_file, port=port, sni_file=sni_file, singbox_template_path=singbox_template_path, hysteria_cli_path=hysteria_cli_path, rate_limit=rate_limit, rate_limit_window=rate_limit_window, sni=sni, template_dir=template_dir, subpath=subpath) def _load_sni_from_env(self, sni_file: str) -> str: try: with open(sni_file, 'r') as f: for line in f: if line.startswith('SNI='): return line.strip().split('=')[1] except FileNotFoundError: print("Warning: SNI file not found. Using default SNI.") return "bts.com" def is_valid_subpath(self, subpath: str) -> bool: """Validates the subpath using a regex.""" return bool(re.match(r"^[a-zA-Z0-9_-]+$", subpath)) def validate_and_escape_subpath(self, subpath: str) -> str: """Validates the subpath and returns the escaped version.""" if not self.is_valid_subpath(subpath): raise ValueError(f"Invalid subpath: {subpath}") return re.escape(subpath) @middleware async def _rate_limit_middleware(self, request: web.Request, handler): client_ip = request.headers.get('X-Forwarded-For', request.headers.get('X-Real-IP', request.remote)) if not self.rate_limiter.check_limit(client_ip): return web.Response(status=429, text="Rate limit exceeded.") return await handler(request) async def handle(self, request: web.Request) -> web.Response: try: username = Utils.sanitize_input(request.match_info.get('username', ''), r'^[a-zA-Z0-9_-]+$') if not username: return web.Response(status=400, text="Error: Missing 'username' parameter.") user_agent = request.headers.get('User-Agent', '').lower() if any(browser in user_agent for browser in ['chrome', 'firefox', 'safari', 'edge', 'opera']): return await self._handle_html(request, username) fragment = request.query.get('fragment', '') if not user_agent.startswith('hiddifynext') and ('singbox' in user_agent or 'sing' in user_agent): return await self._handle_singbox(username, fragment) return await self._handle_normalsub(request, username) except ValueError as e: return web.Response(status=400, text=f"Error: {e}") except Exception as e: print(f"Internal Server Error: {e}") return web.Response(status=500, text="Error: Internal server error") async def _handle_html(self, request: web.Request, username: str) -> web.Response: context = await self._get_template_context(username) return web.Response(text=self.template_renderer.render(context), content_type='text/html') async def _handle_singbox(self, username: str, fragment: str) -> web.Response: config_v4 = self.singbox_generator.generate_config(username, '4', fragment) config_v6 = self.singbox_generator.generate_config(username, '6', fragment) if config_v4 is None and config_v6 is None: return web.Response(status=404, text=f"Error: No valid URIs found for user {username}.") combined_config = self.singbox_generator.combine_configs(username, config_v4, config_v6) return web.Response(text=json.dumps(combined_config, indent=4, sort_keys=True), content_type='application/json') async def _handle_normalsub(self, request: web.Request, username: str) -> web.Response: user_agent = request.headers.get('User-Agent', '').lower() subscription = self.subscription_manager.get_normal_subscription(username, user_agent) return web.Response(text=subscription, content_type='text/plain') async def _get_template_context(self, username: str) -> TemplateContext: user_info = self.hysteria_cli.get_user_info(username) ipv4_uri, ipv6_uri = self.hysteria_cli.get_uris(username) base_url = f"https://{self.config.domain}:{self.config.port}" if not Utils.is_valid_url(base_url): raise ValueError(f"Invalid base URL constructed: {base_url}") sub_link = f"{base_url}/{self.config.subpath}/sub/normal/{username}" ipv4_qrcode = Utils.generate_qrcode_base64(ipv4_uri) ipv6_qrcode = Utils.generate_qrcode_base64(ipv6_uri) sublink_qrcode = Utils.generate_qrcode_base64(sub_link) return TemplateContext( username=username, usage=user_info.usage_human_readable, usage_raw=user_info.usage_detailed, expiration_date=user_info.expiration_date, sublink_qrcode=sublink_qrcode, ipv4_qrcode=ipv4_qrcode, ipv6_qrcode=ipv6_qrcode, sub_link=sub_link, ipv4_uri=ipv4_uri, ipv6_uri=ipv6_uri ) async def handle_404(self, request: web.Request) -> web.Response: """Handles 404 Not Found errors *within* the subpath.""" print(f"404 Not Found (within subpath): {request.path}") return web.Response(status=404, text="Not Found within Subpath") async def handle_generic_404(self, request: web.Request) -> web.Response: """Handles 404 Not Found errors *outside* the subpath.""" print(f"404 Not Found (generic): {request.path}") return web.Response(status=404, text="Not Found") def run(self): ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain(certfile=self.config.cert_file, keyfile=self.config.key_file) ssl_context.set_ciphers('ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384') web.run_app(self.app, port=self.config.port, ssl_context=ssl_context) if __name__ == '__main__': server = HysteriaServer() server.run()