602 lines
26 KiB
Python
602 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import yaml
|
|
import requests
|
|
import feedparser
|
|
import datetime
|
|
import subprocess
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
from pathlib import Path
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
from datetime import timezone # For timezone-aware datetime objects
|
|
|
|
# --- Configuration ---
|
|
# File paths relative to the script's parent directory (sw1tch/)
|
|
BASE_DIR = Path(__file__).parent
|
|
CONFIG_FILE = BASE_DIR / "config" / "config.yaml"
|
|
ATTESTATIONS_FILE = BASE_DIR / "config" / "attestations.txt"
|
|
OUTPUT_FILE = BASE_DIR / "data" / "canary.txt"
|
|
TEMP_MESSAGE_FILE = BASE_DIR / "data" / "temp_canary_message.txt" # For GPG signing
|
|
|
|
# --- Core Functions ---
|
|
|
|
def load_config():
|
|
"""Loads configuration settings from the YAML file."""
|
|
try:
|
|
if not CONFIG_FILE.exists():
|
|
print(f"Error: Configuration file '{CONFIG_FILE}' not found.")
|
|
sys.exit(1)
|
|
with open(CONFIG_FILE, 'r') as file:
|
|
config = yaml.safe_load(file)
|
|
|
|
# Validate essential non-Matrix config fields
|
|
required = [
|
|
('canary', 'organization'),
|
|
('canary', 'gpg_key_id'),
|
|
]
|
|
for path in required:
|
|
current = config
|
|
path_str = '.'.join(path)
|
|
try:
|
|
for key in path:
|
|
if key not in current: raise KeyError
|
|
current = current[key]
|
|
except KeyError:
|
|
print(f"Error: Missing required field '{path_str}' in config.")
|
|
sys.exit(1)
|
|
except TypeError:
|
|
print(f"Error: Invalid structure for '{path_str}' in config.")
|
|
sys.exit(1)
|
|
|
|
# Basic validation of Matrix structure if present (full check done before posting)
|
|
if 'canary' in config and 'credentials' in config['canary']:
|
|
matrix_required_structure = [
|
|
('canary', 'credentials', 'username'),
|
|
('canary', 'credentials', 'password'),
|
|
('canary', 'room')
|
|
]
|
|
for path in matrix_required_structure:
|
|
current = config
|
|
path_str = '.'.join(path)
|
|
try:
|
|
for key in path:
|
|
if key not in current: break # Okay if missing, checked later
|
|
current = current[key]
|
|
except TypeError:
|
|
print(f"Warning: Invalid structure for potential Matrix field '{path_str}'.")
|
|
|
|
return config
|
|
except Exception as e:
|
|
print(f"Error loading configuration: {e}")
|
|
sys.exit(1)
|
|
|
|
def load_attestations():
|
|
"""Loads attestation statements from the attestations text file."""
|
|
try:
|
|
if not ATTESTATIONS_FILE.exists():
|
|
print(f"Error: Attestations file '{ATTESTATIONS_FILE}' not found.")
|
|
sys.exit(1)
|
|
with open(ATTESTATIONS_FILE, 'r') as f:
|
|
# Return non-empty, stripped lines
|
|
return [line.strip() for line in f if line.strip()]
|
|
except Exception as e:
|
|
print(f"Error loading attestations: {e}")
|
|
sys.exit(1)
|
|
|
|
def get_nist_time():
|
|
session = requests.Session()
|
|
retry_strategy = Retry(total=5, backoff_factor=2, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET"], raise_on_redirect=True, connect=3, read=3)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
session.mount("https://", adapter)
|
|
session.mount("http://", adapter)
|
|
endpoints = [
|
|
"http://worldclockapi.com/api/json/utc/now", # Use HTTP due to HTTPS cert issue
|
|
"https://timeapi.io/api/Time/current/zone?timeZone=UTC",
|
|
"https://worldtimeapi.org/api/timezone/UTC"
|
|
]
|
|
for url in endpoints:
|
|
try:
|
|
print(f"Fetching time from {url}...")
|
|
response = session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if "currentDateTime" in data:
|
|
dt_str = data["currentDateTime"]
|
|
elif "dateTime" in data:
|
|
dt_str = data["dateTime"]
|
|
elif "utc_datetime" in data:
|
|
dt_str = data["utc_datetime"]
|
|
else:
|
|
print(f"Warning: Unexpected response format from {url}")
|
|
continue
|
|
|
|
time_part = dt_str.split('.')[0].replace('T', ' ').replace('Z', '')
|
|
if '+' not in time_part and '-' not in time_part.split(' ')[-1]:
|
|
return f"{time_part} UTC"
|
|
parts = time_part.split(' ')
|
|
if '+' in parts[-1] or '-' in parts[-1]:
|
|
return f"{' '.join(parts[:-1])} UTC"
|
|
return f"{time_part} UTC"
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error fetching NIST time from {url}: {e}")
|
|
except Exception as e:
|
|
print(f"Error processing time from {url}: {e}")
|
|
|
|
print("Error: Could not fetch time from any source. Falling back to system time.")
|
|
return datetime.datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
def old_get_nist_time():
|
|
"""Fetches the current UTC time from public time APIs with retries."""
|
|
session = requests.Session()
|
|
# Retry strategy for network issues
|
|
retry_strategy = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
session.mount("https://", adapter)
|
|
endpoints = [
|
|
"https://timeapi.io/api/Time/current/zone?timeZone=UTC",
|
|
"https://worldtimeapi.org/api/timezone/UTC",
|
|
]
|
|
for url in endpoints:
|
|
try:
|
|
print(f"Fetching time from {url}...")
|
|
response = session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if "dateTime" in data: dt_str = data["dateTime"]
|
|
elif "utc_datetime" in data: dt_str = data["utc_datetime"]
|
|
else:
|
|
print(f"Warning: Unexpected response format from {url}")
|
|
continue
|
|
|
|
# Standardize format (YYYY-MM-DD HH:MM:SS UTC), remove microseconds/timezone info
|
|
time_part = dt_str.split('.')[0].replace('T', ' ').replace('Z', '')
|
|
# Add UTC if not already present from offset/Z
|
|
if '+' not in time_part and '-' not in time_part.split(' ')[-1]:
|
|
return f"{time_part} UTC"
|
|
else: # Handle potential offsets, though UTC is requested
|
|
# Basic cleaning assuming offset is present
|
|
parts = time_part.split(' ')
|
|
if '+' in parts[-1] or '-' in parts[-1]:
|
|
return f"{' '.join(parts[:-1])} UTC"
|
|
else:
|
|
return f"{time_part} UTC"
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error fetching NIST time from {url}: {e}")
|
|
except Exception as e:
|
|
print(f"Error processing time from {url}: {e}")
|
|
|
|
print("Error: Could not fetch time from any source.")
|
|
return None
|
|
|
|
def get_rss_headline(config):
|
|
"""Fetches the latest headline and link from the configured RSS feed."""
|
|
try:
|
|
# Safely get RSS config, providing defaults
|
|
rss_config = config.get('canary', {}).get('rss', {})
|
|
rss_url = rss_config.get('url', 'https://www.democracynow.org/democracynow.rss') # Default feed
|
|
rss_name = rss_config.get('name', 'Democracy Now!') # Use specific default name
|
|
print(f"Fetching {rss_name} headline from {rss_url}...")
|
|
feed = feedparser.parse(rss_url)
|
|
if feed.entries:
|
|
entry = feed.entries[0]
|
|
return {"title": entry.title, "link": entry.link}
|
|
else:
|
|
print(f"No entries found in RSS feed: {rss_url}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error fetching RSS headline: {e}")
|
|
return None
|
|
|
|
def get_monero_latest_block():
|
|
"""Fetches the latest Monero block height, hash, and timestamp using public APIs."""
|
|
# Use reliable source for height/timestamp
|
|
stats_url = "https://localmonero.co/blocks/api/get_stats"
|
|
# Use reliable source for block header (incl. hash) by height
|
|
block_header_url_template = "https://moneroblocks.info/api/get_block_header/{}"
|
|
|
|
try:
|
|
# Step 1: Get latest height and timestamp
|
|
print(f"Fetching Monero stats from {stats_url}...")
|
|
stats_response = requests.get(stats_url, timeout=15)
|
|
stats_response.raise_for_status()
|
|
stats_data = stats_response.json()
|
|
|
|
if not stats_data or 'height' not in stats_data or 'last_timestamp' not in stats_data:
|
|
print(f"Error: Unexpected data format from Monero stats API ({stats_url})")
|
|
return None
|
|
|
|
height = stats_data['height']
|
|
timestamp = stats_data['last_timestamp']
|
|
# Use timezone-aware datetime object for UTC conversion
|
|
timestamp_utc = datetime.datetime.fromtimestamp(timestamp, timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
# Step 2: Get block header (including hash) using the height
|
|
block_header_url = block_header_url_template.format(height)
|
|
print(f"Fetching Monero block header from {block_header_url}...")
|
|
header_response = requests.get(block_header_url, timeout=15)
|
|
|
|
# Handle cases where the latest block isn't indexed yet (common timing issue)
|
|
if header_response.status_code in [404, 500]:
|
|
print(f"Warning: Block height {height} lookup failed on {block_header_url} (Status: {header_response.status_code}). Trying previous block ({height-1}).")
|
|
height -= 1 # Fallback to previous block height
|
|
block_header_url = block_header_url_template.format(height)
|
|
print(f"Fetching Monero block header from {block_header_url}...")
|
|
header_response = requests.get(block_header_url, timeout=15)
|
|
header_response.raise_for_status() # Raise error if fallback also fails
|
|
elif not header_response.ok: # Raise other non-2xx errors
|
|
header_response.raise_for_status()
|
|
|
|
header_data = header_response.json()
|
|
|
|
# Validate expected structure from moneroblocks.info
|
|
if not header_data or 'block_header' not in header_data or 'hash' not in header_data['block_header']:
|
|
print(f"Error: Unexpected data format from Monero block header API ({block_header_url})")
|
|
return None
|
|
|
|
block_hash = header_data['block_header']['hash']
|
|
|
|
print(f"Successfully fetched Monero block: Height={height}, Hash={block_hash[:10]}...")
|
|
# Return potentially decremented height, the hash found, and the original latest timestamp
|
|
return {
|
|
"height": height,
|
|
"hash": block_hash,
|
|
"time": timestamp_utc
|
|
}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error fetching Monero block data: {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error processing Monero block data: {e}")
|
|
return None
|
|
|
|
def collect_attestations(config, is_interactive):
|
|
"""Loads attestations and confirms them with the user if interactive."""
|
|
attestations = load_attestations()
|
|
selected_attestations = []
|
|
|
|
if is_interactive:
|
|
org = config['canary']['organization']
|
|
print("\nPlease confirm each attestation separately:")
|
|
for i, attestation in enumerate(attestations, 1):
|
|
while True:
|
|
response = input(f"Confirm: '{org} {attestation}' (y/n): ").lower()
|
|
if response in ['y', 'n']: break
|
|
print("Please answer 'y' or 'n'.")
|
|
if response == 'y':
|
|
selected_attestations.append(attestation)
|
|
if not selected_attestations:
|
|
proceed = input("No attestations confirmed. Proceed anyway? (y/n): ").lower()
|
|
if proceed != 'y':
|
|
print("Operation cancelled")
|
|
return None # Return None to signal cancellation
|
|
else:
|
|
return [] # Return empty list if proceeding without confirmations
|
|
else:
|
|
return selected_attestations
|
|
else:
|
|
# Non-interactive: Assume all loaded attestations are confirmed
|
|
selected_attestations = attestations
|
|
if not selected_attestations:
|
|
print("Warning: No attestations found in file. Proceeding without attestations.")
|
|
else:
|
|
print("Non-interactive mode: Including all attestations from file.")
|
|
return selected_attestations
|
|
|
|
# --- Added back the missing function ---
|
|
def get_optional_note():
|
|
"""Prompts user (if interactive) for an optional note."""
|
|
note = input("\nAdd an optional note (press Enter to skip): ").strip()
|
|
return note if note else None
|
|
# --- End of added function ---
|
|
|
|
def create_warrant_canary_message(config, is_interactive):
|
|
"""Constructs the main body of the warrant canary message."""
|
|
nist_time = get_nist_time()
|
|
rss_data = get_rss_headline(config)
|
|
monero_block = get_monero_latest_block()
|
|
|
|
# Ensure all required data points were fetched
|
|
if not all([nist_time, rss_data, monero_block]):
|
|
missing = [item for item, data in [("NIST time", nist_time),
|
|
("RSS headline", rss_data),
|
|
("Monero block data", monero_block)] if not data]
|
|
print(f"Error: Could not fetch necessary data: {', '.join(missing)}")
|
|
return None
|
|
|
|
# Handle attestations based on interactivity
|
|
selected_attestations = collect_attestations(config, is_interactive)
|
|
if selected_attestations is None: # Check if collect_attestations signaled cancellation
|
|
return None
|
|
|
|
# Get optional note only if interactive
|
|
note = get_optional_note() if is_interactive else None
|
|
|
|
# Get config details safely with defaults
|
|
org = config.get('canary', {}).get('organization', 'Unknown Organization')
|
|
admin_name = config.get('canary', {}).get('admin_name', 'Admin')
|
|
admin_title = config.get('canary', {}).get('admin_title', 'administrator')
|
|
|
|
# Format date with correct suffix (st, nd, rd, th)
|
|
day = datetime.datetime.now().day
|
|
if 11 <= day <= 13: suffix = 'th'
|
|
else: suffixes = {1: 'st', 2: 'nd', 3: 'rd'}; suffix = suffixes.get(day % 10, 'th')
|
|
current_date_str = datetime.datetime.now().strftime(f'%d{suffix} day of %B, %Y')
|
|
|
|
# Build the message string
|
|
message = f"{org} Warrant Canary · {nist_time}\n\n"
|
|
message += f"I, {admin_name}, the {admin_title} of {org}, state this {current_date_str}:\n"
|
|
for i, attestation in enumerate(selected_attestations, 1):
|
|
message += f" {i}. {org} {attestation}\n"
|
|
|
|
if note:
|
|
message += f"\nNOTE: {note}\n"
|
|
|
|
message += "\nDatestamp Proof:\n"
|
|
message += f" Daily News: \"{rss_data['title']}\"\n"
|
|
message += f" Source URL: {rss_data['link']}\n"
|
|
message += f" XMR block: #{monero_block['height']}, {monero_block['time']}\n"
|
|
message += f" Block hash: {monero_block['hash']}\n" # Ensure this line ends with newline for GPG
|
|
|
|
# Ensure single trailing newline before signing
|
|
return message.rstrip() + "\n"
|
|
|
|
def sign_with_gpg(message, gpg_key_id):
|
|
"""Signs the message using GPG clearsign with the specified key ID."""
|
|
if not gpg_key_id:
|
|
print("Error: GPG Key ID is missing in config.")
|
|
return None
|
|
try:
|
|
TEMP_MESSAGE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"Signing message with GPG key ID: {gpg_key_id}...")
|
|
# Ensure input message ends with exactly one newline for GPG
|
|
with open(TEMP_MESSAGE_FILE, "w", newline='\n', encoding='utf-8') as f:
|
|
f.write(message.rstrip() + '\n')
|
|
|
|
# Use --batch and --yes for non-interactive signing
|
|
cmd = ["gpg", "--batch", "--yes", "--clearsign", "--default-key", gpg_key_id, str(TEMP_MESSAGE_FILE)]
|
|
result = subprocess.run(cmd, check=True, capture_output=True, text=True, encoding='utf-8')
|
|
|
|
# GPG might write to stdout or file depending on version/environment
|
|
signed_message_path = Path(f"{TEMP_MESSAGE_FILE}.asc")
|
|
if signed_message_path.exists():
|
|
with open(signed_message_path, "r", encoding='utf-8') as f:
|
|
signed_message = f.read()
|
|
print(f"GPG signing successful (read from {signed_message_path}).")
|
|
elif result.stdout:
|
|
signed_message = result.stdout
|
|
print("GPG signing successful (read from stdout).")
|
|
else:
|
|
print("Error: GPG signed message file not found and no stdout output.")
|
|
if TEMP_MESSAGE_FILE.exists(): os.remove(TEMP_MESSAGE_FILE)
|
|
return None
|
|
|
|
# Clean up temporary files
|
|
if TEMP_MESSAGE_FILE.exists(): os.remove(TEMP_MESSAGE_FILE)
|
|
if signed_message_path.exists(): os.remove(signed_message_path)
|
|
|
|
# Return the raw signed message
|
|
return signed_message
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"GPG signing error (Exit code: {e.returncode}): {e.stderr or e.stdout or 'No output'}")
|
|
if TEMP_MESSAGE_FILE.exists(): os.remove(TEMP_MESSAGE_FILE)
|
|
signed_message_path = Path(f"{TEMP_MESSAGE_FILE}.asc")
|
|
if signed_message_path.exists(): os.remove(signed_message_path)
|
|
return None
|
|
except FileNotFoundError:
|
|
print("Error: 'gpg' command not found. Is GnuPG installed and in your PATH?")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error during GPG signing: {e}")
|
|
if TEMP_MESSAGE_FILE.exists(): os.remove(TEMP_MESSAGE_FILE)
|
|
signed_message_path = Path(f"{TEMP_MESSAGE_FILE}.asc")
|
|
if signed_message_path.exists(): os.remove(signed_message_path)
|
|
return None
|
|
|
|
def save_warrant_canary(signed_message):
|
|
"""Saves the signed warrant canary message to the output file."""
|
|
try:
|
|
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
# Write exactly what GPG (or our adjusted version) gave us
|
|
with open(OUTPUT_FILE, "w", newline='\n', encoding='utf-8') as f:
|
|
f.write(signed_message)
|
|
print(f"Warrant canary saved to {OUTPUT_FILE}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error saving warrant canary: {e}")
|
|
return False
|
|
|
|
async def post_to_matrix(config, signed_message):
|
|
"""Posts the signed warrant canary message to the configured Matrix room."""
|
|
# Validate Matrix configuration just before attempting to post
|
|
if 'base_url' not in config:
|
|
print("Error: 'base_url' missing in config. Cannot post to Matrix.")
|
|
return False
|
|
try:
|
|
matrix_creds = config['canary']['credentials']
|
|
room_id = config['canary']['room']
|
|
org_name = config['canary']['organization']
|
|
if not all([matrix_creds.get('username'), matrix_creds.get('password'), room_id, org_name]):
|
|
print("Error: Missing Matrix credentials, room ID, or organization name in config.")
|
|
return False
|
|
except KeyError as e:
|
|
print(f"Error: Missing structure for Matrix config (key: {e}). Cannot post.")
|
|
return False
|
|
|
|
# Ensure matrix-nio library is available
|
|
try:
|
|
from nio import AsyncClient, LoginError, RoomSendError
|
|
except ImportError:
|
|
print("Error: matrix-nio library not installed (pip install matrix-nio).")
|
|
return False
|
|
|
|
client = None
|
|
try:
|
|
client = AsyncClient(config['base_url'], matrix_creds['username'])
|
|
print("Logging in to Matrix...")
|
|
login_response = await client.login(matrix_creds['password'])
|
|
if isinstance(login_response, LoginError):
|
|
print(f"Matrix login failed: {login_response.message}")
|
|
return False # Don't proceed if login fails
|
|
print("Matrix login successful.")
|
|
|
|
# Format message for Matrix (ensure code block formatting is correct)
|
|
# The signed message already includes newlines, including the (now forced) one before signature
|
|
full_message_body = (
|
|
f"This is the {org_name} Warrant Canary, signed with GPG for authenticity. "
|
|
"Copy the code block below to verify with `gpg --verify`:\n\n"
|
|
f"```\n{signed_message.strip()}\n```" # Strip leading/trailing whitespace just in case
|
|
)
|
|
full_message_html = (
|
|
f"<p>This is the {org_name} Warrant Canary, signed with GPG for authenticity. "
|
|
"Copy the code block below to verify with <code>gpg --verify</code>:</p>"
|
|
# Use html.escape or similar if needed, but pre should handle GPG block okay
|
|
f"<pre><code>{signed_message.strip()}</code></pre>"
|
|
)
|
|
content = {
|
|
"msgtype": "m.text",
|
|
"body": full_message_body,
|
|
"format": "org.matrix.custom.html",
|
|
"formatted_body": full_message_html
|
|
}
|
|
|
|
print(f"Sending message to Matrix room: {room_id}")
|
|
send_response = await client.room_send(room_id=room_id, message_type="m.room.message", content=content)
|
|
|
|
if isinstance(send_response, RoomSendError):
|
|
print(f"Error posting to Matrix room {room_id}: {send_response.message}")
|
|
return False
|
|
else:
|
|
print("Posted to Matrix successfully.")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"An unexpected error occurred during Matrix posting: {e}")
|
|
return False
|
|
finally:
|
|
if client: # Ensure logout/close happens if client was created
|
|
print("Logging out from Matrix...")
|
|
await client.logout()
|
|
await client.close()
|
|
print("Matrix client closed.")
|
|
|
|
# --- Main Execution Logic ---
|
|
|
|
def main():
|
|
"""Main function to generate, sign, save, and optionally post the warrant canary."""
|
|
print("Generating warrant canary...")
|
|
config = load_config()
|
|
|
|
# Detect if running interactively (e.g., in a terminal vs. cron)
|
|
is_interactive = sys.stdout.isatty()
|
|
if not is_interactive:
|
|
print("Running in non-interactive mode.")
|
|
|
|
# Create the message body
|
|
message = create_warrant_canary_message(config, is_interactive)
|
|
if not message:
|
|
print("Failed to create message payload.")
|
|
sys.exit(1)
|
|
|
|
print("\n--- Warrant Canary Preview ---")
|
|
print(message)
|
|
print("----------------------------")
|
|
|
|
# Get GPG key ID (checked in load_config, but check again for safety)
|
|
gpg_key_id = config.get('canary', {}).get('gpg_key_id')
|
|
if not gpg_key_id:
|
|
print("Error: Missing 'gpg_key_id' in config under 'canary'. Cannot sign.")
|
|
sys.exit(1)
|
|
|
|
# Confirm GPG signing
|
|
sign_confirm = 'n'
|
|
if is_interactive:
|
|
sign_confirm = input("\nSign with GPG? (y/n): ").lower()
|
|
else:
|
|
print("Non-interactive mode: Auto-confirming GPG signing.")
|
|
sign_confirm = 'y' # Auto-sign in non-interactive mode
|
|
|
|
if sign_confirm != 'y':
|
|
print("Operation cancelled by user (GPG signing).")
|
|
sys.exit(0)
|
|
|
|
# Sign the message
|
|
signed_message = sign_with_gpg(message, gpg_key_id)
|
|
if not signed_message:
|
|
print("Failed to sign message with GPG.")
|
|
sys.exit(1) # Exit if signing failed
|
|
|
|
# --- Explicitly add blank line before signature if missing ---
|
|
sig_marker = "\n-----BEGIN PGP SIGNATURE-----"
|
|
if sig_marker in signed_message:
|
|
# Check if it's NOT already double-spaced (i.e., \n\n-----BEGIN...)
|
|
if f"\n\n{sig_marker[1:]}" not in signed_message:
|
|
print("Forcing extra newline before PGP signature block...")
|
|
signed_message = signed_message.replace(sig_marker, f"\n{sig_marker}")
|
|
# --- End newline fix ---
|
|
|
|
# Save the signed message (potentially modified)
|
|
if not save_warrant_canary(signed_message):
|
|
print("Failed to save warrant canary file.")
|
|
sys.exit(1) # Exit if saving failed
|
|
|
|
# --- Optional Matrix Posting ---
|
|
# Check if Matrix posting is feasible based on config
|
|
can_post_matrix = all([
|
|
'base_url' in config,
|
|
config.get('canary', {}).get('credentials', {}).get('username'),
|
|
config.get('canary', {}).get('credentials', {}).get('password'),
|
|
config.get('canary', {}).get('room'),
|
|
config.get('canary', {}).get('organization')
|
|
])
|
|
# Check config for explicit auto-post flag for non-interactive runs
|
|
auto_post = config.get('canary', {}).get('auto_post_matrix', False)
|
|
|
|
post_confirm = 'n' # Default to no
|
|
if is_interactive:
|
|
if can_post_matrix:
|
|
post_confirm = input("\nPost to Matrix? (y/n): ").lower()
|
|
# Prompt even if config is bad, to inform user why it won't work
|
|
elif input("\nPost to Matrix? (y/n): ").lower() == 'y':
|
|
print("Cannot post to Matrix: Check 'base_url' and canary credentials/room/organization in config.")
|
|
else: # Non-interactive
|
|
if can_post_matrix and auto_post:
|
|
print("Non-interactive mode: Auto-posting to Matrix is enabled.")
|
|
post_confirm = 'y'
|
|
elif can_post_matrix: # Auto-post is false or missing
|
|
print("Non-interactive mode: Auto-posting to Matrix is disabled in config. Skipping.")
|
|
else: # Config is incomplete
|
|
print("Non-interactive mode: Cannot post to Matrix (incomplete config).")
|
|
|
|
# Attempt posting if confirmed and possible
|
|
if post_confirm == 'y' and can_post_matrix:
|
|
print("Attempting to post to Matrix...")
|
|
post_successful = asyncio.run(post_to_matrix(config, signed_message))
|
|
if not post_successful:
|
|
print("Matrix post failed. Check logs above.")
|
|
# Allow script to finish successfully even if Matrix fails
|
|
elif post_confirm == 'y' and not can_post_matrix:
|
|
# Warning message already printed during prompt phase
|
|
pass
|
|
else:
|
|
# Print skipping message unless already handled above
|
|
already_handled = (post_confirm == 'y' and not can_post_matrix) or \
|
|
(not is_interactive and can_post_matrix and not auto_post) or \
|
|
(not is_interactive and not can_post_matrix)
|
|
if not already_handled:
|
|
print("Skipping Matrix post.")
|
|
|
|
print("\nWarrant canary generation process complete.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|