This commit is contained in:
Sangye Ince-Johannsen 2025-04-04 17:14:01 +00:00
parent d536648058
commit b073f2a734
2 changed files with 39 additions and 751 deletions

View file

@ -1,8 +1,8 @@
#!/bin/bash
# File paths
BASE_PATH="/home/sij/hand_of_morpheus"
TOKEN_FILE="$BASE_PATH/.registration_token"
BASE_PATH="/home/sij/hand_of_morpheus/sw1tch"
TOKEN_FILE="$BASE_PATH/data/.registration_token"
LOG_FILE="$BASE_PATH/logs/token_refresh.log"
BACKUP_PATH="/home/sij/conduwuit_backup"
ENV_FILE="$BASE_PATH/config/conduwuit.env"
@ -22,8 +22,8 @@ FORCE_RESTART=false
# Function to log with a timestamp to both file and terminal
log() {
local message="$(date --iso-8601=seconds) $1"
echo "$message" >> "$LOG_FILE" # Write to log file
echo "$message" # Print to terminal
echo "$message" >> "$LOG_FILE"
echo "$message"
}
# Function to refresh the registration token
@ -129,18 +129,15 @@ restart_container() {
# Function to ensure the registration service is running
ensure_registration_service() {
local python_script="$BASE_PATH/registration.py"
local pid_file="$BASE_PATH/data/registration.pid"
local log_file="$BASE_PATH/logs/registration.log"
if [ ! -f "$python_script" ]; then
log "ERROR: Python script $python_script not found"
exit 1
fi
touch "$log_file" || { log "ERROR: Cannot write to $log_file"; exit 1; }
chmod 666 "$log_file"
REG_PORT=$(python3 -c "import yaml, sys; print(yaml.safe_load(open('$CONFIG_FILE')).get('port', 8000))")
log "Registration service port from config: $REG_PORT"
if [ "$FORCE_RESTART" = true ]; then
log "Force restart requested. Clearing any process listening on port $REG_PORT..."
PIDS=$(lsof -ti tcp:"$REG_PORT")
@ -151,58 +148,55 @@ ensure_registration_service() {
fi
rm -f "$pid_file"
log "Force starting registration service..."
python3 "$python_script" >> "$log_file" 2>&1 &
cd "$(dirname "$BASE_PATH")" || { log "ERROR: Cannot cd to $(dirname "$BASE_PATH")"; exit 1; }
log "Running: nohup python3 -m sw1tch >> $log_file 2>&1 &"
nohup python3 -m sw1tch >> "$log_file" 2>&1 &
NEW_PID=$!
echo "$NEW_PID" > "$pid_file"
log "Started registration service with PID $NEW_PID"
sleep 2
if ps -p "$NEW_PID" > /dev/null; then
echo "$NEW_PID" > "$pid_file"
log "Started registration service with PID $NEW_PID"
sudo lsof -i :"$REG_PORT" || log "WARNING: No process on port $REG_PORT after start"
else
log "ERROR: Process $NEW_PID did not start or exited immediately"
cat "$log_file" >> "$LOG_FILE"
fi
else
EXISTING_PIDS=$(lsof -ti tcp:"$REG_PORT")
if [ -n "$EXISTING_PIDS" ]; then
log "Registration service already running on port $REG_PORT with PID(s): $EXISTING_PIDS"
else
log "Registration service not running on port $REG_PORT, starting..."
python3 "$python_script" >> "$log_file" 2>&1 &
cd "$(dirname "$BASE_PATH")" || { log "ERROR: Cannot cd to $(dirname "$BASE_PATH")"; exit 1; }
log "Running: nohup python3 -m sw1tch >> $log_file 2>&1 &"
nohup python3 -m sw1tch >> "$log_file" 2>&1 &
NEW_PID=$!
echo "$NEW_PID" > "$pid_file"
log "Started registration service with PID $NEW_PID"
sleep 2
if ps -p "$NEW_PID" > /dev/null; then
echo "$NEW_PID" > "$pid_file"
log "Started registration service with PID $NEW_PID"
sudo lsof -i :"$REG_PORT" || log "WARNING: No process on port $REG_PORT after start"
else
log "ERROR: Process $NEW_PID did not start or exited immediately"
cat "$log_file" >> "$LOG_FILE"
fi
fi
fi
}
# Parse command-line flags
# Parse command-line flags and execute (unchanged)
while [[ $# -gt 0 ]]; do
case "$1" in
--refresh-token)
REFRESH_TOKEN=true
shift
;;
--super-admin)
SUPER_ADMIN=true
shift
;;
--update)
UPDATE=true
shift
;;
--force-restart)
FORCE_RESTART=true
shift
;;
*)
log "ERROR: Unknown option: $1"
echo "Usage: $0 [--refresh-token] [--super-admin] [--update] [--force-restart]"
exit 1
;;
--refresh-token) REFRESH_TOKEN=true; shift;;
--super-admin) SUPER_ADMIN=true; shift;;
--update) UPDATE=true; shift;;
--force-restart) FORCE_RESTART=true; shift;;
*) log "ERROR: Unknown option: $1"; echo "Usage: $0 [--refresh-token] [--super-admin] [--update] [--force-restart]"; exit 1;;
esac
done
# Execute based on flags
if [ "$UPDATE" = true ]; then
update_docker_image
fi
if [ "$REFRESH_TOKEN" = true ]; then
refresh_token
fi
if [ "$UPDATE" = true ]; then update_docker_image; fi
if [ "$REFRESH_TOKEN" = true ]; then refresh_token; fi
restart_container
ensure_registration_service

