diff --git a/sijapi/classes.py b/sijapi/classes.py index 9c8b62f..2ef41b1 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -400,8 +400,12 @@ class APIConfig(BaseModel): table_name = table['tablename'] inserts = 0 updates = 0 + error_count = 0 last_synced_version = await self.get_last_synced_version(table_name, source_id) + # Get primary key columns for the table + primary_keys = await self.get_primary_keys(dest_conn, table_name) + changes = await source_conn.fetch(f""" SELECT * FROM "{table_name}" WHERE version > $1 AND server_id = $2 @@ -414,19 +418,19 @@ class APIConfig(BaseModel): debug(f"Syncing data for table: {table_name} from {source_id} to {dest_id}") - if table_name == 'sync_status': + if not primary_keys: + # If no primary key, just insert insert_query = f""" INSERT INTO "{table_name}" ({', '.join(columns)}) VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) - ON CONFLICT (table_name, server_id) DO UPDATE SET - {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in ['table_name', 'server_id'])} """ else: + # If primary key exists, use it for conflict resolution insert_query = f""" INSERT INTO "{table_name}" ({', '.join(columns)}) VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) - ON CONFLICT (id) DO UPDATE SET - {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != 'id')} + ON CONFLICT ({', '.join(primary_keys)}) DO UPDATE SET + {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in primary_keys)} """ try: @@ -436,7 +440,12 @@ class APIConfig(BaseModel): else: inserts += 1 except Exception as e: - err(f"Error syncing data for table {table_name} from {source_id} to {dest_id}: {str(e)}") + if error_count < 10: # Limit error logging + err(f"Error syncing data for table {table_name} from {source_id} to {dest_id}: {str(e)}") + error_count += 1 + elif error_count == 10: + err(f"Suppressing further errors for table {table_name}") + error_count += 1 if changes: await self.update_sync_status(table_name, source_id, changes[-1]['version']) @@ -446,6 +455,8 @@ class APIConfig(BaseModel): table_changes[table_name] = {'inserts': inserts, 'updates': updates} info(f"Synced {table_name} from {source_id} to {dest_id}: {inserts} inserts, {updates} updates") + if error_count > 10: + info(f"Total of {error_count} errors occurred for table {table_name}") info(f"Sync complete from {source_id} ({source_ip}) to {dest_id} ({dest_ip})") info(f"Total changes: {total_inserts} inserts, {total_updates} updates") @@ -456,6 +467,18 @@ class APIConfig(BaseModel): return total_inserts + total_updates + async def get_primary_keys(self, conn, table_name): + primary_keys = await conn.fetch(""" + SELECT a.attname + FROM pg_index i + JOIN pg_attribute a ON a.attrelid = i.indrelid + AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = $1::regclass + AND i.indisprimary + """, table_name) + return [pk['attname'] for pk in primary_keys] + + async def push_changes_to_all(self): async with self.get_connection() as local_conn: tables = await local_conn.fetch("""