refactor(warp): Replace custom logging with Python's logging module and clean up code
This commit is contained in:
@ -4,14 +4,9 @@ import os
|
||||
import sys
|
||||
import subprocess
|
||||
import platform
|
||||
import re
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
RED = "\033[31m"
|
||||
GREEN = "\033[32m"
|
||||
YELLOW = "\033[33m"
|
||||
RESET = "\033[0m"
|
||||
|
||||
WGCF_PROFILE = "wgcf-profile.conf"
|
||||
WGCF_PROFILE_DIR = Path("/etc/warp")
|
||||
WGCF_PROFILE_PATH = WGCF_PROFILE_DIR / WGCF_PROFILE
|
||||
@ -29,11 +24,6 @@ TEST_IPV6 = ["2606:4700:4700::1001", "2620:fe::fe"]
|
||||
CF_TRACE_URL = "https://www.cloudflare.com/cdn-cgi/trace"
|
||||
|
||||
|
||||
def log(level, msg):
|
||||
colors = {"INFO": GREEN, "WARN": YELLOW, "ERROR": RED}
|
||||
print(f"[{colors.get(level, '')}{level}{RESET}] {msg}")
|
||||
|
||||
|
||||
def run(cmd, capture=False, check=False, shell=True):
|
||||
try:
|
||||
r = subprocess.run(cmd, shell=shell, capture_output=capture, text=True)
|
||||
@ -51,7 +41,7 @@ def systemctl(action, service):
|
||||
|
||||
|
||||
def get_system_info():
|
||||
info = {"os": "", "os_full": "", "os_like": "", "os_ver": "", "arch": platform.machine(), "virt": "", "kernel": platform.release()}
|
||||
info = {"os": "", "os_full": "", "arch": platform.machine(), "virt": "", "kernel": platform.release()}
|
||||
try:
|
||||
with open("/etc/os-release") as f:
|
||||
for line in f:
|
||||
@ -59,10 +49,6 @@ def get_system_info():
|
||||
info["os"] = line.split("=")[1].strip().strip('"').lower()
|
||||
elif line.startswith("PRETTY_NAME="):
|
||||
info["os_full"] = line.split("=", 1)[1].strip().strip('"')
|
||||
elif line.startswith("ID_LIKE="):
|
||||
info["os_like"] = line.split("=")[1].strip().strip('"').lower()
|
||||
elif line.startswith("VERSION_ID="):
|
||||
info["os_ver"] = line.split("=")[1].strip().strip('"').split(".")[0]
|
||||
except:
|
||||
pass
|
||||
info["virt"] = run("systemd-detect-virt", capture=True) or "none"
|
||||
@ -99,25 +85,12 @@ def check_wireguard():
|
||||
|
||||
|
||||
def install_wireguard_tools(info):
|
||||
log("INFO", "Installing wireguard-tools...")
|
||||
logging.info("Installing wireguard-tools...")
|
||||
os_name = info["os"]
|
||||
os_like = info["os_like"]
|
||||
if "debian" in os_name or "ubuntu" in os_name:
|
||||
run("apt update && apt install -y iproute2 resolvconf wireguard-tools --no-install-recommends")
|
||||
elif "centos" in os_name or "rhel" in os_name:
|
||||
run(f"yum install -y epel-release || yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-{info['os_ver']}.noarch.rpm")
|
||||
run("yum install -y iproute iptables wireguard-tools")
|
||||
systemctl("enable", "systemd-resolved --now")
|
||||
elif "fedora" in os_name:
|
||||
run("dnf install -y iproute iptables wireguard-tools")
|
||||
systemctl("enable", "systemd-resolved --now")
|
||||
elif "arch" in os_name:
|
||||
run("pacman -Sy --noconfirm iproute2 openresolv wireguard-tools")
|
||||
elif "rhel" in os_like or "fedora" in os_like:
|
||||
run("yum install -y epel-release && yum install -y iproute iptables wireguard-tools")
|
||||
systemctl("enable", "systemd-resolved --now")
|
||||
else:
|
||||
log("ERROR", "OS not supported.")
|
||||
logging.error("OS not supported. Only Debian and Ubuntu are supported.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@ -134,7 +107,7 @@ def install_wireguard(info):
|
||||
install_wireguard_tools(info)
|
||||
install_wireguard_go(info)
|
||||
else:
|
||||
log("INFO", "WireGuard already installed and running.")
|
||||
logging.info("WireGuard already installed and running.")
|
||||
|
||||
|
||||
def install_wgcf():
|
||||
@ -144,7 +117,7 @@ def install_wgcf():
|
||||
def register_warp_account():
|
||||
while not Path("wgcf-account.toml").exists():
|
||||
install_wgcf()
|
||||
log("INFO", "Registering WARP account...")
|
||||
logging.info("Registering WARP account...")
|
||||
run("yes | wgcf register")
|
||||
run("sleep 5")
|
||||
|
||||
@ -152,7 +125,7 @@ def register_warp_account():
|
||||
def generate_wgcf_profile():
|
||||
while not Path(WGCF_PROFILE).exists():
|
||||
register_warp_account()
|
||||
log("INFO", "Generating WGCF profile...")
|
||||
logging.info("Generating WGCF profile...")
|
||||
run("wgcf generate")
|
||||
|
||||
|
||||
@ -197,7 +170,7 @@ def load_wgcf_profile():
|
||||
|
||||
|
||||
def get_mtu(ipv4_ok, ipv6_ok):
|
||||
log("INFO", "Calculating optimal MTU...")
|
||||
logging.info("Calculating optimal MTU...")
|
||||
mtu = 1500
|
||||
increment = 10
|
||||
cmd = "ping6" if not ipv4_ok and ipv6_ok else "ping"
|
||||
@ -214,7 +187,7 @@ def get_mtu(ipv4_ok, ipv6_ok):
|
||||
mtu = 1360
|
||||
break
|
||||
mtu -= 80
|
||||
log("INFO", f"MTU: {mtu}")
|
||||
logging.info(f"MTU: {mtu}")
|
||||
return mtu
|
||||
|
||||
|
||||
@ -269,7 +242,7 @@ def enable_ipv6():
|
||||
|
||||
def start_wireguard():
|
||||
warp_active = check_warp_client()
|
||||
log("INFO", "Starting WireGuard...")
|
||||
logging.info("Starting WireGuard...")
|
||||
if warp_active:
|
||||
systemctl("stop", "warp-svc")
|
||||
systemctl("enable", f"wg-quick@{WG_INTERFACE} --now")
|
||||
@ -277,9 +250,9 @@ def start_wireguard():
|
||||
systemctl("start", "warp-svc")
|
||||
active, _ = check_wireguard()
|
||||
if active:
|
||||
log("INFO", "WireGuard running.")
|
||||
logging.info("WireGuard running.")
|
||||
else:
|
||||
log("ERROR", "WireGuard failed!")
|
||||
logging.error("WireGuard failed!")
|
||||
run(f"journalctl -u wg-quick@{WG_INTERFACE} --no-pager")
|
||||
sys.exit(1)
|
||||
|
||||
@ -288,7 +261,7 @@ def disable_wireguard():
|
||||
warp_active = check_warp_client()
|
||||
active, enabled = check_wireguard()
|
||||
if active or enabled:
|
||||
log("INFO", "Disabling WireGuard...")
|
||||
logging.info("Disabling WireGuard...")
|
||||
if warp_active:
|
||||
systemctl("stop", "warp-svc")
|
||||
systemctl("disable", f"wg-quick@{WG_INTERFACE} --now")
|
||||
@ -296,11 +269,11 @@ def disable_wireguard():
|
||||
systemctl("start", "warp-svc")
|
||||
active, enabled = check_wireguard()
|
||||
if not active and not enabled:
|
||||
log("INFO", "WireGuard disabled.")
|
||||
logging.info("WireGuard disabled.")
|
||||
else:
|
||||
log("ERROR", "Disable failed!")
|
||||
logging.error("Disable failed!")
|
||||
else:
|
||||
log("INFO", "WireGuard already disabled.")
|
||||
logging.info("WireGuard already disabled.")
|
||||
|
||||
|
||||
def get_warp_status():
|
||||
@ -310,19 +283,19 @@ def get_warp_status():
|
||||
v6_status = run(f"curl -s6 {CF_TRACE_URL} --connect-timeout 2 | grep warp | cut -d= -f2", capture=True) if ipv6_ok else ""
|
||||
def fmt(status, connected):
|
||||
if status == "on":
|
||||
return f"{GREEN}WARP{RESET}"
|
||||
return "WARP"
|
||||
if status == "plus":
|
||||
return f"{GREEN}WARP+{RESET}"
|
||||
return "WARP+"
|
||||
if status == "off":
|
||||
return "Normal"
|
||||
return "Normal" if connected else f"{RED}Unconnected{RESET}"
|
||||
return "Normal" if connected else "Unconnected"
|
||||
return fmt(v4_status, ipv4_ok), fmt(v6_status, ipv6_ok)
|
||||
|
||||
|
||||
def print_status():
|
||||
log("INFO", "Checking status...")
|
||||
logging.info("Checking status...")
|
||||
active, _ = check_wireguard()
|
||||
wg_status = f"{GREEN}Running{RESET}" if active else f"{RED}Stopped{RESET}"
|
||||
wg_status = "Running" if active else "Stopped"
|
||||
v4, v6 = get_warp_status()
|
||||
print(f"\n WireGuard: {wg_status}\n IPv4: {v4}\n IPv6: {v6}\n")
|
||||
|
||||
@ -347,24 +320,15 @@ def install_wgx(info):
|
||||
|
||||
|
||||
def uninstall(info):
|
||||
log("INFO", "Uninstalling...")
|
||||
logging.info("Uninstalling...")
|
||||
disable_wireguard()
|
||||
WG_CONF_PATH.unlink(missing_ok=True)
|
||||
run(f"rm -rf {WGCF_PROFILE_DIR}")
|
||||
run("rm -f /usr/local/bin/wgcf")
|
||||
os_name = info["os"]
|
||||
os_like = info["os_like"]
|
||||
if "debian" in os_name or "ubuntu" in os_name:
|
||||
run("apt purge -y wireguard-tools 2>/dev/null")
|
||||
elif "centos" in os_name or "rhel" in os_name:
|
||||
run("yum remove -y wireguard-tools 2>/dev/null")
|
||||
elif "fedora" in os_name:
|
||||
run("dnf remove -y wireguard-tools 2>/dev/null")
|
||||
elif "arch" in os_name:
|
||||
run("pacman -Rns --noconfirm wireguard-tools 2>/dev/null")
|
||||
elif "rhel" in os_like or "fedora" in os_like:
|
||||
run("yum remove -y wireguard-tools 2>/dev/null")
|
||||
log("INFO", "Uninstall complete.")
|
||||
logging.info("Uninstall complete.")
|
||||
|
||||
|
||||
def print_usage():
|
||||
@ -382,14 +346,15 @@ COMMANDS:
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
|
||||
if platform.system() != "Linux":
|
||||
log("ERROR", "Linux required.")
|
||||
logging.error("Linux required.")
|
||||
sys.exit(1)
|
||||
if os.geteuid() != 0:
|
||||
log("ERROR", "Root required.")
|
||||
logging.error("Root required.")
|
||||
sys.exit(1)
|
||||
if not cmd_exists("curl"):
|
||||
log("ERROR", "cURL required.")
|
||||
logging.error("cURL required.")
|
||||
sys.exit(1)
|
||||
info = get_system_info()
|
||||
cmd = sys.argv[1] if len(sys.argv) > 1 else "help"
|
||||
|
||||
Reference in New Issue
Block a user