This commit is contained in:
Sangye Ince-Johannsen 2025-04-06 21:05:05 +00:00
parent b5a8dcd220
commit d8b0889d7d
12 changed files with 513 additions and 102 deletions

View file

@ -1,4 +1,16 @@
python canary.py #!/bin/bash
git add - A .
git commit -m "New canary" # Run the canary script
git push origin main python3 sw1tch/canary.py
# Check if the canary.txt was generated successfully
if [ $? -eq 0 ] && [ -f sw1tch/data/canary.txt ]; then
# Stage all changes, commit, and push
git add sw1tch/data/canary.txt
git commit -m "Update warrant canary - $(date +%Y-%m-%d)"
git push origin main
echo "Warrant canary updated and pushed to repository."
else
echo "Failed to generate or find canary.txt. Git operations aborted."
exit 1
fi

View file

@ -2,16 +2,14 @@ import os
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from sw1tch import BASE_DIR, CustomLoggingMiddleware from sw1tch import BASE_DIR, CustomLoggingMiddleware
from sw1tch.routes.public import router as public_router from sw1tch.routes import admin, canary, public
from sw1tch.routes.admin import router as admin_router
from sw1tch.routes.canary import router as canary_router
app = FastAPI() app = FastAPI()
app.add_middleware(CustomLoggingMiddleware) app.add_middleware(CustomLoggingMiddleware)
app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static") app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static")
app.include_router(public_router)
app.include_router(admin_router) app.include_router(admin.router)
app.include_router(canary_router) app.include_router(public.router)
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn

92
sw1tch/canary.py Executable file → Normal file
View file

