From 42863bd22d4e2c5d277746da5ff997bb19a1af5b Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Mon, 12 Aug 2024 17:13:53 -0700 Subject: [PATCH] Auto-update: Mon Aug 12 17:13:53 PDT 2024 --- sijapi/database.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/sijapi/database.py b/sijapi/database.py index 227af6a..28ea5a0 100644 --- a/sijapi/database.py +++ b/sijapi/database.py @@ -265,8 +265,9 @@ class Database: await session.commit() l.info(f"Successfully replicated write to {ts_id}") except Exception as e: - l.error(f"Failed to replicate write on {ts_id}") - l.debug(f"Failed to replicate write on {ts_id}: {str(e)}") + l.error(f"Failed to replicate write on {ts_id}: {str(e)}") + l.error(f"Traceback: {traceback.format_exc()}") + async def mark_query_completed(self, query_id: int, ts_id: str): async with self.sessions[self.local_ts_id]() as session: @@ -324,6 +325,7 @@ class Database: deleted_count = result.rowcount l.info(f"Purged {deleted_count} completed queries.") + async def sync_query_tracking(self): """Combinatorial sync method for the query_tracking table.""" try: @@ -342,18 +344,21 @@ class Database: remote_new_queries = await remote_session.execute( select(QueryTracking).where(QueryTracking.id > local_max_id) ) + remote_new_queries = remote_new_queries.fetchall() for query in remote_new_queries: - await self.add_or_update_query(query) + await self.add_or_update_query(query[0]) # Sync from local to remote async with self.sessions[self.local_ts_id]() as local_session: local_new_queries = await local_session.execute( select(QueryTracking).where(QueryTracking.id > remote_max_id) ) + local_new_queries = local_new_queries.fetchall() for query in local_new_queries: - await self.add_or_update_query_remote(ts_id, query) + await self.add_or_update_query_remote(ts_id, query[0]) except Exception as e: l.error(f"Error syncing with {ts_id}: {str(e)}") + l.error(f"Traceback: {traceback.format_exc()}") except Exception as e: l.error(f"Error in sync_query_tracking: {str(e)}") l.error(f"Traceback: {traceback.format_exc()}") @@ -389,7 +394,11 @@ class Database: result_checksum=query.result_checksum ) session.add(new_query) - await session.commit() + try: + await session.commit() + except Exception as e: + l.error(f"Failed to add or update query on {ts_id}: {str(e)}") + await session.rollback() async def ensure_query_tracking_table(self): for ts_id, engine in self.engines.items(): @@ -404,12 +413,12 @@ class Database: async def call_db_sync_on_servers(self): """Call /db/sync on all online servers.""" online_servers = await self.get_online_servers() - tasks = [] for server in self.config['POOL']: if server['ts_id'] in online_servers and server['ts_id'] != self.local_ts_id: - url = f"http://{server['ts_ip']}:{server['app_port']}/db/sync" - tasks.append(self.call_db_sync(server)) - await asyncio.gather(*tasks) + try: + await self.call_db_sync(server) + except Exception as e: + l.error(f"Failed to call /db/sync on {server['ts_id']}: {str(e)}") async def call_db_sync(self, server):