View file

@ -1,706 +0,0 @@
#!/usr/bin/env python3
import os
import re
import yaml
import json
import smtplib
import httpx
import logging
import ipaddress
import hashlib
import asyncio
import time
from datetime import datetime, timedelta
from email.message import EmailMessage
from typing import List, Dict, Optional, Tuple, Set, Pattern, Union
from fastapi import FastAPI, Request, Form, HTTPException, Depends, Query
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from starlette.middleware.base import BaseHTTPMiddleware
from ipaddress import IPv4Network, IPv4Address
from nio import AsyncClient, RoomMessageText, RoomMessageNotice
# ---------------------------------------------------------
# 1. Load configuration and setup paths
# ---------------------------------------------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_DIR = os.path.join(BASE_DIR, "config")
DATA_DIR = os.path.join(BASE_DIR, "data")
LOGS_DIR = os.path.join(BASE_DIR, "logs")
CONFIG_PATH = os.path.join(CONFIG_DIR, "config.yaml")
with open(CONFIG_PATH, "r") as f:
config = yaml.safe_load(f)
# Initialize or load registrations.json
REGISTRATIONS_PATH = os.path.join(DATA_DIR, "registrations.json")
def load_registrations() -> List[Dict]:
try:
with open(REGISTRATIONS_PATH, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_registrations(registrations: List[Dict]):
with open(REGISTRATIONS_PATH, "w") as f:
json.dump(registrations, f, indent=2)
def save_registration(data: Dict):
registrations = load_registrations()
registrations.append(data)
with open(REGISTRATIONS_PATH, "w") as f:
json.dump(registrations, f, indent=2)
# Functions to check banned entries
def load_banned_usernames() -> List[Pattern]:
"""Load banned usernames file and compile regex patterns."""
patterns = []
try:
with open(os.path.join(CONFIG_DIR, "banned_usernames.txt"), "r") as f:
for line in f:
line = line.strip()
if line:
try:
patterns.append(re.compile(line, re.IGNORECASE))
except re.error:
logging.error(f"Invalid regex pattern in banned_usernames.txt: {line}")
except FileNotFoundError:
pass
return patterns
def is_ip_banned(ip: str) -> bool:
"""Check if an IP is banned, supporting both individual IPs and CIDR ranges."""
try:
check_ip = IPv4Address(ip)
try:
with open(os.path.join(CONFIG_DIR, "banned_ips.txt"), "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
if '/' in line: # CIDR notation
if check_ip in IPv4Network(line):
return True
else: # Individual IP
if check_ip == IPv4Address(line):
return True
except ValueError:
logging.error(f"Invalid IP/CIDR in banned_ips.txt: {line}")
except FileNotFoundError:
return False
except ValueError:
logging.error(f"Invalid IP address to check: {ip}")
return False
def is_email_banned(email: str) -> bool:
"""Check if an email matches any banned patterns."""
try:
with open(os.path.join(CONFIG_DIR, "banned_emails.txt"), "r") as f:
for line in f:
pattern = line.strip()
if not pattern:
continue
regex_pattern = pattern.replace(".", "\\.").replace("*", ".*")
try:
if re.match(regex_pattern, email, re.IGNORECASE):
return True
except re.error:
logging.error(f"Invalid email pattern in banned_emails.txt: {pattern}")
except FileNotFoundError:
pass
return False
def is_username_banned(username: str) -> bool:
"""Check if username matches any banned patterns."""
patterns = load_banned_usernames()
return any(pattern.search(username) for pattern in patterns)
# Read the registration token (still at base level as per shell script)
def read_registration_token():
token_path = os.path.join(BASE_DIR, ".registration_token")
try:
with open(token_path, "r") as f:
return f.read().strip()
except FileNotFoundError:
return None
# ---------------------------------------------------------
# 2. Logging Configuration
# ---------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename=os.path.join(LOGS_DIR, "registration.log"),
filemode='a'
)
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logging.getLogger("uvicorn.error").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class CustomLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
if request.url.path == "/api/time" or request.url.path.endswith('favicon.ico'):
return await call_next(request)
response = await call_next(request)
logger.info(f"Request: {request.method} {request.url.path} - Status: {response.status_code}")
return response
# ---------------------------------------------------------
# 3. Time Calculation Functions
# ---------------------------------------------------------
def get_current_utc() -> datetime:
return datetime.utcnow()
def get_next_reset_time(now: datetime) -> datetime:
"""Return the next reset time (possibly today or tomorrow) from config."""
reset_h = config["registration"]["token_reset_time_utc"] // 100
reset_m = config["registration"]["token_reset_time_utc"] % 100
candidate = now.replace(hour=reset_h, minute=reset_m, second=0, microsecond=0)
if candidate <= now:
candidate += timedelta(days=1)
return candidate
def get_downtime_start(next_reset: datetime) -> datetime:
"""Return the downtime start time (minutes before next_reset)."""
return next_reset - timedelta(minutes=config["registration"]["downtime_before_token_reset"])
def format_timedelta(td: timedelta) -> str:
"""Format a timedelta as 'X hours and Y minutes' (or similar)."""
total_minutes = int(td.total_seconds() // 60)
hours = total_minutes // 60
minutes = total_minutes % 60
parts = []
if hours == 1:
parts.append("1 hour")
elif hours > 1:
parts.append(f"{hours} hours")
if minutes == 1:
parts.append("1 minute")
elif minutes > 1:
parts.append(f"{minutes} minutes")
if not parts:
return "0 minutes"
return " and ".join(parts)
def get_time_until_reset_str(now: datetime) -> str:
"""Return a string like '3 hours and 41 minutes' until next reset."""
nr = get_next_reset_time(now)
delta = nr - now
return format_timedelta(delta)
def is_registration_closed(now: datetime) -> Tuple[bool, str]:
"""
Determine if registration is closed based on config.
Return (closed_bool, message).
"""
nr = get_next_reset_time(now)
ds = get_downtime_start(nr)
if ds <= now < nr:
time_until_open = nr - now
msg = (
f"Registration is closed. "
f"It reopens in {format_timedelta(time_until_open)} at {nr.strftime('%H:%M UTC')}."
)
return True, msg
else:
if now > ds:
nr += timedelta(days=1)
ds = get_downtime_start(nr)
time_until_close = ds - now
msg = (
f"Registration is open. "
f"It will close in {format_timedelta(time_until_close)} at {ds.strftime('%H:%M UTC')}."
)
return False, msg
# ---------------------------------------------------------
# 4. Registration Validation
# ---------------------------------------------------------
def check_email_cooldown(email: str) -> Optional[str]:
"""Check if email is allowed to register based on cooldown and multiple account rules."""
registrations = load_registrations()
email_entries = [r for r in registrations if r["email"] == email]
if not email_entries:
return None
if not config["registration"].get("multiple_users_per_email", True):
return "This email address has already been used to register an account."
email_cooldown = config["registration"].get("email_cooldown")
if email_cooldown:
latest_registration = max(
datetime.fromisoformat(e["datetime"])
for e in email_entries
)
time_since = datetime.utcnow() - latest_registration
if time_since.total_seconds() < email_cooldown:
wait_time = email_cooldown - time_since.total_seconds()
return f"Please wait {int(wait_time)} seconds before requesting another account."
return None
async def check_username_availability(username: str) -> bool:
"""Check if username is available on Matrix and in our registration records."""
if is_username_banned(username):
logger.info(f"[USERNAME CHECK] {username}: Banned by pattern")
return False
registrations = load_registrations()
if any(r["requested_name"] == username for r in registrations):
logger.info(f"[USERNAME CHECK] {username}: Already requested")
return False
url = f"{config['base_url']}/_matrix/client/v3/register/available?username={username}"
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
is_available = data.get("available", False)
logger.info(f"[USERNAME CHECK] {username}: {'Available' if is_available else 'Taken'}")
return is_available
elif response.status_code == 400:
logger.info(f"[USERNAME CHECK] {username}: Taken (400)")
return False
except httpx.RequestError as ex:
logger.warning(f"[USERNAME CHECK] Could not reach homeserver: {ex}")
return False
return False
# ---------------------------------------------------------
# 5. Email Helper Functions
# ---------------------------------------------------------
def load_template(template_path: str) -> str:
"""Load an email template from a file."""
try:
with open(os.path.join(BASE_DIR, template_path), "r") as f:
return f.read()
except FileNotFoundError:
raise HTTPException(status_code=500, detail=f"Email template not found: {template_path}")
def build_email_message(token: str, requested_username: str, now: datetime, recipient_email: str) -> EmailMessage:
"""
Build and return an EmailMessage for registration using file-based templates.
"""
time_until_reset = get_time_until_reset_str(now)
plain_template = load_template(config["email"]["templates"]["registration_token"]["body"])
html_template = load_template(config["email"]["templates"]["registration_token"]["body_html"])
plain_body = plain_template.format(
homeserver=config["homeserver"],
registration_token=token,
requested_username=requested_username,
utc_time=now.strftime("%H:%M:%S"),
time_until_reset=time_until_reset
)
html_body = html_template.format(
homeserver=config["homeserver"],
registration_token=token,
requested_username=requested_username,
utc_time=now.strftime("%H:%M:%S"),
time_until_reset=time_until_reset
)
msg = EmailMessage()
msg.set_content(plain_body)
msg.add_alternative(html_body, subtype="html")
msg["Subject"] = config["email"]["templates"]["registration_token"]["subject"].format(homeserver=config["homeserver"])
msg["From"] = config["email"]["smtp"]["from"]
msg["To"] = recipient_email
return msg
def send_email_message(msg: EmailMessage) -> None:
"""
Send an email message using SMTP configuration.
"""
smtp_conf = config["email"]["smtp"]
try:
with smtplib.SMTP(smtp_conf["host"], smtp_conf["port"]) as server:
if smtp_conf.get("use_tls", True):
server.starttls()
server.login(smtp_conf["username"], smtp_conf["password"])
server.send_message(msg)
logger.info(f"Registration email sent successfully to {msg['To']}")
except Exception as ex:
logger.error(f"Failed to send email: {ex}")
raise HTTPException(status_code=500, detail=f"Error sending email: {ex}")
# ---------------------------------------------------------
# 6. FastAPI Setup and Routes
# ---------------------------------------------------------
app = FastAPI()
app.add_middleware(CustomLoggingMiddleware)
app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static")
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates"))
# Dependency for admin authentication
def verify_admin_auth(auth_token: str = Form(...)) -> None:
"""Verify the SHA256 hash of matrix_admin.password from config.yaml."""
expected_password = config["matrix_admin"].get("password", "")
expected_hash = hashlib.sha256(expected_password.encode()).hexdigest()
if auth_token != expected_hash:
raise HTTPException(status_code=403, detail="Invalid authentication token")
# Helper function to parse Matrix responses
def parse_response(response_text: str, query: str) -> Dict[str, Union[str, List[str]]]:
"""Parse a response that may contain a markdown codeblock."""
query_parts = query.strip().split()
array_key = query_parts[0] if query_parts else "data"
codeblock_pattern = r"(.*?):\s*\n```\s*\n([\s\S]*?)\n```"
match = re.search(codeblock_pattern, response_text)
if match:
message = match.group(1).strip()
codeblock_content = match.group(2)
items = [line for line in codeblock_content.split('\n') if line.strip()]
return {"message": message, array_key: items}
return {"response": response_text}
# Helper function to get the list of users from the Matrix admin room
async def get_matrix_users() -> List[str]:
"""Fetch the list of users from the Matrix admin room."""
matrix_config = config["matrix_admin"]
homeserver = config["base_url"]
username = matrix_config.get("username")
password = matrix_config.get("password")
admin_room = matrix_config.get("room")
admin_response_user = matrix_config.get("super_admin")
if not all([homeserver, username, password, admin_room, admin_response_user]):
raise HTTPException(status_code=500, detail="Incomplete Matrix admin configuration")
client = AsyncClient(homeserver, username)
try:
login_response = await client.login(password)
if getattr(login_response, "error", None):
raise Exception(f"Login error: {login_response.error}")
logger.debug("Successfully logged in to Matrix")
await client.join(admin_room)
initial_sync = await client.sync(timeout=5000)
next_batch = initial_sync.next_batch
await client.room_send(
room_id=admin_room,
message_type="m.room.message",
content={"msgtype": "m.text", "body": "!admin users list-users"},
)
query_time = time.time()
timeout_seconds = 10
start_time = time.time()
response_message = None
while (time.time() - start_time) < timeout_seconds:
sync_response = await client.sync(timeout=2000, since=next_batch)
next_batch = sync_response.next_batch
room = sync_response.rooms.join.get(admin_room)
if room and room.timeline and room.timeline.events:
message_events = [
event for event in room.timeline.events
if isinstance(event, (RoomMessageText, RoomMessageNotice))
]
for event in message_events:
event_time = event.server_timestamp / 1000.0
if event.sender == admin_response_user and event_time >= query_time:
response_message = event.body
logger.debug(f"Found response: {response_message[:100]}...")
break
if response_message:
break
await client.logout()
await client.close()
if not response_message:
raise HTTPException(status_code=504, detail="No response from admin user within timeout")
parsed = parse_response(response_message, "users list-users")
return parsed.get("users", [])
except Exception as e:
await client.close()
logger.error(f"Error fetching Matrix users: {e}")
raise HTTPException(status_code=500, detail=f"Error fetching users: {e}")
# Helper function to deactivate a user via Matrix admin room
async def deactivate_user(user: str) -> bool:
"""Send a deactivation command for a user to the Matrix admin room."""
matrix_config = config["matrix_admin"]
homeserver = config["base_url"]
username = matrix_config.get("username")
password = matrix_config.get("password")
admin_room = matrix_config.get("room")
admin_response_user = matrix_config.get("super_admin")
client = AsyncClient(homeserver, username)
try:
login_response = await client.login(password)
if getattr(login_response, "error", None):
raise Exception(f"Login error: {login_response.error}")
logger.debug(f"Logged in to deactivate {user}")
await client.join(admin_room)
await client.sync(timeout=5000)
command = f"!admin users deactivate {user}"
await client.room_send(
room_id=admin_room,
message_type="m.room.message",
content={"msgtype": "m.text", "body": command},
)
logger.info(f"Sent deactivation command for {user}")
await asyncio.sleep(1)
await client.logout()
await client.close()
return True
except Exception as e:
await client.close()
logger.error(f"Failed to deactivate {user}: {e}")
return False
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
now = get_current_utc()
closed, message = is_registration_closed(now)
return templates.TemplateResponse(
"index.html",
{
"request": request,
"registration_closed": closed,
"homeserver": config["homeserver"],
"message": message,
"reset_hour": config["registration"]["token_reset_time_utc"] // 100,
"reset_minute": config["registration"]["token_reset_time_utc"] % 100,
"downtime_minutes": config["registration"]["downtime_before_token_reset"]
}
)
@app.get("/api/time")
async def get_server_time():
now = get_current_utc()
return JSONResponse({"utc_time": now.strftime("%H:%M:%S")})
@app.post("/register", response_class=HTMLResponse)
async def register(
request: Request,
requested_username: str = Form(...),
email: str = Form(...)
):
now = get_current_utc()
client_ip = request.client.host
logger.info(f"Registration attempt - Username: {requested_username}, Email: {email}, IP: {client_ip}")
closed, message = is_registration_closed(now)
if closed:
logger.info("Registration rejected: Registration is closed")
return templates.TemplateResponse("error.html", {"request": request, "message": message})
if is_ip_banned(client_ip):
logger.info(f"Registration rejected: Banned IP {client_ip}")
return templates.TemplateResponse("error.html", {"request": request, "message": "Registration not allowed from your IP address."})
if is_email_banned(email):
logger.info(f"Registration rejected: Banned email {email}")
return templates.TemplateResponse("error.html", {"request": request, "message": "Registration not allowed for this email address."})
if error_message := check_email_cooldown(email):
logger.info(f"Registration rejected: Email cooldown - {email}")
return templates.TemplateResponse("error.html", {"request": request, "message": error_message})
available = await check_username_availability(requested_username)
if not available:
logger.info(f"Registration rejected: Username unavailable - {requested_username}")
return templates.TemplateResponse("error.html", {"request": request, "message": f"The username '{requested_username}' is not available."})
token = read_registration_token()
if token is None:
logger.error("Registration token file not found")
raise HTTPException(status_code=500, detail="Registration token file not found.")
email_message = build_email_message(token, requested_username, now, email)
send_email_message(email_message)
registration_data = {
"requested_name": requested_username,
"email": email,
"datetime": datetime.utcnow().isoformat(),
"ip_address": client_ip
}
save_registration(registration_data)
logger.info(f"Registration successful - Username: {requested_username}, Email: {email}")
return templates.TemplateResponse("success.html", {"request": request, "homeserver": config["homeserver"]})
@app.post("/_admin/purge_unfulfilled_registrations", response_class=JSONResponse)
async def purge_unfulfilled_registrations(
min_age_hours: int = Form(default=24),
auth_token: str = Depends(verify_admin_auth)
):
"""
Purge unfulfilled registration entries older than min_age_hours where the username
does not exist on the homeserver.
"""
registrations = load_registrations()
if not registrations:
return JSONResponse({"message": "No registrations found to clean up"})
logger.info(f"Starting cleanup of {len(registrations)} registrations")
logger.info(f"Will remove non-existent users registered more than {min_age_hours} hours ago")
entries_to_keep = []
removed_count = 0
too_new_count = 0
exists_count = 0
current_time = datetime.utcnow()
async with httpx.AsyncClient() as client:
for entry in registrations:
username = entry["requested_name"]
reg_date = datetime.fromisoformat(entry["datetime"])
age = current_time - reg_date
url = f"{config['base_url']}/_matrix/client/v3/register/available?username={username}"
try:
response = await client.get(url, timeout=5)
if response.status_code == 200 and response.json().get("available", False):
exists = False
elif response.status_code == 400 or (response.status_code == 200 and not response.json().get("available", False)):
exists = True
else:
logger.warning(f"Unexpected response for {username}: {response.status_code}")
exists = False
except httpx.RequestError as ex:
logger.error(f"Error checking username {username}: {ex}")
exists = False
if exists:
entries_to_keep.append(entry)
exists_count += 1
logger.info(f"Keeping registration for existing user: {username}")
continue
if age < timedelta(hours=min_age_hours):
entries_to_keep.append(entry)
too_new_count += 1
logger.info(f"Keeping recent registration: {username} (age: {age.total_seconds()/3600:.1f} hours)")
else:
logger.info(f"Removing old registration: {username} (age: {age.total_seconds()/3600:.1f} hours)")
removed_count += 1
save_registrations(entries_to_keep)
result = {
"message": "Cleanup complete",
"kept_existing": exists_count,
"kept_recent": too_new_count,
"removed": removed_count,
"total_remaining": len(entries_to_keep)
}
logger.info(f"Cleanup complete: {result}")
return JSONResponse(result)
@app.post("/_admin/deactivate_undocumented_users", response_class=JSONResponse)
async def deactivate_undocumented_users(auth_token: str = Depends(verify_admin_auth)):
"""Deactivate users on the homeserver without matching entries in registrations.json."""
registrations = load_registrations()
matrix_users = await get_matrix_users()
registered_usernames = {entry["requested_name"].lower() for entry in registrations}
homeserver = config["homeserver"].lower()
undocumented_users = []
for user in matrix_users:
if not user.lower().startswith("@"):
continue
username, user_homeserver = user[1:].lower().split(":", 1)
if user_homeserver != homeserver:
continue
if username not in registered_usernames:
undocumented_users.append(user)
if not undocumented_users:
logger.info("No undocumented users found to deactivate")
return JSONResponse({"message": "No undocumented users found to deactivate", "deactivated_count": 0})
deactivated_count = 0
failed_deactivations = []
for user in undocumented_users:
success = await deactivate_user(user)
if success:
deactivated_count += 1
else:
failed_deactivations.append(user)
logger.info(f"Deactivated {deactivated_count} undocumented users")
if failed_deactivations:
logger.warning(f"Failed to deactivate {len(failed_deactivations)} users: {failed_deactivations}")
result = {
"message": f"Deactivated {deactivated_count} undocumented user(s)",
"deactivated_count": deactivated_count
}
if failed_deactivations:
result["failed_deactivations"] = failed_deactivations
return JSONResponse(result)
@app.post("/_admin/retroactively_document_users", response_class=JSONResponse)
async def retroactively_document_users(auth_token: str = Depends(verify_admin_auth)):
"""Add entries to registrations.json for undocumented users."""
registrations = load_registrations()
matrix_users = await get_matrix_users()
registered_usernames = {entry["requested_name"].lower() for entry in registrations}
homeserver = config["homeserver"].lower()
added_count = 0
for user in matrix_users:
if not user.lower().startswith("@"):
continue
username, user_homeserver = user[1:].lower().split(":", 1)
if user_homeserver != homeserver:
continue
if username not in registered_usernames:
new_entry = {
"requested_name": username,
"email": "null@nope.no",
"datetime": datetime.utcnow().isoformat(),
"ip_address": "127.0.0.1"
}
registrations.append(new_entry)
registered_usernames.add(username)
added_count += 1
logger.info(f"Added retroactive entry for {user}")
if added_count > 0:
save_registrations(registrations)
logger.info(f"Retroactively documented {added_count} users")
return JSONResponse({
"message": f"Retroactively documented {added_count} user(s)",
"added_count": added_count
})
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"registration:app",
host="0.0.0.0",
port=config["port"],
reload=True,
access_log=False
)