Files
Blitz-Proxy/core/scripts/warp/configure.py
Whispering Wind 3a13108181 feat: Add WARP management tab to settings page
Integrates WARP (install, uninstall, configure, status) functionality
into the web panel's settings page. Users can now manage WARP
directly from the UI.
2025-06-02 13:29:48 +03:30

195 lines
8.5 KiB
Python

#!/usr/bin/env python3
import json
import sys
import subprocess
from pathlib import Path
import argparse
core_scripts_dir = Path(__file__).resolve().parents[1]
if str(core_scripts_dir) not in sys.path:
sys.path.append(str(core_scripts_dir))
from paths import *
def warp_configure_handler(
set_all_traffic_state: str | None = None,
set_popular_sites_state: str | None = None,
set_domestic_sites_state: str | None = None,
set_block_adult_sites_state: str | None = None
):
try:
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
except FileNotFoundError:
print(f"Error: Configuration file {CONFIG_FILE} not found.")
sys.exit(1)
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {CONFIG_FILE}.")
sys.exit(1)
modified = False
if 'acl' not in config:
config['acl'] = {}
if 'inline' not in config['acl']:
config['acl']['inline'] = []
if set_all_traffic_state is not None:
warp_all_rule = "warps(all)"
warp_all_active = warp_all_rule in config['acl']['inline']
if set_all_traffic_state == "on":
if not warp_all_active:
config['acl']['inline'].append(warp_all_rule)
print("All traffic rule: Enabled.")
modified = True
else:
print("All traffic rule: Already enabled.")
elif set_all_traffic_state == "off":
if warp_all_active:
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule != warp_all_rule]
print("All traffic rule: Disabled.")
modified = True
else:
print("All traffic rule: Already disabled.")
if set_popular_sites_state is not None:
popular_rules = [
"warps(geoip:google)", "warps(geosite:google)", "warps(geosite:netflix)",
"warps(geosite:spotify)", "warps(geosite:openai)", "warps(geoip:openai)"
]
if set_popular_sites_state == "on":
added_any = False
for rule in popular_rules:
if rule not in config['acl']['inline']:
config['acl']['inline'].append(rule)
added_any = True
if added_any:
print("Popular sites rule: Enabled/Updated.")
modified = True
else:
all_present = all(rule in config['acl']['inline'] for rule in popular_rules)
if all_present:
print("Popular sites rule: Already enabled.")
else:
print("Popular sites rule: Enabled/Updated.")
modified = True
elif set_popular_sites_state == "off":
removed_any = False
initial_len = len(config['acl']['inline'])
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule not in popular_rules]
if len(config['acl']['inline']) < initial_len:
removed_any = True
if removed_any:
print("Popular sites rule: Disabled.")
modified = True
else:
print("Popular sites rule: Already disabled.")
if set_domestic_sites_state is not None:
ir_site_warp_rule = "warps(geosite:ir)"
ir_ip_warp_rule = "warps(geoip:ir)"
ir_site_reject_rule = "reject(geosite:ir)"
ir_ip_reject_rule = "reject(geoip:ir)"
if set_domestic_sites_state == "on":
changed_to_warp = False
if ir_site_reject_rule in config['acl']['inline'] or ir_ip_reject_rule in config['acl']['inline']:
config['acl']['inline'] = [r for r in config['acl']['inline'] if r not in [ir_site_reject_rule, ir_ip_reject_rule]]
changed_to_warp = True
if ir_site_warp_rule not in config['acl']['inline']:
config['acl']['inline'].append(ir_site_warp_rule)
changed_to_warp = True
if ir_ip_warp_rule not in config['acl']['inline']:
config['acl']['inline'].append(ir_ip_warp_rule)
changed_to_warp = True
if changed_to_warp:
print("Domestic sites: Configured to use WARP.")
modified = True
else:
print("Domestic sites: Already configured to use WARP.")
elif set_domestic_sites_state == "off":
changed_to_reject = False
if ir_site_warp_rule in config['acl']['inline'] or ir_ip_warp_rule in config['acl']['inline']:
config['acl']['inline'] = [r for r in config['acl']['inline'] if r not in [ir_site_warp_rule, ir_ip_warp_rule]]
changed_to_reject = True
if ir_site_reject_rule not in config['acl']['inline']:
config['acl']['inline'].append(ir_site_reject_rule)
changed_to_reject = True
if ir_ip_reject_rule not in config['acl']['inline']:
config['acl']['inline'].append(ir_ip_reject_rule)
changed_to_reject = True
if changed_to_reject:
print("Domestic sites: Configured to REJECT.")
modified = True
else:
print("Domestic sites: Already configured to REJECT.")
if set_block_adult_sites_state is not None:
nsfw_rule = "reject(geosite:nsfw)"
is_blocking_nsfw = nsfw_rule in config['acl']['inline']
if 'resolver' not in config: config['resolver'] = {}
if 'tls' not in config['resolver']: config['resolver']['tls'] = {}
desired_resolver = ""
if set_block_adult_sites_state == "on":
desired_resolver = "1.1.1.3:853"
if not is_blocking_nsfw:
config['acl']['inline'].append(nsfw_rule)
print("Adult content blocking: Enabled.")
modified = True
else:
print("Adult content blocking: Already enabled.")
elif set_block_adult_sites_state == "off":
desired_resolver = "1.1.1.1:853"
if is_blocking_nsfw:
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule != nsfw_rule]
print("Adult content blocking: Disabled.")
modified = True
else:
print("Adult content blocking: Already disabled.")
if config['resolver']['tls'].get('addr') != desired_resolver:
config['resolver']['tls']['addr'] = desired_resolver
print(f"Resolver: Updated to {desired_resolver}.")
modified = True
if 'acl' in config and 'inline' in config['acl']:
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule]
if modified:
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
print("Configuration updated. Attempting to restart hysteria2 service...")
try:
subprocess.run(["python3", CLI_PATH, "restart-hysteria2"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, timeout=10)
print("Hysteria2 service restarted successfully.")
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to restart hysteria2. STDERR: {e.stderr.decode().strip()}")
except subprocess.TimeoutExpired:
print("Warning: Timeout expired while trying to restart hysteria2 service.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Configure WARP settings. At least one option must be provided.")
parser.add_argument("--set-all", choices=['on', 'off'], help="Set WARP for all traffic (on/off)")
parser.add_argument("--set-popular-sites", choices=['on', 'off'], help="Set WARP for popular sites (on/off)")
parser.add_argument("--set-domestic-sites", choices=['on', 'off'], help="Set behavior for domestic sites (on=WARP, off=REJECT)")
parser.add_argument("--set-block-adult", choices=['on', 'off'], help="Set blocking of adult content (on/off)")
args = parser.parse_args()
if not any(vars(args).values()):
parser.print_help()
sys.exit(1)
warp_configure_handler(
set_all_traffic_state=args.set_all,
set_popular_sites_state=args.set_popular_sites,
set_domestic_sites_state=args.set_domestic_sites,
set_block_adult_sites_state=args.set_block_adult
)