Update vpn
vpn: added `to` mode for random exit node in specified country, connection logging, and a `status` mode
This commit is contained in:
parent
a9a7f61612
commit
0425b4f1d6
1 changed files with 399 additions and 73 deletions
472
vpn
472
vpn
|
@ -5,113 +5,410 @@ import requests
|
|||
import argparse
|
||||
import json
|
||||
import random
|
||||
import datetime
|
||||
import os
|
||||
|
||||
PRIVACY_FRIENDLY_COUNTRIES = ['Sweden', 'Switzerland', 'Germany', 'Finland', 'Netherlands', 'Norway']
|
||||
LOG_FILE = 'log.txt'
|
||||
|
||||
PRIVACY_FRIENDLY_COUNTRIES = [
|
||||
'Sweden', 'Switzerland', 'Germany',
|
||||
'Finland', 'Netherlands', 'Norway'
|
||||
]
|
||||
|
||||
TAILSCALE_ARGS = [
|
||||
'--exit-node-allow-lan-access',
|
||||
# '--stateful-filtering=false',
|
||||
'--accept-dns',
|
||||
# '--accept-routes',
|
||||
'--auto-update'
|
||||
]
|
||||
|
||||
def get_mullvad_info():
|
||||
"""Fetch JSON info from Mullvad's 'am.i.mullvad.net/json' endpoint."""
|
||||
response = requests.get('https://am.i.mullvad.net/json')
|
||||
if response.status_code != 200:
|
||||
raise Exception("Could not fetch Mullvad info.")
|
||||
return response.json()
|
||||
|
||||
def get_current_exit_node():
|
||||
result = subprocess.run(['tailscale', 'status', '--json'], capture_output=True, text=True)
|
||||
"""
|
||||
Return the DNSName (e.g. 'de-ber-wg-001.mullvad.ts.net.') of whichever
|
||||
peer is currently acting as the exit node. Otherwise returns None.
|
||||
"""
|
||||
result = subprocess.run(['tailscale', 'status', '--json'],
|
||||
capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception("Failed to get Tailscale status")
|
||||
|
||||
status = json.loads(result.stdout)
|
||||
current_exit_node = status.get('Peer', {}).get('Tailnet', {}).get('ExitNode', {}).get('Name')
|
||||
return current_exit_node
|
||||
|
||||
def set_exit_node():
|
||||
# Get the suggested exit node
|
||||
result = subprocess.run(['tailscale', 'exit-node', 'suggest'], capture_output=True, text=True)
|
||||
exit_node = ''
|
||||
for line in result.stdout.splitlines():
|
||||
if 'Suggested exit node' in line:
|
||||
exit_node = line.split(': ')[1].strip()
|
||||
break
|
||||
# 'Peer' is a dict with keys like "nodekey:fe8efdbab7c2..."
|
||||
peers = status.get('Peer', {})
|
||||
for peer_key, peer_data in peers.items():
|
||||
# If the node is currently the exit node, it should have "ExitNode": true
|
||||
if peer_data.get('ExitNode') is True:
|
||||
# Tailscale might return 'de-ber-wg-001.mullvad.ts.net.' with a trailing dot
|
||||
dns_name = peer_data.get('DNSName', '')
|
||||
dns_name = dns_name.rstrip('.') # remove trailing dot
|
||||
return dns_name
|
||||
|
||||
# If we don't find any peer with ExitNode = true, there's no exit node
|
||||
return None
|
||||
|
||||
print(f"Suggested exit node: {exit_node}")
|
||||
|
||||
# Set the exit node with additional arguments
|
||||
cmd = ['tailscale', 'set', f'--exit-node={exit_node}'] + TAILSCALE_ARGS
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
# Verify the exit node
|
||||
response = requests.get('https://am.i.mullvad.net/json')
|
||||
exit_node_info = response.json()
|
||||
exit_node_hostname = exit_node_info.get('mullvad_exit_ip_hostname')
|
||||
|
||||
print(f"Current exit node hostname: {exit_node_hostname}")
|
||||
|
||||
# Get the part before the first '.' in the exit_node
|
||||
exit_node_short = exit_node.split('.')[0]
|
||||
|
||||
# Verify that the exit_node_short and exit_node_hostname are equal
|
||||
if exit_node_short == exit_node_hostname:
|
||||
print("Exit node set successfully!")
|
||||
else:
|
||||
print("Failed to set exit node!")
|
||||
|
||||
def unset_exit_node():
|
||||
# Unset the exit node
|
||||
cmd = ['tailscale', 'set', '--exit-node='] + TAILSCALE_ARGS
|
||||
subprocess.run(cmd, check=True)
|
||||
print("Exit node unset successfully!")
|
||||
|
||||
def start_exit_node():
|
||||
current_exit_node = get_current_exit_node()
|
||||
if current_exit_node:
|
||||
print(f"Already connected to exit node: {current_exit_node}")
|
||||
else:
|
||||
set_exit_node()
|
||||
|
||||
def get_random_privacy_friendly_exit_node():
|
||||
def list_exit_nodes():
|
||||
"""
|
||||
Return a dict {node_name: country} of all available Tailscale exit nodes
|
||||
based on 'tailscale exit-node list'.
|
||||
The output lines typically look like:
|
||||
<Star> <Name> <Country> <OS> ...
|
||||
Example line:
|
||||
* de-dus-wg-001.mullvad.ts.net Germany linux ...
|
||||
"""
|
||||
result = subprocess.run(['tailscale', 'exit-node', 'list'], capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception("Failed to list Tailscale exit nodes")
|
||||
|
||||
exit_nodes = []
|
||||
exit_nodes = {}
|
||||
for line in result.stdout.splitlines():
|
||||
parts = line.split()
|
||||
if len(parts) > 3 and parts[2] in PRIVACY_FRIENDLY_COUNTRIES:
|
||||
exit_nodes.append(parts[1])
|
||||
# Basic sanity check for lines that actually contain node info
|
||||
if len(parts) > 3:
|
||||
# parts[0] might be "*" if it's the current node
|
||||
# parts[1] is typically the FQDN (like "de-dus-wg-001.mullvad.ts.net")
|
||||
# parts[2] is the Country
|
||||
node_name = parts[1].strip()
|
||||
node_country = parts[2].strip()
|
||||
exit_nodes[node_name] = node_country
|
||||
|
||||
if not exit_nodes:
|
||||
raise Exception("No privacy-friendly exit nodes available")
|
||||
return exit_nodes
|
||||
|
||||
return random.choice(exit_nodes)
|
||||
def write_log(
|
||||
old_node=None, new_node=None,
|
||||
old_ip=None, new_ip=None,
|
||||
old_country=None, new_country=None
|
||||
):
|
||||
"""
|
||||
Appends a line to the log file reflecting a connection change.
|
||||
Example:
|
||||
2025.01.17 01:11:33 UTC · disconnected from de-dus-wg-001.mullvad.ts.net (Germany)
|
||||
· connected to at-vie-wg-001.mullvad.ts.net (Austria)
|
||||
· changed IP from 65.21.99.202 to 185.213.155.74
|
||||
If no old_node is specified, it indicates a fresh start (no disconnection).
|
||||
If no new_node is specified, it indicates a stop (only disconnection).
|
||||
"""
|
||||
|
||||
def set_random_privacy_friendly_exit_node():
|
||||
exit_node = get_random_privacy_friendly_exit_node()
|
||||
print(f"Selected random privacy-friendly exit node: {exit_node}")
|
||||
utc_time = datetime.datetime.utcnow().strftime('%Y.%m.%d %H:%M:%S UTC')
|
||||
log_parts = [utc_time]
|
||||
|
||||
# If old_node was present, mention disconnect
|
||||
if old_node and old_country:
|
||||
log_parts.append(f"disconnected from {old_node} ({old_country})")
|
||||
|
||||
# If new_node is present, mention connect
|
||||
if new_node and new_country:
|
||||
log_parts.append(f"connected to {new_node} ({new_country})")
|
||||
|
||||
# If IPs changed
|
||||
if old_ip and new_ip and old_ip != new_ip:
|
||||
log_parts.append(f"changed IP from {old_ip} to {new_ip}")
|
||||
|
||||
line = " · ".join(log_parts)
|
||||
|
||||
# Append to file
|
||||
with open(LOG_FILE, 'a') as f:
|
||||
f.write(line + "\n")
|
||||
|
||||
def get_connection_history():
|
||||
"""
|
||||
Returns an in-memory list of parsed log lines.
|
||||
Each item looks like:
|
||||
{
|
||||
'timestamp': datetime_object,
|
||||
'disconnected_node': '...',
|
||||
'disconnected_country': '...',
|
||||
'connected_node': '...',
|
||||
'connected_country': '...',
|
||||
'old_ip': '...',
|
||||
'new_ip': '...'
|
||||
}
|
||||
"""
|
||||
entries = []
|
||||
if not os.path.isfile(LOG_FILE):
|
||||
return entries
|
||||
|
||||
with open(LOG_FILE, 'r') as f:
|
||||
lines = f.readlines()
|
||||
|
||||
for line in lines:
|
||||
# Example line:
|
||||
# 2025.01.17 01:11:33 UTC · disconnected from de-dus-wg-001.mullvad.ts.net (Germany) · connected to ...
|
||||
# We'll parse step by step, mindful that each line can have different combos.
|
||||
parts = line.strip().split(" · ")
|
||||
if not parts:
|
||||
continue
|
||||
|
||||
# parts[0] => '2025.01.17 01:11:33 UTC'
|
||||
timestamp_str = parts[0]
|
||||
connected_node = None
|
||||
connected_country = None
|
||||
disconnected_node = None
|
||||
disconnected_country = None
|
||||
old_ip = None
|
||||
new_ip = None
|
||||
|
||||
# We parse the timestamp. We have '%Y.%m.%d %H:%M:%S UTC'
|
||||
try:
|
||||
dt = datetime.datetime.strptime(timestamp_str, '%Y.%m.%d %H:%M:%S UTC')
|
||||
except ValueError:
|
||||
continue # If it doesn't parse, skip.
|
||||
|
||||
for p in parts[1:]:
|
||||
p = p.strip()
|
||||
if p.startswith("disconnected from"):
|
||||
# e.g. "disconnected from de-dus-wg-001.mullvad.ts.net (Germany)"
|
||||
# We can split on "("
|
||||
disc_info = p.replace("disconnected from ", "")
|
||||
if "(" in disc_info and disc_info.endswith(")"):
|
||||
node = disc_info.split(" (")[0]
|
||||
country = disc_info.split(" (")[1].replace(")", "")
|
||||
disconnected_node = node
|
||||
disconnected_country = country
|
||||
elif p.startswith("connected to"):
|
||||
# e.g. "connected to at-vie-wg-001.mullvad.ts.net (Austria)"
|
||||
conn_info = p.replace("connected to ", "")
|
||||
if "(" in conn_info and conn_info.endswith(")"):
|
||||
node = conn_info.split(" (")[0]
|
||||
country = conn_info.split(" (")[1].replace(")", "")
|
||||
connected_node = node
|
||||
connected_country = country
|
||||
elif p.startswith("changed IP from"):
|
||||
# e.g. "changed IP from 65.21.99.202 to 185.213.155.74"
|
||||
# We'll split on spaces
|
||||
# changed IP from 65.21.99.202 to 185.213.155.74
|
||||
# index: 0 1 2 3 4
|
||||
ip_parts = p.split()
|
||||
if len(ip_parts) >= 5:
|
||||
old_ip = ip_parts[3]
|
||||
new_ip = ip_parts[5]
|
||||
|
||||
entries.append({
|
||||
'timestamp': dt,
|
||||
'disconnected_node': disconnected_node,
|
||||
'disconnected_country': disconnected_country,
|
||||
'connected_node': connected_node,
|
||||
'connected_country': connected_country,
|
||||
'old_ip': old_ip,
|
||||
'new_ip': new_ip
|
||||
})
|
||||
|
||||
return entries
|
||||
|
||||
def get_last_connection_entry():
|
||||
"""
|
||||
Parse the log and return the last entry that actually
|
||||
has a 'connected_node', which indicates a stable connection.
|
||||
"""
|
||||
history = get_connection_history()
|
||||
# Go in reverse chronological order
|
||||
for entry in reversed(history):
|
||||
if entry['connected_node']:
|
||||
return entry
|
||||
return None
|
||||
|
||||
def set_exit_node(exit_node):
|
||||
"""
|
||||
Generic helper to set Tailscale exit node to 'exit_node'.
|
||||
Returns (old_ip, new_ip, old_node, new_node, old_country, new_country)
|
||||
"""
|
||||
# Get old info for logging
|
||||
old_info = get_mullvad_info()
|
||||
old_ip = old_info.get('ip')
|
||||
old_country = old_info.get('country')
|
||||
old_node = get_current_exit_node() # might be None
|
||||
|
||||
# Set the exit node with additional arguments
|
||||
cmd = ['tailscale', 'set', f'--exit-node={exit_node}'] + TAILSCALE_ARGS
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
# Verify the exit node
|
||||
response = requests.get('https://am.i.mullvad.net/json')
|
||||
exit_node_info = response.json()
|
||||
exit_node_hostname = exit_node_info.get('mullvad_exit_ip_hostname')
|
||||
# Verify the new node
|
||||
new_info = get_mullvad_info()
|
||||
new_ip = new_info.get('ip')
|
||||
new_country = new_info.get('country')
|
||||
new_node = exit_node
|
||||
|
||||
print(f"Current exit node hostname: {exit_node_hostname}")
|
||||
return old_ip, new_ip, old_node, new_node, old_country, new_country
|
||||
|
||||
# Get the part before the first '.' in the exit_node
|
||||
exit_node_short = exit_node.split('.')[0]
|
||||
def unset_exit_node():
|
||||
"""
|
||||
Unset Tailscale exit node.
|
||||
"""
|
||||
# For logging, we still want old IP + new IP. The 'new' IP after unsetting might revert to local.
|
||||
old_info = get_mullvad_info()
|
||||
old_ip = old_info.get('ip')
|
||||
old_country = old_info.get('country')
|
||||
old_node = get_current_exit_node()
|
||||
|
||||
# Verify that the exit_node_short and exit_node_hostname are equal
|
||||
if exit_node_short == exit_node_hostname:
|
||||
print("Exit node set successfully!")
|
||||
cmd = ['tailscale', 'set', '--exit-node='] + TAILSCALE_ARGS
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
# Now see if the IP changed
|
||||
new_info = get_mullvad_info()
|
||||
new_ip = new_info.get('ip')
|
||||
new_country = new_info.get('country')
|
||||
new_node = None
|
||||
|
||||
write_log(old_node, new_node, old_ip, new_ip, old_country, new_country)
|
||||
print("Exit node unset successfully!")
|
||||
|
||||
def start_exit_node():
|
||||
"""
|
||||
Start the exit node if none is currently set.
|
||||
Otherwise, report what is already set.
|
||||
"""
|
||||
current_exit_node = get_current_exit_node()
|
||||
if current_exit_node:
|
||||
print(f"Already connected to exit node: {current_exit_node}")
|
||||
else:
|
||||
print("Failed to set exit node!")
|
||||
# Use the default "tailscale exit-node suggest" approach
|
||||
result = subprocess.run(['tailscale', 'exit-node', 'suggest'], capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception("Failed to run 'tailscale exit-node suggest'")
|
||||
|
||||
suggested = ''
|
||||
for line in result.stdout.splitlines():
|
||||
if 'Suggested exit node' in line:
|
||||
suggested = line.split(': ')[1].strip()
|
||||
break
|
||||
|
||||
if not suggested:
|
||||
raise Exception("No suggested exit node found.")
|
||||
|
||||
(old_ip, new_ip,
|
||||
old_node, new_node,
|
||||
old_country, new_country) = set_exit_node(suggested)
|
||||
|
||||
# Log it
|
||||
write_log(old_node, new_node, old_ip, new_ip, old_country, new_country)
|
||||
print(f"Exit node set successfully to {new_node}")
|
||||
|
||||
def set_random_privacy_friendly_exit_node():
|
||||
"""
|
||||
Pick a random node from PRIVACY_FRIENDLY_COUNTRIES and set it.
|
||||
"""
|
||||
# Filter exit nodes by known privacy-friendly countries
|
||||
nodes = list_exit_nodes()
|
||||
# nodes is dict {node_name: country}
|
||||
pf_nodes = [n for n, c in nodes.items() if c in PRIVACY_FRIENDLY_COUNTRIES]
|
||||
|
||||
if not pf_nodes:
|
||||
raise Exception("No privacy-friendly exit nodes available")
|
||||
|
||||
exit_node = random.choice(pf_nodes)
|
||||
(old_ip, new_ip,
|
||||
old_node, new_node,
|
||||
old_country, new_country) = set_exit_node(exit_node)
|
||||
|
||||
# Log
|
||||
write_log(old_node, new_node, old_ip, new_ip, old_country, new_country)
|
||||
print(f"Selected random privacy-friendly exit node: {exit_node}")
|
||||
print("Exit node set successfully!")
|
||||
|
||||
def set_random_exit_node_in_country(country_input):
|
||||
"""
|
||||
Pick a random node in the given (case-insensitive) country_input.
|
||||
Then set the exit node to that node.
|
||||
"""
|
||||
country_input_normalized = country_input.strip().lower()
|
||||
|
||||
all_nodes = list_exit_nodes()
|
||||
# Filter nodes in the user-requested country
|
||||
country_nodes = [
|
||||
node_name for node_name, node_country in all_nodes.items()
|
||||
if node_country.lower() == country_input_normalized
|
||||
]
|
||||
|
||||
if not country_nodes:
|
||||
raise Exception(f"No exit nodes found in {country_input}.")
|
||||
|
||||
exit_node = random.choice(country_nodes)
|
||||
|
||||
(old_ip, new_ip,
|
||||
old_node, new_node,
|
||||
old_country, new_country) = set_exit_node(exit_node)
|
||||
|
||||
# Log
|
||||
write_log(old_node, new_node, old_ip, new_ip, old_country, new_country)
|
||||
print(f"Selected random exit node in {country_input.title()}: {exit_node}")
|
||||
print("Exit node set successfully!")
|
||||
|
||||
def get_status():
|
||||
"""
|
||||
Print current connection status:
|
||||
- Whether connected or not
|
||||
- Current exit node and IP
|
||||
- Country of that exit node
|
||||
- How long it has been connected to that exit node (based on the last log entry)
|
||||
"""
|
||||
current_node = get_current_exit_node()
|
||||
if not current_node:
|
||||
print("No exit node is currently set.")
|
||||
return
|
||||
|
||||
# Current IP & country
|
||||
info = get_mullvad_info()
|
||||
current_ip = info.get('ip')
|
||||
current_country = info.get('country')
|
||||
|
||||
# Find the last time we connected to this node in the log
|
||||
history = get_connection_history()
|
||||
# We look from the end backwards for an entry that connected to the current_node
|
||||
connected_since = None
|
||||
for entry in reversed(history):
|
||||
if entry['connected_node'] == current_node:
|
||||
connected_since = entry['timestamp']
|
||||
break
|
||||
|
||||
# We'll compute a "connected for X minutes/hours/days" style message
|
||||
if connected_since:
|
||||
now_utc = datetime.datetime.utcnow()
|
||||
delta = now_utc - connected_since
|
||||
# For user-friendliness, just show something like 1h 12m, or 2d 3h
|
||||
# We'll do a simple approach:
|
||||
total_seconds = int(delta.total_seconds())
|
||||
days = total_seconds // 86400
|
||||
hours = (total_seconds % 86400) // 3600
|
||||
minutes = (total_seconds % 3600) // 60
|
||||
|
||||
duration_parts = []
|
||||
if days > 0:
|
||||
duration_parts.append(f"{days}d")
|
||||
if hours > 0:
|
||||
duration_parts.append(f"{hours}h")
|
||||
if minutes > 0:
|
||||
duration_parts.append(f"{minutes}m")
|
||||
if not duration_parts:
|
||||
duration_parts.append("0m") # means less than 1 minute
|
||||
|
||||
duration_str = " ".join(duration_parts)
|
||||
print(f"Currently connected to: {current_node} ({current_country})")
|
||||
print(f"IP: {current_ip}")
|
||||
print(f"Connected for: {duration_str}")
|
||||
else:
|
||||
# If we never found it in the log, it's presumably a brand new connection
|
||||
print(f"Currently connected to: {current_node} ({current_country})")
|
||||
print(f"IP: {current_ip}")
|
||||
print("Connected for: <unknown>, no log entry found.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='Manage VPN exit nodes.')
|
||||
parser.add_argument('action', choices=['start', 'stop', 'new', 'shh'], help='Action to perform: start, stop, new, or shh')
|
||||
parser.add_argument(
|
||||
'action',
|
||||
choices=['start', 'stop', 'new', 'shh', 'to', 'status'],
|
||||
help='Action to perform: start, stop, new, shh, to <country>, or status'
|
||||
)
|
||||
parser.add_argument(
|
||||
'country',
|
||||
nargs='?',
|
||||
default=None,
|
||||
help='Country name (used only with "to" mode).'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
@ -120,7 +417,36 @@ if __name__ == "__main__":
|
|||
elif args.action == 'stop':
|
||||
unset_exit_node()
|
||||
elif args.action == 'new':
|
||||
set_exit_node()
|
||||
# This calls set_exit_node() using the Tailscale "suggest" approach
|
||||
# from the original script
|
||||
result = subprocess.run(['tailscale', 'exit-node', 'suggest'], capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception("Failed to run 'tailscale exit-node suggest'")
|
||||
|
||||
exit_node = ''
|
||||
for line in result.stdout.splitlines():
|
||||
if 'Suggested exit node' in line:
|
||||
exit_node = line.split(': ')[1].strip()
|
||||
break
|
||||
|
||||
if not exit_node:
|
||||
raise Exception("No suggested exit node found.")
|
||||
|
||||
(old_ip, new_ip,
|
||||
old_node, new_node,
|
||||
old_country, new_country) = set_exit_node(exit_node)
|
||||
write_log(old_node, new_node, old_ip, new_ip, old_country, new_country)
|
||||
print(f"Exit node set to suggested node: {new_node}")
|
||||
|
||||
elif args.action == 'shh':
|
||||
# Random privacy-friendly
|
||||
set_random_privacy_friendly_exit_node()
|
||||
|
||||
elif args.action == 'to':
|
||||
# "vpn to sweden" => pick a random node in Sweden
|
||||
if not args.country:
|
||||
raise Exception("You must specify a country. e.g. vpn to sweden")
|
||||
set_random_exit_node_in_country(args.country)
|
||||
|
||||
elif args.action == 'status':
|
||||
get_status()
|
Loading…
Add table
Reference in a new issue