diff --git a/sijapi/classes.py b/sijapi/classes.py index 525789a..458e26c 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -603,26 +603,32 @@ class APIConfig(BaseModel): for table in tables: table_name = table['tablename'] try: - last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) - - changes = await source_conn.fetch(f""" - SELECT * FROM "{table_name}" - WHERE version > $1 AND server_id = $2 - ORDER BY version ASC - LIMIT $3 - """, last_synced_version, source_id, batch_size) - - if changes: - changes_count = await self.apply_batch_changes(dest_conn, table_name, changes) + if table_name == 'spatial_ref_sys': + changes_count = await self.sync_spatial_ref_sys(source_conn, dest_conn) total_changes += changes_count - - if changes_count > 0: - last_synced_version = changes[-1]['version'] - await self.update_sync_status(dest_conn, table_name, source_id, last_synced_version) - - info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}") + info(f"Synced spatial_ref_sys: {changes_count} changes. Total so far: {total_changes}") else: - info(f"No changes to sync for {table_name}") + # Existing code for other tables + last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) + + changes = await source_conn.fetch(f""" + SELECT * FROM "{table_name}" + WHERE version > $1 AND server_id = $2 + ORDER BY version ASC + LIMIT $3 + """, last_synced_version, source_id, batch_size) + + if changes: + changes_count = await self.apply_batch_changes(dest_conn, table_name, changes) + total_changes += changes_count + + if changes_count > 0: + last_synced_version = changes[-1]['version'] + await self.update_sync_status(dest_conn, table_name, source_id, last_synced_version) + + info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}") + else: + info(f"No changes to sync for {table_name}") except Exception as e: err(f"Error syncing table {table_name}: {str(e)}") @@ -647,12 +653,24 @@ class APIConfig(BaseModel): # Convert the keys to a list columns = list(changes[0].keys()) placeholders = [f'${i+1}' for i in range(len(columns))] - insert_query = f""" - INSERT INTO "{table_name}" ({', '.join(columns)}) - VALUES ({', '.join(placeholders)}) - ON CONFLICT (id) DO UPDATE SET - {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != 'id')} - """ + + # Check if 'id' column exists + id_exists = 'id' in columns + + if id_exists: + insert_query = f""" + INSERT INTO "{table_name}" ({', '.join(columns)}) + VALUES ({', '.join(placeholders)}) + ON CONFLICT (id) DO UPDATE SET + {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != 'id')} + """ + else: + # For tables without 'id', use all columns as conflict target + insert_query = f""" + INSERT INTO "{table_name}" ({', '.join(columns)}) + VALUES ({', '.join(placeholders)}) + ON CONFLICT DO NOTHING + """ # Execute the insert for each change affected_rows = 0 @@ -669,6 +687,56 @@ class APIConfig(BaseModel): return 0 + async def sync_spatial_ref_sys(self, source_conn, dest_conn): + try: + # Get all entries from the source + source_entries = await source_conn.fetch(""" + SELECT * FROM spatial_ref_sys + ORDER BY srid + """) + + # Get all entries from the destination + dest_entries = await dest_conn.fetch(""" + SELECT * FROM spatial_ref_sys + ORDER BY srid + """) + + # Convert to dictionaries for easier comparison + source_dict = {entry['srid']: entry for entry in source_entries} + dest_dict = {entry['srid']: entry for entry in dest_entries} + + updates = 0 + inserts = 0 + + for srid, source_entry in source_dict.items(): + if srid not in dest_dict: + # Insert new entry + columns = source_entry.keys() + placeholders = [f'${i+1}' for i in range(len(columns))] + insert_query = f""" + INSERT INTO spatial_ref_sys ({', '.join(columns)}) + VALUES ({', '.join(placeholders)}) + """ + await dest_conn.execute(insert_query, *source_entry.values()) + inserts += 1 + elif source_entry != dest_dict[srid]: + # Update existing entry + update_query = f""" + UPDATE spatial_ref_sys + SET {', '.join(f"{col} = ${i+1}" for i, col in enumerate(source_entry.keys()) if col != 'srid')} + WHERE srid = ${len(source_entry)} + """ + await dest_conn.execute(update_query, *[v for k, v in source_entry.items() if k != 'srid'], srid) + updates += 1 + + info(f"spatial_ref_sys sync complete. Inserts: {inserts}, Updates: {updates}") + return inserts + updates + + except Exception as e: + err(f"Error syncing spatial_ref_sys table: {str(e)}") + err(f"Traceback: {traceback.format_exc()}") + return 0 + async def push_changes_to_all(self): for pool_entry in self.POOL: