Initial commit

This commit is contained in:
sanj 2025-02-02 15:01:16 -08:00
parent 890baa7674
commit 62e6d2e395
14 changed files with 893 additions and 2 deletions

56
.gitignore vendored Normal file
View file

@ -0,0 +1,56 @@
# Python cache and build artifacts
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
# Virtual environment directories
venv/
.env/
env/
.venv/
ENV/
# Log files
*.log
logs/
# IDE or OS-specific files
.DS_Store
.idea/
.vscode/
*.swp
*.swo
*~
Thumbs.db
.project
.settings/
.classpath
# Project-specific sensitive files
.registration_token
config.yaml
registrations.json
banned_emails.txt
banned_ips.txt
banned_usernames.txt
# Backup directories
backup/
*_backup/
*.bak
# Test cache
.pytest_cache/
.coverage
htmlcov/
.tox/
# Dependency management
pip-log.txt
pip-delete-this-directory.txt
# Environment variables
.env*
!.env.example

View file

@ -1,3 +1,95 @@
# hand_of_morpheus
# Matrix Registration System
A FastAPI-based web application that manages Matrix account registration requests for homeservers that do not offer SMTP authentication (like conduwuit). It provides a registration token to users via email, with automatic token rotation and various safety features.
A FastAPI-based web application that manages Matrix account registration requests for homeservers that do not offer SMTP authentication (like conduwuit). It provides a registration token to users via email, with automatic token rotation and various safety features.
## Features
- Daily rotating registration tokens
- Rate limiting per email address
- Multiple account restrictions
- IP and email address banning
- Username pattern banning with regex support
- Automatic downtime before token rotation
- Gruvbox-themed UI with responsive design
## Setup
1. Install dependencies:
```bash
pip install fastapi uvicorn jinja2 httpx pyyaml python-multipart
```
2. Configure your settings:
```bash
cp config.yaml.example config.yaml
# Edit config.yaml with your settings
```
3. Create required files:
```bash
touch banned_ips.txt banned_emails.txt banned_usernames.txt
mkdir static
# Add your logo.png to static/
# Add favicon.ico to static/
```
4. Generate initial registration token:
```bash
openssl rand -base64 32 | tr -d '/+=' | head -c 32 > .registration_token
```
## Configuration
The `config.yaml` file supports these options:
```yaml
port: 6626
homeserver: "your.server"
token_reset_time_utc: 0 # 24-hour format (e.g., 0 = 00:00 UTC)
downtime_before_token_reset: 30 # minutes
email_cooldown: 3600 # seconds between requests per email
multiple_users_per_email: false # allow multiple accounts per email?
smtp:
host: "smtp.example.com"
port: 587
username: "your@email.com"
password: "yourpassword"
use_tls: true
```
## Token Rotation
Add this to your crontab to rotate the registration token daily at 00:00 UTC:
```bash
# Edit crontab with: crontab -e
0 0 * * * openssl rand -base64 32 | tr -d '/+=' | head -c 32 > /path/to/your/.registration_token
```
## Running the Server
Development:
```bash
python registration.py
```
Production:
```bash
uvicorn registration:app --host 0.0.0.0 --port 6626
```
## Security Features
- **IP Banning**: Add IPs to `banned_ips.txt`, one per line
- **Email Banning**: Add emails to `banned_emails.txt`, one per line
- **Username Patterns**: Add regex patterns to `banned_usernames.txt`, one per line
- **Registration Tracking**: All requests are logged to `registrations.json`
## Security Notes
- Place behind a reverse proxy with HTTPS
- Consider placing the registration token file outside web root
- Regularly backup `registrations.json`
- Monitor logs for abuse patterns

View file

@ -0,0 +1 @@
Cc8eeOtozcf2lHl6WhKQ3SGfHK9rda1j

View file

0
example-banned_ips.txt Normal file
View file

View file

@ -0,0 +1,5 @@
.*admin.*
.*loli.*
.*shota.*
.*pedo.*
.*pthc.*

35
example-config.yaml Normal file
View file

