Auto-update: Mon Aug 12 17:13:53 PDT 2024
This commit is contained in:
parent
0bc936dc47
commit
42863bd22d
1 changed files with 18 additions and 9 deletions
|
@ -265,8 +265,9 @@ class Database:
|
||||||
await session.commit()
|
await session.commit()
|
||||||
l.info(f"Successfully replicated write to {ts_id}")
|
l.info(f"Successfully replicated write to {ts_id}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
l.error(f"Failed to replicate write on {ts_id}")
|
l.error(f"Failed to replicate write on {ts_id}: {str(e)}")
|
||||||
l.debug(f"Failed to replicate write on {ts_id}: {str(e)}")
|
l.error(f"Traceback: {traceback.format_exc()}")
|
||||||
|
|
||||||
|
|
||||||
async def mark_query_completed(self, query_id: int, ts_id: str):
|
async def mark_query_completed(self, query_id: int, ts_id: str):
|
||||||
async with self.sessions[self.local_ts_id]() as session:
|
async with self.sessions[self.local_ts_id]() as session:
|
||||||
|
@ -324,6 +325,7 @@ class Database:
|
||||||
deleted_count = result.rowcount
|
deleted_count = result.rowcount
|
||||||
l.info(f"Purged {deleted_count} completed queries.")
|
l.info(f"Purged {deleted_count} completed queries.")
|
||||||
|
|
||||||
|
|
||||||
async def sync_query_tracking(self):
|
async def sync_query_tracking(self):
|
||||||
"""Combinatorial sync method for the query_tracking table."""
|
"""Combinatorial sync method for the query_tracking table."""
|
||||||
try:
|
try:
|
||||||
|
@ -342,18 +344,21 @@ class Database:
|
||||||
remote_new_queries = await remote_session.execute(
|
remote_new_queries = await remote_session.execute(
|
||||||
select(QueryTracking).where(QueryTracking.id > local_max_id)
|
select(QueryTracking).where(QueryTracking.id > local_max_id)
|
||||||
)
|
)
|
||||||
|
remote_new_queries = remote_new_queries.fetchall()
|
||||||
for query in remote_new_queries:
|
for query in remote_new_queries:
|
||||||
await self.add_or_update_query(query)
|
await self.add_or_update_query(query[0])
|
||||||
|
|
||||||
# Sync from local to remote
|
# Sync from local to remote
|
||||||
async with self.sessions[self.local_ts_id]() as local_session:
|
async with self.sessions[self.local_ts_id]() as local_session:
|
||||||
local_new_queries = await local_session.execute(
|
local_new_queries = await local_session.execute(
|
||||||
select(QueryTracking).where(QueryTracking.id > remote_max_id)
|
select(QueryTracking).where(QueryTracking.id > remote_max_id)
|
||||||
)
|
)
|
||||||
|
local_new_queries = local_new_queries.fetchall()
|
||||||
for query in local_new_queries:
|
for query in local_new_queries:
|
||||||
await self.add_or_update_query_remote(ts_id, query)
|
await self.add_or_update_query_remote(ts_id, query[0])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
l.error(f"Error syncing with {ts_id}: {str(e)}")
|
l.error(f"Error syncing with {ts_id}: {str(e)}")
|
||||||
|
l.error(f"Traceback: {traceback.format_exc()}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
l.error(f"Error in sync_query_tracking: {str(e)}")
|
l.error(f"Error in sync_query_tracking: {str(e)}")
|
||||||
l.error(f"Traceback: {traceback.format_exc()}")
|
l.error(f"Traceback: {traceback.format_exc()}")
|
||||||
|
@ -389,7 +394,11 @@ class Database:
|
||||||
result_checksum=query.result_checksum
|
result_checksum=query.result_checksum
|
||||||
)
|
)
|
||||||
session.add(new_query)
|
session.add(new_query)
|
||||||
|
try:
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
except Exception as e:
|
||||||
|
l.error(f"Failed to add or update query on {ts_id}: {str(e)}")
|
||||||
|
await session.rollback()
|
||||||
|
|
||||||
async def ensure_query_tracking_table(self):
|
async def ensure_query_tracking_table(self):
|
||||||
for ts_id, engine in self.engines.items():
|
for ts_id, engine in self.engines.items():
|
||||||
|
@ -404,12 +413,12 @@ class Database:
|
||||||
async def call_db_sync_on_servers(self):
|
async def call_db_sync_on_servers(self):
|
||||||
"""Call /db/sync on all online servers."""
|
"""Call /db/sync on all online servers."""
|
||||||
online_servers = await self.get_online_servers()
|
online_servers = await self.get_online_servers()
|
||||||
tasks = []
|
|
||||||
for server in self.config['POOL']:
|
for server in self.config['POOL']:
|
||||||
if server['ts_id'] in online_servers and server['ts_id'] != self.local_ts_id:
|
if server['ts_id'] in online_servers and server['ts_id'] != self.local_ts_id:
|
||||||
url = f"http://{server['ts_ip']}:{server['app_port']}/db/sync"
|
try:
|
||||||
tasks.append(self.call_db_sync(server))
|
await self.call_db_sync(server)
|
||||||
await asyncio.gather(*tasks)
|
except Exception as e:
|
||||||
|
l.error(f"Failed to call /db/sync on {server['ts_id']}: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
async def call_db_sync(self, server):
|
async def call_db_sync(self, server):
|
||||||
|
|
Loading…
Reference in a new issue