Combined singbox with normal sub

This commit is contained in:
Whispering Wind
2025-02-26 23:41:24 +03:30
committed by GitHub
parent f3f16cf63e
commit 2839ed8684

View File

@ -1,61 +1,159 @@
import logging
import os
import ssl
import subprocess
import time
import re
import shlex
import json
import subprocess
import re
import time
import shlex
import base64
import hashlib
import qrcode
from typing import Dict, List, Optional, Tuple, Any, Union
from dataclasses import dataclass
from io import BytesIO
from aiohttp import web
from aiohttp.web_middlewares import middleware
from urllib.parse import unquote, parse_qs, urlparse
from dotenv import load_dotenv
import qrcode
from jinja2 import Environment, FileSystemLoader
load_dotenv()
CERTFILE = os.getenv('HYSTERIA_CERTFILE')
KEYFILE = os.getenv('HYSTERIA_KEYFILE')
PORT = int(os.getenv('HYSTERIA_PORT', '3325'))
DOMAIN = os.getenv('HYSTERIA_DOMAIN', 'localhost')
RATE_LIMIT = 100
RATE_LIMIT_WINDOW = 60
rate_limit_store = {}
TEMPLATE_DIR = os.path.join(os.path.dirname(__file__))
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template('template.html')
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
@middleware
async def rate_limit_middleware(request, handler):
client_ip = request.headers.get('X-Forwarded-For', request.remote)
current_time = time.time()
@dataclass
class AppConfig:
"""Application configuration settings"""
domain: str
cert_file: str
key_file: str
port: int
sni_file: str
singbox_template_path: str
hysteria_cli_path: str
rate_limit: int
rate_limit_window: int
sni: str
template_dir: str
if client_ip in rate_limit_store:
requests, last_request_time = rate_limit_store[client_ip]
if current_time - last_request_time < RATE_LIMIT_WINDOW:
if requests >= RATE_LIMIT:
return web.Response(status=429, text="Rate limit exceeded.")
if current_time - last_request_time >= RATE_LIMIT_WINDOW:
rate_limit_store[client_ip] = (1, current_time)
class RateLimiter:
"""Handles rate limiting for requests"""
def __init__(self, limit: int, window: int):
self.limit = limit
self.window = window
self.store: Dict[str, Tuple[int, float]] = {}
def check_limit(self, client_ip: str) -> bool:
"""Checks if a client has exceeded their rate limit
Returns:
bool: True if rate limit not exceeded, False otherwise
"""
current_time = time.monotonic()
requests, last_request_time = self.store.get(client_ip, (0, 0))
if current_time - last_request_time < self.window:
if requests >= self.limit:
return False
else:
rate_limit_store[client_ip] = (requests + 1, last_request_time)
else:
rate_limit_store[client_ip] = (1, current_time)
requests = 0
return await handler(request)
self.store[client_ip] = (requests + 1, current_time)
return True
def sanitize_input(value, pattern):
@dataclass
class UriComponents:
"""Components extracted from a Hysteria2 URI"""
username: Optional[str]
password: Optional[str]
ip: Optional[str]
port: Optional[int]
obfs_password: str
@dataclass
class UserInfo:
"""User information and statistics"""
username: str
upload_bytes: int
download_bytes: int
max_download_bytes: int
account_creation_date: str
expiration_days: int
@property
def total_usage(self) -> int:
"""Total bandwidth usage"""
return self.upload_bytes + self.download_bytes
@property
def expiration_timestamp(self) -> int:
"""Unix timestamp when account expires"""
if not self.account_creation_date or self.expiration_days <= 0:
return 0
creation_timestamp = int(time.mktime(time.strptime(self.account_creation_date, "%Y-%m-%d")))
return creation_timestamp + (self.expiration_days * 24 * 3600)
@property
def expiration_date(self) -> str:
"""Formatted expiration date string"""
if not self.account_creation_date or self.expiration_days <= 0:
return "N/A"
creation_timestamp = int(time.mktime(time.strptime(self.account_creation_date, "%Y-%m-%d")))
expiration_timestamp = creation_timestamp + (self.expiration_days * 24 * 3600)
return time.strftime("%Y-%m-%d", time.localtime(expiration_timestamp))
@property
def usage_human_readable(self) -> str:
"""Human readable string of usage"""
total = Utils.human_readable_bytes(self.max_download_bytes)
used = Utils.human_readable_bytes(self.total_usage)
return f"{used} / {total}"
@property
def usage_detailed(self) -> str:
"""Detailed usage breakdown"""
total = Utils.human_readable_bytes(self.max_download_bytes)
upload = Utils.human_readable_bytes(self.upload_bytes)
download = Utils.human_readable_bytes(self.download_bytes)
return f"Upload: {upload}, Download: {download}, Total: {total}"
@dataclass
class TemplateContext:
"""Context for HTML template rendering"""
username: str
usage: str
usage_raw: str
expiration_date: str
sublink_qrcode: str
ipv4_qrcode: Optional[str]
ipv6_qrcode: Optional[str]
sub_link: str
ipv4_uri: Optional[str]
ipv6_uri: Optional[str]
class Utils:
"""Utility functions"""
@staticmethod
def sanitize_input(value: str, pattern: str) -> str:
"""Sanitizes input using a regex pattern and quotes it for shell commands"""
if not re.match(pattern, value):
raise ValueError(f"Invalid value: {value}")
return value
return shlex.quote(value)
@staticmethod
def generate_qrcode_base64(data: str) -> str:
"""Generates a base64-encoded PNG QR code image"""
if not data:
return None
def generate_qrcode_base64(data):
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
@ -64,18 +162,14 @@ def generate_qrcode_base64(data):
)
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffered = BytesIO()
img.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
return f"data:image/png;base64,{img_base64}"
return "data:image/png;base64," + base64.b64encode(buffered.getvalue()).decode()
def human_readable_bytes(bytes_value):
"""
Converts bytes to a human-readable format (KB, MB, GB, TB).
"""
@staticmethod
def human_readable_bytes(bytes_value: int) -> str:
"""Converts bytes to a human-readable string (KB, MB, GB, etc.)"""
units = ["Bytes", "KB", "MB", "GB", "TB"]
size = float(bytes_value)
for unit in units:
@ -85,134 +179,202 @@ def human_readable_bytes(bytes_value):
return f"{size:.2f} PB"
async def handle(request):
class HysteriaCLI:
"""Interface for Hysteria CLI commands"""
def __init__(self, cli_path: str):
self.cli_path = cli_path
def _run_command(self, args: List[str]) -> str:
"""Runs the hysteria CLI with the given arguments and returns the output"""
try:
username = sanitize_input(request.match_info.get('username', ''), r'^[a-zA-Z0-9_-]+$')
if not username:
return web.Response(status=400, text="Error: Missing 'username' parameter.")
user_agent = request.headers.get('User-Agent', '').lower()
if 'text/html' in request.headers.get('Accept', ''):
context = await get_template_context(username, user_agent)
html_output = template.render(context)
return web.Response(text=html_output, content_type='text/html')
else:
uri_output = get_user_uri(username, user_agent)
return web.Response(text=uri_output, content_type='text/plain')
except ValueError as e:
return web.Response(status=400, text=f"Error: {str(e)}")
except Exception as e:
print(f"Internal Server Error: {str(e)}")
return web.Response(status=500, text="Error: Internal server error.")
async def get_template_context(username, user_agent):
"""
Gathers all the data needed to render the HTML template.
"""
try:
user_info = get_user_info(username)
upload = user_info.get('upload_bytes', 0)
download = user_info.get('download_bytes', 0)
total_bytes = user_info.get('max_download_bytes', 0)
creation_date_str = user_info.get('account_creation_date', '')
expiration_days = user_info.get('expiration_days', 0)
if creation_date_str and expiration_days > 0:
try:
creation_date = time.strptime(creation_date_str, "%Y-%m-%d")
expiration_timestamp = int(time.mktime(creation_date)) + (expiration_days * 24 * 60 * 60)
expiration_date = time.strftime("%Y-%m-%d", time.localtime(expiration_timestamp))
except ValueError:
expiration_date = "Invalid Date"
else:
expiration_date = "N/A"
ipv4_uri, ipv6_uri = get_uris(username)
sub_link = f"https://{DOMAIN}:{PORT}/sub/normal/{username}"
ipv4_qrcode = generate_qrcode_base64(ipv4_uri) if ipv4_uri else None
ipv6_qrcode = generate_qrcode_base64(ipv6_uri) if ipv6_uri else None
sublink_qrcode = generate_qrcode_base64(sub_link)
total_human_readable = human_readable_bytes(total_bytes)
usage = f"{human_readable_bytes(upload + download)} / {total_human_readable}"
usage_raw = f"Upload: {human_readable_bytes(upload)}, Download: {human_readable_bytes(download)}, Total: {total_human_readable}" # For the tooltip
context = {
'username': username,
'usage': usage,
'usage_raw': usage_raw,
'expiration_date': expiration_date,
'sublink_qrcode': sublink_qrcode,
'ipv4_qrcode': ipv4_qrcode,
'ipv6_qrcode': ipv6_qrcode,
'sub_link': sub_link,
'ipv4_uri': ipv4_uri,
'ipv6_uri': ipv6_uri,
}
return context
except Exception as e:
print(f"Error in get_template_context: {e}")
raise
def get_user_info(username):
"""
Retrieves user information from the cli.py script.
"""
try:
user_info_command = [
'python3',
'/etc/hysteria/core/cli.py',
'get-user',
'-u', username
]
safe_user_info_command = [shlex.quote(arg) for arg in user_info_command]
user_info_output = subprocess.check_output(safe_user_info_command).decode()
user_info = json.loads(user_info_output)
return user_info
command = ['python3', self.cli_path] + args
return subprocess.check_output(command, stderr=subprocess.DEVNULL, text=True).strip()
except subprocess.CalledProcessError as e:
print(f"Error executing get-user command: {e}")
raise
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
print(f"Hysteria CLI error: {e}") # Log the error
raise
def get_user_uri(username, user_agent):
"""
Returns the URI for the user, adapting the output based on the User-Agent. Returns BOTH IPv4 and IPv6.
"""
try:
user_info = get_user_info(username)
upload = user_info.get('upload_bytes', 0)
download = user_info.get('download_bytes', 0)
total = user_info.get('max_download_bytes', 0)
creation_date_str = user_info.get('account_creation_date', '')
expiration_days = user_info.get('expiration_days', 0)
if creation_date_str and expiration_days > 0:
try:
creation_date = time.strptime(creation_date_str, "%Y-%m-%d")
expiration_timestamp = int(time.mktime(creation_date)) + (expiration_days * 24 * 60 * 60)
except ValueError:
expiration_timestamp = 0
def get_user_info(self, username: str) -> UserInfo:
"""Retrieves user information"""
raw_info = json.loads(self._run_command(['get-user', '-u', username]))
return UserInfo(
username=username,
upload_bytes=raw_info.get('upload_bytes', 0),
download_bytes=raw_info.get('download_bytes', 0),
max_download_bytes=raw_info.get('max_download_bytes', 0),
account_creation_date=raw_info.get('account_creation_date', ''),
expiration_days=raw_info.get('expiration_days', 0)
)
def get_user_uri(self, username: str, ip_version: Optional[str] = None) -> str:
"""Gets the URI for a user, optionally specifying IP version"""
if ip_version:
return self._run_command(['show-user-uri', '-u', username, '-ip', ip_version])
else:
expiration_timestamp = 0
output = self._run_command(['show-user-uri', '-u', username, '-a'])
return output
ipv4_uri, ipv6_uri = get_uris(username)
def get_uris(self, username: str) -> Tuple[Optional[str], Optional[str]]:
"""Retrieves IPv4 and IPv6 URIs for a user"""
output = self._run_command(['show-user-uri', '-u', username, '-a'])
ipv4_uri = re.search(r'IPv4:\s*(.*)', output)
ipv6_uri = re.search(r'IPv6:\s*(.*)', output)
return (ipv4_uri.group(1).strip() if ipv4_uri else None,
ipv6_uri.group(1).strip() if ipv6_uri else None)
output_lines = []
if ipv4_uri:
output_lines.append(ipv4_uri)
if ipv6_uri:
output_lines.append(ipv6_uri)
class UriParser:
"""Parser for Hysteria2 URIs"""
@staticmethod
def extract_uri_components(uri: Optional[str], prefix: str) -> Optional[UriComponents]:
"""Extracts components from a Hysteria2 URI"""
if not uri or not uri.startswith(prefix):
return None
uri = uri[len(prefix):].strip()
try:
decoded_uri = unquote(uri)
parsed_url = urlparse(decoded_uri)
query_params = parse_qs(parsed_url.query)
hostname = parsed_url.hostname
if hostname and hostname.startswith('[') and hostname.endswith(']'):
hostname = hostname[1:-1]
port = None
if parsed_url.port is not None:
try:
port = int(parsed_url.port)
except ValueError:
print(f"Warning: Invalid port in URI: {parsed_url.port}")
return UriComponents(
username=parsed_url.username,
password=parsed_url.password,
ip=hostname,
port=port,
obfs_password=query_params.get('obfs-password', [''])[0]
)
except Exception as e:
print(f"Error during URI parsing: {e}, URI: {uri}")
return None
class SingboxConfigGenerator:
"""Generator for Sing-box configurations"""
def __init__(self, hysteria_cli: HysteriaCLI, default_sni: str):
self.hysteria_cli = hysteria_cli
self.default_sni = default_sni
self._template_cache = None
self.template_path = None
def set_template_path(self, path: str):
"""Sets the path to the template file"""
self.template_path = path
self._template_cache = None
def get_template(self) -> Dict[str, Any]:
"""Loads and caches the singbox template"""
if self._template_cache is None:
try:
with open(self.template_path, 'r') as f:
self._template_cache = json.load(f)
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
raise RuntimeError(f"Error loading Singbox template: {e}") from e
return self._template_cache.copy()
def generate_config(self, username: str, ip_version: str, fragment: str) -> Optional[Dict[str, Any]]:
"""Generates a Sing-box outbound configuration for a given user and IP version"""
try:
uri = self.hysteria_cli.get_user_uri(username, ip_version)
except Exception:
print(f"Warning: Failed to get URI for {username} with IP version {ip_version}. Skipping.")
return None
if not uri:
print(f"Warning: No URI found for {username} with IP version {ip_version}. Skipping.")
return None
components = UriParser.extract_uri_components(uri, f'IPv{ip_version}:')
if components is None or components.port is None:
print(f"Warning: Invalid URI components for {username} with IP version {ip_version}. Skipping.")
return None
return {
"outbounds": [{
"type": "hysteria2",
"tag": f"{username}-Hysteria2",
"server": components.ip,
"server_port": components.port,
"obfs": {
"type": "salamander",
"password": components.obfs_password
},
"password": f"{username}:{components.password}",
"tls": {
"enabled": True,
"server_name": fragment if fragment else self.default_sni,
"insecure": True
}
}]
}
def combine_configs(self, username: str, config_v4: Optional[Dict[str, Any]],
config_v6: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Combines IPv4 and IPv6 configurations into a single config"""
combined_config = self.get_template()
combined_config['outbounds'] = [
outbound for outbound in combined_config['outbounds']
if outbound.get('type') != 'hysteria2'
]
modified_v4_outbounds = []
if config_v4:
v4_outbound = config_v4['outbounds'][0]
v4_outbound['tag'] = f"{username}-IPv4"
modified_v4_outbounds = [v4_outbound]
modified_v6_outbounds = []
if config_v6:
v6_outbound = config_v6['outbounds'][0]
v6_outbound['tag'] = f"{username}-IPv6"
modified_v6_outbounds = [v6_outbound]
select_outbounds = ["auto"]
if config_v4:
select_outbounds.append(f"{username}-IPv4")
if config_v6:
select_outbounds.append(f"{username}-IPv6")
auto_outbounds = []
if config_v4:
auto_outbounds.append(f"{username}-IPv4")
if config_v6:
auto_outbounds.append(f"{username}-IPv6")
for outbound in combined_config['outbounds']:
if outbound.get('tag') == 'select':
outbound['outbounds'] = select_outbounds
elif outbound.get('tag') == 'auto':
outbound['outbounds'] = auto_outbounds
combined_config['outbounds'].extend(modified_v4_outbounds + modified_v6_outbounds)
return combined_config
class SubscriptionManager:
"""Handles user subscription generation"""
def __init__(self, hysteria_cli: HysteriaCLI, config: AppConfig):
self.hysteria_cli = hysteria_cli
self.config = config
def get_normal_subscription(self, username: str, user_agent: str) -> str:
"""Generates the user URI for normal subscriptions"""
user_info = self.hysteria_cli.get_user_info(username)
ipv4_uri, ipv6_uri = self.hysteria_cli.get_uris(username)
output_lines = [uri for uri in [ipv4_uri, ipv6_uri] if uri]
if not output_lines:
return "No URI available"
@ -221,75 +383,186 @@ def get_user_uri(username, user_agent):
if "v2ray" in user_agent and "ng" in user_agent:
match = re.search(r'pinSHA256=sha256/([^&]+)', uri)
if match:
base64_pin = match.group(1)
try:
decoded_pin = base64.b64decode(base64_pin)
hex_pin = ':'.join(['{:02X}'.format(byte) for byte in decoded_pin])
uri = uri.replace(f'pinSHA256=sha256/{base64_pin}', f'pinSHA256={hex_pin}')
except Exception as e:
print(f"Error processing pinSHA256: {e}")
decoded = base64.b64decode(match.group(1))
formatted = ":".join("{:02X}".format(byte) for byte in decoded)
uri = uri.replace(f'pinSHA256=sha256/{match.group(1)}', f'pinSHA256={formatted}')
processed_uris.append(uri)
subscription_info = (
f"//subscription-userinfo: upload={upload}; download={download}; total={total}; expire={expiration_timestamp}\n"
f"//subscription-userinfo: upload={user_info.upload_bytes}; "
f"download={user_info.download_bytes}; "
f"total={user_info.max_download_bytes}; "
f"expire={user_info.expiration_timestamp}\n"
)
profile_lines = f"//profile-title: {username}-Hysteria2 🚀\n//profile-update-interval: 1\n"
return profile_lines + subscription_info + "\n".join(processed_uris)
class TemplateRenderer:
"""Handles HTML template rendering"""
def __init__(self, template_dir: str, config: AppConfig):
self.env = Environment(loader=FileSystemLoader(template_dir), autoescape=True)
self.html_template = self.env.get_template('template.html')
self.config = config
def render(self, context: TemplateContext) -> str:
"""Renders the HTML template with the given context"""
return self.html_template.render(vars(context))
class HysteriaServer:
"""Main application server class"""
def __init__(self):
self.config = self._load_config()
self.rate_limiter = RateLimiter(self.config.rate_limit, self.config.rate_limit_window)
self.hysteria_cli = HysteriaCLI(self.config.hysteria_cli_path)
self.singbox_generator = SingboxConfigGenerator(self.hysteria_cli, self.config.sni)
self.singbox_generator.set_template_path(self.config.singbox_template_path)
self.subscription_manager = SubscriptionManager(self.hysteria_cli, self.config)
self.template_renderer = TemplateRenderer(self.config.template_dir, self.config)
self.app = web.Application(middlewares=[self._rate_limit_middleware])
self.app.add_routes([web.get('/sub/normal/{username}', self.handle)])
self.app.router.add_route('*', '/sub/normal/{tail:.*}', self.handle_404)
def _load_config(self) -> AppConfig:
"""Loads application configuration from environment variables"""
domain = os.getenv('HYSTERIA_DOMAIN', 'localhost')
cert_file = os.getenv('HYSTERIA_CERTFILE')
key_file = os.getenv('HYSTERIA_KEYFILE')
port = int(os.getenv('HYSTERIA_PORT', '3326'))
sni_file = '/etc/hysteria/.configs.env'
singbox_template_path = '/etc/hysteria/core/scripts/normalsub/singbox.json'
hysteria_cli_path = '/etc/hysteria/core/cli.py'
rate_limit = 100
rate_limit_window = 60
template_dir = os.path.dirname(__file__)
sni = self._load_sni_from_env(sni_file)
return AppConfig(
domain=domain,
cert_file=cert_file,
key_file=key_file,
port=port,
sni_file=sni_file,
singbox_template_path=singbox_template_path,
hysteria_cli_path=hysteria_cli_path,
rate_limit=rate_limit,
rate_limit_window=rate_limit_window,
sni=sni,
template_dir=template_dir
)
profile_lines = f"//profile-title: {username}-Hysteria2 🚀\n//profile-update-interval: 1\n"
output = profile_lines + subscription_info + "\n".join(processed_uris)
return output
except subprocess.CalledProcessError:
raise RuntimeError("Failed to get URI or user info.")
except json.JSONDecodeError:
raise RuntimeError("Failed to parse user info JSON.")
except ValueError:
raise RuntimeError("expiration_timestamp OR account_creation_date in config file is invalid")
def get_uris(username):
"""
Gets the IPv4 and IPv6 URIs for a user, handling cases where one or both might be missing.
"""
def _load_sni_from_env(self, sni_file: str) -> str:
"""Loads SNI configuration from the environment file"""
try:
command = [
'python3',
'/etc/hysteria/core/cli.py',
'show-user-uri',
'-u', username,
'-a'
]
safe_command = [shlex.quote(arg) for arg in command]
output = subprocess.check_output(safe_command).decode().strip()
with open(sni_file, 'r') as f:
for line in f:
if line.startswith('SNI='):
return line.strip().split('=')[1]
except FileNotFoundError:
print("Warning: SNI file not found. Using default SNI.")
return "bts.com"
ipv4_match = re.search(r'IPv4:\s*(.*)', output)
ipv6_match = re.search(r'IPv6:\s*(.*)', output)
@middleware
async def _rate_limit_middleware(self, request: web.Request, handler):
"""Middleware for rate limiting requests"""
client_ip = request.headers.get('X-Forwarded-For',
request.headers.get('X-Real-IP', request.remote))
ipv4_uri = ipv4_match.group(1).strip() if ipv4_match else None
ipv6_uri = ipv6_match.group(1).strip() if ipv6_match else None
if not self.rate_limiter.check_limit(client_ip):
return web.Response(status=429, text="Rate limit exceeded.")
return ipv4_uri, ipv6_uri
return await handler(request)
except subprocess.CalledProcessError as e:
print(f"Error executing show-user-uri command: {e}")
raise
except AttributeError as e:
print(f"Error parsing URI output: {e}")
raise
async def handle(self, request: web.Request) -> web.Response:
"""Main request handler"""
try:
username = Utils.sanitize_input(request.match_info.get('username', ''), r'^[a-zA-Z0-9_-]+$')
if not username:
return web.Response(status=400, text="Error: Missing 'username' parameter.")
user_agent = request.headers.get('User-Agent', '').lower()
async def handle_404(request):
if any(browser in user_agent for browser in ['chrome', 'firefox', 'safari', 'edge', 'opera']):
return await self._handle_html(request, username)
else:
fragment = request.query.get('fragment', '')
if not user_agent.startswith('hiddifynext') and ('singbox' in user_agent or 'sing' in user_agent):
return await self._handle_singbox(username, fragment)
return await self._handle_normalsub(request, username)
except ValueError as e:
return web.Response(status=400, text=f"Error: {e}")
except Exception as e:
print(f"Internal Server Error: {e}")
return web.Response(status=500, text="Error: Internal server error")
async def _handle_html(self, request: web.Request, username: str) -> web.Response:
"""Handles requests for HTML output"""
context = await self._get_template_context(username)
rendered_html = self.template_renderer.render(context)
return web.Response(text=rendered_html, content_type='text/html')
async def _handle_singbox(self, username: str, fragment: str) -> web.Response:
"""Handles requests for Sing-box configuration"""
config_v4 = self.singbox_generator.generate_config(username, '4', fragment)
config_v6 = self.singbox_generator.generate_config(username, '6', fragment)
if config_v4 is None and config_v6 is None:
return web.Response(status=404, text=f"Error: No valid URIs found for user {username}.")
combined_config = self.singbox_generator.combine_configs(username, config_v4, config_v6)
return web.Response(text=json.dumps(combined_config, indent=4, sort_keys=True),
content_type='application/json')
async def _handle_normalsub(self, request: web.Request, username: str) -> web.Response:
"""Handles requests for normal subscription links"""
user_agent = request.headers.get('User-Agent', '').lower()
subscription = self.subscription_manager.get_normal_subscription(username, user_agent)
return web.Response(text=subscription, content_type='text/plain')
async def _get_template_context(self, username: str) -> TemplateContext:
"""Generates the context for HTML template rendering"""
user_info = self.hysteria_cli.get_user_info(username)
ipv4_uri, ipv6_uri = self.hysteria_cli.get_uris(username)
sub_link = f"https://{self.config.domain}:{self.config.port}/sub/normal/{username}"
ipv4_qrcode = Utils.generate_qrcode_base64(ipv4_uri)
ipv6_qrcode = Utils.generate_qrcode_base64(ipv6_uri)
sublink_qrcode = Utils.generate_qrcode_base64(sub_link)
return TemplateContext(
username=username,
usage=user_info.usage_human_readable,
usage_raw=user_info.usage_detailed,
expiration_date=user_info.expiration_date,
sublink_qrcode=sublink_qrcode,
ipv4_qrcode=ipv4_qrcode,
ipv6_qrcode=ipv6_qrcode,
sub_link=sub_link,
ipv4_uri=ipv4_uri,
ipv6_uri=ipv6_uri
)
async def handle_404(self, request: web.Request) -> web.Response:
"""Handles 404 Not Found errors"""
print(f"404 Not Found: {request.path}")
return web.Response(status=404, text="Not Found")
if __name__ == '__main__':
app = web.Application(middlewares=[rate_limit_middleware])
app.add_routes([web.get('/sub/normal/{username}', handle)])
app.router.add_route('*', '/sub/normal/{tail:.*}', handle_404)
def run(self):
"""Runs the web server"""
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
ssl_context.set_ciphers('AES256+EECDH:AES256+EDH')
ssl_context.load_cert_chain(certfile=self.config.cert_file, keyfile=self.config.key_file)
ssl_context.set_ciphers('ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384')
web.run_app(app, port=PORT, ssl_context=ssl_context)
web.run_app(self.app, port=self.config.port, ssl_context=ssl_context)
if __name__ == '__main__':
server = HysteriaServer()
server.run()