diff --git a/launch_conduwuit.sh b/launch.sh similarity index 72% rename from launch_conduwuit.sh rename to launch.sh index 1a933c5..756e937 100755 --- a/launch_conduwuit.sh +++ b/launch.sh @@ -1,8 +1,8 @@ #!/bin/bash # File paths -BASE_PATH="/home/sij/hand_of_morpheus" -TOKEN_FILE="$BASE_PATH/.registration_token" +BASE_PATH="/home/sij/hand_of_morpheus/sw1tch" +TOKEN_FILE="$BASE_PATH/data/.registration_token" LOG_FILE="$BASE_PATH/logs/token_refresh.log" BACKUP_PATH="/home/sij/conduwuit_backup" ENV_FILE="$BASE_PATH/config/conduwuit.env" @@ -22,8 +22,8 @@ FORCE_RESTART=false # Function to log with a timestamp to both file and terminal log() { local message="$(date --iso-8601=seconds) $1" - echo "$message" >> "$LOG_FILE" # Write to log file - echo "$message" # Print to terminal + echo "$message" >> "$LOG_FILE" + echo "$message" } # Function to refresh the registration token @@ -129,18 +129,15 @@ restart_container() { # Function to ensure the registration service is running ensure_registration_service() { - local python_script="$BASE_PATH/registration.py" local pid_file="$BASE_PATH/data/registration.pid" local log_file="$BASE_PATH/logs/registration.log" - if [ ! -f "$python_script" ]; then - log "ERROR: Python script $python_script not found" - exit 1 - fi + touch "$log_file" || { log "ERROR: Cannot write to $log_file"; exit 1; } + chmod 666 "$log_file" REG_PORT=$(python3 -c "import yaml, sys; print(yaml.safe_load(open('$CONFIG_FILE')).get('port', 8000))") log "Registration service port from config: $REG_PORT" - + if [ "$FORCE_RESTART" = true ]; then log "Force restart requested. Clearing any process listening on port $REG_PORT..." PIDS=$(lsof -ti tcp:"$REG_PORT") @@ -151,58 +148,55 @@ ensure_registration_service() { fi rm -f "$pid_file" log "Force starting registration service..." - python3 "$python_script" >> "$log_file" 2>&1 & + cd "$(dirname "$BASE_PATH")" || { log "ERROR: Cannot cd to $(dirname "$BASE_PATH")"; exit 1; } + log "Running: nohup python3 -m sw1tch >> $log_file 2>&1 &" + nohup python3 -m sw1tch >> "$log_file" 2>&1 & NEW_PID=$! - echo "$NEW_PID" > "$pid_file" - log "Started registration service with PID $NEW_PID" + sleep 2 + if ps -p "$NEW_PID" > /dev/null; then + echo "$NEW_PID" > "$pid_file" + log "Started registration service with PID $NEW_PID" + sudo lsof -i :"$REG_PORT" || log "WARNING: No process on port $REG_PORT after start" + else + log "ERROR: Process $NEW_PID did not start or exited immediately" + cat "$log_file" >> "$LOG_FILE" + fi else EXISTING_PIDS=$(lsof -ti tcp:"$REG_PORT") if [ -n "$EXISTING_PIDS" ]; then log "Registration service already running on port $REG_PORT with PID(s): $EXISTING_PIDS" else log "Registration service not running on port $REG_PORT, starting..." - python3 "$python_script" >> "$log_file" 2>&1 & + cd "$(dirname "$BASE_PATH")" || { log "ERROR: Cannot cd to $(dirname "$BASE_PATH")"; exit 1; } + log "Running: nohup python3 -m sw1tch >> $log_file 2>&1 &" + nohup python3 -m sw1tch >> "$log_file" 2>&1 & NEW_PID=$! - echo "$NEW_PID" > "$pid_file" - log "Started registration service with PID $NEW_PID" + sleep 2 + if ps -p "$NEW_PID" > /dev/null; then + echo "$NEW_PID" > "$pid_file" + log "Started registration service with PID $NEW_PID" + sudo lsof -i :"$REG_PORT" || log "WARNING: No process on port $REG_PORT after start" + else + log "ERROR: Process $NEW_PID did not start or exited immediately" + cat "$log_file" >> "$LOG_FILE" + fi fi fi } -# Parse command-line flags +# Parse command-line flags and execute (unchanged) while [[ $# -gt 0 ]]; do case "$1" in - --refresh-token) - REFRESH_TOKEN=true - shift - ;; - --super-admin) - SUPER_ADMIN=true - shift - ;; - --update) - UPDATE=true - shift - ;; - --force-restart) - FORCE_RESTART=true - shift - ;; - *) - log "ERROR: Unknown option: $1" - echo "Usage: $0 [--refresh-token] [--super-admin] [--update] [--force-restart]" - exit 1 - ;; + --refresh-token) REFRESH_TOKEN=true; shift;; + --super-admin) SUPER_ADMIN=true; shift;; + --update) UPDATE=true; shift;; + --force-restart) FORCE_RESTART=true; shift;; + *) log "ERROR: Unknown option: $1"; echo "Usage: $0 [--refresh-token] [--super-admin] [--update] [--force-restart]"; exit 1;; esac done -# Execute based on flags -if [ "$UPDATE" = true ]; then - update_docker_image -fi -if [ "$REFRESH_TOKEN" = true ]; then - refresh_token -fi +if [ "$UPDATE" = true ]; then update_docker_image; fi +if [ "$REFRESH_TOKEN" = true ]; then refresh_token; fi restart_container ensure_registration_service diff --git a/registration.py b/registration.py deleted file mode 100755 index ca5afbf..0000000 --- a/registration.py +++ /dev/null @@ -1,706 +0,0 @@ -#!/usr/bin/env python3 -import os -import re -import yaml -import json -import smtplib -import httpx -import logging -import ipaddress -import hashlib -import asyncio -import time -from datetime import datetime, timedelta -from email.message import EmailMessage -from typing import List, Dict, Optional, Tuple, Set, Pattern, Union -from fastapi import FastAPI, Request, Form, HTTPException, Depends, Query -from fastapi.responses import HTMLResponse, JSONResponse -from fastapi.templating import Jinja2Templates -from fastapi.staticfiles import StaticFiles -from starlette.middleware.base import BaseHTTPMiddleware -from ipaddress import IPv4Network, IPv4Address -from nio import AsyncClient, RoomMessageText, RoomMessageNotice - -# --------------------------------------------------------- -# 1. Load configuration and setup paths -# --------------------------------------------------------- -BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -CONFIG_DIR = os.path.join(BASE_DIR, "config") -DATA_DIR = os.path.join(BASE_DIR, "data") -LOGS_DIR = os.path.join(BASE_DIR, "logs") - -CONFIG_PATH = os.path.join(CONFIG_DIR, "config.yaml") -with open(CONFIG_PATH, "r") as f: - config = yaml.safe_load(f) - -# Initialize or load registrations.json -REGISTRATIONS_PATH = os.path.join(DATA_DIR, "registrations.json") -def load_registrations() -> List[Dict]: - try: - with open(REGISTRATIONS_PATH, "r") as f: - return json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - return [] - -def save_registrations(registrations: List[Dict]): - with open(REGISTRATIONS_PATH, "w") as f: - json.dump(registrations, f, indent=2) - -def save_registration(data: Dict): - registrations = load_registrations() - registrations.append(data) - with open(REGISTRATIONS_PATH, "w") as f: - json.dump(registrations, f, indent=2) - -# Functions to check banned entries -def load_banned_usernames() -> List[Pattern]: - """Load banned usernames file and compile regex patterns.""" - patterns = [] - try: - with open(os.path.join(CONFIG_DIR, "banned_usernames.txt"), "r") as f: - for line in f: - line = line.strip() - if line: - try: - patterns.append(re.compile(line, re.IGNORECASE)) - except re.error: - logging.error(f"Invalid regex pattern in banned_usernames.txt: {line}") - except FileNotFoundError: - pass - return patterns - -def is_ip_banned(ip: str) -> bool: - """Check if an IP is banned, supporting both individual IPs and CIDR ranges.""" - try: - check_ip = IPv4Address(ip) - try: - with open(os.path.join(CONFIG_DIR, "banned_ips.txt"), "r") as f: - for line in f: - line = line.strip() - if not line: - continue - try: - if '/' in line: # CIDR notation - if check_ip in IPv4Network(line): - return True - else: # Individual IP - if check_ip == IPv4Address(line): - return True - except ValueError: - logging.error(f"Invalid IP/CIDR in banned_ips.txt: {line}") - except FileNotFoundError: - return False - except ValueError: - logging.error(f"Invalid IP address to check: {ip}") - return False - -def is_email_banned(email: str) -> bool: - """Check if an email matches any banned patterns.""" - try: - with open(os.path.join(CONFIG_DIR, "banned_emails.txt"), "r") as f: - for line in f: - pattern = line.strip() - if not pattern: - continue - regex_pattern = pattern.replace(".", "\\.").replace("*", ".*") - try: - if re.match(regex_pattern, email, re.IGNORECASE): - return True - except re.error: - logging.error(f"Invalid email pattern in banned_emails.txt: {pattern}") - except FileNotFoundError: - pass - return False - -def is_username_banned(username: str) -> bool: - """Check if username matches any banned patterns.""" - patterns = load_banned_usernames() - return any(pattern.search(username) for pattern in patterns) - -# Read the registration token (still at base level as per shell script) -def read_registration_token(): - token_path = os.path.join(BASE_DIR, ".registration_token") - try: - with open(token_path, "r") as f: - return f.read().strip() - except FileNotFoundError: - return None - -# --------------------------------------------------------- -# 2. Logging Configuration -# --------------------------------------------------------- -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - filename=os.path.join(LOGS_DIR, "registration.log"), - filemode='a' -) - -logging.getLogger("uvicorn.access").setLevel(logging.WARNING) -logging.getLogger("uvicorn.error").setLevel(logging.WARNING) -logger = logging.getLogger(__name__) - -class CustomLoggingMiddleware(BaseHTTPMiddleware): - async def dispatch(self, request, call_next): - if request.url.path == "/api/time" or request.url.path.endswith('favicon.ico'): - return await call_next(request) - response = await call_next(request) - logger.info(f"Request: {request.method} {request.url.path} - Status: {response.status_code}") - return response - -# --------------------------------------------------------- -# 3. Time Calculation Functions -# --------------------------------------------------------- -def get_current_utc() -> datetime: - return datetime.utcnow() - -def get_next_reset_time(now: datetime) -> datetime: - """Return the next reset time (possibly today or tomorrow) from config.""" - reset_h = config["registration"]["token_reset_time_utc"] // 100 - reset_m = config["registration"]["token_reset_time_utc"] % 100 - candidate = now.replace(hour=reset_h, minute=reset_m, second=0, microsecond=0) - if candidate <= now: - candidate += timedelta(days=1) - return candidate - -def get_downtime_start(next_reset: datetime) -> datetime: - """Return the downtime start time (minutes before next_reset).""" - return next_reset - timedelta(minutes=config["registration"]["downtime_before_token_reset"]) - -def format_timedelta(td: timedelta) -> str: - """Format a timedelta as 'X hours and Y minutes' (or similar).""" - total_minutes = int(td.total_seconds() // 60) - hours = total_minutes // 60 - minutes = total_minutes % 60 - - parts = [] - if hours == 1: - parts.append("1 hour") - elif hours > 1: - parts.append(f"{hours} hours") - - if minutes == 1: - parts.append("1 minute") - elif minutes > 1: - parts.append(f"{minutes} minutes") - - if not parts: - return "0 minutes" - - return " and ".join(parts) - -def get_time_until_reset_str(now: datetime) -> str: - """Return a string like '3 hours and 41 minutes' until next reset.""" - nr = get_next_reset_time(now) - delta = nr - now - return format_timedelta(delta) - -def is_registration_closed(now: datetime) -> Tuple[bool, str]: - """ - Determine if registration is closed based on config. - Return (closed_bool, message). - """ - nr = get_next_reset_time(now) - ds = get_downtime_start(nr) - - if ds <= now < nr: - time_until_open = nr - now - msg = ( - f"Registration is closed. " - f"It reopens in {format_timedelta(time_until_open)} at {nr.strftime('%H:%M UTC')}." - ) - return True, msg - else: - if now > ds: - nr += timedelta(days=1) - ds = get_downtime_start(nr) - - time_until_close = ds - now - msg = ( - f"Registration is open. " - f"It will close in {format_timedelta(time_until_close)} at {ds.strftime('%H:%M UTC')}." - ) - return False, msg - -# --------------------------------------------------------- -# 4. Registration Validation -# --------------------------------------------------------- -def check_email_cooldown(email: str) -> Optional[str]: - """Check if email is allowed to register based on cooldown and multiple account rules.""" - registrations = load_registrations() - email_entries = [r for r in registrations if r["email"] == email] - - if not email_entries: - return None - - if not config["registration"].get("multiple_users_per_email", True): - return "This email address has already been used to register an account." - - email_cooldown = config["registration"].get("email_cooldown") - if email_cooldown: - latest_registration = max( - datetime.fromisoformat(e["datetime"]) - for e in email_entries - ) - time_since = datetime.utcnow() - latest_registration - if time_since.total_seconds() < email_cooldown: - wait_time = email_cooldown - time_since.total_seconds() - return f"Please wait {int(wait_time)} seconds before requesting another account." - - return None - -async def check_username_availability(username: str) -> bool: - """Check if username is available on Matrix and in our registration records.""" - if is_username_banned(username): - logger.info(f"[USERNAME CHECK] {username}: Banned by pattern") - return False - - registrations = load_registrations() - if any(r["requested_name"] == username for r in registrations): - logger.info(f"[USERNAME CHECK] {username}: Already requested") - return False - - url = f"{config['base_url']}/_matrix/client/v3/register/available?username={username}" - async with httpx.AsyncClient() as client: - try: - response = await client.get(url, timeout=5) - if response.status_code == 200: - data = response.json() - is_available = data.get("available", False) - logger.info(f"[USERNAME CHECK] {username}: {'Available' if is_available else 'Taken'}") - return is_available - elif response.status_code == 400: - logger.info(f"[USERNAME CHECK] {username}: Taken (400)") - return False - except httpx.RequestError as ex: - logger.warning(f"[USERNAME CHECK] Could not reach homeserver: {ex}") - return False - return False - -# --------------------------------------------------------- -# 5. Email Helper Functions -# --------------------------------------------------------- -def load_template(template_path: str) -> str: - """Load an email template from a file.""" - try: - with open(os.path.join(BASE_DIR, template_path), "r") as f: - return f.read() - except FileNotFoundError: - raise HTTPException(status_code=500, detail=f"Email template not found: {template_path}") - -def build_email_message(token: str, requested_username: str, now: datetime, recipient_email: str) -> EmailMessage: - """ - Build and return an EmailMessage for registration using file-based templates. - """ - time_until_reset = get_time_until_reset_str(now) - - plain_template = load_template(config["email"]["templates"]["registration_token"]["body"]) - html_template = load_template(config["email"]["templates"]["registration_token"]["body_html"]) - - plain_body = plain_template.format( - homeserver=config["homeserver"], - registration_token=token, - requested_username=requested_username, - utc_time=now.strftime("%H:%M:%S"), - time_until_reset=time_until_reset - ) - html_body = html_template.format( - homeserver=config["homeserver"], - registration_token=token, - requested_username=requested_username, - utc_time=now.strftime("%H:%M:%S"), - time_until_reset=time_until_reset - ) - - msg = EmailMessage() - msg.set_content(plain_body) - msg.add_alternative(html_body, subtype="html") - - msg["Subject"] = config["email"]["templates"]["registration_token"]["subject"].format(homeserver=config["homeserver"]) - msg["From"] = config["email"]["smtp"]["from"] - msg["To"] = recipient_email - return msg - -def send_email_message(msg: EmailMessage) -> None: - """ - Send an email message using SMTP configuration. - """ - smtp_conf = config["email"]["smtp"] - try: - with smtplib.SMTP(smtp_conf["host"], smtp_conf["port"]) as server: - if smtp_conf.get("use_tls", True): - server.starttls() - server.login(smtp_conf["username"], smtp_conf["password"]) - server.send_message(msg) - logger.info(f"Registration email sent successfully to {msg['To']}") - except Exception as ex: - logger.error(f"Failed to send email: {ex}") - raise HTTPException(status_code=500, detail=f"Error sending email: {ex}") - -# --------------------------------------------------------- -# 6. FastAPI Setup and Routes -# --------------------------------------------------------- -app = FastAPI() -app.add_middleware(CustomLoggingMiddleware) -app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static") -templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) - -# Dependency for admin authentication -def verify_admin_auth(auth_token: str = Form(...)) -> None: - """Verify the SHA256 hash of matrix_admin.password from config.yaml.""" - expected_password = config["matrix_admin"].get("password", "") - expected_hash = hashlib.sha256(expected_password.encode()).hexdigest() - if auth_token != expected_hash: - raise HTTPException(status_code=403, detail="Invalid authentication token") - -# Helper function to parse Matrix responses -def parse_response(response_text: str, query: str) -> Dict[str, Union[str, List[str]]]: - """Parse a response that may contain a markdown codeblock.""" - query_parts = query.strip().split() - array_key = query_parts[0] if query_parts else "data" - codeblock_pattern = r"(.*?):\s*\n```\s*\n([\s\S]*?)\n```" - match = re.search(codeblock_pattern, response_text) - if match: - message = match.group(1).strip() - codeblock_content = match.group(2) - items = [line for line in codeblock_content.split('\n') if line.strip()] - return {"message": message, array_key: items} - return {"response": response_text} - -# Helper function to get the list of users from the Matrix admin room -async def get_matrix_users() -> List[str]: - """Fetch the list of users from the Matrix admin room.""" - matrix_config = config["matrix_admin"] - homeserver = config["base_url"] - username = matrix_config.get("username") - password = matrix_config.get("password") - admin_room = matrix_config.get("room") - admin_response_user = matrix_config.get("super_admin") - - if not all([homeserver, username, password, admin_room, admin_response_user]): - raise HTTPException(status_code=500, detail="Incomplete Matrix admin configuration") - - client = AsyncClient(homeserver, username) - try: - login_response = await client.login(password) - if getattr(login_response, "error", None): - raise Exception(f"Login error: {login_response.error}") - logger.debug("Successfully logged in to Matrix") - - await client.join(admin_room) - initial_sync = await client.sync(timeout=5000) - next_batch = initial_sync.next_batch - - await client.room_send( - room_id=admin_room, - message_type="m.room.message", - content={"msgtype": "m.text", "body": "!admin users list-users"}, - ) - query_time = time.time() - - timeout_seconds = 10 - start_time = time.time() - response_message = None - - while (time.time() - start_time) < timeout_seconds: - sync_response = await client.sync(timeout=2000, since=next_batch) - next_batch = sync_response.next_batch - room = sync_response.rooms.join.get(admin_room) - if room and room.timeline and room.timeline.events: - message_events = [ - event for event in room.timeline.events - if isinstance(event, (RoomMessageText, RoomMessageNotice)) - ] - for event in message_events: - event_time = event.server_timestamp / 1000.0 - if event.sender == admin_response_user and event_time >= query_time: - response_message = event.body - logger.debug(f"Found response: {response_message[:100]}...") - break - if response_message: - break - - await client.logout() - await client.close() - - if not response_message: - raise HTTPException(status_code=504, detail="No response from admin user within timeout") - - parsed = parse_response(response_message, "users list-users") - return parsed.get("users", []) - except Exception as e: - await client.close() - logger.error(f"Error fetching Matrix users: {e}") - raise HTTPException(status_code=500, detail=f"Error fetching users: {e}") - -# Helper function to deactivate a user via Matrix admin room -async def deactivate_user(user: str) -> bool: - """Send a deactivation command for a user to the Matrix admin room.""" - matrix_config = config["matrix_admin"] - homeserver = config["base_url"] - username = matrix_config.get("username") - password = matrix_config.get("password") - admin_room = matrix_config.get("room") - admin_response_user = matrix_config.get("super_admin") - - client = AsyncClient(homeserver, username) - try: - login_response = await client.login(password) - if getattr(login_response, "error", None): - raise Exception(f"Login error: {login_response.error}") - logger.debug(f"Logged in to deactivate {user}") - - await client.join(admin_room) - await client.sync(timeout=5000) - - command = f"!admin users deactivate {user}" - await client.room_send( - room_id=admin_room, - message_type="m.room.message", - content={"msgtype": "m.text", "body": command}, - ) - logger.info(f"Sent deactivation command for {user}") - - await asyncio.sleep(1) - await client.logout() - await client.close() - return True - except Exception as e: - await client.close() - logger.error(f"Failed to deactivate {user}: {e}") - return False - -@app.get("/", response_class=HTMLResponse) -async def index(request: Request): - now = get_current_utc() - closed, message = is_registration_closed(now) - - return templates.TemplateResponse( - "index.html", - { - "request": request, - "registration_closed": closed, - "homeserver": config["homeserver"], - "message": message, - "reset_hour": config["registration"]["token_reset_time_utc"] // 100, - "reset_minute": config["registration"]["token_reset_time_utc"] % 100, - "downtime_minutes": config["registration"]["downtime_before_token_reset"] - } - ) - -@app.get("/api/time") -async def get_server_time(): - now = get_current_utc() - return JSONResponse({"utc_time": now.strftime("%H:%M:%S")}) - -@app.post("/register", response_class=HTMLResponse) -async def register( - request: Request, - requested_username: str = Form(...), - email: str = Form(...) -): - now = get_current_utc() - client_ip = request.client.host - - logger.info(f"Registration attempt - Username: {requested_username}, Email: {email}, IP: {client_ip}") - - closed, message = is_registration_closed(now) - if closed: - logger.info("Registration rejected: Registration is closed") - return templates.TemplateResponse("error.html", {"request": request, "message": message}) - - if is_ip_banned(client_ip): - logger.info(f"Registration rejected: Banned IP {client_ip}") - return templates.TemplateResponse("error.html", {"request": request, "message": "Registration not allowed from your IP address."}) - - if is_email_banned(email): - logger.info(f"Registration rejected: Banned email {email}") - return templates.TemplateResponse("error.html", {"request": request, "message": "Registration not allowed for this email address."}) - - if error_message := check_email_cooldown(email): - logger.info(f"Registration rejected: Email cooldown - {email}") - return templates.TemplateResponse("error.html", {"request": request, "message": error_message}) - - available = await check_username_availability(requested_username) - if not available: - logger.info(f"Registration rejected: Username unavailable - {requested_username}") - return templates.TemplateResponse("error.html", {"request": request, "message": f"The username '{requested_username}' is not available."}) - - token = read_registration_token() - if token is None: - logger.error("Registration token file not found") - raise HTTPException(status_code=500, detail="Registration token file not found.") - - email_message = build_email_message(token, requested_username, now, email) - send_email_message(email_message) - - registration_data = { - "requested_name": requested_username, - "email": email, - "datetime": datetime.utcnow().isoformat(), - "ip_address": client_ip - } - save_registration(registration_data) - logger.info(f"Registration successful - Username: {requested_username}, Email: {email}") - - return templates.TemplateResponse("success.html", {"request": request, "homeserver": config["homeserver"]}) - -@app.post("/_admin/purge_unfulfilled_registrations", response_class=JSONResponse) -async def purge_unfulfilled_registrations( - min_age_hours: int = Form(default=24), - auth_token: str = Depends(verify_admin_auth) -): - """ - Purge unfulfilled registration entries older than min_age_hours where the username - does not exist on the homeserver. - """ - registrations = load_registrations() - if not registrations: - return JSONResponse({"message": "No registrations found to clean up"}) - - logger.info(f"Starting cleanup of {len(registrations)} registrations") - logger.info(f"Will remove non-existent users registered more than {min_age_hours} hours ago") - - entries_to_keep = [] - removed_count = 0 - too_new_count = 0 - exists_count = 0 - current_time = datetime.utcnow() - - async with httpx.AsyncClient() as client: - for entry in registrations: - username = entry["requested_name"] - reg_date = datetime.fromisoformat(entry["datetime"]) - age = current_time - reg_date - - url = f"{config['base_url']}/_matrix/client/v3/register/available?username={username}" - try: - response = await client.get(url, timeout=5) - if response.status_code == 200 and response.json().get("available", False): - exists = False - elif response.status_code == 400 or (response.status_code == 200 and not response.json().get("available", False)): - exists = True - else: - logger.warning(f"Unexpected response for {username}: {response.status_code}") - exists = False - except httpx.RequestError as ex: - logger.error(f"Error checking username {username}: {ex}") - exists = False - - if exists: - entries_to_keep.append(entry) - exists_count += 1 - logger.info(f"Keeping registration for existing user: {username}") - continue - - if age < timedelta(hours=min_age_hours): - entries_to_keep.append(entry) - too_new_count += 1 - logger.info(f"Keeping recent registration: {username} (age: {age.total_seconds()/3600:.1f} hours)") - else: - logger.info(f"Removing old registration: {username} (age: {age.total_seconds()/3600:.1f} hours)") - removed_count += 1 - - save_registrations(entries_to_keep) - - result = { - "message": "Cleanup complete", - "kept_existing": exists_count, - "kept_recent": too_new_count, - "removed": removed_count, - "total_remaining": len(entries_to_keep) - } - logger.info(f"Cleanup complete: {result}") - return JSONResponse(result) - -@app.post("/_admin/deactivate_undocumented_users", response_class=JSONResponse) -async def deactivate_undocumented_users(auth_token: str = Depends(verify_admin_auth)): - """Deactivate users on the homeserver without matching entries in registrations.json.""" - registrations = load_registrations() - matrix_users = await get_matrix_users() - - registered_usernames = {entry["requested_name"].lower() for entry in registrations} - homeserver = config["homeserver"].lower() - - undocumented_users = [] - for user in matrix_users: - if not user.lower().startswith("@"): - continue - username, user_homeserver = user[1:].lower().split(":", 1) - if user_homeserver != homeserver: - continue - if username not in registered_usernames: - undocumented_users.append(user) - - if not undocumented_users: - logger.info("No undocumented users found to deactivate") - return JSONResponse({"message": "No undocumented users found to deactivate", "deactivated_count": 0}) - - deactivated_count = 0 - failed_deactivations = [] - - for user in undocumented_users: - success = await deactivate_user(user) - if success: - deactivated_count += 1 - else: - failed_deactivations.append(user) - - logger.info(f"Deactivated {deactivated_count} undocumented users") - if failed_deactivations: - logger.warning(f"Failed to deactivate {len(failed_deactivations)} users: {failed_deactivations}") - - result = { - "message": f"Deactivated {deactivated_count} undocumented user(s)", - "deactivated_count": deactivated_count - } - if failed_deactivations: - result["failed_deactivations"] = failed_deactivations - return JSONResponse(result) - -@app.post("/_admin/retroactively_document_users", response_class=JSONResponse) -async def retroactively_document_users(auth_token: str = Depends(verify_admin_auth)): - """Add entries to registrations.json for undocumented users.""" - registrations = load_registrations() - matrix_users = await get_matrix_users() - - registered_usernames = {entry["requested_name"].lower() for entry in registrations} - homeserver = config["homeserver"].lower() - added_count = 0 - - for user in matrix_users: - if not user.lower().startswith("@"): - continue - username, user_homeserver = user[1:].lower().split(":", 1) - if user_homeserver != homeserver: - continue - if username not in registered_usernames: - new_entry = { - "requested_name": username, - "email": "null@nope.no", - "datetime": datetime.utcnow().isoformat(), - "ip_address": "127.0.0.1" - } - registrations.append(new_entry) - registered_usernames.add(username) - added_count += 1 - logger.info(f"Added retroactive entry for {user}") - - if added_count > 0: - save_registrations(registrations) - logger.info(f"Retroactively documented {added_count} users") - - return JSONResponse({ - "message": f"Retroactively documented {added_count} user(s)", - "added_count": added_count - }) - -if __name__ == "__main__": - import uvicorn - uvicorn.run( - "registration:app", - host="0.0.0.0", - port=config["port"], - reload=True, - access_log=False - )