From 297c3454df45c25cd161a581ce01e0ad0690835d Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Tue, 30 Jul 2024 14:29:20 -0700 Subject: [PATCH] Auto-update: Tue Jul 30 14:29:20 PDT 2024 --- sijapi/classes.py | 89 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 77 insertions(+), 12 deletions(-) diff --git a/sijapi/classes.py b/sijapi/classes.py index 820f5e4..6c8025e 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -352,10 +352,18 @@ class APIConfig(BaseModel): try: async with self.get_connection(pool_entry) as conn: info(f"Starting sync initialization for {pool_entry['ts_ip']}...") + + # Check PostGIS installation + postgis_installed = await self.check_postgis(conn) + if not postgis_installed: + warn(f"PostGIS is not installed on {pool_entry['ts_id']} ({pool_entry['ts_ip']}). Some spatial operations may fail.") + + # Continue with sync initialization tables = await conn.fetch(""" SELECT tablename FROM pg_tables WHERE schemaname = 'public' """) + all_tables_synced = True for table in tables: table_name = table['tablename'] @@ -366,11 +374,27 @@ class APIConfig(BaseModel): info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.") else: warn(f"Sync initialization partially complete for {pool_entry['ts_ip']}. Some tables may be missing version or server_id columns.") + + # Initialize sync_status table if it doesn't exist + await self.initialize_sync_status_table(conn) + except Exception as e: err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}") err(f"Traceback: {traceback.format_exc()}") + async def initialize_sync_status_table(self, conn): + await conn.execute(""" + CREATE TABLE IF NOT EXISTS sync_status ( + table_name TEXT, + server_id TEXT, + last_synced_version INTEGER, + last_sync_time TIMESTAMP WITH TIME ZONE, + PRIMARY KEY (table_name, server_id) + ) + """) + + async def ensure_sync_structure(self, conn): tables = await conn.fetch(""" @@ -468,16 +492,24 @@ class APIConfig(BaseModel): for table in tables: table_name = table['tablename'] try: - version = await conn.fetchval(f""" - SELECT COALESCE(MAX(version), -1) + result = await conn.fetchrow(f""" + SELECT MAX(version) as max_version, server_id FROM "{table_name}" + WHERE version = (SELECT MAX(version) FROM "{table_name}") + GROUP BY server_id + ORDER BY MAX(version) DESC + LIMIT 1 """) - info(f"Max version for {pool_entry['ts_id']}, table {table_name}: {version}") - if version > max_version: - max_version = version - most_recent_source = pool_entry + if result: + version, server_id = result['max_version'], result['server_id'] + info(f"Max version for {pool_entry['ts_id']}, table {table_name}: {version} (from server {server_id})") + if version > max_version: + max_version = version + most_recent_source = pool_entry + else: + info(f"No data in table {table_name} for {pool_entry['ts_id']}") except asyncpg.exceptions.UndefinedColumnError: - warn(f"Version column does not exist in table {table_name} for {pool_entry['ts_id']}. Attempting to add...") + warn(f"Version or server_id column does not exist in table {table_name} for {pool_entry['ts_id']}. Attempting to add...") await self.ensure_sync_columns(conn, table_name) except Exception as e: err(f"Error checking version for {pool_entry['ts_id']}, table {table_name}: {str(e)}") @@ -491,6 +523,7 @@ class APIConfig(BaseModel): return most_recent_source + async def is_server_accessible(self, host, port, timeout=2): try: @@ -530,8 +563,6 @@ class APIConfig(BaseModel): return False - - async def pull_changes(self, source_pool_entry, batch_size=10000): if source_pool_entry['ts_id'] == os.environ.get('TS_ID'): info("Skipping self-sync") @@ -572,7 +603,7 @@ class APIConfig(BaseModel): total_changes += changes_count last_synced_version = changes[-1]['version'] - await self.update_last_synced_version(dest_conn, table_name, source_id, last_synced_version) + await self.update_sync_status(dest_conn, table_name, source_id, last_synced_version) info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}") @@ -584,6 +615,7 @@ class APIConfig(BaseModel): return total_changes + async def apply_batch_changes(self, conn, table_name, changes): if not changes: return 0 @@ -599,7 +631,10 @@ class APIConfig(BaseModel): # Bulk insert changes into temporary table columns = changes[0].keys() - await conn.copy_records_to_table(temp_table_name, records=[tuple(change[col] for col in columns) for change in changes]) + records = [tuple(change[col] for col in columns) for change in changes] + + # Use copy_records instead of copy_records_to_table + await conn.copy_records(temp_table_name, records=records, columns=columns) # Perform upsert result = await conn.execute(f""" @@ -613,9 +648,17 @@ class APIConfig(BaseModel): affected_rows = int(result.split()[-1]) return affected_rows + except Exception as e: + err(f"Error applying batch changes to {table_name}: {str(e)}") + err(f"Traceback: {traceback.format_exc()}") + return 0 finally: # Ensure temporary table is dropped - await conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}") + try: + await conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}") + except Exception as e: + warn(f"Failed to drop temporary table {temp_table_name}: {str(e)}") + async def push_changes_to_all(self): for pool_entry in self.POOL: @@ -666,6 +709,15 @@ class APIConfig(BaseModel): err(f"Error pushing changes to {pool_entry['ts_id']}: {str(e)}") err(f"Traceback: {traceback.format_exc()}") + async def update_sync_status(self, conn, table_name, server_id, version): + await conn.execute(""" + INSERT INTO sync_status (table_name, server_id, last_synced_version, last_sync_time) + VALUES ($1, $2, $3, NOW()) + ON CONFLICT (table_name, server_id) DO UPDATE + SET last_synced_version = EXCLUDED.last_synced_version, + last_sync_time = EXCLUDED.last_sync_time + """, table_name, server_id, version) + async def get_last_synced_version(self, conn, table_name, server_id): return await conn.fetchval(f""" SELECT COALESCE(MAX(version), 0) @@ -701,6 +753,19 @@ class APIConfig(BaseModel): END $$; """) + async def check_postgis(self, conn): + try: + result = await conn.fetchval("SELECT PostGIS_version();") + if result: + info(f"PostGIS version: {result}") + return True + else: + warn("PostGIS is not installed or not working properly") + return False + except Exception as e: + err(f"Error checking PostGIS: {str(e)}") + return False +