@ -0,0 +1,35 @@
port: 6626
homeserver: "we2.ee"
# Token reset configuration
token_reset_time_utc: 0 # 00:00 UTC
downtime_before_token_reset: 30 # 30 minutes before that time, registration is closed
# Email rate limiting and multiple account settings
email_cooldown: 3600 # 1 hour cooldown between requests for the same email
multiple_users_per_email: false # If false, each email can only be used once
# SMTP configuration
smtp:
host: "smtp.protonmail.ch"
port: 587
username: "email@pm.me"
password: "YourPassword"
use_tls: true
# Email templates
email_subject: "Your Registration Token for {homeserver}"
email_body: |
Hello,
Thank you for your interest in {homeserver}, {requested_username}.
The registration token today is: {registration_token}
This registration token is valid for {time_until_reset}. If you do not register in that period, you will need to request the new registration token.
Please ensure you use the username {requested_username} when you register. Using a different username may result in your account being deleted at a later time without forewarning.
Regards,
{homeserver} registration team

397
registration.py Normal file
View file

@ -0,0 +1,397 @@
import os
import re
import yaml
import json
import smtplib
import httpx
import logging
from datetime import datetime, timedelta
from email.message import EmailMessage
from typing import List, Dict, Optional, Tuple, Set, Pattern
from fastapi import FastAPI, Request, Form, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from starlette.middleware.base import BaseHTTPMiddleware
# ---------------------------------------------------------
# 1. Load configuration and setup paths
# ---------------------------------------------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(BASE_DIR, "config.yaml")
with open(CONFIG_PATH, "r") as f:
config = yaml.safe_load(f)
# Initialize or load registrations.json
REGISTRATIONS_PATH = os.path.join(BASE_DIR, "registrations.json")
def load_registrations() -> List[Dict]:
try:
with open(REGISTRATIONS_PATH, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_registration(data: Dict):
registrations = load_registrations()
registrations.append(data)
with open(REGISTRATIONS_PATH, "w") as f:
json.dump(registrations, f, indent=2)
# Load banned IPs and emails
def load_banned_list(filename: str) -> Set[str]:
try:
with open(os.path.join(BASE_DIR, filename), "r") as f:
return {line.strip() for line in f if line.strip()}
except FileNotFoundError:
return set()
def load_banned_usernames() -> List[Pattern]:
"""Load banned usernames file and compile regex patterns."""
patterns = []
try:
with open(os.path.join(BASE_DIR, "banned_usernames.txt"), "r") as f:
for line in f:
line = line.strip()
if line:
try:
patterns.append(re.compile(line, re.IGNORECASE))
except re.error:
logging.error(f"Invalid regex pattern in banned_usernames.txt: {line}")
except FileNotFoundError:
pass
return patterns
banned_ips = load_banned_list("banned_ips.txt")
banned_emails = load_banned_list("banned_emails.txt")
banned_username_patterns = load_banned_usernames()
# Read the registration token
def read_registration_token():
token_path = os.path.join(BASE_DIR, ".registration_token")
try:
with open(token_path, "r") as f:
return f.read().strip()
except FileNotFoundError:
return None
# ---------------------------------------------------------
# 2. Logging Configuration
# ---------------------------------------------------------
# Set up logging format
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Configure loggers
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logging.getLogger("uvicorn.error").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class CustomLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
# Don't process /api/time or favicon requests at all
if request.url.path == "/api/time" or request.url.path.endswith('favicon.ico'):
return await call_next(request)
# For all other requests, log them
response = await call_next(request)
logger.info(f"Request: {request.method} {request.url.path} - Status: {response.status_code}")
return response
# ---------------------------------------------------------
# 3. Time Calculation Functions
# ---------------------------------------------------------
def get_current_utc() -> datetime:
return datetime.utcnow()
def get_next_reset_time(now: datetime) -> datetime:
"""Return the next reset time (possibly today or tomorrow) from config."""
reset_h = config["token_reset_time_utc"] // 100
reset_m = config["token_reset_time_utc"] % 100
candidate = now.replace(hour=reset_h, minute=reset_m, second=0, microsecond=0)
if candidate <= now:
# If we've passed today's reset time, it must be tomorrow.
candidate += timedelta(days=1)
return candidate
def get_downtime_start(next_reset: datetime) -> datetime:
"""Return the downtime start time (minutes before next_reset)."""
return next_reset - timedelta(minutes=config["downtime_before_token_reset"])
def format_timedelta(td: timedelta) -> str:
"""Format a timedelta as 'X hours and Y minutes' (or similar)."""
total_minutes = int(td.total_seconds() // 60)
hours = total_minutes // 60
minutes = total_minutes % 60
parts = []
if hours == 1:
parts.append("1 hour")
elif hours > 1:
parts.append(f"{hours} hours")
if minutes == 1:
parts.append("1 minute")
elif minutes > 1:
parts.append(f"{minutes} minutes")
if not parts: # If total is less than a minute
return "0 minutes"
return " and ".join(parts)
def get_time_until_reset_str(now: datetime) -> str:
"""Return a string like '3 hours and 41 minutes' until next reset."""
nr = get_next_reset_time(now)
delta = nr - now
return format_timedelta(delta)
def is_registration_closed(now: datetime) -> Tuple[bool, str]:
"""
Determine if registration is closed based on config.
Return (closed_bool, message).
"""
nr = get_next_reset_time(now)
ds = get_downtime_start(nr)
if ds <= now < nr:
# We are within downtime
time_until_open = nr - now
msg = (
f"Registration is closed. "
f"It reopens in {format_timedelta(time_until_open)} at {nr.strftime('%H:%M UTC')}."
)
return True, msg
else:
# Registration is open
if now > ds:
# We've passed ds, so next downtime is tomorrow
nr += timedelta(days=1)
ds = get_downtime_start(nr)
time_until_close = ds - now
msg = (
f"Registration is open. "
f"It will close in {format_timedelta(time_until_close)} at {ds.strftime('%H:%M UTC')}."
)
return False, msg
# ---------------------------------------------------------
# 4. Registration Validation
# ---------------------------------------------------------
def is_username_banned(username: str) -> bool:
"""Check if username matches any banned patterns."""
return any(pattern.search(username) for pattern in banned_username_patterns)
def check_email_cooldown(email: str) -> Optional[str]:
"""Check if email is allowed to register based on cooldown and multiple account rules."""
registrations = load_registrations()
email_entries = [r for r in registrations if r["email"] == email]
if not email_entries:
return None
if not config.get("multiple_users_per_email", True):
return "This email address has already been used to register an account."
if email_cooldown := config.get("email_cooldown"):
latest_registration = max(
datetime.fromisoformat(e["datetime"])
for e in email_entries
)
time_since = datetime.utcnow() - latest_registration
if time_since.total_seconds() < email_cooldown:
wait_time = email_cooldown - time_since.total_seconds()
return f"Please wait {int(wait_time)} seconds before requesting another account."
return None
async def check_username_availability(username: str) -> bool:
"""Check if username is available on Matrix and in our registration records."""
# Check banned usernames first
if is_username_banned(username):
logger.info(f"[USERNAME CHECK] {username}: Banned by pattern")
return False
# Check local registrations
registrations = load_registrations()
if any(r["requested_name"] == username for r in registrations):
logger.info(f"[USERNAME CHECK] {username}: Already requested")
return False
# Check Matrix homeserver
url = f"https://{config['homeserver']}/_matrix/client/v3/register/available?username={username}"
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
is_available = data.get("available", False)
logger.info(f"[USERNAME CHECK] {username}: {'Available' if is_available else 'Taken'}")
return is_available
elif response.status_code == 400:
logger.info(f"[USERNAME CHECK] {username}: Taken (400)")
return False
except httpx.RequestError as ex:
logger.warning(f"[USERNAME CHECK] Could not reach homeserver: {ex}")
return False
return False
# ---------------------------------------------------------
# 5. FastAPI Setup and Routes
# ---------------------------------------------------------
app = FastAPI()
app.add_middleware(CustomLoggingMiddleware)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
now = get_current_utc()
closed, message = is_registration_closed(now)
return templates.TemplateResponse(
"index.html",
{
"request": request,
"registration_closed": closed,
"homeserver": config["homeserver"],
"message": message
}
)
@app.get("/api/time")
async def get_server_time():
now = get_current_utc()
return JSONResponse({"utc_time": now.strftime("%H:%M:%S")})
@app.post("/register", response_class=HTMLResponse)
async def register(
request: Request,
requested_username: str = Form(...),
email: str = Form(...)
):
now = get_current_utc()
client_ip = request.client.host
logger.info(f"Registration attempt - Username: {requested_username}, Email: {email}, IP: {client_ip}")
# Check if registration is closed
closed, message = is_registration_closed(now)
if closed:
logger.info("Registration rejected: Registration is closed")
return templates.TemplateResponse(
"error.html",
{
"request": request,
"message": message
}
)
# Check bans
if client_ip in banned_ips:
logger.info(f"Registration rejected: Banned IP {client_ip}")
return templates.TemplateResponse(
"error.html",
{
"request": request,
"message": "Registration not allowed from your IP address."
}
)
if email in banned_emails:
logger.info(f"Registration rejected: Banned email {email}")
return templates.TemplateResponse(
"error.html",
{
"request": request,
"message": "Registration not allowed for this email address."
}
)
# Check email cooldown
if error_message := check_email_cooldown(email):
logger.info(f"Registration rejected: Email cooldown - {email}")
return templates.TemplateResponse(
"error.html",
{
"request": request,
"message": error_message
}
)
# Check username availability
available = await check_username_availability(requested_username)
if not available:
logger.info(f"Registration rejected: Username unavailable - {requested_username}")
return templates.TemplateResponse(
"error.html",
{
"request": request,
"message": f"The username '{requested_username}' is not available."
}
)
# Read token and prepare email
token = read_registration_token()
if token is None:
logger.error("Registration token file not found")
raise HTTPException(status_code=500, detail="Registration token file not found.")
time_until_reset = get_time_until_reset_str(now)
email_body = config["email_body"].format(
homeserver=config["homeserver"],
registration_token=token,
requested_username=requested_username,
utc_time=now.strftime("%H:%M:%S"),
time_until_reset=time_until_reset
)
msg = EmailMessage()
msg.set_content(email_body)
msg["Subject"] = config["email_subject"].format(homeserver=config["homeserver"])
msg["From"] = config["smtp"]["username"]
msg["To"] = email
# Send email
try:
smtp_conf = config["smtp"]
with smtplib.SMTP(smtp_conf["host"], smtp_conf["port"]) as server:
if smtp_conf.get("use_tls", True):
server.starttls()
server.login(smtp_conf["username"], smtp_conf["password"])
server.send_message(msg)
logger.info(f"Registration email sent successfully to {email}")
except Exception as ex:
logger.error(f"Failed to send email: {ex}")
raise HTTPException(status_code=500, detail=f"Error sending email: {ex}")
# Log registration
registration_data = {
"requested_name": requested_username,
"email": email,
"datetime": datetime.utcnow().isoformat(),
"ip_address": client_ip
}
save_registration(registration_data)
logger.info(f"Registration successful - Username: {requested_username}, Email: {email}")
return templates.TemplateResponse(
"success.html",
{
"request": request,
"homeserver": config["homeserver"]
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"registration:app",
host="0.0.0.0",
port=config["port"],
reload=True,
access_log=False
)

BIN
static/favicon.ico Normal file

Binary file not shown.

After

Width: 256px  |  Height: 256px  |  Size: 175 KiB

BIN
static/logo.png Normal file

Binary file not shown.

After

(image error) Size: 43 KiB

185
static/styles.css Normal file
View file

@ -0,0 +1,185 @@
/* Base styles */
body {
background-color: #282828;
color: #ebdbb2;
font-family: sans-serif;
margin: 0;
padding: 0;
display: flex;
flex-direction: column;
align-items: center;
justify-content: flex-start;
min-height: 100vh;
}
/* Main container */
.main-container {
background-color: #32302f;
border-radius: 12px;
padding: 2rem;
margin: 2rem auto;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.2);
width: 90%;
max-width: 800px;
display: flex;
flex-direction: column;
align-items: center;
}
/* Logo styling */
.logo-container {
width: 320px;
height: auto;
margin: 1rem 0 2rem 0;
display: flex;
justify-content: center;
background: transparent; /* Explicitly set transparent background */
}
.logo {
max-width: 100%;
height: auto;
background: transparent; /* Explicitly set transparent background */
}
/* Header styling */
header {
text-align: center;
margin-bottom: 2rem;
width: 100%;
}
h1 {
margin: 0;
font-size: 1.8rem;
}
/* Clock styling */
.clock {
font-size: 96px;
font-weight: bold;
color: #8ec07c;
text-align: center;
margin: 1.5rem 0;
font-family: monospace;
}
/* Message containers and content */
.message-container {
width: 100%;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
margin: 2rem 0;
}
.status-message,
.error-message,
.success-message {
width: 100%;
max-width: 600px;
font-size: 1.1rem;
text-align: center;
margin: 1.5rem 0;
}
.status-message,
.success-message {
color: #b8bb26;
}
.error-message {
color: #fb4934;
}
.success-message p {
margin: 0.5rem 0;
}
/* Form styling */
form {
display: flex;
flex-direction: column;
align-items: center;
width: 100%;
margin: 1rem 0;
}
input[type="text"],
input[type="email"] {
width: 80%;
max-width: 400px;
padding: 12px;
margin-bottom: 1rem;
font-size: 16px;
background-color: #3c3836;
border: 1px solid #665c54;
color: #ebdbb2;
border-radius: 6px;
}
input[type="submit"] {
padding: 12px 24px;
font-size: 16px;
background-color: #689d6a;
border: none;
border-radius: 6px;
color: #ebdbb2;
cursor: pointer;
transition: background-color 0.2s;
}
input[type="submit"]:hover {
background-color: #8ec07c;
}
/* Button styling */
.button-container {
width: 100%;
display: flex;
justify-content: center;
align-items: center;
margin: 2rem 0;
}
.back-button {
display: inline-block;
padding: 12px 24px;
color: #83a598;
text-decoration: none;
font-size: 16px;
border: 1px solid #83a598;
border-radius: 6px;
transition: all 0.2s;
text-align: center;
}
.back-button:hover {
background-color: #83a598;
color: #282828;
}
/* Footer styling */
.footer-spacer {
flex-grow: 1;
}
footer {
text-align: center;
color: #a89984;
font-size: 0.9rem;
margin-top: 2rem;
width: 100%;
}
footer p {
margin: 0;
}
/* Focus states for accessibility */
input:focus,
.back-button:focus {
outline: 2px solid #83a598;
outline-offset: 2px;
}

31
templates/error.html Normal file
View file

@ -0,0 +1,31 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Error - {{ homeserver }}</title>
<link rel="icon" href="/static/favicon.ico">
<link rel="stylesheet" href="/static/styles.css">
</head>
<body>
<div class="main-container">
<div class="logo-container">
<img src="/static/logo.png" alt="{{ homeserver }} logo" class="logo">
</div>
<div class="message-container">
<div class="error-message">
{{ message }}
</div>
</div>
<div class="button-container">
<a href="/" class="back-button">Back to Home</a>
</div>
<div class="footer-spacer"></div>
<footer>
<p>Matrix Homeserver: {{ homeserver }}</p>
</footer>
</div>
</body>
</html>

57
templates/index.html Normal file
View file

@ -0,0 +1,57 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Request an account on {{ homeserver }}</title>
<link rel="icon" href="/static/favicon.ico">
<link rel="stylesheet" href="/static/styles.css">
<script>
function updateClock() {
const now = new Date();
const utcHours = String(now.getUTCHours()).padStart(2, '0');
const utcMinutes = String(now.getUTCMinutes()).padStart(2, '0');
const utcSeconds = String(now.getUTCSeconds()).padStart(2, '0');
document.getElementById("clock").innerText =
`${utcHours}:${utcMinutes}:${utcSeconds}`;
}
// Wait for DOM to be ready before starting the clock
document.addEventListener('DOMContentLoaded', function() {
updateClock();
setInterval(updateClock, 1000);
});
</script>
</head>
<body>
<div class="main-container">
<div class="logo-container">
<img src="/static/logo.png" alt="{{ homeserver }} logo" class="logo">
</div>
<header>
<h1>Request an account on {{ homeserver }}</h1>
</header>
<div id="clock" class="clock">--:--:--</div>
<div class="status-message">
{{ message }}
</div>
{% if registration_closed %}
<!-- Registration is closed -->
{% else %}
<form action="/register" method="post">
<input type="text" name="requested_username" placeholder="Requested Username" required>
<input type="email" name="email" placeholder="Valid Email Address" required>
<input type="submit" value="Submit">
</form>
{% endif %}
<div class="footer-spacer"></div>
<footer>
<p>Matrix Homeserver: {{ homeserver }}</p>
</footer>
</div>
</body>
</html>

32
templates/success.html Normal file
View file

@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Registration Success - {{ homeserver }}</title>
<link rel="icon" href="/static/favicon.ico">
<link rel="stylesheet" href="/static/styles.css">
</head>
<body>
<div class="main-container">
<div class="logo-container">
<img src="/static/logo.png" alt="{{ homeserver }} logo" class="logo">
</div>
<div class="message-container">
<div class="success-message">
<p>Registration token sent to your email address.</p>
<p>Please check your inbox.</p>
</div>
</div>
<div class="button-container">
<a href="/" class="back-button">Back to Home</a>
</div>
<div class="footer-spacer"></div>
<footer>
<p>Matrix Homeserver: {{ homeserver }}</p>
</footer>
</div>
</body>
</html>