diff --git a/sijapi/classes.py b/sijapi/classes.py index 2ef41b1..40bdaf3 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -387,86 +387,132 @@ class APIConfig(BaseModel): dest_id = os.environ.get('TS_ID') dest_ip = self.local_db['ts_ip'] - info(f"Starting sync from source {source_id} ({source_ip}) to destination {dest_id} ({dest_ip})") + info(f"Starting comprehensive sync from source {source_id} ({source_ip}) to destination {dest_id} ({dest_ip})") - async with self.get_connection(source_pool_entry) as source_conn: - async with self.get_connection() as dest_conn: - tables = await source_conn.fetch(""" - SELECT tablename FROM pg_tables - WHERE schemaname = 'public' - """) - - for table in tables: - table_name = table['tablename'] - inserts = 0 - updates = 0 - error_count = 0 - last_synced_version = await self.get_last_synced_version(table_name, source_id) + try: + async with self.get_connection(source_pool_entry) as source_conn: + async with self.get_connection() as dest_conn: + # Compare tables + source_tables = await self.get_tables(source_conn) + dest_tables = await self.get_tables(dest_conn) - # Get primary key columns for the table - primary_keys = await self.get_primary_keys(dest_conn, table_name) - - changes = await source_conn.fetch(f""" - SELECT * FROM "{table_name}" - WHERE version > $1 AND server_id = $2 - ORDER BY version ASC - """, last_synced_version, source_id) - - for change in changes: - columns = list(change.keys()) - values = [change[col] for col in columns] - - debug(f"Syncing data for table: {table_name} from {source_id} to {dest_id}") - - if not primary_keys: - # If no primary key, just insert - insert_query = f""" - INSERT INTO "{table_name}" ({', '.join(columns)}) - VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) - """ - else: - # If primary key exists, use it for conflict resolution - insert_query = f""" - INSERT INTO "{table_name}" ({', '.join(columns)}) - VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) - ON CONFLICT ({', '.join(primary_keys)}) DO UPDATE SET - {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in primary_keys)} - """ - - try: - result = await dest_conn.execute(insert_query, *values) - if 'UPDATE' in result: - updates += 1 - else: - inserts += 1 - except Exception as e: - if error_count < 10: # Limit error logging - err(f"Error syncing data for table {table_name} from {source_id} to {dest_id}: {str(e)}") - error_count += 1 - elif error_count == 10: - err(f"Suppressing further errors for table {table_name}") - error_count += 1 - - if changes: - await self.update_sync_status(table_name, source_id, changes[-1]['version']) - - total_inserts += inserts - total_updates += updates - table_changes[table_name] = {'inserts': inserts, 'updates': updates} - - info(f"Synced {table_name} from {source_id} to {dest_id}: {inserts} inserts, {updates} updates") - if error_count > 10: - info(f"Total of {error_count} errors occurred for table {table_name}") + tables_only_in_source = set(source_tables) - set(dest_tables) + tables_only_in_dest = set(dest_tables) - set(source_tables) + common_tables = set(source_tables) & set(dest_tables) - info(f"Sync complete from {source_id} ({source_ip}) to {dest_id} ({dest_ip})") - info(f"Total changes: {total_inserts} inserts, {total_updates} updates") - info("Changes by table:") - for table, changes in table_changes.items(): - info(f" {table}: {changes['inserts']} inserts, {changes['updates']} updates") + info(f"Tables only in source: {tables_only_in_source}") + info(f"Tables only in destination: {tables_only_in_dest}") + info(f"Common tables: {common_tables}") + + for table in common_tables: + await self.compare_table_structure(source_conn, dest_conn, table) + inserts, updates = await self.compare_and_sync_data(source_conn, dest_conn, table, source_id) + + total_inserts += inserts + total_updates += updates + table_changes[table] = {'inserts': inserts, 'updates': updates} + + info(f"Comprehensive sync complete from {source_id} ({source_ip}) to {dest_id} ({dest_ip})") + info(f"Total changes: {total_inserts} inserts, {total_updates} updates") + info("Changes by table:") + for table, changes in table_changes.items(): + info(f" {table}: {changes['inserts']} inserts, {changes['updates']} updates") + + except Exception as e: + err(f"Error during sync process: {str(e)}") return total_inserts + total_updates + async def get_tables(self, conn): + tables = await conn.fetch(""" + SELECT tablename FROM pg_tables + WHERE schemaname = 'public' + """) + return [table['tablename'] for table in tables] + + async def compare_table_structure(self, source_conn, dest_conn, table_name): + source_columns = await self.get_table_structure(source_conn, table_name) + dest_columns = await self.get_table_structure(dest_conn, table_name) + + columns_only_in_source = set(source_columns.keys()) - set(dest_columns.keys()) + columns_only_in_dest = set(dest_columns.keys()) - set(source_columns.keys()) + common_columns = set(source_columns.keys()) & set(dest_columns.keys()) + + info(f"Table {table_name}:") + info(f" Columns only in source: {columns_only_in_source}") + info(f" Columns only in destination: {columns_only_in_dest}") + info(f" Common columns: {common_columns}") + + for col in common_columns: + if source_columns[col] != dest_columns[col]: + warn(f" Column {col} has different types: source={source_columns[col]}, dest={dest_columns[col]}") + + async def get_table_structure(self, conn, table_name): + columns = await conn.fetch(""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = $1 + """, table_name) + return {col['column_name']: col['data_type'] for col in columns} + + async def compare_and_sync_data(self, source_conn, dest_conn, table_name, source_id): + inserts = 0 + updates = 0 + error_count = 0 + + try: + primary_keys = await self.get_primary_keys(dest_conn, table_name) + if not primary_keys: + warn(f"Table {table_name} has no primary keys. Skipping data sync.") + return inserts, updates + + last_synced_version = await self.get_last_synced_version(table_name, source_id) + + changes = await source_conn.fetch(f""" + SELECT * FROM "{table_name}" + WHERE version > $1 AND server_id = $2 + ORDER BY version ASC + """, last_synced_version, source_id) + + for change in changes: + columns = list(change.keys()) + values = [change[col] for col in columns] + + insert_query = f""" + INSERT INTO "{table_name}" ({', '.join(columns)}) + VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) + ON CONFLICT ({', '.join(primary_keys)}) DO UPDATE SET + {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in primary_keys)} + """ + + try: + result = await dest_conn.execute(insert_query, *values) + if 'UPDATE' in result: + updates += 1 + else: + inserts += 1 + except Exception as e: + if error_count < 10: # Limit error logging + err(f"Error syncing data for table {table_name}: {str(e)}") + error_count += 1 + elif error_count == 10: + err(f"Suppressing further errors for table {table_name}") + error_count += 1 + + if changes: + await self.update_sync_status(table_name, source_id, changes[-1]['version']) + + info(f"Synced {table_name}: {inserts} inserts, {updates} updates") + if error_count > 10: + info(f"Total of {error_count} errors occurred for table {table_name}") + + except Exception as e: + err(f"Error processing table {table_name}: {str(e)}") + + return inserts, updates + + async def get_primary_keys(self, conn, table_name): primary_keys = await conn.fetch(""" SELECT a.attname @@ -526,6 +572,7 @@ class APIConfig(BaseModel): WHERE table_name = $1 AND server_id = $2 """, table_name, server_id) or 0 + async def update_sync_status(self, table_name, server_id, version): async with self.get_connection() as conn: await conn.execute(""" @@ -535,6 +582,7 @@ class APIConfig(BaseModel): SET last_synced_version = EXCLUDED.last_synced_version """, table_name, server_id, version) + async def sync_schema(self): source_entry = self.local_db source_schema = await self.get_schema(source_entry)