From 5747a893a7d19f49e32f891dbfd66e8df57b959a Mon Sep 17 00:00:00 2001 From: ReturnFI <151555003+ReturnFI@users.noreply.github.com> Date: Fri, 19 Dec 2025 08:35:40 +0000 Subject: [PATCH] feat(config): enhance ACL rule management with geo rule detection and updates --- core/scripts/warp/configure.py | 197 ++++++++++++++------------------- 1 file changed, 84 insertions(+), 113 deletions(-) diff --git a/core/scripts/warp/configure.py b/core/scripts/warp/configure.py index 0b557fd..78e1c40 100644 --- a/core/scripts/warp/configure.py +++ b/core/scripts/warp/configure.py @@ -5,6 +5,7 @@ import sys import subprocess from pathlib import Path import argparse +import re core_scripts_dir = Path(__file__).resolve().parents[1] if str(core_scripts_dir) not in sys.path: @@ -12,6 +13,50 @@ if str(core_scripts_dir) not in sys.path: from paths import * +def _get_current_geo_rule_stubs(inline_rules): + """ + Detects the current country's geosite and geoip rule stubs from the ACL list. + Returns ('geosite:ir', 'geoip:ir') as a default if no specific rules are found. + """ + geosite_stub, geoip_stub = 'geosite:ir', 'geoip:ir' # Default + for rule in inline_rules: + if 'geosite:' in rule and any(country in rule for country in ['ir', 'cn', 'ru']): + match = re.search(r'geosite:[^)]+', rule) + if match: + geosite_stub = match.group(0) + if 'geoip:' in rule and any(country in rule for country in ['ir', 'cn', 'ru']): + match = re.search(r'geoip:[^)]+', rule) + if match: + geoip_stub = match.group(0) + print(f"INFO: Detected domestic geo rules: {geosite_stub}, {geoip_stub}") + return geosite_stub, geoip_stub + +def _update_acl_rules(acl_list, stubs_to_manage, target_prefix=None): + """ + Atomically updates ACL rules. It removes all managed stubs (both reject and warps) + and then adds them back with the correct target_prefix. + - target_prefix: 'warps', 'reject', or None (to just remove). + """ + initial_len = len(acl_list) + + rules_to_remove = set() + for stub in stubs_to_manage: + rules_to_remove.add(f"reject({stub})") + rules_to_remove.add(f"warps({stub})") + + acl_list = [rule for rule in acl_list if rule not in rules_to_remove] + + rules_were_added = False + if target_prefix: + for stub in stubs_to_manage: + new_rule = f"{target_prefix}({stub})" + if new_rule not in acl_list: + acl_list.append(new_rule) + rules_were_added = True + + modified = (len(acl_list) != initial_len) or rules_were_added + return acl_list, modified + def warp_configure_handler( set_all_traffic_state: str | None = None, set_popular_sites_state: str | None = None, @@ -30,135 +75,61 @@ def warp_configure_handler( modified = False - if 'acl' not in config: - config['acl'] = {} - if 'inline' not in config['acl']: - config['acl']['inline'] = [] + if 'acl' not in config: config['acl'] = {} + if 'inline' not in config['acl']: config['acl']['inline'] = [] + + acl_inline = config['acl']['inline'] if set_all_traffic_state is not None: - warp_all_rule = "warps(all)" - warp_all_active = warp_all_rule in config['acl']['inline'] - if set_all_traffic_state == "on": - if not warp_all_active: - config['acl']['inline'].append(warp_all_rule) - print("All traffic rule: Enabled.") - modified = True - else: - print("All traffic rule: Already enabled.") - elif set_all_traffic_state == "off": - if warp_all_active: - config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule != warp_all_rule] - print("All traffic rule: Disabled.") - modified = True - else: - print("All traffic rule: Already disabled.") + acl_inline, changed = _update_acl_rules(acl_inline, ['all'], 'warps' if set_all_traffic_state == 'on' else None) + if changed: + print(f"All traffic rule: {'Enabled' if set_all_traffic_state == 'on' else 'Disabled'}.") + modified = True + else: + print(f"All traffic rule: Already {'enabled' if set_all_traffic_state == 'on' else 'disabled'}.") if set_popular_sites_state is not None: - popular_rules = [ - "warps(geoip:google)", "warps(geosite:google)", "warps(geosite:netflix)", - "warps(geosite:spotify)", "warps(geosite:openai)", "warps(geoip:openai)" - ] - if set_popular_sites_state == "on": - added_any = False - for rule in popular_rules: - if rule not in config['acl']['inline']: - config['acl']['inline'].append(rule) - added_any = True - if added_any: - print("Popular sites rule: Enabled/Updated.") - modified = True - else: - - all_present = all(rule in config['acl']['inline'] for rule in popular_rules) - if all_present: - print("Popular sites rule: Already enabled.") - else: - print("Popular sites rule: Enabled/Updated.") - modified = True - elif set_popular_sites_state == "off": - removed_any = False - initial_len = len(config['acl']['inline']) - config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule not in popular_rules] - if len(config['acl']['inline']) < initial_len: - removed_any = True - if removed_any: - print("Popular sites rule: Disabled.") - modified = True - else: - print("Popular sites rule: Already disabled.") + popular_stubs = ["geoip:google", "geosite:google", "geosite:netflix", "geosite:spotify"] + target_prefix = 'warps' if set_popular_sites_state == 'on' else None + acl_inline, changed = _update_acl_rules(acl_inline, popular_stubs, target_prefix) + if changed: + print(f"Popular sites rule: {'Enabled' if set_popular_sites_state == 'on' else 'Disabled'}.") + modified = True + else: + print(f"Popular sites rule: Already {'enabled' if set_popular_sites_state == 'on' else 'disabled'}.") if set_domestic_sites_state is not None: - ir_site_warp_rule = "warps(geosite:ir)" - ir_ip_warp_rule = "warps(geoip:ir)" - ir_site_reject_rule = "reject(geosite:ir)" - ir_ip_reject_rule = "reject(geoip:ir)" - - if set_domestic_sites_state == "on": - changed_to_warp = False - if ir_site_reject_rule in config['acl']['inline'] or ir_ip_reject_rule in config['acl']['inline']: - config['acl']['inline'] = [r for r in config['acl']['inline'] if r not in [ir_site_reject_rule, ir_ip_reject_rule]] - changed_to_warp = True - if ir_site_warp_rule not in config['acl']['inline']: - config['acl']['inline'].append(ir_site_warp_rule) - changed_to_warp = True - if ir_ip_warp_rule not in config['acl']['inline']: - config['acl']['inline'].append(ir_ip_warp_rule) - changed_to_warp = True - if changed_to_warp: - print("Domestic sites: Configured to use WARP.") - modified = True - else: - print("Domestic sites: Already configured to use WARP.") - elif set_domestic_sites_state == "off": - changed_to_reject = False - if ir_site_warp_rule in config['acl']['inline'] or ir_ip_warp_rule in config['acl']['inline']: - config['acl']['inline'] = [r for r in config['acl']['inline'] if r not in [ir_site_warp_rule, ir_ip_warp_rule]] - changed_to_reject = True - if ir_site_reject_rule not in config['acl']['inline']: - config['acl']['inline'].append(ir_site_reject_rule) - changed_to_reject = True - if ir_ip_reject_rule not in config['acl']['inline']: - config['acl']['inline'].append(ir_ip_reject_rule) - changed_to_reject = True - if changed_to_reject: - print("Domestic sites: Configured to REJECT.") - modified = True - else: - print("Domestic sites: Already configured to REJECT.") + geosite_stub, geoip_stub = _get_current_geo_rule_stubs(acl_inline) + domestic_stubs = [geosite_stub, geoip_stub] + target_prefix = 'warps' if set_domestic_sites_state == 'on' else 'reject' + acl_inline, changed = _update_acl_rules(acl_inline, domestic_stubs, target_prefix) + if changed: + print(f"Domestic sites: Configured to use {'WARP' if set_domestic_sites_state == 'on' else 'REJECT'}.") + modified = True + else: + print(f"Domestic sites: Already configured to use {'WARP' if set_domestic_sites_state == 'on' else 'REJECT'}.") if set_block_adult_sites_state is not None: - nsfw_rule = "reject(geosite:nsfw)" - is_blocking_nsfw = nsfw_rule in config['acl']['inline'] - + nsfw_stub = ["geosite:nsfw"] + target_prefix = 'reject' if set_block_adult_sites_state == 'on' else None + acl_inline, changed = _update_acl_rules(acl_inline, nsfw_stub, target_prefix) + if 'resolver' not in config: config['resolver'] = {} if 'tls' not in config['resolver']: config['resolver']['tls'] = {} - - desired_resolver = "" - if set_block_adult_sites_state == "on": - desired_resolver = "1.1.1.3:853" - if not is_blocking_nsfw: - config['acl']['inline'].append(nsfw_rule) - print("Adult content blocking: Enabled.") - modified = True - else: - print("Adult content blocking: Already enabled.") - elif set_block_adult_sites_state == "off": - desired_resolver = "1.1.1.1:853" - if is_blocking_nsfw: - config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule != nsfw_rule] - print("Adult content blocking: Disabled.") - modified = True - else: - print("Adult content blocking: Already disabled.") + desired_resolver = "1.1.1.3:853" if set_block_adult_sites_state == 'on' else "1.1.1.1:853" if config['resolver']['tls'].get('addr') != desired_resolver: config['resolver']['tls']['addr'] = desired_resolver print(f"Resolver: Updated to {desired_resolver}.") modified = True + + if changed: + print(f"Adult content blocking: {'Enabled' if set_block_adult_sites_state == 'on' else 'Disabled'}.") + modified = True + elif not modified: + print(f"Adult content blocking: Already {'enabled' if set_block_adult_sites_state == 'on' else 'disabled'}.") - if 'acl' in config and 'inline' in config['acl']: - config['acl']['inline'] = [rule for rule in config['acl']['inline'] if rule] - + config['acl']['inline'] = [rule for rule in acl_inline if rule] if modified: with open(CONFIG_FILE, 'w') as f: