Feat: Add blocked user check to subscription endpoint

This commit is contained in:
Whispering Wind
2025-08-17 16:16:30 +03:30
committed by GitHub
parent 8b2f22849b
commit b9f979d1ae

View File

@ -74,6 +74,7 @@ class UserInfo:
max_download_bytes: int max_download_bytes: int
account_creation_date: str account_creation_date: str
expiration_days: int expiration_days: int
blocked: bool = False
@property @property
def total_usage(self) -> int: def total_usage(self) -> int:
@ -190,22 +191,16 @@ class HysteriaCLI:
print(f"Hysteria CLI error: {e}") print(f"Hysteria CLI error: {e}")
raise raise
def get_user_password(self, username: str) -> Optional[str]: def get_user_details_from_json(self, username: str) -> Optional[Dict[str, Any]]:
try: try:
with open(self.users_json_path, 'r') as f: with open(self.users_json_path, 'r') as f:
users_data = json.load(f) users_data = json.load(f)
user_details = users_data.get(username) return users_data.get(username)
if user_details and 'password' in user_details: except (FileNotFoundError, json.JSONDecodeError) as e:
return user_details['password'] print(f"Error reading user details from {self.users_json_path}: {e}")
return None
except FileNotFoundError:
print(f"Error: Users file not found at {self.users_json_path}")
return None
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {self.users_json_path}")
return None return None
except Exception as e: except Exception as e:
print(f"An unexpected error occurred while reading users file for password: {e}") print(f"An unexpected error occurred while reading users file: {e}")
return None return None
def get_username_by_password(self, password_token: str) -> Optional[str]: def get_username_by_password(self, password_token: str) -> Optional[str]:
@ -231,8 +226,8 @@ class HysteriaCLI:
if raw_info_str is None: if raw_info_str is None:
return None return None
user_password = self.get_user_password(username) user_details = self.get_user_details_from_json(username)
if user_password is None: if not user_details or 'password' not in user_details:
print(f"Warning: Password for user '{username}' could not be fetched from {self.users_json_path}. Cannot create UserInfo.") print(f"Warning: Password for user '{username}' could not be fetched from {self.users_json_path}. Cannot create UserInfo.")
return None return None
@ -240,26 +235,25 @@ class HysteriaCLI:
raw_info = json.loads(raw_info_str) raw_info = json.loads(raw_info_str)
return UserInfo( return UserInfo(
username=username, username=username,
password=user_password, password=user_details['password'],
upload_bytes=raw_info.get('upload_bytes', 0), upload_bytes=raw_info.get('upload_bytes', 0),
download_bytes=raw_info.get('download_bytes', 0), download_bytes=raw_info.get('download_bytes', 0),
max_download_bytes=raw_info.get('max_download_bytes', 0), max_download_bytes=raw_info.get('max_download_bytes', 0),
account_creation_date=raw_info.get('account_creation_date', ''), account_creation_date=raw_info.get('account_creation_date', ''),
expiration_days=raw_info.get('expiration_days', 0) expiration_days=raw_info.get('expiration_days', 0),
blocked=user_details.get('blocked', False)
) )
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
print(f"JSONDecodeError: {e}, Raw output: {raw_info_str}") print(f"JSONDecodeError: {e}, Raw output: {raw_info_str}")
return None return None
def get_all_uris(self, username: str) -> List[str]: def get_all_uris(self, username: str) -> List[str]:
"""Fetches all available URIs (local and nodes) for a user."""
output = self._run_command(['show-user-uri', '-u', username, '-a']) output = self._run_command(['show-user-uri', '-u', username, '-a'])
if not output: if not output:
return [] return []
return re.findall(r'hy2://.*', output) return re.findall(r'hy2://.*', output)
def get_all_labeled_uris(self, username: str) -> List[Dict[str, str]]: def get_all_labeled_uris(self, username: str) -> List[Dict[str, str]]:
"""Fetches all URIs and their labels."""
output = self._run_command(['show-user-uri', '-u', username, '-a']) output = self._run_command(['show-user-uri', '-u', username, '-a'])
if not output: if not output:
return [] return []
@ -315,7 +309,6 @@ class SingboxConfigGenerator:
return self._template_cache.copy() return self._template_cache.copy()
def generate_config_from_uri(self, uri: str, username: str, fragment: str) -> Optional[Dict[str, Any]]: def generate_config_from_uri(self, uri: str, username: str, fragment: str) -> Optional[Dict[str, Any]]:
"""Generates a Singbox outbound config from a single Hysteria URI."""
if not uri: if not uri:
return None return None
@ -357,7 +350,6 @@ class SingboxConfigGenerator:
} }
def combine_configs(self, all_uris: List[str], username: str, fragment: str) -> Optional[Dict[str, Any]]: def combine_configs(self, all_uris: List[str], username: str, fragment: str) -> Optional[Dict[str, Any]]:
"""Generates a combined Singbox config from a list of URIs."""
if not all_uris: if not all_uris:
return None return None
@ -391,7 +383,6 @@ class SubscriptionManager:
self.config = config self.config = config
def _get_extra_configs(self) -> List[str]: def _get_extra_configs(self) -> List[str]:
"""Reads extra proxy URIs from the JSON config file."""
if not os.path.exists(self.config.extra_config_path): if not os.path.exists(self.config.extra_config_path):
return [] return []
try: try:
@ -562,11 +553,14 @@ class HysteriaServer:
if username is None: if username is None:
return web.Response(status=404, text="User not found for the provided token.") return web.Response(status=404, text="User not found for the provided token.")
user_agent = request.headers.get('User-Agent', '').lower()
user_info = self.hysteria_cli.get_user_info(username) user_info = self.hysteria_cli.get_user_info(username)
if user_info is None: if user_info is None:
return web.Response(status=404, text=f"User '{username}' details not found.") return web.Response(status=404, text=f"User '{username}' details not found.")
if user_info.blocked:
return await self._handle_blocked_user(request)
user_agent = request.headers.get('User-Agent', '').lower()
if any(browser in user_agent for browser in ['chrome', 'firefox', 'safari', 'edge', 'opera']): if any(browser in user_agent for browser in ['chrome', 'firefox', 'safari', 'edge', 'opera']):
return await self._handle_html(request, username, user_info) return await self._handle_html(request, username, user_info)
fragment = request.query.get('fragment', '') fragment = request.query.get('fragment', '')
@ -579,6 +573,39 @@ class HysteriaServer:
print(f"Internal Server Error: {e}") print(f"Internal Server Error: {e}")
return web.Response(status=500, text="Error: Internal server error") return web.Response(status=500, text="Error: Internal server error")
async def _handle_blocked_user(self, request: web.Request) -> web.Response:
fake_uri = "hysteria2://x@end.com:443?sni=support.me#⛔Account-Expired⚠"
user_agent = request.headers.get('User-Agent', '').lower()
if any(browser in user_agent for browser in ['chrome', 'firefox', 'safari', 'edge', 'opera']):
context = self._get_blocked_template_context(fake_uri)
return web.Response(text=self.template_renderer.render(context), content_type='text/html')
fragment = request.query.get('fragment', '')
if not user_agent.startswith('hiddifynext') and ('singbox' in user_agent or 'sing' in user_agent):
combined_config = self.singbox_generator.combine_configs([fake_uri], "blocked", fragment)
return web.Response(text=json.dumps(combined_config, indent=4, sort_keys=True), content_type='application/json')
return web.Response(text=fake_uri, content_type='text/plain')
def _get_blocked_template_context(self, fake_uri: str) -> TemplateContext:
return TemplateContext(
username="blocked",
usage="N/A",
usage_raw="This account has been suspended.",
expiration_date="N/A",
sublink_qrcode=Utils.generate_qrcode_base64("blocked"),
sub_link="#blocked",
local_uris=[
NodeURI(
label="Blocked",
uri=fake_uri,
qrcode=Utils.generate_qrcode_base64(fake_uri)
)
],
node_uris=[]
)
async def _handle_html(self, request: web.Request, username: str, user_info: UserInfo) -> web.Response: async def _handle_html(self, request: web.Request, username: str, user_info: UserInfo) -> web.Response:
context = await self._get_template_context(username, user_info) context = await self._get_template_context(username, user_info)
return web.Response(text=self.template_renderer.render(context), content_type='text/html') return web.Response(text=self.template_renderer.render(context), content_type='text/html')
@ -651,4 +678,4 @@ class HysteriaServer:
if __name__ == '__main__': if __name__ == '__main__':
server = HysteriaServer() server = HysteriaServer()
server.run() server.run()