@ -12,32 +12,52 @@ from pathlib import Path
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
# File paths # File paths relative to the sw1tch/ directory
CONFIG_FILE = "config.yaml" BASE_DIR = Path(__file__).parent
OUTPUT_FILE = "canary.txt" CONFIG_FILE = BASE_DIR / "config" / "config.yaml"
TEMP_MESSAGE_FILE = "temp_canary_message.txt" ATTESTATIONS_FILE = BASE_DIR / "config" / "attestations.txt"
OUTPUT_FILE = BASE_DIR / "data" / "canary.txt"
TEMP_MESSAGE_FILE = BASE_DIR / "data" / "temp_canary_message.txt"
def load_config(): def load_config():
"""Load configuration from YAML file.""" """Load configuration from YAML file."""
try: try:
if not os.path.exists(CONFIG_FILE): if not CONFIG_FILE.exists():
print(f"Error: Configuration file '{CONFIG_FILE}' not found.") print(f"Error: Configuration file '{CONFIG_FILE}' not found.")
sys.exit(1) sys.exit(1)
with open(CONFIG_FILE, 'r') as file: with open(CONFIG_FILE, 'r') as file:
config = yaml.safe_load(file) config = yaml.safe_load(file)
required = [('gpg', 'key_id'), ('canary', 'organization'), ('canary', 'attestations')] # Adjust to match config.yaml structure
for section, field in required: required = [
if section not in config or field not in config[section]: ('canary', 'organization'),
print(f"Error: Missing required field '{section}.{field}' in config.") ('canary', 'gpg_key_id'),
sys.exit(1) ('canary', 'credentials', 'username'),
('canary', 'credentials', 'password'),
('canary', 'room')
]
for path in required:
current = config
for key in path:
if key not in current:
print(f"Error: Missing required field '{'.'.join(path)}' in config.")
sys.exit(1)
current = current[key]
return config return config
except Exception as e: except Exception as e:
print(f"Error loading configuration: {e}") print(f"Error loading configuration: {e}")
sys.exit(1) sys.exit(1)
def get_current_date(): def load_attestations():
"""Return the current date in YYYY-MM-DD format.""" """Load attestations from attestations.txt."""
return datetime.datetime.now().strftime("%Y-%m-%d") try:
if not ATTESTATIONS_FILE.exists():
print(f"Error: Attestations file '{ATTESTATIONS_FILE}' not found.")
sys.exit(1)
with open(ATTESTATIONS_FILE, 'r') as f:
return [line.strip() for line in f if line.strip()]
except Exception as e:
print(f"Error loading attestations: {e}")
sys.exit(1)
def get_nist_time(): def get_nist_time():
"""Get the current time from NIST or fallback servers.""" """Get the current time from NIST or fallback servers."""
@ -66,7 +86,7 @@ def get_nist_time():
def get_rss_headline(config): def get_rss_headline(config):
"""Get the latest headline and link from the configured RSS feed.""" """Get the latest headline and link from the configured RSS feed."""
try: try:
rss_config = config.get('rss', {}) rss_config = config['canary'].get('rss', {})
rss_url = rss_config.get('url', 'https://www.democracynow.org/democracynow.rss') rss_url = rss_config.get('url', 'https://www.democracynow.org/democracynow.rss')
feed = feedparser.parse(rss_url) feed = feedparser.parse(rss_url)
if feed.entries and len(feed.entries) > 0: if feed.entries and len(feed.entries) > 0:
@ -100,11 +120,12 @@ def get_bitcoin_latest_block():
return None return None
def collect_attestations(config): def collect_attestations(config):
"""Prompt user for each attestation from config.""" """Prompt user for each attestation from attestations.txt."""
attestations = load_attestations()
selected_attestations = [] selected_attestations = []
org = config['canary']['organization'] org = config['canary']['organization']
print("\nPlease confirm each attestation separately:") print("\nPlease confirm each attestation separately:")
for i, attestation in enumerate(config['canary']['attestations'], 1): for i, attestation in enumerate(attestations, 1):
while True: while True:
response = input(f"Confirm: '{org} {attestation}' (y/n): ").lower() response = input(f"Confirm: '{org} {attestation}' (y/n): ").lower()
if response in ['y', 'n']: if response in ['y', 'n']:
@ -121,7 +142,6 @@ def get_optional_note():
def create_warrant_canary_message(config): def create_warrant_canary_message(config):
"""Create the warrant canary message with updated formatting.""" """Create the warrant canary message with updated formatting."""
current_date = get_current_date()
nist_time = get_nist_time() nist_time = get_nist_time()
rss_data = get_rss_headline(config) rss_data = get_rss_headline(config)
bitcoin_block = get_bitcoin_latest_block() bitcoin_block = get_bitcoin_latest_block()
@ -129,7 +149,7 @@ def create_warrant_canary_message(config):
if not all([nist_time, rss_data, bitcoin_block]): if not all([nist_time, rss_data, bitcoin_block]):
missing = [] missing = []
if not nist_time: missing.append("NIST time") if not nist_time: missing.append("NIST time")
if not rss_data: missing.append(f"{config['rss'].get('name', 'RSS')} headline") if not rss_data: missing.append(f"{config['canary']['rss'].get('name', 'RSS')} headline")
if not bitcoin_block: missing.append("Bitcoin block data") if not bitcoin_block: missing.append("Bitcoin block data")
print(f"Error: Could not fetch: {', '.join(missing)}") print(f"Error: Could not fetch: {', '.join(missing)}")
return None return None
@ -145,9 +165,7 @@ def create_warrant_canary_message(config):
org = config['canary']['organization'] org = config['canary']['organization']
admin_name = config['canary'].get('admin_name', 'Admin') admin_name = config['canary'].get('admin_name', 'Admin')
admin_title = config['canary'].get('admin_title', 'administrator') admin_title = config['canary'].get('admin_title', 'administrator')
rss_name = config['rss'].get('name', 'RSS Feed')
# No leading \n; GPG adds one blank line after Hash: SHA512
message = f"{org} Warrant Canary · {nist_time}\n" message = f"{org} Warrant Canary · {nist_time}\n"
message += f"I, {admin_name}, the {admin_title} of {org}, state this {datetime.datetime.now().strftime('%dth day of %B, %Y')}:\n" message += f"I, {admin_name}, the {admin_title} of {org}, state this {datetime.datetime.now().strftime('%dth day of %B, %Y')}:\n"
for i, attestation in enumerate(attestations, 1): for i, attestation in enumerate(attestations, 1):
@ -162,12 +180,12 @@ def create_warrant_canary_message(config):
message += f" BTC block: #{bitcoin_block['height']}, {bitcoin_block['time']}\n" message += f" BTC block: #{bitcoin_block['height']}, {bitcoin_block['time']}\n"
message += f" Block hash: {bitcoin_block['hash']}\n" message += f" Block hash: {bitcoin_block['hash']}\n"
return message.rstrip() + "\n" # Single newline before signature return message.rstrip() + "\n"
def sign_with_gpg(message, gpg_key_id): def sign_with_gpg(message, gpg_key_id):
"""Sign the warrant canary message with GPG, ensuring no extra newline after signature header.""" """Sign the warrant canary message with GPG."""
try: try:
with open(TEMP_MESSAGE_FILE, "w", newline='\n') as f: # Unix line endings with open(TEMP_MESSAGE_FILE, "w", newline='\n') as f:
f.write(message) f.write(message)
cmd = ["gpg", "--clearsign", "--default-key", gpg_key_id, TEMP_MESSAGE_FILE] cmd = ["gpg", "--clearsign", "--default-key", gpg_key_id, TEMP_MESSAGE_FILE]
subprocess.run(cmd, check=True) subprocess.run(cmd, check=True)
@ -175,13 +193,11 @@ def sign_with_gpg(message, gpg_key_id):
signed_message = f.read() signed_message = f.read()
os.remove(TEMP_MESSAGE_FILE) os.remove(TEMP_MESSAGE_FILE)
os.remove(f"{TEMP_MESSAGE_FILE}.asc") os.remove(f"{TEMP_MESSAGE_FILE}.asc")
# Fix GPG's extra newline after -----BEGIN PGP SIGNATURE-----
lines = signed_message.splitlines() lines = signed_message.splitlines()
signature_idx = next(i for i, line in enumerate(lines) if line == "-----BEGIN PGP SIGNATURE-----") signature_idx = next(i for i, line in enumerate(lines) if line == "-----BEGIN PGP SIGNATURE-----")
if lines[signature_idx + 1] == "": if lines[signature_idx + 1] == "":
lines.pop(signature_idx + 1) # Remove blank line lines.pop(signature_idx + 1)
signed_message = "\n".join(lines) return "\n".join(lines)
return signed_message
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f"GPG signing error: {e}") print(f"GPG signing error: {e}")
return None return None
@ -202,13 +218,10 @@ def save_warrant_canary(signed_message):
async def post_to_matrix(config, signed_message): async def post_to_matrix(config, signed_message):
"""Post the signed warrant canary to Matrix room.""" """Post the signed warrant canary to Matrix room."""
if not config.get('matrix', {}).get('enabled', False):
print("Matrix posting is disabled in config")
return False
try: try:
from nio import AsyncClient, LoginResponse from nio import AsyncClient
matrix = config['matrix'] matrix = config['canary']['credentials']
client = AsyncClient(matrix['homeserver'], matrix['username']) client = AsyncClient(config['base_url'], matrix['username'])
await client.login(matrix['password']) await client.login(matrix['password'])
full_message = ( full_message = (
@ -227,7 +240,7 @@ async def post_to_matrix(config, signed_message):
f"<pre>{signed_message}</pre>" f"<pre>{signed_message}</pre>"
) )
} }
await client.room_send(matrix['room_id'], "m.room.message", content) await client.room_send(config['canary']['room'], "m.room.message", content)
await client.logout() await client.logout()
await client.close() await client.close()
print("Posted to Matrix successfully") print("Posted to Matrix successfully")
@ -253,14 +266,17 @@ def main():
print("Operation cancelled") print("Operation cancelled")
sys.exit(0) sys.exit(0)
signed_message = sign_with_gpg(message, config['gpg']['key_id']) signed_message = sign_with_gpg(message, config['canary']['gpg_key_id'])
if not signed_message: if not signed_message:
print("Failed to sign message") print("Failed to sign message")
sys.exit(1) sys.exit(1)
if save_warrant_canary(signed_message) and config.get('matrix', {}).get('enabled', False): if not save_warrant_canary(signed_message):
if input("Post to Matrix? (y/n): ").lower() == 'y': print("Failed to save canary")
asyncio.run(post_to_matrix(config, signed_message)) sys.exit(1)
if input("Post to Matrix? (y/n): ").lower() == 'y':
asyncio.run(post_to_matrix(config, signed_message))
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View file

