Auto-update: Wed Jul 31 13:41:15 PDT 2024

This commit is contained in:
sanj 2024-07-31 13:41:15 -07:00
parent 10eb581ad4
commit 011673893d
6 changed files with 276 additions and 128 deletions

View file

@ -28,6 +28,7 @@ MAX_CPU_CORES = min(int(os.getenv("MAX_CPU_CORES", int(multiprocessing.cpu_count
IMG = Configuration.load('img', 'secrets') IMG = Configuration.load('img', 'secrets')
News = Configuration.load('news', 'secrets') News = Configuration.load('news', 'secrets')
Scrape = Configuration.load('scrape', 'secrets', Dir) Scrape = Configuration.load('scrape', 'secrets', Dir)
Serve = Configuration.load('serve')
# Directories & general paths # Directories & general paths
ROUTER_DIR = BASE_DIR / "routers" ROUTER_DIR = BASE_DIR / "routers"

View file

@ -56,11 +56,11 @@ class Configuration(BaseModel):
with yaml_path.open('r') as file: with yaml_path.open('r') as file:
config_data = yaml.safe_load(file) config_data = yaml.safe_load(file)
info(f"Loaded configuration data from {yaml_path}") debug(f"Loaded configuration data from {yaml_path}")
if secrets_path: if secrets_path:
with secrets_path.open('r') as file: with secrets_path.open('r') as file:
secrets_data = yaml.safe_load(file) secrets_data = yaml.safe_load(file)
info(f"Loaded secrets data from {secrets_path}") debug(f"Loaded secrets data from {secrets_path}")
if isinstance(config_data, list): if isinstance(config_data, list):
for item in config_data: for item in config_data:
if isinstance(item, dict): if isinstance(item, dict):
@ -184,7 +184,14 @@ class APIConfig(BaseModel):
SPECIAL_TABLES: ClassVar[List[str]] = ['spatial_ref_sys'] SPECIAL_TABLES: ClassVar[List[str]] = ['spatial_ref_sys']
_db_pools: Dict[str, Any] = PrivateAttr(default_factory=dict) db_pools: Dict[str, Any] = Field(default_factory=dict)
def __init__(self, **data):
super().__init__(**data)
self._db_pools = {}
class Config:
arbitrary_types_allowed = True
@classmethod @classmethod
def load(cls, config_path: Union[str, Path], secrets_path: Union[str, Path]): def load(cls, config_path: Union[str, Path], secrets_path: Union[str, Path]):
@ -307,17 +314,17 @@ class APIConfig(BaseModel):
pool_key = f"{pool_entry['ts_ip']}:{pool_entry['db_port']}" pool_key = f"{pool_entry['ts_ip']}:{pool_entry['db_port']}"
if pool_key not in self._db_pools: if pool_key not in self.db_pools:
try: try:
self._db_pools[pool_key] = await asyncpg.create_pool( self.db_pools[pool_key] = await asyncpg.create_pool(
host=pool_entry['ts_ip'], host=pool_entry['ts_ip'],
port=pool_entry['db_port'], port=pool_entry['db_port'],
user=pool_entry['db_user'], user=pool_entry['db_user'],
password=pool_entry['db_pass'], password=pool_entry['db_pass'],
database=pool_entry['db_name'], database=pool_entry['db_name'],
min_size=1, min_size=1,
max_size=10, # adjust as needed max_size=10,
timeout=5 # connection timeout in seconds timeout=5
) )
except Exception as e: except Exception as e:
err(f"Failed to create connection pool for {pool_key}: {str(e)}") err(f"Failed to create connection pool for {pool_key}: {str(e)}")
@ -325,27 +332,70 @@ class APIConfig(BaseModel):
return return
try: try:
async with self._db_pools[pool_key].acquire() as conn: async with self.db_pools[pool_key].acquire() as conn:
yield conn yield conn
except asyncpg.exceptions.ConnectionDoesNotExistError:
err(f"Failed to acquire connection from pool for {pool_key}: Connection does not exist")
yield None
except asyncpg.exceptions.ConnectionFailureError:
err(f"Failed to acquire connection from pool for {pool_key}: Connection failure")
yield None
except Exception as e: except Exception as e:
err(f"Unexpected error when acquiring connection from pool for {pool_key}: {str(e)}") err(f"Failed to acquire connection from pool for {pool_key}: {str(e)}")
yield None yield None
async def push_changes_to_one(self, pool_entry):
try:
async with self.get_connection() as local_conn:
if local_conn is None:
err(f"Failed to connect to local database. Skipping push to {pool_entry['ts_id']}")
return
async with self.get_connection(pool_entry) as remote_conn:
if remote_conn is None:
err(f"Failed to connect to remote database {pool_entry['ts_id']}. Skipping push.")
return
tables = await local_conn.fetch("""
SELECT tablename FROM pg_tables
WHERE schemaname = 'public'
""")
for table in tables:
table_name = table['tablename']
try:
if table_name in self.SPECIAL_TABLES:
await self.sync_special_table(local_conn, remote_conn, table_name)
else:
primary_key = await self.ensure_sync_columns(remote_conn, table_name)
last_synced_version = await self.get_last_synced_version(remote_conn, table_name, os.environ.get('TS_ID'))
changes = await local_conn.fetch(f"""
SELECT * FROM "{table_name}"
WHERE version > $1 AND server_id = $2
ORDER BY version ASC
""", last_synced_version, os.environ.get('TS_ID'))
if changes:
changes_count = await self.apply_batch_changes(remote_conn, table_name, changes, primary_key)
if changes_count > 0:
debug(f"Pushed {changes_count} changes for table {table_name} to {pool_entry['ts_id']}")
except Exception as e:
err(f"Error pushing changes for table {table_name} to {pool_entry['ts_id']}: {str(e)}")
err(f"Traceback: {traceback.format_exc()}")
info(f"Successfully pushed changes to {pool_entry['ts_id']}")
except Exception as e:
err(f"Error pushing changes to {pool_entry['ts_id']}: {str(e)}")
err(f"Traceback: {traceback.format_exc()}")
async def close_db_pools(self): async def close_db_pools(self):
info("Closing database connection pools...") info("Closing database connection pools...")
for pool_key, pool in self._db_pools.items(): for pool_key, pool in self.db_pools.items():
try: try:
await pool.close() await pool.close()
info(f"Closed pool for {pool_key}") debug(f"Closed pool for {pool_key}")
except Exception as e: except Exception as e:
err(f"Error closing pool for {pool_key}: {str(e)}") err(f"Error closing pool for {pool_key}: {str(e)}")
self._db_pools.clear() self.db_pools.clear()
info("All database connection pools closed.") info("All database connection pools closed.")
async def initialize_sync(self): async def initialize_sync(self):
@ -360,7 +410,7 @@ class APIConfig(BaseModel):
if conn is None: if conn is None:
continue # Skip this database if connection failed continue # Skip this database if connection failed
info(f"Starting sync initialization for {pool_entry['ts_ip']}...") debug(f"Starting sync initialization for {pool_entry['ts_ip']}...")
# Check PostGIS installation # Check PostGIS installation
postgis_installed = await self.check_postgis(conn) postgis_installed = await self.check_postgis(conn)
@ -376,15 +426,19 @@ class APIConfig(BaseModel):
table_name = table['tablename'] table_name = table['tablename']
await self.ensure_sync_columns(conn, table_name) await self.ensure_sync_columns(conn, table_name)
info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have necessary sync columns and triggers.") debug(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have necessary sync columns and triggers.")
except Exception as e: except Exception as e:
err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}") err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}")
err(f"Traceback: {traceback.format_exc()}") err(f"Traceback: {traceback.format_exc()}")
async def ensure_sync_columns(self, conn, table_name): async def ensure_sync_columns(self, conn, table_name):
if conn is None:
debug(f"Skipping offline server...")
return None
if table_name in self.SPECIAL_TABLES: if table_name in self.SPECIAL_TABLES:
info(f"Skipping sync columns for special table: {table_name}") debug(f"Skipping sync columns for special table: {table_name}")
return None return None
try: try:
@ -439,7 +493,7 @@ class APIConfig(BaseModel):
FOR EACH ROW EXECUTE FUNCTION update_version_and_server_id(); FOR EACH ROW EXECUTE FUNCTION update_version_and_server_id();
""") """)
info(f"Successfully ensured sync columns and trigger for table {table_name}") debug(f"Successfully ensured sync columns and trigger for table {table_name}")
return primary_key return primary_key
except Exception as e: except Exception as e:
@ -447,7 +501,8 @@ class APIConfig(BaseModel):
err(f"Traceback: {traceback.format_exc()}") err(f"Traceback: {traceback.format_exc()}")
async def apply_batch_changes(self, conn, table_name, changes, primary_key): async def apply_batch_changes(self, conn, table_name, changes, primary_key):
if not changes: if conn is None or not changes:
debug(f"Skipping apply_batch_changes because conn is none or there are no changes.")
return 0 return 0
try: try:
@ -473,12 +528,12 @@ class APIConfig(BaseModel):
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
""" """
debug(f"Generated insert query for {table_name}: {insert_query}") # debug(f"Generated insert query for {table_name}: {insert_query}")
affected_rows = 0 affected_rows = 0
async for change in tqdm(changes, desc=f"Syncing {table_name}", unit="row"): async for change in tqdm(changes, desc=f"Syncing {table_name}", unit="row"):
values = [change[col] for col in columns] values = [change[col] for col in columns]
debug(f"Executing query for {table_name} with values: {values}") # debug(f"Executing query for {table_name} with values: {values}")
result = await conn.execute(insert_query, *values) result = await conn.execute(insert_query, *values)
affected_rows += int(result.split()[-1]) affected_rows += int(result.split()[-1])
@ -491,7 +546,7 @@ class APIConfig(BaseModel):
async def pull_changes(self, source_pool_entry, batch_size=10000): async def pull_changes(self, source_pool_entry, batch_size=10000):
if source_pool_entry['ts_id'] == os.environ.get('TS_ID'): if source_pool_entry['ts_id'] == os.environ.get('TS_ID'):
info("Skipping self-sync") debug("Skipping self-sync")
return 0 return 0
total_changes = 0 total_changes = 0
@ -533,7 +588,7 @@ class APIConfig(BaseModel):
if changes_count > 0: if changes_count > 0:
info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}") info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}")
else: else:
info(f"No changes to sync for {table_name}") debug(f"No changes to sync for {table_name}")
except Exception as e: except Exception as e:
err(f"Error syncing table {table_name}: {str(e)}") err(f"Error syncing table {table_name}: {str(e)}")
@ -572,47 +627,15 @@ class APIConfig(BaseModel):
except Exception as e: except Exception as e:
err(f"Error pushing changes to {pool_entry['ts_id']}: {str(e)}") err(f"Error pushing changes to {pool_entry['ts_id']}: {str(e)}")
async def push_changes_to_one(self, pool_entry):
try:
async with self.get_connection() as local_conn:
async with self.get_connection(pool_entry) as remote_conn:
tables = await local_conn.fetch("""
SELECT tablename FROM pg_tables
WHERE schemaname = 'public'
""")
for table in tables:
table_name = table['tablename']
try:
if table_name in self.SPECIAL_TABLES:
await self.sync_special_table(local_conn, remote_conn, table_name)
else:
primary_key = await self.ensure_sync_columns(remote_conn, table_name)
last_synced_version = await self.get_last_synced_version(remote_conn, table_name, os.environ.get('TS_ID'))
changes = await local_conn.fetch(f"""
SELECT * FROM "{table_name}"
WHERE version > $1 AND server_id = $2
ORDER BY version ASC
""", last_synced_version, os.environ.get('TS_ID'))
if changes:
changes_count = await self.apply_batch_changes(remote_conn, table_name, changes, primary_key)
if changes_count > 0:
info(f"Pushed {changes_count} changes for table {table_name} to {pool_entry['ts_id']}")
except Exception as e:
err(f"Error pushing changes for table {table_name} to {pool_entry['ts_id']}: {str(e)}")
err(f"Traceback: {traceback.format_exc()}")
info(f"Successfully pushed changes to {pool_entry['ts_id']}")
except Exception as e:
err(f"Error pushing changes to {pool_entry['ts_id']}: {str(e)}")
err(f"Traceback: {traceback.format_exc()}")
async def get_last_synced_version(self, conn, table_name, server_id): async def get_last_synced_version(self, conn, table_name, server_id):
if conn is None:
debug(f"Skipping offline server...")
return 0
if table_name in self.SPECIAL_TABLES: if table_name in self.SPECIAL_TABLES:
debug(f"Skipping get_last_synced_version becaue {table_name} is special.")
return 0 # Special handling for tables without version column return 0 # Special handling for tables without version column
return await conn.fetchval(f""" return await conn.fetchval(f"""
@ -622,10 +645,14 @@ class APIConfig(BaseModel):
""", server_id) """, server_id)
async def check_postgis(self, conn): async def check_postgis(self, conn):
if conn is None:
debug(f"Skipping offline server...")
return None
try: try:
result = await conn.fetchval("SELECT PostGIS_version();") result = await conn.fetchval("SELECT PostGIS_version();")
if result: if result:
info(f"PostGIS version: {result}") debug(f"PostGIS version: {result}")
return True return True
else: else:
warn("PostGIS is not installed or not working properly") warn("PostGIS is not installed or not working properly")
@ -669,7 +696,7 @@ class APIConfig(BaseModel):
INSERT INTO spatial_ref_sys ({', '.join(f'"{col}"' for col in columns)}) INSERT INTO spatial_ref_sys ({', '.join(f'"{col}"' for col in columns)})
VALUES ({', '.join(placeholders)}) VALUES ({', '.join(placeholders)})
""" """
debug(f"Inserting new entry for srid {srid}: {insert_query}") # debug(f"Inserting new entry for srid {srid}: {insert_query}")
await dest_conn.execute(insert_query, *source_entry.values()) await dest_conn.execute(insert_query, *source_entry.values())
inserts += 1 inserts += 1
elif source_entry != dest_dict[srid]: elif source_entry != dest_dict[srid]:
@ -682,7 +709,7 @@ class APIConfig(BaseModel):
proj4text = $4::text proj4text = $4::text
WHERE srid = $5::integer WHERE srid = $5::integer
""" """
debug(f"Updating entry for srid {srid}: {update_query}") # debug(f"Updating entry for srid {srid}: {update_query}")
await dest_conn.execute(update_query, await dest_conn.execute(update_query,
source_entry['auth_name'], source_entry['auth_name'],
source_entry['auth_srid'], source_entry['auth_srid'],
@ -705,55 +732,58 @@ class APIConfig(BaseModel):
max_version = -1 max_version = -1
local_ts_id = os.environ.get('TS_ID') local_ts_id = os.environ.get('TS_ID')
online_hosts = await self.get_online_hosts() online_hosts = await self.get_online_hosts()
num_online_hosts = len(online_hosts)
for pool_entry in online_hosts: if num_online_hosts > 0:
if pool_entry['ts_id'] == local_ts_id: online_ts_ids = [host['ts_id'] for host in online_hosts if host['ts_id'] != local_ts_id]
continue # Skip local database crit(f"Online hosts: {', '.join(online_ts_ids)}")
try: for pool_entry in online_hosts:
async with self.get_connection(pool_entry) as conn: if pool_entry['ts_id'] == local_ts_id:
tables = await conn.fetch(""" continue # Skip local database
SELECT tablename FROM pg_tables
WHERE schemaname = 'public' try:
""") async with self.get_connection(pool_entry) as conn:
tables = await conn.fetch("""
for table in tables: SELECT tablename FROM pg_tables
table_name = table['tablename'] WHERE schemaname = 'public'
if table_name in self.SPECIAL_TABLES: """)
continue # Skip special tables for version comparison
try: for table in tables:
result = await conn.fetchrow(f""" table_name = table['tablename']
SELECT MAX(version) as max_version, server_id if table_name in self.SPECIAL_TABLES:
FROM "{table_name}" continue # Skip special tables for version comparison
WHERE version = (SELECT MAX(version) FROM "{table_name}") try:
GROUP BY server_id result = await conn.fetchrow(f"""
ORDER BY MAX(version) DESC SELECT MAX(version) as max_version, server_id
LIMIT 1 FROM "{table_name}"
""") WHERE version = (SELECT MAX(version) FROM "{table_name}")
if result: GROUP BY server_id
version, server_id = result['max_version'], result['server_id'] ORDER BY MAX(version) DESC
info(f"Max version for {pool_entry['ts_id']}, table {table_name}: {version} (from server {server_id})") LIMIT 1
if version > max_version: """)
max_version = version if result:
most_recent_source = pool_entry version, server_id = result['max_version'], result['server_id']
else: info(f"Max version for {pool_entry['ts_id']}, table {table_name}: {version} (from server {server_id})")
info(f"No data in table {table_name} for {pool_entry['ts_id']}") if version > max_version:
except asyncpg.exceptions.UndefinedColumnError: max_version = version
warn(f"Version or server_id column does not exist in table {table_name} for {pool_entry['ts_id']}. Skipping.") most_recent_source = pool_entry
except Exception as e: else:
err(f"Error checking version for {pool_entry['ts_id']}, table {table_name}: {str(e)}") debug(f"No data in table {table_name} for {pool_entry['ts_id']}")
except asyncpg.exceptions.UndefinedColumnError:
except asyncpg.exceptions.ConnectionFailureError: warn(f"Version or server_id column does not exist in table {table_name} for {pool_entry['ts_id']}. Skipping.")
warn(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']}") except Exception as e:
except Exception as e: err(f"Error checking version for {pool_entry['ts_id']}, table {table_name}: {str(e)}")
err(f"Unexpected error occurred while checking version for {pool_entry['ts_id']}: {str(e)}")
err(f"Traceback: {traceback.format_exc()}")
return most_recent_source
except asyncpg.exceptions.ConnectionFailureError:
warn(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']}")
except Exception as e:
err(f"Unexpected error occurred while checking version for {pool_entry['ts_id']}: {str(e)}")
err(f"Traceback: {traceback.format_exc()}")
return most_recent_source
else:
warn(f"No other online hosts for sync")
return None
class Location(BaseModel): class Location(BaseModel):

View file

@ -0,0 +1,24 @@
base_url: 'https://cloudflare.com'
token: '{{ SECRET.CF_TOKEN }}'
cf_ip: '65.1.1.1' # replace me
ai: # tld, e.g. .ai
sij: # domain, e.g. sij.ai
_zone: c00a9e0ff540308232eb5762621d5b1 # zone id
_www: 8a26b17923ac3a8f21b6127cdb3d7459 # dns id for domain, e.g. www.sij.ai
api: 8a336ee8a5b13e112d6d4ae77c149bd6 # dns id for subdomain, e.g. api.sij.ai
txt: b4b0bd48ac4272b1c48eb1624072adb2 # dns id for subdomaikn, e.g. txt.sij.ai
git: cd524c00b6daf824c933a294cb52eae2 # dns id for subdomain, e.g. git.sij.ai
law: # tld, e.g. .law
sij: # domain, e.g. sij.law
_zone: 5b68d9cd99b896e26c232f03cda89d66 # zone id
_www: ba9afd99deeb0407ea1b74ba88eb5564 # dns id for domain, e.g. www.sij.law
map: 4de8fe05bb0e722ee2c78b2ddf553c82 # dns id for subdomain, e.g. map.sij.law
imap: 384acd03c139ffaed37f4e70c627e7d1 # dns id for subdomain, e.g. imap.sij.law
smtp: 0677e42ea9b589d67d1da21aa00455e0 # dns id for subdomain, e.g. smtp.sij.law
esq: # tld, e.g. .esq
env: # domain, e.g. env.esq
_zone: faf889fd7c227c2e61875b2e70b5c6fe # zone id
_www: b9b636ce9bd4812a6564f572f0f373ee # dns id for domain, e.g. www.env.esq
dt: afbc205e829cfb8d3f79dab187c06f99 # dns id for subdomain, e.g. dt.env.esq
rss: f043d5cf485f4e53f9cbcb85fed2c861 # dns id for subdomain, e.g. rss.env.esq
s3: a5fa431a4be8f50af2c118aed353b0ec # dns id for subdomain, e.g. s3.env.esq

View file

@ -0,0 +1,37 @@
- name: "CalFire_THP"
url: "https://caltreesplans.resources.ca.gov/Caltrees/Report/ShowReport.aspx?module=TH_Document&reportID=492&reportType=LINK_REPORT_LIST"
output_file: "{{ Dir.DATA }}/calfire_thp_data.json"
content:
type: "pdf"
selector: null
js_render: false
processing:
- name: "split_entries"
type: "regex_split"
pattern: '(\d+-\d+-\d+-\w+)'
- name: "filter_entries"
type: "keyword_filter"
keywords: ["Sierra Pacific", "SPI", "Land & Timber"]
- name: "extract_data"
type: "regex_extract"
extractions:
- name: "Harvest Document"
pattern: '(\d+-\d+-\d+-\w+)'
- name: "Land Owner"
pattern: '((?:SIERRA PACIFIC|SPI|.*?LAND & TIMBER).*?)(?=\d+-\d+-\d+-\w+|\Z)'
flags: ["DOTALL", "IGNORECASE"]
- name: "Location"
pattern: '((?:MDBM|HBM):.*?)(?=(?:SIERRA PACIFIC|SPI|.*?LAND & TIMBER)|\Z)'
flags: ["DOTALL"]
- name: "Total Acres"
pattern: '(\d+\.\d+)\s+acres'
- name: "Watershed"
pattern: 'Watershed:\s+(.+)'
post_processing:
- name: "extract_plss_coordinates"
type: "regex_extract"
field: "Location"
pattern: '(\w+): T(\d+)([NSEW]) R(\d+)([NSEW]) S(\d+)'
output_field: "PLSS Coordinates"
all_matches: true
format: "{0}: T{1}{2} R{3}{4} S{5}"

View file

@ -0,0 +1,5 @@
forwarding_rules:
- source: "test.domain.com:80"
destination: "100.64.64.14:8080"
- source: "100.64.64.20:1024"
destination: "127.0.0.1:1025"

View file

@ -31,7 +31,7 @@ from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
from sijapi import ( from sijapi import (
L, API, LOGS_DIR, TS_ID, CASETABLE_PATH, COURTLISTENER_DOCKETS_URL, COURTLISTENER_API_KEY, L, API, Serve, LOGS_DIR, TS_ID, CASETABLE_PATH, COURTLISTENER_DOCKETS_URL, COURTLISTENER_API_KEY,
COURTLISTENER_BASE_URL, COURTLISTENER_DOCKETS_DIR, COURTLISTENER_SEARCH_DIR, ALERTS_DIR, COURTLISTENER_BASE_URL, COURTLISTENER_DOCKETS_DIR, COURTLISTENER_SEARCH_DIR, ALERTS_DIR,
MAC_UN, MAC_PW, MAC_ID, TS_TAILNET, IMG_DIR, PUBLIC_KEY, OBSIDIAN_VAULT_DIR MAC_UN, MAC_PW, MAC_ID, TS_TAILNET, IMG_DIR, PUBLIC_KEY, OBSIDIAN_VAULT_DIR
) )
@ -51,7 +51,7 @@ templates = Jinja2Templates(directory=Path(__file__).parent.parent / "sites")
@serve.get("/pgp") @serve.get("/pgp")
async def get_pgp(): async def get_pgp():
return Response(PUBLIC_KEY, media_type="text/plain") return Response(Serve.PGP, media_type="text/plain")
@serve.get("/img/{image_name}") @serve.get("/img/{image_name}")
def serve_image(image_name: str): def serve_image(image_name: str):
@ -119,17 +119,6 @@ async def hook_alert(request: Request):
return await notify(alert) return await notify(alert)
@serve.post("/alert/cd")
async def hook_changedetection(webhook_data: dict):
body = webhook_data.get("body", {})
message = body.get("message", "")
if message and any(word in message.split() for word in ["SPI", "sierra", "pacific"]):
filename = ALERTS_DIR / f"alert_{int(time.time())}.json"
filename.write_text(json.dumps(webhook_data, indent=4))
notify(message)
return {"status": "received"}
async def notify(alert: str): async def notify(alert: str):
fail = True fail = True
@ -528,3 +517,65 @@ async def get_analytics(short_code: str):
"total_clicks": click_count, "total_clicks": click_count,
"recent_clicks": [dict(click) for click in clicks] "recent_clicks": [dict(click) for click in clicks]
} }
async def forward_traffic(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, destination: str):
dest_host, dest_port = destination.split(':')
dest_port = int(dest_port)
try:
dest_reader, dest_writer = await asyncio.open_connection(dest_host, dest_port)
except Exception as e:
writer.close()
await writer.wait_closed()
return
async def forward(src, dst):
try:
while True:
data = await src.read(8192)
if not data:
break
dst.write(data)
await dst.drain()
except Exception as e:
pass
finally:
dst.close()
await dst.wait_closed()
await asyncio.gather(
forward(reader, dest_writer),
forward(dest_reader, writer)
)
async def start_server(source: str, destination: str):
host, port = source.split(':')
port = int(port)
server = await asyncio.start_server(
lambda r, w: forward_traffic(r, w, destination),
host,
port
)
async with server:
await server.serve_forever()
async def start_port_forwarding():
if hasattr(Serve, 'forwarding_rules'):
for rule in Serve.forwarding_rules:
asyncio.create_task(start_server(rule.source, rule.destination))
else:
warn("No forwarding rules found in the configuration.")
@serve.get("/forward_status")
async def get_forward_status():
if hasattr(Serve, 'forwarding_rules'):
return {"status": "active", "rules": Serve.forwarding_rules}
else:
return {"status": "inactive", "message": "No forwarding rules configured"}
# Add this to the end of your serve.py file
asyncio.create_task(start_port_forwarding())