feat: migrate user management from json to mongodb
This commit is contained in:
@ -3,16 +3,16 @@
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
import fcntl
|
||||
import shutil
|
||||
import datetime
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from init_paths import *
|
||||
from paths import *
|
||||
from hysteria2_api import Hysteria2Client
|
||||
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
||||
from db.database import db
|
||||
from hysteria2_api import Hysteria2Client
|
||||
from paths import CONFIG_FILE
|
||||
|
||||
logging.basicConfig(
|
||||
stream=sys.stdout,
|
||||
level=logging.INFO,
|
||||
@ -22,8 +22,8 @@ logging.basicConfig(
|
||||
logger = logging.getLogger()
|
||||
|
||||
LOCKFILE = "/tmp/kick.lock"
|
||||
BACKUP_FILE = f"{USERS_FILE}.bak"
|
||||
MAX_WORKERS = 8
|
||||
API_BASE_URL = 'http://127.0.0.1:25413'
|
||||
|
||||
def acquire_lock():
|
||||
try:
|
||||
@ -34,144 +34,104 @@ def acquire_lock():
|
||||
logger.warning("Another instance is already running. Exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
def kick_users(usernames, secret):
|
||||
def get_secret():
|
||||
try:
|
||||
client = Hysteria2Client(
|
||||
base_url="http://127.0.0.1:25413",
|
||||
secret=secret
|
||||
)
|
||||
|
||||
client.kick_clients(usernames)
|
||||
logger.info(f"Successfully kicked {len(usernames)} users: {', '.join(usernames)}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error kicking users: {str(e)}")
|
||||
return False
|
||||
with open(CONFIG_FILE, 'r') as f:
|
||||
config = json.load(f)
|
||||
return config.get('trafficStats', {}).get('secret')
|
||||
except (json.JSONDecodeError, FileNotFoundError):
|
||||
return None
|
||||
|
||||
def process_user(username, user_data, config_secret, users_data):
|
||||
blocked = user_data.get('blocked', False)
|
||||
|
||||
if blocked:
|
||||
logger.info(f"Skipping {username} as they are already blocked.")
|
||||
return None
|
||||
|
||||
max_download_bytes = user_data.get('max_download_bytes', 0)
|
||||
expiration_days = user_data.get('expiration_days', 0)
|
||||
account_creation_date = user_data.get('account_creation_date')
|
||||
current_download_bytes = user_data.get('download_bytes', 0)
|
||||
current_upload_bytes = user_data.get('upload_bytes', 0)
|
||||
|
||||
total_bytes = current_download_bytes + current_upload_bytes
|
||||
|
||||
if not account_creation_date:
|
||||
logger.info(f"Skipping {username} due to missing account creation date.")
|
||||
return None
|
||||
|
||||
def kick_users_api(usernames, secret):
|
||||
try:
|
||||
current_date = datetime.datetime.now().timestamp()
|
||||
creation_date = datetime.datetime.fromisoformat(account_creation_date.replace('Z', '+00:00'))
|
||||
expiration_date = (creation_date + datetime.timedelta(days=expiration_days)).timestamp()
|
||||
|
||||
should_block = False
|
||||
|
||||
if max_download_bytes > 0 and total_bytes >= 0 and expiration_days > 0:
|
||||
if total_bytes >= max_download_bytes or current_date >= expiration_date:
|
||||
should_block = True
|
||||
|
||||
if should_block:
|
||||
logger.info(f"Setting blocked=True for user {username}")
|
||||
users_data[username]['blocked'] = True
|
||||
return username
|
||||
else:
|
||||
logger.info(f"Skipping {username} due to invalid or missing data.")
|
||||
return None
|
||||
|
||||
client = Hysteria2Client(base_url=API_BASE_URL, secret=secret)
|
||||
client.kick_clients(usernames)
|
||||
logger.info(f"Successfully sent kick command for users: {', '.join(usernames)}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing user {username}: {str(e)}")
|
||||
logger.error(f"Error kicking users via API: {e}")
|
||||
|
||||
def process_user(user_doc):
|
||||
username = user_doc.get('_id')
|
||||
|
||||
if not username or user_doc.get('blocked', False):
|
||||
return None
|
||||
|
||||
account_creation_date = user_doc.get('account_creation_date')
|
||||
if not account_creation_date:
|
||||
return None
|
||||
|
||||
should_block = False
|
||||
|
||||
try:
|
||||
expiration_days = user_doc.get('expiration_days', 0)
|
||||
if expiration_days > 0:
|
||||
creation_date = datetime.datetime.strptime(account_creation_date, "%Y-%m-%d")
|
||||
expiration_date = creation_date + datetime.timedelta(days=expiration_days)
|
||||
if datetime.datetime.now() >= expiration_date:
|
||||
should_block = True
|
||||
logger.info(f"User {username} is expired.")
|
||||
|
||||
if not should_block:
|
||||
max_download_bytes = user_doc.get('max_download_bytes', 0)
|
||||
if max_download_bytes > 0:
|
||||
total_bytes = user_doc.get('download_bytes', 0) + user_doc.get('upload_bytes', 0)
|
||||
if total_bytes >= max_download_bytes:
|
||||
should_block = True
|
||||
logger.info(f"User {username} has exceeded their traffic limit.")
|
||||
|
||||
if should_block:
|
||||
return username
|
||||
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.error(f"Error processing user {username} due to invalid data: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def main():
|
||||
lock_file = acquire_lock()
|
||||
|
||||
try:
|
||||
shutil.copy2(USERS_FILE, BACKUP_FILE)
|
||||
logger.info(f"Created backup of users file at {BACKUP_FILE}")
|
||||
|
||||
try:
|
||||
with open(CONFIG_FILE, 'r') as f:
|
||||
config = json.load(f)
|
||||
secret = config.get('trafficStats', {}).get('secret', '')
|
||||
if not secret:
|
||||
logger.error("No secret found in config file")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load config file: {str(e)}")
|
||||
shutil.copy2(BACKUP_FILE, USERS_FILE)
|
||||
if db is None:
|
||||
logger.error("Database connection failed. Exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
secret = get_secret()
|
||||
if not secret:
|
||||
logger.error(f"Could not find secret in {CONFIG_FILE}. Exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
with open(USERS_FILE, 'r') as f:
|
||||
users_data = json.load(f)
|
||||
logger.info(f"Loaded data for {len(users_data)} users")
|
||||
except json.JSONDecodeError:
|
||||
logger.error("Invalid users.json. Restoring backup.")
|
||||
shutil.copy2(BACKUP_FILE, USERS_FILE)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load users file: {str(e)}")
|
||||
shutil.copy2(BACKUP_FILE, USERS_FILE)
|
||||
sys.exit(1)
|
||||
all_users = db.get_all_users()
|
||||
logger.info(f"Loaded {len(all_users)} users from the database for processing.")
|
||||
|
||||
users_to_kick = []
|
||||
logger.info(f"Processing {len(users_data)} users in parallel with {MAX_WORKERS} workers")
|
||||
users_to_block = []
|
||||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||||
future_to_user = {
|
||||
executor.submit(process_user, username, user_data, secret, users_data): username
|
||||
for username, user_data in users_data.items()
|
||||
}
|
||||
|
||||
future_to_user = {executor.submit(process_user, user_doc): user_doc for user_doc in all_users}
|
||||
for future in future_to_user:
|
||||
username = future.result()
|
||||
if username:
|
||||
users_to_kick.append(username)
|
||||
logger.info(f"User {username} added to kick list")
|
||||
result = future.result()
|
||||
if result:
|
||||
users_to_block.append(result)
|
||||
|
||||
if users_to_kick:
|
||||
logger.info(f"Saving changes to users file for {len(users_to_kick)} blocked users")
|
||||
for retry in range(3):
|
||||
try:
|
||||
with open(USERS_FILE, 'w') as f:
|
||||
json.dump(users_data, f, indent=2)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save users file (attempt {retry+1}): {str(e)}")
|
||||
time.sleep(1)
|
||||
if retry == 2:
|
||||
raise
|
||||
if not users_to_block:
|
||||
logger.info("No users to block or kick.")
|
||||
return
|
||||
|
||||
logger.info(f"Found {len(users_to_block)} users to block: {', '.join(users_to_block)}")
|
||||
|
||||
if users_to_kick:
|
||||
logger.info(f"Kicking {len(users_to_kick)} users")
|
||||
batch_size = 50
|
||||
for i in range(0, len(users_to_kick), batch_size):
|
||||
batch = users_to_kick[i:i+batch_size]
|
||||
logger.info(f"Processing batch of {len(batch)} users")
|
||||
kick_users(batch, secret)
|
||||
for username in batch:
|
||||
logger.info(f"Blocked and kicked user {username}")
|
||||
else:
|
||||
logger.info("No users to kick")
|
||||
for username in users_to_block:
|
||||
db.update_user(username, {'blocked': True})
|
||||
logger.info("Successfully updated user statuses to 'blocked' in the database.")
|
||||
|
||||
batch_size = 50
|
||||
for i in range(0, len(users_to_block), batch_size):
|
||||
batch = users_to_block[i:i + batch_size]
|
||||
kick_users_api(batch, secret)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred: {str(e)}")
|
||||
logger.info("Restoring users file from backup")
|
||||
shutil.copy2(BACKUP_FILE, USERS_FILE)
|
||||
logger.error(f"An unexpected error occurred in main execution: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
finally:
|
||||
fcntl.flock(lock_file, fcntl.LOCK_UN)
|
||||
lock_file.close()
|
||||
logger.info("Script completed")
|
||||
logger.info("Script finished.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user