@ -1,16 +1,73 @@
from fastapi import APIRouter, Form, Depends from fastapi import APIRouter, Form, Depends, Request, HTTPException
from fastapi.responses import JSONResponse from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from datetime import datetime, timedelta from datetime import datetime, timedelta
import httpx import httpx
import re import re
import os
import hashlib
from sw1tch import config, logger, load_registrations, save_registrations, verify_admin_auth from sw1tch import BASE_DIR, config, logger, load_registrations, save_registrations, verify_admin_auth
from sw1tch.utilities.matrix import get_matrix_users, deactivate_user from sw1tch.utilities.matrix import get_matrix_users, deactivate_user
router = APIRouter(prefix="/_admin", dependencies=[Depends(verify_admin_auth)]) router = APIRouter(prefix="/_admin")
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates"))
@router.get("/", response_class=HTMLResponse)
async def admin_panel(request: Request, auth_token: str = Depends(verify_admin_auth)):
return templates.TemplateResponse("admin.html", {"request": request, "authenticated": True})
@router.get("/login", response_class=HTMLResponse)
async def admin_login_page(request: Request):
return templates.TemplateResponse("admin.html", {"request": request, "authenticated": False})
@router.post("/login", response_class=HTMLResponse)
async def admin_login(request: Request, password: str = Form(...)):
expected_password = config["matrix_admin"].get("password", "")
hashed_password = hashlib.sha256(password.encode()).hexdigest()
if hashed_password == hashlib.sha256(expected_password.encode()).hexdigest():
return HTMLResponse(
content=f"""
<html>
<head>
<meta http-equiv="refresh" content="0;url=/_admin/?auth_token={hashed_password}">
</head>
<body>
<p>Redirecting to admin panel...</p>
</body>
</html>
"""
)
else:
return templates.TemplateResponse("admin.html", {"request": request, "authenticated": False, "error": "Invalid password"})
@router.get("/view_unfulfilled", response_class=HTMLResponse)
async def view_unfulfilled_registrations(request: Request, auth_token: str = Depends(verify_admin_auth)):
registrations = load_registrations()
unfulfilled = []
if registrations:
current_time = datetime.utcnow()
async with httpx.AsyncClient() as client:
for entry in registrations:
username = entry["requested_name"]
reg_date = datetime.fromisoformat(entry["datetime"])
age = current_time - reg_date
url = f"{config['base_url']}/_matrix/client/v3/register/available?username={username}"
try:
response = await client.get(url, timeout=5)
if response.status_code == 200 and response.json().get("available", False):
unfulfilled.append({
"username": username,
"email": entry["email"],
"registration_date": entry["datetime"],
"age_hours": age.total_seconds() / 3600
})
except httpx.RequestError as ex:
logger.error(f"Error checking username {username}: {ex}")
return templates.TemplateResponse("unfulfilled_registrations.html", {"request": request, "registrations": unfulfilled})
@router.post("/purge_unfulfilled_registrations", response_class=JSONResponse) @router.post("/purge_unfulfilled_registrations", response_class=JSONResponse)
async def purge_unfulfilled_registrations(min_age_hours: int = Form(default=24)): async def purge_unfulfilled_registrations(min_age_hours: int = Form(default=24), auth_token: str = Depends(verify_admin_auth)):
registrations = load_registrations() registrations = load_registrations()
if not registrations: if not registrations:
return JSONResponse({"message": "No registrations found to clean up"}) return JSONResponse({"message": "No registrations found to clean up"})
@ -62,8 +119,21 @@ async def purge_unfulfilled_registrations(min_age_hours: int = Form(default=24))
logger.info(f"Cleanup complete: {result}") logger.info(f"Cleanup complete: {result}")
return JSONResponse(result) return JSONResponse(result)
@router.get("/view_undocumented", response_class=HTMLResponse)
async def view_undocumented_users(request: Request, auth_token: str = Depends(verify_admin_auth)):
registrations = load_registrations()
matrix_users = await get_matrix_users()
registered_usernames = {entry["requested_name"].lower() for entry in registrations}
homeserver = config["homeserver"].lower()
undocumented_users = [
user for user in matrix_users
if user.lower().startswith("@") and user[1:].lower().split(":", 1)[1] == homeserver
and user[1:].lower().split(":", 1)[0] not in registered_usernames
]
return templates.TemplateResponse("undocumented_users.html", {"request": request, "users": undocumented_users})
@router.post("/deactivate_undocumented_users", response_class=JSONResponse) @router.post("/deactivate_undocumented_users", response_class=JSONResponse)
async def deactivate_undocumented_users(): async def deactivate_undocumented_users(auth_token: str = Depends(verify_admin_auth)):
registrations = load_registrations() registrations = load_registrations()
matrix_users = await get_matrix_users() matrix_users = await get_matrix_users()
registered_usernames = {entry["requested_name"].lower() for entry in registrations} registered_usernames = {entry["requested_name"].lower() for entry in registrations}
@ -93,7 +163,7 @@ async def deactivate_undocumented_users():
return JSONResponse(result) return JSONResponse(result)
@router.post("/retroactively_document_users", response_class=JSONResponse) @router.post("/retroactively_document_users", response_class=JSONResponse)
async def retroactively_document_users(): async def retroactively_document_users(auth_token: str = Depends(verify_admin_auth)):
registrations = load_registrations() registrations = load_registrations()
matrix_users = await get_matrix_users() matrix_users = await get_matrix_users()
registered_usernames = {entry["requested_name"].lower() for entry in registrations} registered_usernames = {entry["requested_name"].lower() for entry in registrations}

