diff --git a/sijapi/classes.py b/sijapi/classes.py index 6c8025e..f09b8f0 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -358,6 +358,9 @@ class APIConfig(BaseModel): if not postgis_installed: warn(f"PostGIS is not installed on {pool_entry['ts_id']} ({pool_entry['ts_ip']}). Some spatial operations may fail.") + # Initialize sync_status table + await self.initialize_sync_status_table(conn) + # Continue with sync initialization tables = await conn.fetch(""" SELECT tablename FROM pg_tables @@ -375,14 +378,12 @@ class APIConfig(BaseModel): else: warn(f"Sync initialization partially complete for {pool_entry['ts_ip']}. Some tables may be missing version or server_id columns.") - # Initialize sync_status table if it doesn't exist - await self.initialize_sync_status_table(conn) - except Exception as e: err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}") err(f"Traceback: {traceback.format_exc()}") + async def initialize_sync_status_table(self, conn): await conn.execute(""" CREATE TABLE IF NOT EXISTS sync_status ( @@ -394,6 +395,22 @@ class APIConfig(BaseModel): ) """) + # Check if the last_sync_time column exists, and add it if it doesn't + column_exists = await conn.fetchval(""" + SELECT EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_name = 'sync_status' AND column_name = 'last_sync_time' + ) + """) + + if not column_exists: + await conn.execute(""" + ALTER TABLE sync_status + ADD COLUMN last_sync_time TIMESTAMP WITH TIME ZONE + """) + + async def ensure_sync_structure(self, conn): @@ -633,8 +650,8 @@ class APIConfig(BaseModel): columns = changes[0].keys() records = [tuple(change[col] for col in columns) for change in changes] - # Use copy_records instead of copy_records_to_table - await conn.copy_records(temp_table_name, records=records, columns=columns) + # Use copy_records_to_table instead of copy_records + await conn.copy_records_to_table(temp_table_name, records=records) # Perform upsert result = await conn.execute(f""" @@ -660,6 +677,7 @@ class APIConfig(BaseModel): warn(f"Failed to drop temporary table {temp_table_name}: {str(e)}") + async def push_changes_to_all(self): for pool_entry in self.POOL: if pool_entry['ts_id'] != os.environ.get('TS_ID'):