Auto-update: Mon Aug 12 22:48:33 PDT 2024
This commit is contained in:
parent
6896592356
commit
36b2578265
1 changed files with 24 additions and 6 deletions
|
@ -21,7 +21,7 @@ import sys
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from sqlalchemy import text, select, func, and_
|
from sqlalchemy import text, select, func, and_
|
||||||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
|
||||||
from sqlalchemy.orm import sessionmaker, declarative_base
|
from sqlalchemy.orm import sessionmaker, declarative_base, make_transient
|
||||||
from sqlalchemy.exc import OperationalError
|
from sqlalchemy.exc import OperationalError
|
||||||
from sqlalchemy import Column, Integer, String, DateTime, JSON, Text
|
from sqlalchemy import Column, Integer, String, DateTime, JSON, Text
|
||||||
import uuid
|
import uuid
|
||||||
|
@ -46,7 +46,6 @@ TS_ID = os.environ.get('TS_ID')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class QueryTracking(Base):
|
class QueryTracking(Base):
|
||||||
__tablename__ = 'query_tracking'
|
__tablename__ = 'query_tracking'
|
||||||
|
|
||||||
|
@ -61,6 +60,8 @@ class QueryTracking(Base):
|
||||||
|
|
||||||
|
|
||||||
class Database:
|
class Database:
|
||||||
|
SYNC_COOLDOWN = 30 # seconds
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def init(cls, config_name: str):
|
def init(cls, config_name: str):
|
||||||
return cls(config_name)
|
return cls(config_name)
|
||||||
|
@ -191,19 +192,22 @@ class Database:
|
||||||
|
|
||||||
async def sync_db(self):
|
async def sync_db(self):
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
if current_time - self.last_sync_time < 30:
|
if current_time - self.last_sync_time < self.SYNC_COOLDOWN:
|
||||||
l.info("Skipping sync, last sync was less than 30 seconds ago")
|
l.info(f"Skipping sync, last sync was less than {self.SYNC_COOLDOWN} seconds ago")
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
l.info("Starting database synchronization")
|
l.info("Starting database synchronization")
|
||||||
|
self.last_sync_time = current_time # Update the last sync time before starting
|
||||||
await self.pull_query_tracking_from_all_servers()
|
await self.pull_query_tracking_from_all_servers()
|
||||||
await self.execute_unexecuted_queries()
|
await self.execute_unexecuted_queries()
|
||||||
self.last_sync_time = current_time
|
|
||||||
l.info("Database synchronization completed successfully")
|
l.info("Database synchronization completed successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
l.error(f"Error during database sync: {str(e)}")
|
l.error(f"Error during database sync: {str(e)}")
|
||||||
l.error(f"Traceback: {traceback.format_exc()}")
|
l.error(f"Traceback: {traceback.format_exc()}")
|
||||||
|
finally:
|
||||||
|
# Ensure the cooldown is respected even if an error occurs
|
||||||
|
self.last_sync_time = max(self.last_sync_time, current_time)
|
||||||
|
|
||||||
|
|
||||||
async def pull_query_tracking_from_all_servers(self):
|
async def pull_query_tracking_from_all_servers(self):
|
||||||
|
@ -223,16 +227,30 @@ class Database:
|
||||||
|
|
||||||
async with self.sessions[self.local_ts_id]() as local_session:
|
async with self.sessions[self.local_ts_id]() as local_session:
|
||||||
for query in queries:
|
for query in queries:
|
||||||
|
# Detach the object from its original session
|
||||||
|
make_transient(query)
|
||||||
|
|
||||||
existing = await local_session.execute(
|
existing = await local_session.execute(
|
||||||
select(QueryTracking).where(QueryTracking.id == query.id)
|
select(QueryTracking).where(QueryTracking.id == query.id)
|
||||||
)
|
)
|
||||||
existing = existing.scalar_one_or_none()
|
existing = existing.scalar_one_or_none()
|
||||||
|
|
||||||
if existing:
|
if existing:
|
||||||
|
# Update existing query
|
||||||
existing.completed_by = list(set(existing.completed_by + query.completed_by))
|
existing.completed_by = list(set(existing.completed_by + query.completed_by))
|
||||||
l.debug(f"Updated existing query: {query.id}")
|
l.debug(f"Updated existing query: {query.id}")
|
||||||
else:
|
else:
|
||||||
local_session.add(query)
|
# Create a new instance for the local session
|
||||||
|
new_query = QueryTracking(
|
||||||
|
id=query.id,
|
||||||
|
origin_ts_id=query.origin_ts_id,
|
||||||
|
query=query.query,
|
||||||
|
args=query.args,
|
||||||
|
executed_at=query.executed_at,
|
||||||
|
completed_by=query.completed_by,
|
||||||
|
result_checksum=query.result_checksum
|
||||||
|
)
|
||||||
|
local_session.add(new_query)
|
||||||
l.debug(f"Added new query: {query.id}")
|
l.debug(f"Added new query: {query.id}")
|
||||||
await local_session.commit()
|
await local_session.commit()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
Loading…
Reference in a new issue