View file

@ -13,7 +13,7 @@ from urllib3.util.retry import Retry
from sw1tch import BASE_DIR, config, logger, verify_admin_auth from sw1tch import BASE_DIR, config, logger, verify_admin_auth
from sw1tch.utilities.matrix import AsyncClient from sw1tch.utilities.matrix import AsyncClient
router = APIRouter(prefix="/_admin/warrant_canary", dependencies=[Depends(verify_admin_auth)]) router = APIRouter(prefix="/_admin/canary")
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates"))
ATTESTATIONS_FILE = os.path.join(BASE_DIR, "config", "attestations.txt") ATTESTATIONS_FILE = os.path.join(BASE_DIR, "config", "attestations.txt")
@ -45,8 +45,8 @@ def get_nist_time():
return data["dateTime"] + " UTC" return data["dateTime"] + " UTC"
elif "utc_datetime" in data: elif "utc_datetime" in data:
return data["utc_datetime"] + " UTC" return data["utc_datetime"] + " UTC"
except requests.RequestException: except requests.RequestException as e:
pass logger.error(f"Failed to fetch NIST time from {url}: {e}")
raise HTTPException(status_code=500, detail="Failed to fetch NIST time") raise HTTPException(status_code=500, detail="Failed to fetch NIST time")
def get_rss_headline(): def get_rss_headline():
@ -60,18 +60,19 @@ def get_rss_headline():
def get_bitcoin_latest_block(): def get_bitcoin_latest_block():
try: try:
response = requests.get("https://blockchain.info/latestblock", timeout=10) response = requests.get("https://blockchain.info/latestblock", timeout=10)
if response.status_code == 200: response.raise_for_status()
data = response.json() data = response.json()
block_response = requests.get(f"https://blockchain.info/rawblock/{data['hash']}", timeout=10) block_response = requests.get(f"https://blockchain.info/rawblock/{data['hash']}", timeout=10)
if block_response.status_code == 200: block_response.raise_for_status()
block_data = block_response.json() block_data = block_response.json()
hash_str = data["hash"].lstrip("0") or "0" hash_str = data["hash"].lstrip("0") or "0"
return { return {
"height": data["height"], "height": data["height"],
"hash": hash_str, "hash": hash_str,
"time": datetime.datetime.fromtimestamp(block_data["time"]).strftime("%Y-%m-%d %H:%M:%S UTC") "time": datetime.datetime.fromtimestamp(block_data["time"]).strftime("%Y-%m-%d %H:%M:%S UTC")
} }
except Exception: except requests.RequestException as e:
logger.error(f"Failed to fetch Bitcoin block data: {e}")
raise HTTPException(status_code=500, detail="Failed to fetch Bitcoin block data") raise HTTPException(status_code=500, detail="Failed to fetch Bitcoin block data")
def create_warrant_canary_message(attestations: List[str], note: str): def create_warrant_canary_message(attestations: List[str], note: str):
@ -99,7 +100,7 @@ def sign_with_gpg(message: str, gpg_key_id: str, passphrase: str):
with open(TEMP_CANARY_FILE, "w", newline='\n') as f: with open(TEMP_CANARY_FILE, "w", newline='\n') as f:
f.write(message) f.write(message)
cmd = ["gpg", "--batch", "--yes", "--passphrase", passphrase, "--clearsign", "--default-key", gpg_key_id, TEMP_CANARY_FILE] cmd = ["gpg", "--batch", "--yes", "--passphrase", passphrase, "--clearsign", "--default-key", gpg_key_id, TEMP_CANARY_FILE]
subprocess.run(cmd, check=True) subprocess.run(cmd, check=True, capture_output=True, text=True)
with open(f"{TEMP_CANARY_FILE}.asc", "r") as f: with open(f"{TEMP_CANARY_FILE}.asc", "r") as f:
signed_message = f.read() signed_message = f.read()
os.remove(TEMP_CANARY_FILE) os.remove(TEMP_CANARY_FILE)
@ -110,7 +111,11 @@ def sign_with_gpg(message: str, gpg_key_id: str, passphrase: str):
lines.pop(signature_idx + 1) lines.pop(signature_idx + 1)
return "\n".join(lines) return "\n".join(lines)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise HTTPException(status_code=500, detail=f"GPG signing failed: {e}") logger.error(f"GPG signing failed: {e.stderr}")
raise HTTPException(status_code=500, detail=f"GPG signing failed: {e.stderr}")
except Exception as e:
logger.error(f"Error during GPG signing: {e}")
raise HTTPException(status_code=500, detail=f"Error during GPG signing: {e}")
async def post_to_matrix(signed_message: str): async def post_to_matrix(signed_message: str):
try: try:
@ -132,36 +137,62 @@ async def post_to_matrix(signed_message: str):
f"<pre>{signed_message}</pre>" f"<pre>{signed_message}</pre>"
) )
} }
await client.room_send(config['matrix_admin']['room'], "m.room.message", content) await client.room_send(config['canary']['room'], "m.room.message", content)
await client.logout() await client.logout()
await client.close() await client.close()
logger.info("Warrant canary posted to Matrix successfully")
return True return True
except Exception as e: except Exception as e:
logger.error(f"Error posting to Matrix: {e}") logger.error(f"Error posting to Matrix: {e}")
return False return False
@router.get("/", response_class=HTMLResponse) @router.get("/", response_class=HTMLResponse)
async def warrant_canary_form(request: Request): async def warrant_canary_form(request: Request, auth_token: str = Depends(verify_admin_auth)):
attestations = load_attestations() attestations = load_attestations()
return templates.TemplateResponse("canary_form.html", { return templates.TemplateResponse("canary.html", {
"request": request, "request": request,
"attestations": attestations, "attestations": attestations,
"organization": config["canary"]["organization"] "organization": config["canary"]["organization"]
}) })
@router.post("/preview", response_class=HTMLResponse) @router.post("/preview", response_class=HTMLResponse)
async def warrant_canary_preview(request: Request, attestations: List[str] = Form(...), note: str = Form(default="")): async def warrant_canary_preview(
message = create_warrant_canary_message(attestations, note) request: Request,
return templates.TemplateResponse("canary_preview.html", {"request": request, "message": message}) selected_attestations: List[str] = Form(...),
note: str = Form(default=""),
auth_token: str = Depends(verify_admin_auth)
):
message = create_warrant_canary_message(selected_attestations, note)
return templates.TemplateResponse("canary_preview.html", {
"request": request,
"message": message,
"selected_attestations": selected_attestations,
"note": note
})
@router.post("/sign", response_class=HTMLResponse) @router.post("/sign", response_class=HTMLResponse)
async def warrant_canary_sign(request: Request, message: str = Form(...), passphrase: str = Form(...)): async def warrant_canary_sign(
request: Request,
message: str = Form(...),
passphrase: str = Form(...),
auth_token: str = Depends(verify_admin_auth)
):
signed_message = sign_with_gpg(message, config["canary"]["gpg_key_id"], passphrase) signed_message = sign_with_gpg(message, config["canary"]["gpg_key_id"], passphrase)
with open(CANARY_OUTPUT_FILE, "w") as f: with open(CANARY_OUTPUT_FILE, "w") as f:
f.write(signed_message) f.write(signed_message)
return templates.TemplateResponse("canary_success.html", {"request": request, "signed_message": signed_message}) logger.info(f"Warrant canary saved to {CANARY_OUTPUT_FILE}")
return templates.TemplateResponse("canary_success.html", {
"request": request,
"signed_message": signed_message
})
@router.post("/post", response_class=JSONResponse) @router.post("/post", response_class=JSONResponse)
async def warrant_canary_post(signed_message: str = Form(...)): async def warrant_canary_post(
signed_message: str = Form(...),
auth_token: str = Depends(verify_admin_auth)
):
success = await post_to_matrix(signed_message) success = await post_to_matrix(signed_message)
return JSONResponse({"message": "Posted to Matrix" if success else "Failed to post to Matrix"}) return JSONResponse({
"message": "Posted to Matrix successfully" if success else "Failed to post to Matrix",
"success": success
})

