Corrected canary

This commit is contained in:
Sangye Ince-Johannsen 2025-04-06 21:21:48 +00:00
parent 57b0eefe3b
commit d50fb137a6

View file

@ -12,6 +12,7 @@ from pathlib import Path
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# File paths relative to the sw1tch/ directory
BASE_DIR = Path(__file__).parent
CONFIG_FILE = BASE_DIR / "config" / "config.yaml"
ATTESTATIONS_FILE = BASE_DIR / "config" / "attestations.txt"
@ -103,14 +104,19 @@ def get_bitcoin_latest_block():
response = requests.get("https://blockchain.info/latestblock", timeout=10)
if response.status_code == 200:
data = response.json()
block_response = requests.get(f"https://blockchain.info/rawblock/{data['hash']}", timeout=10)
block_response = requests.get(
f"https://blockchain.info/rawblock/{data['hash']}",
timeout=10
)
if block_response.status_code == 200:
block_data = block_response.json()
hash_str = data["hash"].lstrip("0") or "0"
return {
"height": data["height"],
"hash": hash_str,
"time": datetime.datetime.fromtimestamp(block_data["time"]).strftime("%Y-%m-%d %H:%M:%S UTC")
"time": datetime.datetime.fromtimestamp(
block_data["time"]
).strftime("%Y-%m-%d %H:%M:%S UTC")
}
print(f"Error fetching Bitcoin block: HTTP {response.status_code}")
return None
@ -147,9 +153,12 @@ def create_warrant_canary_message(config):
if not all([nist_time, rss_data, bitcoin_block]):
missing = []
if not nist_time: missing.append("NIST time")
if not rss_data: missing.append(f"{config['canary']['rss'].get('name', 'RSS')} headline")
if not bitcoin_block: missing.append("Bitcoin block data")
if not nist_time:
missing.append("NIST time")
if not rss_data:
missing.append(f"{config['canary']['rss'].get('name', 'RSS')} headline")
if not bitcoin_block:
missing.append("Bitcoin block data")
print(f"Error: Could not fetch: {', '.join(missing)}")
return None
@ -166,7 +175,10 @@ def create_warrant_canary_message(config):
admin_title = config['canary'].get('admin_title', 'administrator')
message = f"{org} Warrant Canary · {nist_time}\n"
message += f"I, {admin_name}, the {admin_title} of {org}, state this {datetime.datetime.now().strftime('%dth day of %B, %Y')}:\n"
message += (
f"I, {admin_name}, the {admin_title} of {org}, "
f"state this {datetime.datetime.now().strftime('%dth day of %B, %Y')}:\n"
)
for i, attestation in enumerate(attestations, 1):
message += f" {i}. {org} {attestation}\n"
@ -192,11 +204,8 @@ def sign_with_gpg(message, gpg_key_id):
signed_message = f.read()
os.remove(TEMP_MESSAGE_FILE)
os.remove(f"{TEMP_MESSAGE_FILE}.asc")
lines = signed_message.splitlines()
signature_idx = next(i for i, line in enumerate(lines) if line == "-----BEGIN PGP SIGNATURE-----")
if lines[signature_idx + 1] == "":
lines.pop(signature_idx + 1)
return "\n".join(lines)
# Return the GPG output exactly as is (no line removals)
return signed_message
except subprocess.CalledProcessError as e:
print(f"GPG signing error: {e}")
return None
@ -234,8 +243,9 @@ async def post_to_matrix(config, signed_message):
"body": full_message,
"format": "org.matrix.custom.html",
"formatted_body": (
f"This is the {config['canary']['organization']} Warrant Canary, signed with GPG for authenticity. "
"Copy the code block below to verify with <code>gpg --verify</code>:<br><br>"
f"This is the {config['canary']['organization']} Warrant Canary, "
"signed with GPG for authenticity. Copy the code block below to verify "
"with <code>gpg --verify</code>:<br><br>"
f"<pre>{signed_message}</pre>"
)
}