From 20d03c8867a39d56b9a7e35de0de37ae809aa8d4 Mon Sep 17 00:00:00 2001
From: sanj <67624670+iodrift@users.noreply.github.com>
Date: Tue, 30 Jul 2024 15:12:58 -0700
Subject: [PATCH] Auto-update: Tue Jul 30 15:12:58 PDT 2024

---
 sijapi/classes.py | 86 ++++++++++++++++++++++-------------------------
 1 file changed, 40 insertions(+), 46 deletions(-)

diff --git a/sijapi/classes.py b/sijapi/classes.py
index f09b8f0..a682aa0 100644
--- a/sijapi/classes.py
+++ b/sijapi/classes.py
@@ -603,26 +603,33 @@ class APIConfig(BaseModel):
                     
                     for table in tables:
                         table_name = table['tablename']
-                        last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id)
-                        
-                        while True:
-                            changes = await source_conn.fetch(f"""
-                                SELECT * FROM "{table_name}"
-                                WHERE version > $1 AND server_id = $2
-                                ORDER BY version ASC
-                                LIMIT $3
-                            """, last_synced_version, source_id, batch_size)
+                        try:
+                            last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id)
                             
-                            if not changes:
-                                break
+                            while True:
+                                changes = await source_conn.fetch(f"""
+                                    SELECT * FROM "{table_name}"
+                                    WHERE version > $1 AND server_id = $2
+                                    ORDER BY version ASC
+                                    LIMIT $3
+                                """, last_synced_version, source_id, batch_size)
+                                
+                                if not changes:
+                                    break
 
-                            changes_count = await self.apply_batch_changes(dest_conn, table_name, changes)
-                            total_changes += changes_count
-                            
-                            last_synced_version = changes[-1]['version']
-                            await self.update_sync_status(dest_conn, table_name, source_id, last_synced_version)
-                            
-                            info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}")
+                                changes_count = await self.apply_batch_changes(dest_conn, table_name, changes)
+                                total_changes += changes_count
+                                
+                                if changes_count > 0:
+                                    last_synced_version = changes[-1]['version']
+                                    await self.update_sync_status(dest_conn, table_name, source_id, last_synced_version)
+                                
+                                info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}")
+
+                        except Exception as e:
+                            err(f"Error syncing table {table_name}: {str(e)}")
+                            err(f"Traceback: {traceback.format_exc()}")
+                            # Continue with the next table
 
             info(f"Sync complete from {source_id} ({source_ip}) to {dest_id} ({dest_ip}). Total changes: {total_changes}")
 
@@ -637,45 +644,30 @@ class APIConfig(BaseModel):
         if not changes:
             return 0
 
-        temp_table_name = f"temp_{table_name}_{uuid.uuid4().hex[:8]}"
-        
         try:
-            # Create temporary table
-            await conn.execute(f"""
-                CREATE TEMPORARY TABLE {temp_table_name} (LIKE "{table_name}" INCLUDING ALL)
-                ON COMMIT DROP
-            """)
-
-            # Bulk insert changes into temporary table
+            # Prepare the insert statement
             columns = changes[0].keys()
-            records = [tuple(change[col] for col in columns) for change in changes]
-            
-            # Use copy_records_to_table instead of copy_records
-            await conn.copy_records_to_table(temp_table_name, records=records)
-
-            # Perform upsert
-            result = await conn.execute(f"""
-                INSERT INTO "{table_name}" 
-                SELECT * FROM {temp_table_name}
+            placeholders = [f'${i+1}' for i in range(len(columns))]
+            insert_query = f"""
+                INSERT INTO "{table_name}" ({', '.join(columns)})
+                VALUES ({', '.join(placeholders)})
                 ON CONFLICT (id) DO UPDATE SET
                 {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != 'id')}
-            """)
+            """
+
+            # Execute the insert for each change
+            affected_rows = 0
+            for change in changes:
+                values = [change[col] for col in columns]
+                result = await conn.execute(insert_query, *values)
+                affected_rows += int(result.split()[-1])
 
-            # Parse the result to get the number of affected rows
-            affected_rows = int(result.split()[-1])
             return affected_rows
 
         except Exception as e:
             err(f"Error applying batch changes to {table_name}: {str(e)}")
             err(f"Traceback: {traceback.format_exc()}")
             return 0
-        finally:
-            # Ensure temporary table is dropped
-            try:
-                await conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}")
-            except Exception as e:
-                warn(f"Failed to drop temporary table {temp_table_name}: {str(e)}")
-
 
 
     async def push_changes_to_all(self):
@@ -727,6 +719,7 @@ class APIConfig(BaseModel):
             err(f"Error pushing changes to {pool_entry['ts_id']}: {str(e)}")
             err(f"Traceback: {traceback.format_exc()}")
 
+
     async def update_sync_status(self, conn, table_name, server_id, version):
         await conn.execute("""
             INSERT INTO sync_status (table_name, server_id, last_synced_version, last_sync_time)
@@ -736,6 +729,7 @@ class APIConfig(BaseModel):
                 last_sync_time = EXCLUDED.last_sync_time
         """, table_name, server_id, version)
 
+
     async def get_last_synced_version(self, conn, table_name, server_id):
         return await conn.fetchval(f"""
             SELECT COALESCE(MAX(version), 0)