From b059d93f37d53af7e275557c0679f1133faa88e4 Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Sun, 28 Jul 2024 15:02:36 -0700 Subject: [PATCH] Auto-update: Sun Jul 28 15:02:36 PDT 2024 --- sijapi/__main__.py | 7 ++--- sijapi/classes.py | 67 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/sijapi/__main__.py b/sijapi/__main__.py index 43bf81b..caea1c1 100755 --- a/sijapi/__main__.py +++ b/sijapi/__main__.py @@ -69,12 +69,13 @@ async def lifespan(app: FastAPI): # Check if other instances have more recent data source = await API.get_most_recent_source() if source: - crit(f"Pulling changes from {source['ts_id']}...") - await API.pull_changes(source) - crit("Data pull complete.") + crit(f"Pulling changes from {source['ts_id']} ({source['ts_ip']})...") + total_changes = await API.pull_changes(source) + crit(f"Data pull complete. Total changes: {total_changes}") else: crit("No instances with more recent data found.") + except Exception as e: crit(f"Error during startup: {str(e)}") crit(f"Traceback: {traceback.format_exc()}") diff --git a/sijapi/classes.py b/sijapi/classes.py index 91ad0d6..9c8b62f 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -378,6 +378,17 @@ class APIConfig(BaseModel): return most_recent_source async def pull_changes(self, source_pool_entry): + total_inserts = 0 + total_updates = 0 + table_changes = {} + + source_id = source_pool_entry['ts_id'] + source_ip = source_pool_entry['ts_ip'] + dest_id = os.environ.get('TS_ID') + dest_ip = self.local_db['ts_ip'] + + info(f"Starting sync from source {source_id} ({source_ip}) to destination {dest_id} ({dest_ip})") + async with self.get_connection(source_pool_entry) as source_conn: async with self.get_connection() as dest_conn: tables = await source_conn.fetch(""" @@ -387,36 +398,62 @@ class APIConfig(BaseModel): for table in tables: table_name = table['tablename'] - last_synced_version = await self.get_last_synced_version(table_name, source_pool_entry['ts_id']) + inserts = 0 + updates = 0 + last_synced_version = await self.get_last_synced_version(table_name, source_id) changes = await source_conn.fetch(f""" SELECT * FROM "{table_name}" WHERE version > $1 AND server_id = $2 ORDER BY version ASC - """, last_synced_version, source_pool_entry['ts_id']) + """, last_synced_version, source_id) for change in changes: columns = list(change.keys()) values = [change[col] for col in columns] - # Log the target database and table name - debug(f"Attempting to insert data into table: {table_name} in database: {dest_conn._params['database']} (host: {dest_conn._params['host']})") + debug(f"Syncing data for table: {table_name} from {source_id} to {dest_id}") - insert_query = f""" - INSERT INTO "{table_name}" ({', '.join(columns)}) - VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) - ON CONFLICT (id) DO UPDATE SET - {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != 'id')} - """ + if table_name == 'sync_status': + insert_query = f""" + INSERT INTO "{table_name}" ({', '.join(columns)}) + VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) + ON CONFLICT (table_name, server_id) DO UPDATE SET + {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in ['table_name', 'server_id'])} + """ + else: + insert_query = f""" + INSERT INTO "{table_name}" ({', '.join(columns)}) + VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) + ON CONFLICT (id) DO UPDATE SET + {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != 'id')} + """ try: - await dest_conn.execute(insert_query, *values) - except asyncpg.exceptions.UndefinedColumnError as e: - err(f"UndefinedColumnError in table: {table_name} in database: {dest_conn._params['database']} (host: {dest_conn._params['host']})") - raise e + result = await dest_conn.execute(insert_query, *values) + if 'UPDATE' in result: + updates += 1 + else: + inserts += 1 + except Exception as e: + err(f"Error syncing data for table {table_name} from {source_id} to {dest_id}: {str(e)}") if changes: - await self.update_sync_status(table_name, source_pool_entry['ts_id'], changes[-1]['version']) + await self.update_sync_status(table_name, source_id, changes[-1]['version']) + + total_inserts += inserts + total_updates += updates + table_changes[table_name] = {'inserts': inserts, 'updates': updates} + + info(f"Synced {table_name} from {source_id} to {dest_id}: {inserts} inserts, {updates} updates") + + info(f"Sync complete from {source_id} ({source_ip}) to {dest_id} ({dest_ip})") + info(f"Total changes: {total_inserts} inserts, {total_updates} updates") + info("Changes by table:") + for table, changes in table_changes.items(): + info(f" {table}: {changes['inserts']} inserts, {changes['updates']} updates") + + return total_inserts + total_updates async def push_changes_to_all(self):