feat: Adapt WARP API to handle JSON status output
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
import re
|
||||
import json
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from ..schema.response import DetailResponse
|
||||
from ..schema.config.warp import ConfigureInputBody, StatusResponse
|
||||
@ -69,74 +69,21 @@ async def configure(body: ConfigureInputBody):
|
||||
|
||||
@router.get('/status', response_model=StatusResponse, summary='Get WARP Status')
|
||||
async def status():
|
||||
"""
|
||||
Retrieves the current status of WARP.
|
||||
|
||||
Returns:
|
||||
StatusResponse: A response model containing the current WARP status details.
|
||||
|
||||
Raises:
|
||||
HTTPException: If the WARP status is not available (404) or if there is an error processing the request (400).
|
||||
"""
|
||||
try:
|
||||
if res := cli_api.warp_status():
|
||||
return __parse_status(res)
|
||||
raise HTTPException(status_code=404, detail='WARP status not available.')
|
||||
status_json_str = cli_api.warp_status()
|
||||
if not status_json_str:
|
||||
raise HTTPException(status_code=404, detail='WARP status not available.')
|
||||
|
||||
status_data = json.loads(status_json_str)
|
||||
|
||||
if "error" in status_data:
|
||||
raise HTTPException(status_code=500, detail=f'Error getting WARP status: {status_data["error"]}')
|
||||
|
||||
return StatusResponse(**status_data)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
raise HTTPException(status_code=500, detail='Error decoding WARP status JSON.')
|
||||
except HTTPException as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=f'Error: {str(e)}')
|
||||
|
||||
|
||||
def __parse_status(status: str) -> StatusResponse:
|
||||
"""
|
||||
Parses the output of the WARP status command to extract the current configuration settings.
|
||||
|
||||
Args:
|
||||
status: The output of the WARP status command as a string.
|
||||
|
||||
Returns:
|
||||
StatusResponse: A response model containing the current WARP status details.
|
||||
|
||||
Raises:
|
||||
ValueError: If the WARP status is invalid or incomplete.
|
||||
"""
|
||||
|
||||
# Example output(status) from cli_api.warp_status():
|
||||
# --------------------------------
|
||||
# Current WARP Configuration:
|
||||
# All traffic: Inactive
|
||||
# Popular sites (Google, Netflix, etc.): Inactive
|
||||
# Domestic sites (geosite:ir, geoip:ir): Inactive
|
||||
# Block adult content: Inactive
|
||||
# --------------------------------
|
||||
data = {}
|
||||
|
||||
# Remove ANSI escape sequences(colors) (e.g., \x1b[1;35m)
|
||||
clean_status = re.sub(r'\x1b\[[0-9;]*m', '', status)
|
||||
|
||||
for line in clean_status.split('\n'):
|
||||
if ':' not in line:
|
||||
continue
|
||||
if 'Current WARP Configuration:' in line:
|
||||
continue
|
||||
key, _, value = line.partition(':')
|
||||
key = key.strip().lower()
|
||||
value = value.strip()
|
||||
|
||||
if not key or not value:
|
||||
continue
|
||||
|
||||
if 'all traffic' in key:
|
||||
data['all_traffic'] = value == 'active'
|
||||
elif 'popular sites' in key:
|
||||
data['popular_sites'] = value == 'active'
|
||||
elif 'domestic sites' in key:
|
||||
data['domestic_sites'] = value == 'active'
|
||||
elif 'block adult content' in key:
|
||||
data['block_adult_sites'] = value == 'active'
|
||||
|
||||
if not data:
|
||||
raise ValueError('Invalid WARP status')
|
||||
try:
|
||||
return StatusResponse(**data)
|
||||
except Exception as e:
|
||||
raise ValueError(f'Invalid or incomplete WARP status: {e}')
|
||||
|
||||
@ -10,7 +10,7 @@ class ConfigureInputBody(BaseModel):
|
||||
|
||||
|
||||
class StatusResponse(BaseModel):
|
||||
all_traffic: bool
|
||||
popular_sites: bool
|
||||
domestic_sites: bool
|
||||
block_adult_sites: bool
|
||||
all_traffic_via_warp: bool
|
||||
popular_sites_via_warp: bool
|
||||
domestic_sites_via_warp: bool
|
||||
block_adult_content: bool
|
||||
Reference in New Issue
Block a user