diff --git a/sw1tch/canary.py b/sw1tch/canary.py index bb88786..44b226b 100644 --- a/sw1tch/canary.py +++ b/sw1tch/canary.py @@ -12,6 +12,7 @@ from pathlib import Path from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry +# File paths relative to the sw1tch/ directory BASE_DIR = Path(__file__).parent CONFIG_FILE = BASE_DIR / "config" / "config.yaml" ATTESTATIONS_FILE = BASE_DIR / "config" / "attestations.txt" @@ -103,14 +104,19 @@ def get_bitcoin_latest_block(): response = requests.get("https://blockchain.info/latestblock", timeout=10) if response.status_code == 200: data = response.json() - block_response = requests.get(f"https://blockchain.info/rawblock/{data['hash']}", timeout=10) + block_response = requests.get( + f"https://blockchain.info/rawblock/{data['hash']}", + timeout=10 + ) if block_response.status_code == 200: block_data = block_response.json() hash_str = data["hash"].lstrip("0") or "0" return { "height": data["height"], "hash": hash_str, - "time": datetime.datetime.fromtimestamp(block_data["time"]).strftime("%Y-%m-%d %H:%M:%S UTC") + "time": datetime.datetime.fromtimestamp( + block_data["time"] + ).strftime("%Y-%m-%d %H:%M:%S UTC") } print(f"Error fetching Bitcoin block: HTTP {response.status_code}") return None @@ -147,9 +153,12 @@ def create_warrant_canary_message(config): if not all([nist_time, rss_data, bitcoin_block]): missing = [] - if not nist_time: missing.append("NIST time") - if not rss_data: missing.append(f"{config['canary']['rss'].get('name', 'RSS')} headline") - if not bitcoin_block: missing.append("Bitcoin block data") + if not nist_time: + missing.append("NIST time") + if not rss_data: + missing.append(f"{config['canary']['rss'].get('name', 'RSS')} headline") + if not bitcoin_block: + missing.append("Bitcoin block data") print(f"Error: Could not fetch: {', '.join(missing)}") return None @@ -166,7 +175,10 @@ def create_warrant_canary_message(config): admin_title = config['canary'].get('admin_title', 'administrator') message = f"{org} Warrant Canary ยท {nist_time}\n" - message += f"I, {admin_name}, the {admin_title} of {org}, state this {datetime.datetime.now().strftime('%dth day of %B, %Y')}:\n" + message += ( + f"I, {admin_name}, the {admin_title} of {org}, " + f"state this {datetime.datetime.now().strftime('%dth day of %B, %Y')}:\n" + ) for i, attestation in enumerate(attestations, 1): message += f" {i}. {org} {attestation}\n" @@ -192,11 +204,8 @@ def sign_with_gpg(message, gpg_key_id): signed_message = f.read() os.remove(TEMP_MESSAGE_FILE) os.remove(f"{TEMP_MESSAGE_FILE}.asc") - lines = signed_message.splitlines() - signature_idx = next(i for i, line in enumerate(lines) if line == "-----BEGIN PGP SIGNATURE-----") - if lines[signature_idx + 1] == "": - lines.pop(signature_idx + 1) - return "\n".join(lines) + # Return the GPG output exactly as is (no line removals) + return signed_message except subprocess.CalledProcessError as e: print(f"GPG signing error: {e}") return None @@ -234,8 +243,9 @@ async def post_to_matrix(config, signed_message): "body": full_message, "format": "org.matrix.custom.html", "formatted_body": ( - f"This is the {config['canary']['organization']} Warrant Canary, signed with GPG for authenticity. " - "Copy the code block below to verify with <code>gpg --verify</code>:<br><br>" + f"This is the {config['canary']['organization']} Warrant Canary, " + "signed with GPG for authenticity. Copy the code block below to verify " + "with <code>gpg --verify</code>:<br><br>" f"<pre>{signed_message}</pre>" ) }