From d7020c2a0a23ed3a9d078061e0c5bf70035ed7a7 Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Tue, 30 Jul 2024 15:12:58 -0700 Subject: [PATCH] Auto-update: Tue Jul 30 15:12:58 PDT 2024 --- sijapi/classes.py | 86 ++++++++++++++++++++++------------------------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/sijapi/classes.py b/sijapi/classes.py index f09b8f0..a682aa0 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -603,26 +603,33 @@ class APIConfig(BaseModel): for table in tables: table_name = table['tablename'] - last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) - - while True: - changes = await source_conn.fetch(f""" - SELECT * FROM "{table_name}" - WHERE version > $1 AND server_id = $2 - ORDER BY version ASC - LIMIT $3 - """, last_synced_version, source_id, batch_size) + try: + last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) - if not changes: - break + while True: + changes = await source_conn.fetch(f""" + SELECT * FROM "{table_name}" + WHERE version > $1 AND server_id = $2 + ORDER BY version ASC + LIMIT $3 + """, last_synced_version, source_id, batch_size) + + if not changes: + break - changes_count = await self.apply_batch_changes(dest_conn, table_name, changes) - total_changes += changes_count - - last_synced_version = changes[-1]['version'] - await self.update_sync_status(dest_conn, table_name, source_id, last_synced_version) - - info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}") + changes_count = await self.apply_batch_changes(dest_conn, table_name, changes) + total_changes += changes_count + + if changes_count > 0: + last_synced_version = changes[-1]['version'] + await self.update_sync_status(dest_conn, table_name, source_id, last_synced_version) + + info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}") + + except Exception as e: + err(f"Error syncing table {table_name}: {str(e)}") + err(f"Traceback: {traceback.format_exc()}") + # Continue with the next table info(f"Sync complete from {source_id} ({source_ip}) to {dest_id} ({dest_ip}). Total changes: {total_changes}") @@ -637,45 +644,30 @@ class APIConfig(BaseModel): if not changes: return 0 - temp_table_name = f"temp_{table_name}_{uuid.uuid4().hex[:8]}" - try: - # Create temporary table - await conn.execute(f""" - CREATE TEMPORARY TABLE {temp_table_name} (LIKE "{table_name}" INCLUDING ALL) - ON COMMIT DROP - """) - - # Bulk insert changes into temporary table + # Prepare the insert statement columns = changes[0].keys() - records = [tuple(change[col] for col in columns) for change in changes] - - # Use copy_records_to_table instead of copy_records - await conn.copy_records_to_table(temp_table_name, records=records) - - # Perform upsert - result = await conn.execute(f""" - INSERT INTO "{table_name}" - SELECT * FROM {temp_table_name} + placeholders = [f'${i+1}' for i in range(len(columns))] + insert_query = f""" + INSERT INTO "{table_name}" ({', '.join(columns)}) + VALUES ({', '.join(placeholders)}) ON CONFLICT (id) DO UPDATE SET {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != 'id')} - """) + """ + + # Execute the insert for each change + affected_rows = 0 + for change in changes: + values = [change[col] for col in columns] + result = await conn.execute(insert_query, *values) + affected_rows += int(result.split()[-1]) - # Parse the result to get the number of affected rows - affected_rows = int(result.split()[-1]) return affected_rows except Exception as e: err(f"Error applying batch changes to {table_name}: {str(e)}") err(f"Traceback: {traceback.format_exc()}") return 0 - finally: - # Ensure temporary table is dropped - try: - await conn.execute(f"DROP TABLE IF EXISTS {temp_table_name}") - except Exception as e: - warn(f"Failed to drop temporary table {temp_table_name}: {str(e)}") - async def push_changes_to_all(self): @@ -727,6 +719,7 @@ class APIConfig(BaseModel): err(f"Error pushing changes to {pool_entry['ts_id']}: {str(e)}") err(f"Traceback: {traceback.format_exc()}") + async def update_sync_status(self, conn, table_name, server_id, version): await conn.execute(""" INSERT INTO sync_status (table_name, server_id, last_synced_version, last_sync_time) @@ -736,6 +729,7 @@ class APIConfig(BaseModel): last_sync_time = EXCLUDED.last_sync_time """, table_name, server_id, version) + async def get_last_synced_version(self, conn, table_name, server_id): return await conn.fetchval(f""" SELECT COALESCE(MAX(version), 0)