import logging import os import ssl import json import subprocess import re import time import shlex import base64 from typing import Dict, List, Optional, Tuple, Any, Union from dataclasses import dataclass from io import BytesIO from aiohttp import web from aiohttp.web_middlewares import middleware from urllib.parse import unquote, parse_qs, urlparse from dotenv import load_dotenv import qrcode from jinja2 import Environment, FileSystemLoader load_dotenv() # logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") @dataclass class AppConfig: """Application configuration settings""" domain: str cert_file: str key_file: str port: int sni_file: str singbox_template_path: str hysteria_cli_path: str rate_limit: int rate_limit_window: int sni: str template_dir: str class RateLimiter: """Handles rate limiting for requests""" def __init__(self, limit: int, window: int): self.limit = limit self.window = window self.store: Dict[str, Tuple[int, float]] = {} def check_limit(self, client_ip: str) -> bool: """Checks if a client has exceeded their rate limit Returns: bool: True if rate limit not exceeded, False otherwise """ current_time = time.monotonic() requests, last_request_time = self.store.get(client_ip, (0, 0)) if current_time - last_request_time < self.window: if requests >= self.limit: return False else: requests = 0 self.store[client_ip] = (requests + 1, current_time) return True @dataclass class UriComponents: """Components extracted from a Hysteria2 URI""" username: Optional[str] password: Optional[str] ip: Optional[str] port: Optional[int] obfs_password: str @dataclass class UserInfo: """User information and statistics""" username: str upload_bytes: int download_bytes: int max_download_bytes: int account_creation_date: str expiration_days: int @property def total_usage(self) -> int: """Total bandwidth usage""" return self.upload_bytes + self.download_bytes @property def expiration_timestamp(self) -> int: """Unix timestamp when account expires""" if not self.account_creation_date or self.expiration_days <= 0: return 0 creation_timestamp = int(time.mktime(time.strptime(self.account_creation_date, "%Y-%m-%d"))) return creation_timestamp + (self.expiration_days * 24 * 3600) @property def expiration_date(self) -> str: """Formatted expiration date string""" if not self.account_creation_date or self.expiration_days <= 0: return "N/A" creation_timestamp = int(time.mktime(time.strptime(self.account_creation_date, "%Y-%m-%d"))) expiration_timestamp = creation_timestamp + (self.expiration_days * 24 * 3600) return time.strftime("%Y-%m-%d", time.localtime(expiration_timestamp)) @property def usage_human_readable(self) -> str: """Human readable string of usage""" total = Utils.human_readable_bytes(self.max_download_bytes) used = Utils.human_readable_bytes(self.total_usage) return f"{used} / {total}" @property def usage_detailed(self) -> str: """Detailed usage breakdown""" total = Utils.human_readable_bytes(self.max_download_bytes) upload = Utils.human_readable_bytes(self.upload_bytes) download = Utils.human_readable_bytes(self.download_bytes) return f"Upload: {upload}, Download: {download}, Total: {total}" @dataclass class TemplateContext: """Context for HTML template rendering""" username: str usage: str usage_raw: str expiration_date: str sublink_qrcode: str ipv4_qrcode: Optional[str] ipv6_qrcode: Optional[str] sub_link: str ipv4_uri: Optional[str] ipv6_uri: Optional[str] class Utils: """Utility functions""" @staticmethod def sanitize_input(value: str, pattern: str) -> str: """Sanitizes input using a regex pattern and quotes it for shell commands""" if not re.match(pattern, value): raise ValueError(f"Invalid value: {value}") return shlex.quote(value) @staticmethod def generate_qrcode_base64(data: str) -> str: """Generates a base64-encoded PNG QR code image""" if not data: return None qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(data) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffered = BytesIO() img.save(buffered, format="PNG") return "data:image/png;base64," + base64.b64encode(buffered.getvalue()).decode() @staticmethod def human_readable_bytes(bytes_value: int) -> str: """Converts bytes to a human-readable string (KB, MB, GB, etc.)""" units = ["Bytes", "KB", "MB", "GB", "TB"] size = float(bytes_value) for unit in units: if size < 1024: return f"{size:.2f} {unit}" size /= 1024 return f"{size:.2f} PB" class HysteriaCLI: """Interface for Hysteria CLI commands""" def __init__(self, cli_path: str): self.cli_path = cli_path def _run_command(self, args: List[str]) -> str: """Runs the hysteria CLI with the given arguments and returns the output""" try: command = ['python3', self.cli_path] + args return subprocess.check_output(command, stderr=subprocess.DEVNULL, text=True).strip() except subprocess.CalledProcessError as e: print(f"Hysteria CLI error: {e}") # Log the error raise def get_user_info(self, username: str) -> UserInfo: """Retrieves user information""" raw_info = json.loads(self._run_command(['get-user', '-u', username])) return UserInfo( username=username, upload_bytes=raw_info.get('upload_bytes', 0), download_bytes=raw_info.get('download_bytes', 0), max_download_bytes=raw_info.get('max_download_bytes', 0), account_creation_date=raw_info.get('account_creation_date', ''), expiration_days=raw_info.get('expiration_days', 0) ) def get_user_uri(self, username: str, ip_version: Optional[str] = None) -> str: """Gets the URI for a user, optionally specifying IP version""" if ip_version: return self._run_command(['show-user-uri', '-u', username, '-ip', ip_version]) else: output = self._run_command(['show-user-uri', '-u', username, '-a']) return output def get_uris(self, username: str) -> Tuple[Optional[str], Optional[str]]: """Retrieves IPv4 and IPv6 URIs for a user""" output = self._run_command(['show-user-uri', '-u', username, '-a']) ipv4_uri = re.search(r'IPv4:\s*(.*)', output) ipv6_uri = re.search(r'IPv6:\s*(.*)', output) return (ipv4_uri.group(1).strip() if ipv4_uri else None, ipv6_uri.group(1).strip() if ipv6_uri else None) class UriParser: """Parser for Hysteria2 URIs""" @staticmethod def extract_uri_components(uri: Optional[str], prefix: str) -> Optional[UriComponents]: """Extracts components from a Hysteria2 URI""" if not uri or not uri.startswith(prefix): return None uri = uri[len(prefix):].strip() try: decoded_uri = unquote(uri) parsed_url = urlparse(decoded_uri) query_params = parse_qs(parsed_url.query) hostname = parsed_url.hostname if hostname and hostname.startswith('[') and hostname.endswith(']'): hostname = hostname[1:-1] port = None if parsed_url.port is not None: try: port = int(parsed_url.port) except ValueError: print(f"Warning: Invalid port in URI: {parsed_url.port}") return UriComponents( username=parsed_url.username, password=parsed_url.password, ip=hostname, port=port, obfs_password=query_params.get('obfs-password', [''])[0] ) except Exception as e: print(f"Error during URI parsing: {e}, URI: {uri}") return None class SingboxConfigGenerator: """Generator for Sing-box configurations""" def __init__(self, hysteria_cli: HysteriaCLI, default_sni: str): self.hysteria_cli = hysteria_cli self.default_sni = default_sni self._template_cache = None self.template_path = None def set_template_path(self, path: str): """Sets the path to the template file""" self.template_path = path self._template_cache = None def get_template(self) -> Dict[str, Any]: """Loads and caches the singbox template""" if self._template_cache is None: try: with open(self.template_path, 'r') as f: self._template_cache = json.load(f) except (FileNotFoundError, json.JSONDecodeError, IOError) as e: raise RuntimeError(f"Error loading Singbox template: {e}") from e return self._template_cache.copy() def generate_config(self, username: str, ip_version: str, fragment: str) -> Optional[Dict[str, Any]]: """Generates a Sing-box outbound configuration for a given user and IP version""" try: uri = self.hysteria_cli.get_user_uri(username, ip_version) except Exception: print(f"Warning: Failed to get URI for {username} with IP version {ip_version}. Skipping.") return None if not uri: print(f"Warning: No URI found for {username} with IP version {ip_version}. Skipping.") return None components = UriParser.extract_uri_components(uri, f'IPv{ip_version}:') if components is None or components.port is None: print(f"Warning: Invalid URI components for {username} with IP version {ip_version}. Skipping.") return None return { "outbounds": [{ "type": "hysteria2", "tag": f"{username}-Hysteria2", "server": components.ip, "server_port": components.port, "obfs": { "type": "salamander", "password": components.obfs_password }, "password": f"{username}:{components.password}", "tls": { "enabled": True, "server_name": fragment if fragment else self.default_sni, "insecure": True } }] } def combine_configs(self, username: str, config_v4: Optional[Dict[str, Any]], config_v6: Optional[Dict[str, Any]]) -> Dict[str, Any]: """Combines IPv4 and IPv6 configurations into a single config""" combined_config = self.get_template() combined_config['outbounds'] = [ outbound for outbound in combined_config['outbounds'] if outbound.get('type') != 'hysteria2' ] modified_v4_outbounds = [] if config_v4: v4_outbound = config_v4['outbounds'][0] v4_outbound['tag'] = f"{username}-IPv4" modified_v4_outbounds = [v4_outbound] modified_v6_outbounds = [] if config_v6: v6_outbound = config_v6['outbounds'][0] v6_outbound['tag'] = f"{username}-IPv6" modified_v6_outbounds = [v6_outbound] select_outbounds = ["auto"] if config_v4: select_outbounds.append(f"{username}-IPv4") if config_v6: select_outbounds.append(f"{username}-IPv6") auto_outbounds = [] if config_v4: auto_outbounds.append(f"{username}-IPv4") if config_v6: auto_outbounds.append(f"{username}-IPv6") for outbound in combined_config['outbounds']: if outbound.get('tag') == 'select': outbound['outbounds'] = select_outbounds elif outbound.get('tag') == 'auto': outbound['outbounds'] = auto_outbounds combined_config['outbounds'].extend(modified_v4_outbounds + modified_v6_outbounds) return combined_config class SubscriptionManager: """Handles user subscription generation""" def __init__(self, hysteria_cli: HysteriaCLI, config: AppConfig): self.hysteria_cli = hysteria_cli self.config = config def get_normal_subscription(self, username: str, user_agent: str) -> str: """Generates the user URI for normal subscriptions""" user_info = self.hysteria_cli.get_user_info(username) ipv4_uri, ipv6_uri = self.hysteria_cli.get_uris(username) output_lines = [uri for uri in [ipv4_uri, ipv6_uri] if uri] if not output_lines: return "No URI available" processed_uris = [] for uri in output_lines: if "v2ray" in user_agent and "ng" in user_agent: match = re.search(r'pinSHA256=sha256/([^&]+)', uri) if match: decoded = base64.b64decode(match.group(1)) formatted = ":".join("{:02X}".format(byte) for byte in decoded) uri = uri.replace(f'pinSHA256=sha256/{match.group(1)}', f'pinSHA256={formatted}') processed_uris.append(uri) subscription_info = ( f"//subscription-userinfo: upload={user_info.upload_bytes}; " f"download={user_info.download_bytes}; " f"total={user_info.max_download_bytes}; " f"expire={user_info.expiration_timestamp}\n" ) profile_lines = f"//profile-title: {username}-Hysteria2 🚀\n//profile-update-interval: 1\n" return profile_lines + subscription_info + "\n".join(processed_uris) class TemplateRenderer: """Handles HTML template rendering""" def __init__(self, template_dir: str, config: AppConfig): self.env = Environment(loader=FileSystemLoader(template_dir), autoescape=True) self.html_template = self.env.get_template('template.html') self.config = config def render(self, context: TemplateContext) -> str: """Renders the HTML template with the given context""" return self.html_template.render(vars(context)) class HysteriaServer: """Main application server class""" def __init__(self): self.config = self._load_config() self.rate_limiter = RateLimiter(self.config.rate_limit, self.config.rate_limit_window) self.hysteria_cli = HysteriaCLI(self.config.hysteria_cli_path) self.singbox_generator = SingboxConfigGenerator(self.hysteria_cli, self.config.sni) self.singbox_generator.set_template_path(self.config.singbox_template_path) self.subscription_manager = SubscriptionManager(self.hysteria_cli, self.config) self.template_renderer = TemplateRenderer(self.config.template_dir, self.config) self.app = web.Application(middlewares=[self._rate_limit_middleware]) self.app.add_routes([web.get('/sub/normal/{username}', self.handle)]) self.app.router.add_route('*', '/sub/normal/{tail:.*}', self.handle_404) def _load_config(self) -> AppConfig: """Loads application configuration from environment variables""" domain = os.getenv('HYSTERIA_DOMAIN', 'localhost') cert_file = os.getenv('HYSTERIA_CERTFILE') key_file = os.getenv('HYSTERIA_KEYFILE') port = int(os.getenv('HYSTERIA_PORT', '3326')) sni_file = '/etc/hysteria/.configs.env' singbox_template_path = '/etc/hysteria/core/scripts/normalsub/singbox.json' hysteria_cli_path = '/etc/hysteria/core/cli.py' rate_limit = 100 rate_limit_window = 60 template_dir = os.path.dirname(__file__) sni = self._load_sni_from_env(sni_file) return AppConfig( domain=domain, cert_file=cert_file, key_file=key_file, port=port, sni_file=sni_file, singbox_template_path=singbox_template_path, hysteria_cli_path=hysteria_cli_path, rate_limit=rate_limit, rate_limit_window=rate_limit_window, sni=sni, template_dir=template_dir ) def _load_sni_from_env(self, sni_file: str) -> str: """Loads SNI configuration from the environment file""" try: with open(sni_file, 'r') as f: for line in f: if line.startswith('SNI='): return line.strip().split('=')[1] except FileNotFoundError: print("Warning: SNI file not found. Using default SNI.") return "bts.com" @middleware async def _rate_limit_middleware(self, request: web.Request, handler): """Middleware for rate limiting requests""" client_ip = request.headers.get('X-Forwarded-For', request.headers.get('X-Real-IP', request.remote)) if not self.rate_limiter.check_limit(client_ip): return web.Response(status=429, text="Rate limit exceeded.") return await handler(request) async def handle(self, request: web.Request) -> web.Response: """Main request handler""" try: username = Utils.sanitize_input(request.match_info.get('username', ''), r'^[a-zA-Z0-9_-]+$') if not username: return web.Response(status=400, text="Error: Missing 'username' parameter.") user_agent = request.headers.get('User-Agent', '').lower() if any(browser in user_agent for browser in ['chrome', 'firefox', 'safari', 'edge', 'opera']): return await self._handle_html(request, username) else: fragment = request.query.get('fragment', '') if not user_agent.startswith('hiddifynext') and ('singbox' in user_agent or 'sing' in user_agent): return await self._handle_singbox(username, fragment) return await self._handle_normalsub(request, username) except ValueError as e: return web.Response(status=400, text=f"Error: {e}") except Exception as e: print(f"Internal Server Error: {e}") return web.Response(status=500, text="Error: Internal server error") async def _handle_html(self, request: web.Request, username: str) -> web.Response: """Handles requests for HTML output""" context = await self._get_template_context(username) rendered_html = self.template_renderer.render(context) return web.Response(text=rendered_html, content_type='text/html') async def _handle_singbox(self, username: str, fragment: str) -> web.Response: """Handles requests for Sing-box configuration""" config_v4 = self.singbox_generator.generate_config(username, '4', fragment) config_v6 = self.singbox_generator.generate_config(username, '6', fragment) if config_v4 is None and config_v6 is None: return web.Response(status=404, text=f"Error: No valid URIs found for user {username}.") combined_config = self.singbox_generator.combine_configs(username, config_v4, config_v6) return web.Response(text=json.dumps(combined_config, indent=4, sort_keys=True), content_type='application/json') async def _handle_normalsub(self, request: web.Request, username: str) -> web.Response: """Handles requests for normal subscription links""" user_agent = request.headers.get('User-Agent', '').lower() subscription = self.subscription_manager.get_normal_subscription(username, user_agent) return web.Response(text=subscription, content_type='text/plain') async def _get_template_context(self, username: str) -> TemplateContext: """Generates the context for HTML template rendering""" user_info = self.hysteria_cli.get_user_info(username) ipv4_uri, ipv6_uri = self.hysteria_cli.get_uris(username) sub_link = f"https://{self.config.domain}:{self.config.port}/sub/normal/{username}" ipv4_qrcode = Utils.generate_qrcode_base64(ipv4_uri) ipv6_qrcode = Utils.generate_qrcode_base64(ipv6_uri) sublink_qrcode = Utils.generate_qrcode_base64(sub_link) return TemplateContext( username=username, usage=user_info.usage_human_readable, usage_raw=user_info.usage_detailed, expiration_date=user_info.expiration_date, sublink_qrcode=sublink_qrcode, ipv4_qrcode=ipv4_qrcode, ipv6_qrcode=ipv6_qrcode, sub_link=sub_link, ipv4_uri=ipv4_uri, ipv6_uri=ipv6_uri ) async def handle_404(self, request: web.Request) -> web.Response: """Handles 404 Not Found errors""" print(f"404 Not Found: {request.path}") return web.Response(status=404, text="Not Found") def run(self): """Runs the web server""" ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain(certfile=self.config.cert_file, keyfile=self.config.key_file) ssl_context.set_ciphers('ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384') web.run_app(self.app, port=self.config.port, ssl_context=ssl_context) if __name__ == '__main__': server = HysteriaServer() server.run()