diff --git a/sijapi/classes.py b/sijapi/classes.py index 0df567d..41965d2 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -1,5 +1,4 @@ # classes.py -import asyncio import json import yaml import math @@ -8,7 +7,9 @@ import re import uuid import aiofiles import aiohttp +import asyncio import asyncpg +import socket import traceback import reverse_geocoder as rg from pathlib import Path @@ -330,6 +331,7 @@ class APIConfig(BaseModel): err(f"Unexpected error when acquiring connection from pool for {pool_key}: {str(e)}") raise + async def close_db_pools(self): info("Closing database connection pools...") for pool_key, pool in self._db_pools.items(): @@ -341,6 +343,7 @@ class APIConfig(BaseModel): self._db_pools.clear() info("All database connection pools closed.") + async def initialize_sync(self): local_ts_id = os.environ.get('TS_ID') for pool_entry in self.POOL: @@ -348,10 +351,13 @@ class APIConfig(BaseModel): continue # Skip local database try: async with self.get_connection(pool_entry) as conn: + info(f"Starting sync initialization for {pool_entry['ts_ip']}...") await self.ensure_sync_structure(conn) - info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.") + info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables should now have version and server_id columns with appropriate triggers.") except Exception as e: err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}") + err(f"Traceback: {traceback.format_exc()}") + async def ensure_sync_structure(self, conn): tables = await conn.fetch(""" @@ -364,37 +370,45 @@ class APIConfig(BaseModel): await self.ensure_sync_columns(conn, table_name) await self.ensure_sync_trigger(conn, table_name) + async def ensure_sync_columns(self, conn, table_name): - await conn.execute(f""" - DO $$ - BEGIN - BEGIN - ALTER TABLE "{table_name}" - ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1; - EXCEPTION - WHEN duplicate_column THEN - -- Do nothing, column already exists - END; - - BEGIN - ALTER TABLE "{table_name}" - ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; - EXCEPTION - WHEN duplicate_column THEN - -- Do nothing, column already exists - END; - END $$; - """) - - # Verify that the columns were added - result = await conn.fetchrow(f""" - SELECT - EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = 'version') as has_version, - EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = 'server_id') as has_server_id - """) - - if not (result['has_version'] and result['has_server_id']): - raise Exception(f"Failed to add version and/or server_id columns to table {table_name}") + try: + await conn.execute(f""" + DO $$ + BEGIN + BEGIN + ALTER TABLE "{table_name}" + ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1; + EXCEPTION + WHEN duplicate_column THEN + NULL; + END; + + BEGIN + ALTER TABLE "{table_name}" + ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; + EXCEPTION + WHEN duplicate_column THEN + NULL; + END; + END $$; + """) + + # Verify that the columns were added + result = await conn.fetchrow(f""" + SELECT + EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = 'version') as has_version, + EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = 'server_id') as has_server_id + """) + + if not (result['has_version'] and result['has_server_id']): + raise Exception(f"Failed to add version and/or server_id columns to table {table_name}") + else: + info(f"Successfully added/verified version and server_id columns for table {table_name}") + except Exception as e: + err(f"Error ensuring sync columns for table {table_name}: {str(e)}") + err(f"Traceback: {traceback.format_exc()}") + async def ensure_sync_trigger(self, conn, table_name): await conn.execute(f""" @@ -423,11 +437,18 @@ class APIConfig(BaseModel): if pool_entry['ts_id'] == local_ts_id: continue # Skip local database + if not await self.is_server_accessible(pool_entry['ts_ip'], pool_entry['db_port']): + warn(f"Server {pool_entry['ts_id']} ({pool_entry['ts_ip']}:{pool_entry['db_port']}) is not accessible. Skipping.") + continue + try: async with self.get_connection(pool_entry) as conn: if not await self.check_version_column_exists(conn): - warn(f"Version column does not exist in {pool_entry['ts_id']}. Skipping.") - continue + warn(f"Version column does not exist in some tables for {pool_entry['ts_id']}. Attempting to add...") + await self.ensure_sync_structure(conn) + if not await self.check_version_column_exists(conn): + warn(f"Failed to add version column to all tables in {pool_entry['ts_id']}. Skipping.") + continue version = await conn.fetchval(""" SELECT COALESCE(MAX(version), -1) @@ -437,27 +458,61 @@ class APIConfig(BaseModel): WHERE schemaname = 'public' ) as subquery """) + info(f"Max version for {pool_entry['ts_id']}: {version}") if version > max_version: max_version = version most_recent_source = pool_entry - except asyncpg.exceptions.ConnectionFailureError: - warn(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']}") + except asyncpg.exceptions.ConnectionFailureError as e: + err(f"Failed to establish database connection with {pool_entry['ts_id']} ({pool_entry['ts_ip']}:{pool_entry['db_port']}): {str(e)}") + except asyncpg.exceptions.PostgresError as e: + err(f"PostgreSQL error occurred while querying {pool_entry['ts_id']}: {str(e)}") + if "column \"version\" does not exist" in str(e): + err(f"The 'version' column is missing in one or more tables on {pool_entry['ts_id']}. This might indicate a synchronization issue.") except Exception as e: - warn(f"Error checking version for {pool_entry['ts_id']}: {str(e)}") + err(f"Unexpected error occurred while checking version for {pool_entry['ts_id']}: {str(e)}") + err(f"Traceback: {traceback.format_exc()}") return most_recent_source + + async def is_server_accessible(self, host, port, timeout=2): + try: + future = asyncio.open_connection(host, port) + await asyncio.wait_for(future, timeout=timeout) + return True + except (asyncio.TimeoutError, ConnectionRefusedError, socket.gaierror): + return False + async def check_version_column_exists(self, conn): - result = await conn.fetchval(""" - SELECT EXISTS ( - SELECT 1 - FROM information_schema.columns - WHERE table_schema = 'public' - AND column_name = 'version' - AND table_name IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') - ) - """) - return result + try: + result = await conn.fetchval(""" + SELECT EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_schema = 'public' + AND column_name = 'version' + AND table_name IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') + ) + """) + if not result: + tables_without_version = await conn.fetch(""" + SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename NOT IN ( + SELECT table_name + FROM information_schema.columns + WHERE table_schema = 'public' AND column_name = 'version' + ) + """) + table_names = ", ".join([t['tablename'] for t in tables_without_version]) + warn(f"Tables without 'version' column: {table_names}") + return result + except Exception as e: + err(f"Error checking for 'version' column existence: {str(e)}") + return False + + async def pull_changes(self, source_pool_entry, batch_size=10000):