View file

@ -33,7 +33,7 @@ body {
border-radius: 0.75rem; border-radius: 0.75rem;
padding: 2rem; padding: 2rem;
width: 100%; width: 100%;
max-width: 24rem; max-width: 80%; /* Changed from 24rem to match results-container */
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);
} }
@ -135,4 +135,82 @@ button:hover, .button:hover {
.message p { .message p {
margin: 0.5rem 0; margin: 0.5rem 0;
} }
.list-container {
margin-top: 0; /* Remove top margin since its now in results-container */
}
.data-table {
width: 100%;
border-collapse: collapse;
margin-top: 1rem;
}
.data-table th,
.data-table td {
padding: 0.75rem;
border: 1px solid var(--border-color);
text-align: left;
overflow-wrap: break-word; /* Ensure long text wraps */
}
.data-table th {
background-color: var(--input-bg);
font-weight: 500;
}
.data-table tr:nth-child(even) {
background-color: var(--card-bg);
}
.data-table tr:hover {
background-color: var(--input-bg);
}
/* Add to the end of sw1tch/static/styles.css */
fieldset {
border: 1px solid var(--border-color);
padding: 1rem;
margin-bottom: 1rem;
border-radius: 0.375rem;
}
legend {
padding: 0 0.5rem;
color: var(--text-secondary);
}
.checkbox-label {
display: block;
margin: 0.5rem 0;
color: var(--text-primary);
}
textarea {
width: 100%;
padding: 0.5rem 1rem;
background-color: var(--input-bg);
border: 1px solid var(--border-color);
border-radius: 0.375rem;
color: var(--text-primary);
font-size: 1rem;
transition: border-color 0.15s ease;
resize: vertical;
}
textarea:focus {
outline: none;
border-color: var(--accent);
}
.canary-preview {
background-color: var(--input-bg);
padding: 1rem;
border-radius: 0.375rem;
white-space: pre-wrap;
word-wrap: break-word;
margin: 1rem 0;
font-family: monospace;
}

View file

@ -0,0 +1,99 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Admin Panel</title>
<link rel="stylesheet" href="/static/styles.css">
<style>
.grid-container {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 1rem;
margin-bottom: 2rem;
}
.admin-button {
padding: 1rem;
font-size: 1rem;
}
.login-container {
display: flex;
flex-direction: column;
gap: 1rem;
}
.results-container {
width: 80%;
margin: 0 auto 2rem auto;
max-height: none; /* Allow height to adjust to content */
}
</style>
</head>
<body>
<div class="card">
<div class="logo-container">
<img src="/static/logo.png" alt="Logo" class="logo">
</div>
{% if not authenticated %}
<div class="login-container">
<form id="login-form" method="post" action="/_admin/login">
<input type="password" name="password" placeholder="Admin Password" required>
<button type="submit">Login</button>
</form>
{% if error %}
<p class="message" style="color: var(--error-color);">{{ error }}</p>
{% endif %}
</div>
{% else %}
<div class="grid-container">
<button onclick="viewUnfulfilled('{{ request.query_params.auth_token }}')" class="admin-button">View Unfulfilled Registrations</button>
<button onclick="viewUndocumented('{{ request.query_params.auth_token }}')" class="admin-button">View Undocumented Users</button>
</div>
<div id="results" class="results-container"></div>
<div class="grid-container">
<button onclick="purgeUnfulfilled('{{ request.query_params.auth_token }}')" class="admin-button">Purge Unfulfilled Registrations</button>
<button onclick="deactivateUndocumented('{{ request.query_params.auth_token }}')" class="admin-button">Deactivate Undocumented Users</button>
</div>
<p class="info-text">Logged in as admin</p>
{% endif %}
</div>
<script>
async function viewUnfulfilled(auth_token) {
const response = await fetch(`/_admin/view_unfulfilled?auth_token=${auth_token}`);
const html = await response.text();
document.getElementById('results').innerHTML = html;
}
async function purgeUnfulfilled(auth_token) {
if (confirm("Are you sure you want to purge unfulfilled registrations?")) {
const response = await fetch(`/_admin/purge_unfulfilled_registrations?auth_token=${auth_token}`, {
method: "POST",
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: "min_age_hours=24"
});
const result = await response.json();
alert(`Cleanup complete:\nKept existing: ${result.kept_existing}\nKept recent: ${result.kept_recent}\nRemoved: ${result.removed}`);
viewUnfulfilled(auth_token); // Refresh the list
}
}
async function viewUndocumented(auth_token) {
const response = await fetch(`/_admin/view_undocumented?auth_token=${auth_token}`);
const html = await response.text();
document.getElementById('results').innerHTML = html;
}
async function deactivateUndocumented(auth_token) {
if (confirm("Are you sure you want to deactivate undocumented users?")) {
const response = await fetch(`/_admin/deactivate_undocumented_users?auth_token=${auth_token}`, {
method: "POST"
});
const result = await response.json();
alert(`${result.message}\nFailed deactivations: ${result.failed_deactivations || 'None'}`);
viewUndocumented(auth_token); // Refresh the list
}
}
</script>
</body>
</html>

View file

@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Warrant Canary</title>
<link rel="stylesheet" href="/static/styles.css">
</head>
<body>
<div class="card">
<div class="logo-container">
<img src="/static/logo.png" alt="Logo" class="logo">
</div>
<h2>Create Warrant Canary</h2>
<form method="post" action="/_admin/canary/preview?auth_token={{ request.query_params.auth_token }}">
<fieldset>
<legend>Select Attestations for {{ organization }}</legend>
{% for attestation in attestations %}
<label class="checkbox-label">
<input type="checkbox" name="selected_attestations" value="{{ attestation }}">
{{ organization }} {{ attestation }}
</label>
{% endfor %}
</fieldset>
<label for="note">Optional Note:</label>
<textarea id="note" name="note" rows="3" placeholder="Add an optional note"></textarea>
<button type="submit">Preview Canary</button>
</form>
<p class="info-text"><a href="/_admin/?auth_token={{ request.query_params.auth_token }}">Back to Admin Panel</a></p>
</div>
</body>
</html>

View file

@ -1,16 +1,25 @@
<!DOCTYPE html> <!DOCTYPE html>
<html> <html lang="en">
<head> <head>
<title>Warrant Canary Preview</title> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Preview Warrant Canary</title>
<link rel="stylesheet" href="/static/styles.css">
</head> </head>
<body> <body>
<h1>Warrant Canary Preview</h1> <div class="card">
<pre>{{ message }}</pre> <div class="logo-container">
<form method="post" action="/_admin/warrant_canary/sign"> <img src="/static/logo.png" alt="Logo" class="logo">
<input type="hidden" name="message" value="{{ message|escape }}"> </div>
<input type="password" name="passphrase" placeholder="GPG Passphrase" required> <h2>Preview Warrant Canary</h2>
<input type="hidden" name="auth_token" value="{{ request.query_params.auth_token }}"> <pre class="canary-preview">{{ message }}</pre>
<button type="submit">Sign</button> <form method="post" action="/_admin/canary/sign?auth_token={{ request.query_params.auth_token }}">
</form> <input type="hidden" name="message" value="{{ message | escape }}">
<label for="passphrase">GPG Passphrase:</label>
<input type="password" id="passphrase" name="passphrase" required>
<button type="submit">Sign and Save</button>
</form>
<p class="info-text"><a href="/_admin/canary/?auth_token={{ request.query_params.auth_token }}">Back to Form</a></p>
</div>
</body> </body>
</html> </html>

View file

@ -1,15 +1,33 @@
<!DOCTYPE html> <!DOCTYPE html>
<html> <html lang="en">
<head> <head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Warrant Canary Signed</title> <title>Warrant Canary Signed</title>
<link rel="stylesheet" href="/static/styles.css">
</head> </head>
<body> <body>
<h1>Warrant Canary Signed</h1> <div class="card">
<pre>{{ signed_message }}</pre> <div class="logo-container">
<form method="post" action="/_admin/warrant_canary/post"> <img src="/static/logo.png" alt="Logo" class="logo">
<input type="hidden" name="signed_message" value="{{ signed_message|escape }}"> </div>
<input type="hidden" name="auth_token" value="{{ request.query_params.auth_token }}"> <h2>Warrant Canary Signed</h2>
<button type="submit">Post to Matrix</button> <pre class="canary-preview">{{ signed_message }}</pre>
</form> <button onclick="postToMatrix('{{ signed_message | escape }}', '{{ request.query_params.auth_token }}')">Post to Matrix</button>
<p id="post-result" class="message"></p>
<p class="info-text"><a href="/_admin/?auth_token={{ request.query_params.auth_token }}">Back to Admin Panel</a></p>
</div>
<script>
async function postToMatrix(signed_message, auth_token) {
const response = await fetch(`/_admin/canary/post?auth_token=${auth_token}`, {
method: "POST",
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: `signed_message=${encodeURIComponent(signed_message)}`
});
const result = await response.json();
document.getElementById("post-result").textContent = result.message;
document.getElementById("post-result").style.color = result.success ? "var(--success-color)" : "var(--error-color)";
}
</script>
</body> </body>
</html> </html>

View file

@ -0,0 +1,21 @@
<div class="list-container">
<h2>Undocumented Users</h2>
{% if users %}
<table class="data-table">
<thead>
<tr>
<th>User ID</th>
</tr>
</thead>
<tbody>
{% for user in users %}
<tr>
<td>{{ user }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="message">No undocumented users found.</p>
{% endif %}
</div>

View file

@ -0,0 +1,27 @@
<div class="list-container">
<h2>Unfulfilled Registrations</h2>
{% if registrations %}
<table class="data-table">
<thead>
<tr>
<th>Username</th>
<th>Email</th>
<th>Registration Date</th>
<th>Age (Hours)</th>
</tr>
</thead>
<tbody>
{% for reg in registrations %}
<tr>
<td>{{ reg.username }}</td>
<td>{{ reg.email }}</td>
<td>{{ reg.registration_date }}</td>
<td>{{ reg.age_hours | round(1) }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="message">No unfulfilled registrations found.</p>
{% endif %}
</div>