feat: Add WARP management tab to settings page

Integrates WARP (install, uninstall, configure, status) functionality
into the web panel's settings page. Users can now manage WARP
directly from the UI.
This commit is contained in:
Whispering Wind
2025-06-02 13:29:48 +03:30
committed by GitHub
parent 5e61afe15c
commit 3a13108181
6 changed files with 406 additions and 176 deletions

View File

@ -4,6 +4,7 @@ import json
import sys
import subprocess
from pathlib import Path
import argparse
core_scripts_dir = Path(__file__).resolve().parents[1]
if str(core_scripts_dir) not in sys.path:
@ -11,149 +12,184 @@ if str(core_scripts_dir) not in sys.path:
from paths import *
def warp_configure_handler(all_traffic=False, popular_sites=False, domestic_sites=False, block_adult_sites=False):
"""
Configure WARP routing rules based on provided parameters
Args:
all_traffic (bool): Toggle WARP for all traffic
popular_sites (bool): Toggle WARP for popular sites (Google, Netflix, etc.)
domestic_sites (bool): Toggle between WARP and Reject for domestic sites
block_adult_sites (bool): Toggle blocking of adult content
"""
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
def warp_configure_handler(
set_all_traffic_state: str | None = None,
set_popular_sites_state: str | None = None,
set_domestic_sites_state: str | None = None,
set_block_adult_sites_state: str | None = None
):
try:
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
except FileNotFoundError:
print(f"Error: Configuration file {CONFIG_FILE} not found.")
sys.exit(1)
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {CONFIG_FILE}.")
sys.exit(1)
modified = False
if all_traffic:
warp_all_active = any(rule == "warps(all)" for rule in config.get('acl', {}).get('inline', []))
if warp_all_active:
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule != "warps(all)"]
print("Traffic configuration changed to Direct.")
modified = True
else:
if 'acl' not in config:
config['acl'] = {}
if 'inline' not in config['acl']:
config['acl']['inline'] = []
config['acl']['inline'].append("warps(all)")
print("Traffic configuration changed to WARP.")
modified = True
if popular_sites:
if 'acl' not in config:
config['acl'] = {}
if 'inline' not in config['acl']:
config['acl']['inline'] = []
if set_all_traffic_state is not None:
warp_all_rule = "warps(all)"
warp_all_active = warp_all_rule in config['acl']['inline']
if set_all_traffic_state == "on":
if not warp_all_active:
config['acl']['inline'].append(warp_all_rule)
print("All traffic rule: Enabled.")
modified = True
else:
print("All traffic rule: Already enabled.")
elif set_all_traffic_state == "off":
if warp_all_active:
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule != warp_all_rule]
print("All traffic rule: Disabled.")
modified = True
else:
print("All traffic rule: Already disabled.")
if set_popular_sites_state is not None:
popular_rules = [
"warps(geoip:google)",
"warps(geosite:google)",
"warps(geosite:netflix)",
"warps(geosite:spotify)",
"warps(geosite:openai)",
"warps(geoip:openai)"
"warps(geoip:google)", "warps(geosite:google)", "warps(geosite:netflix)",
"warps(geosite:spotify)", "warps(geosite:openai)", "warps(geoip:openai)"
]
rule_exists = any(rule in config.get('acl', {}).get('inline', []) for rule in popular_rules)
if rule_exists:
config['acl']['inline'] = [rule for rule in config['acl']['inline']
if rule not in popular_rules]
print("WARP configuration for Google, OpenAI, etc. removed.")
modified = True
else:
if 'acl' not in config:
config['acl'] = {}
if 'inline' not in config['acl']:
config['acl']['inline'] = []
config['acl']['inline'].extend(popular_rules)
print("WARP configured for Google, OpenAI, etc.")
modified = True
if domestic_sites:
ir_site_rule = "warps(geosite:ir)"
ir_ip_rule = "warps(geoip:ir)"
reject_site_rule = "reject(geosite:ir)"
reject_ip_rule = "reject(geoip:ir)"
using_warp = (ir_site_rule in config.get('acl', {}).get('inline', []) and
ir_ip_rule in config.get('acl', {}).get('inline', []))
using_reject = (reject_site_rule in config.get('acl', {}).get('inline', []) and
reject_ip_rule in config.get('acl', {}).get('inline', []))
if 'acl' not in config:
config['acl'] = {}
if 'inline' not in config['acl']:
config['acl']['inline'] = []
if using_warp:
config['acl']['inline'] = [reject_site_rule if rule == ir_site_rule else
reject_ip_rule if rule == ir_ip_rule else
rule for rule in config['acl']['inline']]
print("Configuration changed to Reject for geosite:ir and geoip:ir.")
modified = True
elif using_reject:
config['acl']['inline'] = [ir_site_rule if rule == reject_site_rule else
ir_ip_rule if rule == reject_ip_rule else
rule for rule in config['acl']['inline']]
print("Configuration changed to Use WARP for geosite:ir and geoip:ir.")
modified = True
else:
config['acl']['inline'].extend([reject_site_rule, reject_ip_rule])
print("Added Reject configuration for geosite:ir and geoip:ir.")
modified = True
if block_adult_sites:
if set_popular_sites_state == "on":
added_any = False
for rule in popular_rules:
if rule not in config['acl']['inline']:
config['acl']['inline'].append(rule)
added_any = True
if added_any:
print("Popular sites rule: Enabled/Updated.")
modified = True
else:
all_present = all(rule in config['acl']['inline'] for rule in popular_rules)
if all_present:
print("Popular sites rule: Already enabled.")
else:
print("Popular sites rule: Enabled/Updated.")
modified = True
elif set_popular_sites_state == "off":
removed_any = False
initial_len = len(config['acl']['inline'])
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule not in popular_rules]
if len(config['acl']['inline']) < initial_len:
removed_any = True
if removed_any:
print("Popular sites rule: Disabled.")
modified = True
else:
print("Popular sites rule: Already disabled.")
if set_domestic_sites_state is not None:
ir_site_warp_rule = "warps(geosite:ir)"
ir_ip_warp_rule = "warps(geoip:ir)"
ir_site_reject_rule = "reject(geosite:ir)"
ir_ip_reject_rule = "reject(geoip:ir)"
if set_domestic_sites_state == "on":
changed_to_warp = False
if ir_site_reject_rule in config['acl']['inline'] or ir_ip_reject_rule in config['acl']['inline']:
config['acl']['inline'] = [r for r in config['acl']['inline'] if r not in [ir_site_reject_rule, ir_ip_reject_rule]]
changed_to_warp = True
if ir_site_warp_rule not in config['acl']['inline']:
config['acl']['inline'].append(ir_site_warp_rule)
changed_to_warp = True
if ir_ip_warp_rule not in config['acl']['inline']:
config['acl']['inline'].append(ir_ip_warp_rule)
changed_to_warp = True
if changed_to_warp:
print("Domestic sites: Configured to use WARP.")
modified = True
else:
print("Domestic sites: Already configured to use WARP.")
elif set_domestic_sites_state == "off":
changed_to_reject = False
if ir_site_warp_rule in config['acl']['inline'] or ir_ip_warp_rule in config['acl']['inline']:
config['acl']['inline'] = [r for r in config['acl']['inline'] if r not in [ir_site_warp_rule, ir_ip_warp_rule]]
changed_to_reject = True
if ir_site_reject_rule not in config['acl']['inline']:
config['acl']['inline'].append(ir_site_reject_rule)
changed_to_reject = True
if ir_ip_reject_rule not in config['acl']['inline']:
config['acl']['inline'].append(ir_ip_reject_rule)
changed_to_reject = True
if changed_to_reject:
print("Domestic sites: Configured to REJECT.")
modified = True
else:
print("Domestic sites: Already configured to REJECT.")
if set_block_adult_sites_state is not None:
nsfw_rule = "reject(geosite:nsfw)"
is_blocking_nsfw = nsfw_rule in config['acl']['inline']
blocked = nsfw_rule in config.get('acl', {}).get('inline', [])
if 'resolver' not in config: config['resolver'] = {}
if 'tls' not in config['resolver']: config['resolver']['tls'] = {}
desired_resolver = ""
if set_block_adult_sites_state == "on":
desired_resolver = "1.1.1.3:853"
if not is_blocking_nsfw:
config['acl']['inline'].append(nsfw_rule)
print("Adult content blocking: Enabled.")
modified = True
else:
print("Adult content blocking: Already enabled.")
elif set_block_adult_sites_state == "off":
desired_resolver = "1.1.1.1:853"
if is_blocking_nsfw:
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule != nsfw_rule]
print("Adult content blocking: Disabled.")
modified = True
else:
print("Adult content blocking: Already disabled.")
if blocked:
config['acl']['inline'] = [rule for rule in config['acl']['inline']
if rule != nsfw_rule]
if 'resolver' not in config:
config['resolver'] = {}
if 'tls' not in config['resolver']:
config['resolver']['tls'] = {}
config['resolver']['tls']['addr'] = "1.1.1.1:853"
print("Adult content blocking removed and resolver updated.")
if config['resolver']['tls'].get('addr') != desired_resolver:
config['resolver']['tls']['addr'] = desired_resolver
print(f"Resolver: Updated to {desired_resolver}.")
modified = True
else:
if 'acl' not in config:
config['acl'] = {}
if 'inline' not in config['acl']:
config['acl']['inline'] = []
config['acl']['inline'].append(nsfw_rule)
if 'resolver' not in config:
config['resolver'] = {}
if 'tls' not in config['resolver']:
config['resolver']['tls'] = {}
config['resolver']['tls']['addr'] = "1.1.1.3:853"
print("Adult content blocked and resolver updated.")
modified = True
if 'acl' in config and 'inline' in config['acl']:
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule]
if modified:
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
print("Configuration updated. Attempting to restart hysteria2 service...")
try:
subprocess.run(["python3", CLI_PATH, "restart-hysteria2"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
except subprocess.CalledProcessError:
print("Warning: Failed to restart hysteria2")
stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, timeout=10)
print("Hysteria2 service restarted successfully.")
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to restart hysteria2. STDERR: {e.stderr.decode().strip()}")
except subprocess.TimeoutExpired:
print("Warning: Timeout expired while trying to restart hysteria2 service.")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Configure WARP settings")
parser.add_argument("--all", action="store_true", help="Toggle WARP for all traffic")
parser.add_argument("--popular-sites", action="store_true", help="Toggle WARP for popular sites")
parser.add_argument("--domestic-sites", action="store_true", help="Toggle between WARP and Reject for domestic sites")
parser.add_argument("--block-adult", action="store_true", help="Toggle blocking of adult content")
parser = argparse.ArgumentParser(description="Configure WARP settings. At least one option must be provided.")
parser.add_argument("--set-all", choices=['on', 'off'], help="Set WARP for all traffic (on/off)")
parser.add_argument("--set-popular-sites", choices=['on', 'off'], help="Set WARP for popular sites (on/off)")
parser.add_argument("--set-domestic-sites", choices=['on', 'off'], help="Set behavior for domestic sites (on=WARP, off=REJECT)")
parser.add_argument("--set-block-adult", choices=['on', 'off'], help="Set blocking of adult content (on/off)")
args = parser.parse_args()
if not any(vars(args).values()):
parser.print_help()
sys.exit(1)
warp_configure_handler(
all_traffic=args.all,
popular_sites=args.popular_sites,
domestic_sites=args.domestic_sites,
block_adult_sites=args.block_adult
set_all_traffic_state=args.set_all,
set_popular_sites_state=args.set_popular_sites,
set_domestic_sites_state=args.set_domestic_sites,
set_block_adult_sites_state=args.set_block_adult
)