diff --git a/sijapi/classes.py b/sijapi/classes.py index 41965d2..820f5e4 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -352,13 +352,26 @@ class APIConfig(BaseModel): try: async with self.get_connection(pool_entry) as conn: info(f"Starting sync initialization for {pool_entry['ts_ip']}...") - await self.ensure_sync_structure(conn) - info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables should now have version and server_id columns with appropriate triggers.") + tables = await conn.fetch(""" + SELECT tablename FROM pg_tables + WHERE schemaname = 'public' + """) + all_tables_synced = True + for table in tables: + table_name = table['tablename'] + if not await self.ensure_sync_columns(conn, table_name): + all_tables_synced = False + + if all_tables_synced: + info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.") + else: + warn(f"Sync initialization partially complete for {pool_entry['ts_ip']}. Some tables may be missing version or server_id columns.") except Exception as e: err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}") err(f"Traceback: {traceback.format_exc()}") + async def ensure_sync_structure(self, conn): tables = await conn.fetch(""" SELECT tablename FROM pg_tables @@ -401,13 +414,16 @@ class APIConfig(BaseModel): EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = 'server_id') as has_server_id """) - if not (result['has_version'] and result['has_server_id']): - raise Exception(f"Failed to add version and/or server_id columns to table {table_name}") - else: + if result['has_version'] and result['has_server_id']: info(f"Successfully added/verified version and server_id columns for table {table_name}") + return True + else: + err(f"Failed to add version and/or server_id columns to table {table_name}") + return False except Exception as e: err(f"Error ensuring sync columns for table {table_name}: {str(e)}") err(f"Traceback: {traceback.format_exc()}") + return False async def ensure_sync_trigger(self, conn, table_name): @@ -428,6 +444,7 @@ class APIConfig(BaseModel): FOR EACH ROW EXECUTE FUNCTION update_version_and_server_id(); """) + async def get_most_recent_source(self): most_recent_source = None max_version = -1 @@ -443,37 +460,37 @@ class APIConfig(BaseModel): try: async with self.get_connection(pool_entry) as conn: - if not await self.check_version_column_exists(conn): - warn(f"Version column does not exist in some tables for {pool_entry['ts_id']}. Attempting to add...") - await self.ensure_sync_structure(conn) - if not await self.check_version_column_exists(conn): - warn(f"Failed to add version column to all tables in {pool_entry['ts_id']}. Skipping.") - continue - - version = await conn.fetchval(""" - SELECT COALESCE(MAX(version), -1) - FROM ( - SELECT MAX(version) as version - FROM pg_tables - WHERE schemaname = 'public' - ) as subquery + tables = await conn.fetch(""" + SELECT tablename FROM pg_tables + WHERE schemaname = 'public' """) - info(f"Max version for {pool_entry['ts_id']}: {version}") - if version > max_version: - max_version = version - most_recent_source = pool_entry + + for table in tables: + table_name = table['tablename'] + try: + version = await conn.fetchval(f""" + SELECT COALESCE(MAX(version), -1) + FROM "{table_name}" + """) + info(f"Max version for {pool_entry['ts_id']}, table {table_name}: {version}") + if version > max_version: + max_version = version + most_recent_source = pool_entry + except asyncpg.exceptions.UndefinedColumnError: + warn(f"Version column does not exist in table {table_name} for {pool_entry['ts_id']}. Attempting to add...") + await self.ensure_sync_columns(conn, table_name) + except Exception as e: + err(f"Error checking version for {pool_entry['ts_id']}, table {table_name}: {str(e)}") + except asyncpg.exceptions.ConnectionFailureError as e: err(f"Failed to establish database connection with {pool_entry['ts_id']} ({pool_entry['ts_ip']}:{pool_entry['db_port']}): {str(e)}") - except asyncpg.exceptions.PostgresError as e: - err(f"PostgreSQL error occurred while querying {pool_entry['ts_id']}: {str(e)}") - if "column \"version\" does not exist" in str(e): - err(f"The 'version' column is missing in one or more tables on {pool_entry['ts_id']}. This might indicate a synchronization issue.") except Exception as e: err(f"Unexpected error occurred while checking version for {pool_entry['ts_id']}: {str(e)}") err(f"Traceback: {traceback.format_exc()}") return most_recent_source + async def is_server_accessible(self, host, port, timeout=2): try: