SwiftBar/sijapi.15s.py
2024-11-14 01:01:24 +01:00

245 lines
8.8 KiB
Python
Executable file

#!/usr/bin/env python3
# <swiftbar.title>sijapi</swiftbar.title>
# <swiftbar.author>sij.law</swiftbar.author>
# <swiftbar.author.github>sij.ai</swiftbar.author.github>
# <swiftbar.desc>Monitors Tailscale, VPN, and sijapi server and Postgres status across multiple servers.</swiftbar.desc>
# <swiftbar.hideRunInTerminal>true</swiftbar.hideRunInTerminal>
# <swiftbar.hideDisablePlugin>false</swiftbar.hideDisablePlugin>
# <swiftbar.hideSwiftBar>false</swiftbar.hideSwiftBar>
# <swiftbar.refreshEveryNSeconds>15</swiftbar.refreshEveryNSeconds>
# STEP 1: Update this path to point to your sijapi directory:
sijapi_dir = '/Users/sij/workshop/sijapi'
# STEP 2: Ensure ./sijapi/config/sys.yaml exists in your sijapi directory and contains your correct configuration. You can work off the template provided at sys.yaml-example.
config_yaml = f'{sijapi_dir}/sijapi/config/sys.yaml'
# STEP 3: Install 'vitals' (https://sij.ai/sij/pathScripts/src/branch/main/vitals) in your server(s) PATH(s).
import subprocess
import json
import yaml
import psycopg2
import paramiko
import shlex
import tempfile
import os
import glob
from urllib3.exceptions import InsecureRequestWarning
import requests
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# Clean up old temporary scripts
temp_dir = tempfile.gettempdir()
for old_script in glob.glob(os.path.join(temp_dir, 'swiftbar_vpn_*.sh')):
try:
os.remove(old_script)
except OSError:
pass # Ignore errors in cleanup
with open(config_yaml, 'r') as stream:
config = yaml.safe_load(stream)
servers = config['POOL']
api_url = config['URL']
api_key = config['KEYS'][0]
def run_ssh_command(server, command):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# Determine authentication method
if 'ssh_key' in server:
# Use SSH key authentication
ssh.connect(
server['ts_ip'],
port=server['ssh_port'],
username=server['ssh_user'],
key_filename=server['ssh_key']
)
elif 'ssh_pass' in server:
# Use password authentication
ssh.connect(
server['ts_ip'],
port=server['ssh_port'],
username=server['ssh_user'],
password=server['ssh_pass']
)
else:
raise ValueError("Neither ssh_key nor ssh_pass provided in server configuration")
stdin, stdout, stderr = ssh.exec_command(command)
result = stdout.read().decode().strip()
error = stderr.read().decode().strip()
if error and "discover_other_daemon: 1" not in error:
print(f"SSH Error on {server['ts_id']} running {command}: {error}")
return result
except Exception as e:
print(f"SSH Connection Error for {server['ts_id']}: {str(e)}")
return None
finally:
ssh.close()
def check_health(ip, port):
try:
response = requests.get(f"http://{ip}:{port}/health", timeout=5, verify=False)
return response.status_code == 200
except:
return False
def check_postgres(ip, port, dbname, user, password):
try:
conn = psycopg2.connect(
host=ip, port=port, dbname=dbname,
user=user, password=password,
connect_timeout=5
)
conn.close()
return True
except:
return False
def get_sij_id():
try:
response = requests.get(f"{api_url}/id?api_key={api_key}", timeout=4)
response.raise_for_status()
return response.text.strip().strip('"')
except requests.exceptions.RequestException as e:
return "API Error"
except Exception as e:
return "Unexpected Error"
def flag_emoji(country_code):
if country_code:
offset = 127397
flag = ''.join([chr(ord(char) + offset) for char in country_code.upper()])
return flag
return flag_emoji('us')
def gather_remote_info(server):
vitals_output = run_ssh_command(server, f"bash -l -c '{server['vitals']}'")
if vitals_output:
try:
json_start = vitals_output.find('{')
if json_start != -1:
json_output = vitals_output[json_start:]
host_info = json.loads(json_output)
else:
print(f"No JSON found in output from {server['ts_id']}. Output: {vitals_output}")
return None
except json.JSONDecodeError:
print(f"Error decoding JSON from {server['ts_id']}. Output: {vitals_output}")
return None
vpn_status = host_info.get('mullvad_exitnode', False)
dns_status = host_info.get('nextdns_connected', False) or host_info.get('adguard_connected', False)
country_code = host_info.get('mullvad_hostname', '').split('-')[0] if vpn_status else 'us'
flag = flag_emoji(country_code)
mullvad_hostname = host_info.get('mullvad_hostname', '').split('.')[0] if vpn_status else ''
info = f"{server['ts_id']}\n"
wan_ip = host_info.get('wan_ip', 'N/A')
# Create a shell script that will be executed
shell_script = f"""#!/bin/bash
ssh {server['ssh_user']}@{server['ts_ip']} "bash -l -c '{server['vpn']} shh'"
"""
# Write the shell script to a temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.sh', prefix='swiftbar_vpn_') as temp_file:
temp_file.write(shell_script)
temp_script_path = temp_file.name
# Make the script executable
os.chmod(temp_script_path, 0o755)
info += f"{flag} {wan_ip} | bash={temp_script_path} terminal=false refresh=true"
if mullvad_hostname:
info += f" ({mullvad_hostname})\n"
else:
info += "\n"
info += f"{host_info.get('nextdns_protocol', 'NextDNS') if host_info.get('nextdns_connected') else 'AdGuard Home' if host_info.get('adguard_connected') else 'Standard DNS'}\n"
info += f"{host_info.get('uptime', 'N/A')}\n"
return {
'info': info,
'vpn': vpn_status,
'dns': dns_status,
'ts_ip': server['ts_ip'],
'ssh_user': server['ssh_user'],
'vpn_path': server['vpn'],
'health_ok': check_health(server['ts_ip'], server['app_port']),
'postgres_ok': check_postgres(server['ts_ip'], server['db_port'],
server['db_name'], server['db_user'], server['db_pass']),
'server_id': server['ts_id'],
'temp_script': temp_script_path
}
else:
return None
def index_to_braille(v1a, v1b, v2a, v2b, v3a, v3b):
return (v1a * 1 + v1b * 8 + v2a * 2 + v2b * 16 + v3a * 4 + v3b * 32)
status = 0
vpn_dns_status = 0
server_infos = []
# Check each server
for i, server in enumerate(servers[:3]): # Limit to first 3 servers
health_ok = check_health(server['ts_ip'], server['app_port'])
postgres_ok = check_postgres(server['ts_ip'], server['db_port'],
server['db_name'], server['db_user'], server['db_pass'])
if i == 0: # First server
if health_ok: status |= 1
if postgres_ok: status |= 8
elif i == 1: # Second server
if health_ok: status |= 2
if postgres_ok: status |= 16
elif i == 2: # Third server
if health_ok: status |= 4
if postgres_ok: status |= 32
server_info = gather_remote_info(server)
if server_info:
server_infos.append(server_info)
if i == 0: # First server
if server_info['vpn']: vpn_dns_status |= 1
if server_info['dns']: vpn_dns_status |= 8
elif i == 1: # Second server
if server_info['vpn']: vpn_dns_status |= 2
if server_info['dns']: vpn_dns_status |= 16
elif i == 2: # Third server
if server_info['vpn']: vpn_dns_status |= 4
if server_info['dns']: vpn_dns_status |= 32
# Convert the statuses to Braille Unicode symbols
sijapi_symbol = chr(0x2800 + status)
vpn_dns_symbol = chr(0x2800 + vpn_dns_status)
sij_id = get_sij_id()
# Display menu bar indicators:
print(sijapi_symbol, vpn_dns_symbol)
# Divide menu bar from drop-down menu bar:
print('---')
# Display drop-down menu indicators:
for server_info in server_infos:
if server_info:
check_mark = '' if server_info['server_id'] == sij_id else ''
print(f"{check_mark}{server_info['info'].rstrip()}")
health_symbol = '' if server_info['health_ok'] else ''
postgres_symbol = '' if server_info['postgres_ok'] else ''
print(f"{health_symbol} Health {postgres_symbol} Postgres")
vpn_path = server_info['vpn_path']
print()
print('---')
print()
print()
print('---')
print()