From 36b2578265ab0c506bf26faa53ff6509a8c39105 Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Mon, 12 Aug 2024 22:48:33 -0700 Subject: [PATCH] Auto-update: Mon Aug 12 22:48:33 PDT 2024 --- sijapi/database.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/sijapi/database.py b/sijapi/database.py index f9f52d3..0b1b67d 100644 --- a/sijapi/database.py +++ b/sijapi/database.py @@ -21,7 +21,7 @@ import sys from loguru import logger from sqlalchemy import text, select, func, and_ from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession -from sqlalchemy.orm import sessionmaker, declarative_base +from sqlalchemy.orm import sessionmaker, declarative_base, make_transient from sqlalchemy.exc import OperationalError from sqlalchemy import Column, Integer, String, DateTime, JSON, Text import uuid @@ -46,7 +46,6 @@ TS_ID = os.environ.get('TS_ID') - class QueryTracking(Base): __tablename__ = 'query_tracking' @@ -61,6 +60,8 @@ class QueryTracking(Base): class Database: + SYNC_COOLDOWN = 30 # seconds + @classmethod def init(cls, config_name: str): return cls(config_name) @@ -191,19 +192,22 @@ class Database: async def sync_db(self): current_time = time.time() - if current_time - self.last_sync_time < 30: - l.info("Skipping sync, last sync was less than 30 seconds ago") + if current_time - self.last_sync_time < self.SYNC_COOLDOWN: + l.info(f"Skipping sync, last sync was less than {self.SYNC_COOLDOWN} seconds ago") return try: l.info("Starting database synchronization") + self.last_sync_time = current_time # Update the last sync time before starting await self.pull_query_tracking_from_all_servers() await self.execute_unexecuted_queries() - self.last_sync_time = current_time l.info("Database synchronization completed successfully") except Exception as e: l.error(f"Error during database sync: {str(e)}") l.error(f"Traceback: {traceback.format_exc()}") + finally: + # Ensure the cooldown is respected even if an error occurs + self.last_sync_time = max(self.last_sync_time, current_time) async def pull_query_tracking_from_all_servers(self): @@ -223,16 +227,30 @@ class Database: async with self.sessions[self.local_ts_id]() as local_session: for query in queries: + # Detach the object from its original session + make_transient(query) + existing = await local_session.execute( select(QueryTracking).where(QueryTracking.id == query.id) ) existing = existing.scalar_one_or_none() if existing: + # Update existing query existing.completed_by = list(set(existing.completed_by + query.completed_by)) l.debug(f"Updated existing query: {query.id}") else: - local_session.add(query) + # Create a new instance for the local session + new_query = QueryTracking( + id=query.id, + origin_ts_id=query.origin_ts_id, + query=query.query, + args=query.args, + executed_at=query.executed_at, + completed_by=query.completed_by, + result_checksum=query.result_checksum + ) + local_session.add(new_query) l.debug(f"Added new query: {query.id}") await local_session.commit() except Exception as e: