diff --git a/core/scripts/singbox/singbox.py b/core/scripts/singbox/singbox.py index 6d173bb..3b22879 100644 --- a/core/scripts/singbox/singbox.py +++ b/core/scripts/singbox/singbox.py @@ -3,8 +3,11 @@ import ssl import json import subprocess from aiohttp import web +from aiohttp.web_middlewares import middleware from urllib.parse import unquote, parse_qs import re +import time + from dotenv import load_dotenv load_dotenv() @@ -15,35 +18,66 @@ CERTFILE = os.getenv('CERTFILE') KEYFILE = os.getenv('KEYFILE') PORT = int(os.getenv('PORT', '3324')) +RATE_LIMIT = 100 +RATE_LIMIT_WINDOW = 60 + +rate_limit_store = {} + +@middleware +async def rate_limit_middleware(request, handler): + client_ip = request.headers.get('X-Forwarded-For', request.remote) + current_time = time.time() + + if client_ip in rate_limit_store: + requests, last_request_time = rate_limit_store[client_ip] + if current_time - last_request_time < RATE_LIMIT_WINDOW: + if requests >= RATE_LIMIT: + return web.Response(status=429, text="Rate limit exceeded.") + if current_time - last_request_time >= RATE_LIMIT_WINDOW: + rate_limit_store[client_ip] = (1, current_time) + else: + rate_limit_store[client_ip] = (requests + 1, last_request_time) + else: + rate_limit_store[client_ip] = (1, current_time) + + return await handler(request) + +def sanitize_input(value, pattern): + if not re.match(pattern, value): + raise ValueError(f"Invalid value: {value}") + return value + async def handle(request): - username = request.query.get('username') - ip_version = request.query.get('ip') - fragment = request.match_info.get('fragment', '') - - if not username: - return web.Response(status=400, text="Error: Missing 'username' parameter.") - - if not ip_version: - return web.Response(status=400, text="Error: Missing 'ip' parameter.") - - if ip_version not in ['4', '6']: - return web.Response(status=400, text="Error: Invalid 'ip' parameter. Must be '4' or '6'.") - try: + username = sanitize_input(request.match_info.get('username', ''), r'^[a-zA-Z0-9_-]+$') + ip_version = sanitize_input(request.match_info.get('ip_version', ''), r'^[46]$') + fragment = request.query.get('fragment', '') + + if not username: + return web.Response(status=400, text="Error: Missing 'username' parameter.") + + if not ip_version: + return web.Response(status=400, text="Error: Missing 'ip' parameter.") + + if ip_version not in ['4', '6']: + return web.Response(status=400, text="Error: Invalid 'ip' parameter. Must be '4' or '6'.") + config = generate_singbox_config(username, ip_version, fragment) config_json = json.dumps(config, indent=4, sort_keys=True) return web.Response(text=config_json, content_type='application/json') + except ValueError as e: + return web.Response(status=400, text=f"Error: {str(e)}") except Exception as e: - return web.Response(status=500, text=f"Error: {str(e)}") + print(f"Internal Server Error: {str(e)}") + return web.Response(status=500, text="Error: Internal server error.") def generate_singbox_config(username, ip_version, fragment): try: - uri = subprocess.check_output([ - 'python3', '/etc/hysteria/core/cli.py', 'show-user-uri', '-u', username, '-ip', ip_version - ]).decode().strip() - except subprocess.CalledProcessError as e: - raise RuntimeError(f"Failed to get URI: {e}") + command = ['python3', '/etc/hysteria/core/cli.py', 'show-user-uri', '-u', username, '-ip', ip_version] + uri = subprocess.check_output(command).decode().strip() + except subprocess.CalledProcessError: + raise RuntimeError("Failed to get URI.") if ip_version == '4': components = extract_uri_components(uri, 'IPv4:') @@ -77,7 +111,7 @@ def extract_uri_components(uri, prefix): match = pattern.match(decoded_uri) if not match: - raise ValueError(f"Could not parse URI: {decoded_uri}") + raise ValueError("Could not parse URI.") username = match.group(1) password = match.group(2) @@ -104,12 +138,19 @@ def load_singbox_template(): try: with open('/etc/hysteria/core/scripts/singbox/singbox.json', 'r') as f: return json.load(f) - except IOError as e: - raise RuntimeError(f"Failed to load template: {e}") + except IOError: + raise RuntimeError("Failed to load template.") + +async def handle_404(request): + print(f"404 Not Found: {request.path}") + return web.Response(status=404, text="Not Found") + if __name__ == '__main__': - app = web.Application() - app.add_routes([web.get('/sub/singbox/', handle)]) + app = web.Application(middlewares=[rate_limit_middleware]) + + app.add_routes([web.get('/sub/singbox/{username}/{ip_version}', handle)]) + app.router.add_route('*', '/sub/singbox/{tail:.*}', handle_404) ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)