feat(config): enhance ACL rule management with geo rule detection and updates
This commit is contained in:
@ -5,6 +5,7 @@ import sys
|
|||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import argparse
|
import argparse
|
||||||
|
import re
|
||||||
|
|
||||||
core_scripts_dir = Path(__file__).resolve().parents[1]
|
core_scripts_dir = Path(__file__).resolve().parents[1]
|
||||||
if str(core_scripts_dir) not in sys.path:
|
if str(core_scripts_dir) not in sys.path:
|
||||||
@ -12,6 +13,50 @@ if str(core_scripts_dir) not in sys.path:
|
|||||||
|
|
||||||
from paths import *
|
from paths import *
|
||||||
|
|
||||||
|
def _get_current_geo_rule_stubs(inline_rules):
|
||||||
|
"""
|
||||||
|
Detects the current country's geosite and geoip rule stubs from the ACL list.
|
||||||
|
Returns ('geosite:ir', 'geoip:ir') as a default if no specific rules are found.
|
||||||
|
"""
|
||||||
|
geosite_stub, geoip_stub = 'geosite:ir', 'geoip:ir' # Default
|
||||||
|
for rule in inline_rules:
|
||||||
|
if 'geosite:' in rule and any(country in rule for country in ['ir', 'cn', 'ru']):
|
||||||
|
match = re.search(r'geosite:[^)]+', rule)
|
||||||
|
if match:
|
||||||
|
geosite_stub = match.group(0)
|
||||||
|
if 'geoip:' in rule and any(country in rule for country in ['ir', 'cn', 'ru']):
|
||||||
|
match = re.search(r'geoip:[^)]+', rule)
|
||||||
|
if match:
|
||||||
|
geoip_stub = match.group(0)
|
||||||
|
print(f"INFO: Detected domestic geo rules: {geosite_stub}, {geoip_stub}")
|
||||||
|
return geosite_stub, geoip_stub
|
||||||
|
|
||||||
|
def _update_acl_rules(acl_list, stubs_to_manage, target_prefix=None):
|
||||||
|
"""
|
||||||
|
Atomically updates ACL rules. It removes all managed stubs (both reject and warps)
|
||||||
|
and then adds them back with the correct target_prefix.
|
||||||
|
- target_prefix: 'warps', 'reject', or None (to just remove).
|
||||||
|
"""
|
||||||
|
initial_len = len(acl_list)
|
||||||
|
|
||||||
|
rules_to_remove = set()
|
||||||
|
for stub in stubs_to_manage:
|
||||||
|
rules_to_remove.add(f"reject({stub})")
|
||||||
|
rules_to_remove.add(f"warps({stub})")
|
||||||
|
|
||||||
|
acl_list = [rule for rule in acl_list if rule not in rules_to_remove]
|
||||||
|
|
||||||
|
rules_were_added = False
|
||||||
|
if target_prefix:
|
||||||
|
for stub in stubs_to_manage:
|
||||||
|
new_rule = f"{target_prefix}({stub})"
|
||||||
|
if new_rule not in acl_list:
|
||||||
|
acl_list.append(new_rule)
|
||||||
|
rules_were_added = True
|
||||||
|
|
||||||
|
modified = (len(acl_list) != initial_len) or rules_were_added
|
||||||
|
return acl_list, modified
|
||||||
|
|
||||||
def warp_configure_handler(
|
def warp_configure_handler(
|
||||||
set_all_traffic_state: str | None = None,
|
set_all_traffic_state: str | None = None,
|
||||||
set_popular_sites_state: str | None = None,
|
set_popular_sites_state: str | None = None,
|
||||||
@ -30,135 +75,61 @@ def warp_configure_handler(
|
|||||||
|
|
||||||
modified = False
|
modified = False
|
||||||
|
|
||||||
if 'acl' not in config:
|
if 'acl' not in config: config['acl'] = {}
|
||||||
config['acl'] = {}
|
if 'inline' not in config['acl']: config['acl']['inline'] = []
|
||||||
if 'inline' not in config['acl']:
|
|
||||||
config['acl']['inline'] = []
|
acl_inline = config['acl']['inline']
|
||||||
|
|
||||||
if set_all_traffic_state is not None:
|
if set_all_traffic_state is not None:
|
||||||
warp_all_rule = "warps(all)"
|
acl_inline, changed = _update_acl_rules(acl_inline, ['all'], 'warps' if set_all_traffic_state == 'on' else None)
|
||||||
warp_all_active = warp_all_rule in config['acl']['inline']
|
if changed:
|
||||||
if set_all_traffic_state == "on":
|
print(f"All traffic rule: {'Enabled' if set_all_traffic_state == 'on' else 'Disabled'}.")
|
||||||
if not warp_all_active:
|
modified = True
|
||||||
config['acl']['inline'].append(warp_all_rule)
|
else:
|
||||||
print("All traffic rule: Enabled.")
|
print(f"All traffic rule: Already {'enabled' if set_all_traffic_state == 'on' else 'disabled'}.")
|
||||||
modified = True
|
|
||||||
else:
|
|
||||||
print("All traffic rule: Already enabled.")
|
|
||||||
elif set_all_traffic_state == "off":
|
|
||||||
if warp_all_active:
|
|
||||||
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule != warp_all_rule]
|
|
||||||
print("All traffic rule: Disabled.")
|
|
||||||
modified = True
|
|
||||||
else:
|
|
||||||
print("All traffic rule: Already disabled.")
|
|
||||||
|
|
||||||
if set_popular_sites_state is not None:
|
if set_popular_sites_state is not None:
|
||||||
popular_rules = [
|
popular_stubs = ["geoip:google", "geosite:google", "geosite:netflix", "geosite:spotify"]
|
||||||
"warps(geoip:google)", "warps(geosite:google)", "warps(geosite:netflix)",
|
target_prefix = 'warps' if set_popular_sites_state == 'on' else None
|
||||||
"warps(geosite:spotify)", "warps(geosite:openai)", "warps(geoip:openai)"
|
acl_inline, changed = _update_acl_rules(acl_inline, popular_stubs, target_prefix)
|
||||||
]
|
if changed:
|
||||||
if set_popular_sites_state == "on":
|
print(f"Popular sites rule: {'Enabled' if set_popular_sites_state == 'on' else 'Disabled'}.")
|
||||||
added_any = False
|
modified = True
|
||||||
for rule in popular_rules:
|
else:
|
||||||
if rule not in config['acl']['inline']:
|
print(f"Popular sites rule: Already {'enabled' if set_popular_sites_state == 'on' else 'disabled'}.")
|
||||||
config['acl']['inline'].append(rule)
|
|
||||||
added_any = True
|
|
||||||
if added_any:
|
|
||||||
print("Popular sites rule: Enabled/Updated.")
|
|
||||||
modified = True
|
|
||||||
else:
|
|
||||||
|
|
||||||
all_present = all(rule in config['acl']['inline'] for rule in popular_rules)
|
|
||||||
if all_present:
|
|
||||||
print("Popular sites rule: Already enabled.")
|
|
||||||
else:
|
|
||||||
print("Popular sites rule: Enabled/Updated.")
|
|
||||||
modified = True
|
|
||||||
elif set_popular_sites_state == "off":
|
|
||||||
removed_any = False
|
|
||||||
initial_len = len(config['acl']['inline'])
|
|
||||||
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule not in popular_rules]
|
|
||||||
if len(config['acl']['inline']) < initial_len:
|
|
||||||
removed_any = True
|
|
||||||
if removed_any:
|
|
||||||
print("Popular sites rule: Disabled.")
|
|
||||||
modified = True
|
|
||||||
else:
|
|
||||||
print("Popular sites rule: Already disabled.")
|
|
||||||
|
|
||||||
if set_domestic_sites_state is not None:
|
if set_domestic_sites_state is not None:
|
||||||
ir_site_warp_rule = "warps(geosite:ir)"
|
geosite_stub, geoip_stub = _get_current_geo_rule_stubs(acl_inline)
|
||||||
ir_ip_warp_rule = "warps(geoip:ir)"
|
domestic_stubs = [geosite_stub, geoip_stub]
|
||||||
ir_site_reject_rule = "reject(geosite:ir)"
|
target_prefix = 'warps' if set_domestic_sites_state == 'on' else 'reject'
|
||||||
ir_ip_reject_rule = "reject(geoip:ir)"
|
acl_inline, changed = _update_acl_rules(acl_inline, domestic_stubs, target_prefix)
|
||||||
|
if changed:
|
||||||
if set_domestic_sites_state == "on":
|
print(f"Domestic sites: Configured to use {'WARP' if set_domestic_sites_state == 'on' else 'REJECT'}.")
|
||||||
changed_to_warp = False
|
modified = True
|
||||||
if ir_site_reject_rule in config['acl']['inline'] or ir_ip_reject_rule in config['acl']['inline']:
|
else:
|
||||||
config['acl']['inline'] = [r for r in config['acl']['inline'] if r not in [ir_site_reject_rule, ir_ip_reject_rule]]
|
print(f"Domestic sites: Already configured to use {'WARP' if set_domestic_sites_state == 'on' else 'REJECT'}.")
|
||||||
changed_to_warp = True
|
|
||||||
if ir_site_warp_rule not in config['acl']['inline']:
|
|
||||||
config['acl']['inline'].append(ir_site_warp_rule)
|
|
||||||
changed_to_warp = True
|
|
||||||
if ir_ip_warp_rule not in config['acl']['inline']:
|
|
||||||
config['acl']['inline'].append(ir_ip_warp_rule)
|
|
||||||
changed_to_warp = True
|
|
||||||
if changed_to_warp:
|
|
||||||
print("Domestic sites: Configured to use WARP.")
|
|
||||||
modified = True
|
|
||||||
else:
|
|
||||||
print("Domestic sites: Already configured to use WARP.")
|
|
||||||
elif set_domestic_sites_state == "off":
|
|
||||||
changed_to_reject = False
|
|
||||||
if ir_site_warp_rule in config['acl']['inline'] or ir_ip_warp_rule in config['acl']['inline']:
|
|
||||||
config['acl']['inline'] = [r for r in config['acl']['inline'] if r not in [ir_site_warp_rule, ir_ip_warp_rule]]
|
|
||||||
changed_to_reject = True
|
|
||||||
if ir_site_reject_rule not in config['acl']['inline']:
|
|
||||||
config['acl']['inline'].append(ir_site_reject_rule)
|
|
||||||
changed_to_reject = True
|
|
||||||
if ir_ip_reject_rule not in config['acl']['inline']:
|
|
||||||
config['acl']['inline'].append(ir_ip_reject_rule)
|
|
||||||
changed_to_reject = True
|
|
||||||
if changed_to_reject:
|
|
||||||
print("Domestic sites: Configured to REJECT.")
|
|
||||||
modified = True
|
|
||||||
else:
|
|
||||||
print("Domestic sites: Already configured to REJECT.")
|
|
||||||
|
|
||||||
if set_block_adult_sites_state is not None:
|
if set_block_adult_sites_state is not None:
|
||||||
nsfw_rule = "reject(geosite:nsfw)"
|
nsfw_stub = ["geosite:nsfw"]
|
||||||
is_blocking_nsfw = nsfw_rule in config['acl']['inline']
|
target_prefix = 'reject' if set_block_adult_sites_state == 'on' else None
|
||||||
|
acl_inline, changed = _update_acl_rules(acl_inline, nsfw_stub, target_prefix)
|
||||||
|
|
||||||
if 'resolver' not in config: config['resolver'] = {}
|
if 'resolver' not in config: config['resolver'] = {}
|
||||||
if 'tls' not in config['resolver']: config['resolver']['tls'] = {}
|
if 'tls' not in config['resolver']: config['resolver']['tls'] = {}
|
||||||
|
|
||||||
desired_resolver = ""
|
|
||||||
if set_block_adult_sites_state == "on":
|
|
||||||
desired_resolver = "1.1.1.3:853"
|
|
||||||
if not is_blocking_nsfw:
|
|
||||||
config['acl']['inline'].append(nsfw_rule)
|
|
||||||
print("Adult content blocking: Enabled.")
|
|
||||||
modified = True
|
|
||||||
else:
|
|
||||||
print("Adult content blocking: Already enabled.")
|
|
||||||
elif set_block_adult_sites_state == "off":
|
|
||||||
desired_resolver = "1.1.1.1:853"
|
|
||||||
if is_blocking_nsfw:
|
|
||||||
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule != nsfw_rule]
|
|
||||||
print("Adult content blocking: Disabled.")
|
|
||||||
modified = True
|
|
||||||
else:
|
|
||||||
print("Adult content blocking: Already disabled.")
|
|
||||||
|
|
||||||
|
desired_resolver = "1.1.1.3:853" if set_block_adult_sites_state == 'on' else "1.1.1.1:853"
|
||||||
if config['resolver']['tls'].get('addr') != desired_resolver:
|
if config['resolver']['tls'].get('addr') != desired_resolver:
|
||||||
config['resolver']['tls']['addr'] = desired_resolver
|
config['resolver']['tls']['addr'] = desired_resolver
|
||||||
print(f"Resolver: Updated to {desired_resolver}.")
|
print(f"Resolver: Updated to {desired_resolver}.")
|
||||||
modified = True
|
modified = True
|
||||||
|
|
||||||
|
if changed:
|
||||||
|
print(f"Adult content blocking: {'Enabled' if set_block_adult_sites_state == 'on' else 'Disabled'}.")
|
||||||
|
modified = True
|
||||||
|
elif not modified:
|
||||||
|
print(f"Adult content blocking: Already {'enabled' if set_block_adult_sites_state == 'on' else 'disabled'}.")
|
||||||
|
|
||||||
if 'acl' in config and 'inline' in config['acl']:
|
config['acl']['inline'] = [rule for rule in acl_inline if rule]
|
||||||
config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule]
|
|
||||||
|
|
||||||
|
|
||||||
if modified:
|
if modified:
|
||||||
with open(CONFIG_FILE, 'w') as f:
|
with open(CONFIG_FILE, 'w') as f:
|
||||||
|
|||||||
Reference in New Issue
Block a user