Auto-update: Mon Aug 12 22:55:12 PDT 2024

This commit is contained in:
sanj 2024-08-12 22:55:12 -07:00
parent 36b2578265
commit 426844f928

View file

@ -270,16 +270,26 @@ class Database:
for query in unexecuted_queries:
try:
params = json.loads(query.args)
await session.execute(text(query.query), params)
query.completed_by = list(set(query.completed_by + [self.local_ts_id]))
await session.commit()
# Convert string datetime to datetime objects
for key, value in params.items():
if isinstance(value, str) and value.endswith(('Z', '+00:00')):
try:
params[key] = datetime.fromisoformat(value.rstrip('Z'))
except ValueError:
# If conversion fails, leave the original value
pass
async with session.begin():
await session.execute(text(query.query), params)
query.completed_by = list(set(query.completed_by + [self.local_ts_id]))
await session.commit()
l.info(f"Successfully executed query ID {query.id}")
except Exception as e:
l.error(f"Failed to execute query ID {query.id}: {str(e)}")
await session.rollback()
l.info("Finished executing unexecuted queries")
async def call_db_sync_on_servers(self):
"""Call /db/sync on all online servers."""
online_servers = await self.get_online_servers()