diff --git a/sijapi/classes.py b/sijapi/classes.py index be3875f..83a02cb 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -330,24 +330,32 @@ class APIConfig(BaseModel): await self.ensure_sync_columns(conn, table_name) await self.create_sync_trigger(conn, table_name) - info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.") + info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.") except Exception as e: err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}") + err(f"Traceback: {traceback.format_exc()}") + async def ensure_sync_columns(self, conn, table_name): - await conn.execute(f""" - DO $$ - BEGIN - BEGIN - ALTER TABLE "{table_name}" - ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1, - ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; - EXCEPTION - WHEN duplicate_column THEN - -- Do nothing, column already exists - END; - END $$; - """) + try: + await conn.execute(f""" + DO $$ + BEGIN + BEGIN + ALTER TABLE "{table_name}" + ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1, + ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; + EXCEPTION + WHEN duplicate_column THEN + RAISE NOTICE 'column version or server_id already exists in {table_name}.'; + END; + END $$; + """) + info(f"Ensured sync columns for table {table_name}") + except Exception as e: + err(f"Error ensuring sync columns for table {table_name}: {str(e)}") + err(f"Traceback: {traceback.format_exc()}") + async def create_sync_trigger(self, conn, table_name): await conn.execute(f""" @@ -380,20 +388,45 @@ class APIConfig(BaseModel): try: async with self.get_connection(pool_entry) as conn: + # Check if the version column exists in any table + version_exists = await conn.fetchval(""" + SELECT EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_schema = 'public' + AND column_name = 'version' + ) + """) + + if not version_exists: + info(f"Version column does not exist in any table for {pool_entry['ts_id']}") + continue + version = await conn.fetchval(""" - SELECT COALESCE(MAX(version), -1) FROM ( - SELECT MAX(version) as version FROM information_schema.columns - WHERE table_schema = 'public' AND column_name = 'version' + SELECT COALESCE(MAX(version), -1) + FROM ( + SELECT MAX(version) as version + FROM information_schema.columns + WHERE table_schema = 'public' + AND column_name = 'version' + AND is_updatable = 'YES' ) as subquery """) + info(f"Max version for {pool_entry['ts_id']}: {version}") if version > max_version: max_version = version most_recent_source = pool_entry except Exception as e: err(f"Error checking version for {pool_entry['ts_id']}: {str(e)}") + if most_recent_source: + info(f"Most recent source: {most_recent_source['ts_id']} with version {max_version}") + else: + info("No valid source found with version information") + return most_recent_source + async def pull_changes(self, source_pool_entry, batch_size=10000): if source_pool_entry['ts_id'] == os.environ.get('TS_ID'): info("Skipping self-sync")