Implement services status API

This commit is contained in:
Iam54r1n4
2025-02-07 21:59:52 +00:00
parent c16e11b324
commit a8be39e2e7
2 changed files with 56 additions and 1 deletions

View File

@ -14,3 +14,12 @@ class ServerStatusResponse(BaseModel):
uploaded_traffic: str
downloaded_traffic: str
total_traffic: str
class ServerServicesStatusResponse(BaseModel):
hysteria_server: bool
hysteria_webpanel: bool
hysteria_singbox: bool
hysteria_normal_sub: bool
hysteria_telegram_bot: bool
hysteria_warp: bool

View File

@ -1,6 +1,6 @@
from fastapi import APIRouter, HTTPException
import cli_api
from .schema.server import ServerStatusResponse
from .schema.server import ServerStatusResponse, ServerServicesStatusResponse
router = APIRouter()
@ -98,3 +98,49 @@ def __parse_server_status(server_info: str) -> ServerStatusResponse:
return ServerStatusResponse(**data) # type: ignore
except Exception as e:
raise ValueError(f'Invalid or incomplete server info: {e}')
@router.get('/services/status', response_model=ServerServicesStatusResponse)
async def server_services_status_api():
"""
Retrieve the status of various services.
This endpoint provides information about the status of different services,
including Hysteria2, TelegramBot, Singbox, and Normal-SUB.
Returns:
ServerServicesStatusResponse: A response model containing service status details.
Raises:
HTTPException: If the services status is not available (404) or
if there is an error processing the request (400).
"""
try:
if res := cli_api.get_services_status():
return __parse_services_status(res)
raise HTTPException(status_code=404, detail='Services status not available.')
except Exception as e:
raise HTTPException(status_code=400, detail=f'Error: {str(e)}')
def __parse_services_status(services_status: dict[str, bool]) -> ServerServicesStatusResponse:
'''
Parse the services status provided by cli_api.get_services_status()
and return a ServerServicesStatusResponse instance.
'''
parsed_services_status: dict[str, bool] = {}
for service, status in services_status.items():
if 'hysteria-server' in service:
parsed_services_status['hysteria_server'] = status
elif 'hysteria-webpanel' in service:
parsed_services_status['hysteria_webpanel'] = status
elif 'telegram-bot' in service:
parsed_services_status['hysteria_telegram_bot'] = status
elif 'hysteria-normal-sub' in service:
parsed_services_status['hysteria_normal_sub'] = status
elif 'hysteria-singbox' in service:
parsed_services_status['hysteria_singbox'] = status
elif 'wg-quick' in service:
parsed_services_status['hysteria_warp'] = status
return ServerServicesStatusResponse(**parsed_services_status)