Auto-update: Mon Aug 12 17:30:08 PDT 2024
This commit is contained in:
parent
42863bd22d
commit
e4db7a0f88
1 changed files with 40 additions and 27 deletions
|
@ -135,33 +135,27 @@ class Database:
|
||||||
|
|
||||||
async with self.sessions[self.local_ts_id]() as session:
|
async with self.sessions[self.local_ts_id]() as session:
|
||||||
try:
|
try:
|
||||||
# Serialize the kwargs
|
# a. Execute the write query locally
|
||||||
serialized_kwargs = {key: serialize(value) for key, value in kwargs.items()}
|
serialized_kwargs = {key: serialize(value) for key, value in kwargs.items()}
|
||||||
|
|
||||||
# Execute the write query
|
|
||||||
result = await session.execute(text(query), serialized_kwargs)
|
result = await session.execute(text(query), serialized_kwargs)
|
||||||
|
|
||||||
# Log the query
|
# b. Log the query in query_tracking table
|
||||||
new_query = QueryTracking(
|
new_query = QueryTracking(
|
||||||
ts_id=self.local_ts_id,
|
ts_id=self.local_ts_id,
|
||||||
query=query,
|
query=query,
|
||||||
args=json_dumps(kwargs) # Use json_dumps for logging
|
args=json_dumps(kwargs),
|
||||||
|
completed_by={self.local_ts_id: True}
|
||||||
)
|
)
|
||||||
session.add(new_query)
|
session.add(new_query)
|
||||||
await session.flush()
|
await session.flush()
|
||||||
query_id = new_query.id
|
query_id = new_query.id
|
||||||
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
l.info(f"Successfully executed write query: {query[:50]}...")
|
|
||||||
|
|
||||||
checksum = await self._local_compute_checksum(query, serialized_kwargs)
|
# Initiate async operations
|
||||||
|
asyncio.create_task(self._async_sync_operations(query_id, query, serialized_kwargs))
|
||||||
# Update query_tracking with checksum
|
|
||||||
await self.update_query_checksum(query_id, checksum)
|
|
||||||
|
|
||||||
# Perform sync operations asynchronously
|
|
||||||
asyncio.create_task(self._async_sync_operations(query_id, query, serialized_kwargs, checksum))
|
|
||||||
|
|
||||||
|
# c. Return the result
|
||||||
return result
|
return result
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -172,25 +166,44 @@ class Database:
|
||||||
l.error(f"Traceback: {traceback.format_exc()}")
|
l.error(f"Traceback: {traceback.format_exc()}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def _async_sync_operations(self, query_id: int, query: str, params: dict, checksum: str):
|
async def _async_sync_operations(self, query_id: int, query: str, params: dict):
|
||||||
try:
|
try:
|
||||||
await self.sync_query_tracking()
|
# a. Calculate and add checksum
|
||||||
except Exception as e:
|
checksum = await self._local_compute_checksum(query, params)
|
||||||
l.error(f"Failed to sync query_tracking: {str(e)}")
|
await self.update_query_checksum(query_id, checksum)
|
||||||
|
|
||||||
try:
|
# b. Synchronize query_tracking table
|
||||||
|
await self.sync_query_tracking()
|
||||||
|
|
||||||
|
# c. Call /db/sync on all servers
|
||||||
await self.call_db_sync_on_servers()
|
await self.call_db_sync_on_servers()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
l.error(f"Failed to call db_sync on other servers: {str(e)}")
|
l.error(f"Error in async sync operations: {str(e)}")
|
||||||
|
l.error(f"Traceback: {traceback.format_exc()}")
|
||||||
|
|
||||||
# Replicate write to other servers
|
async def call_db_sync_on_servers(self):
|
||||||
|
"""Call /db/sync on all online servers."""
|
||||||
online_servers = await self.get_online_servers()
|
online_servers = await self.get_online_servers()
|
||||||
for ts_id in online_servers:
|
for server in self.config['POOL']:
|
||||||
if ts_id != self.local_ts_id:
|
if server['ts_id'] in online_servers and server['ts_id'] != self.local_ts_id:
|
||||||
try:
|
asyncio.create_task(self.call_db_sync(server))
|
||||||
await self._replicate_write(ts_id, query_id, query, params, checksum)
|
|
||||||
except Exception as e:
|
async def call_db_sync(self, server):
|
||||||
l.error(f"Failed to replicate write to {ts_id}: {str(e)}")
|
url = f"http://{server['ts_ip']}:{server['app_port']}/db/sync"
|
||||||
|
headers = {
|
||||||
|
"Authorization": f"Bearer {server['api_key']}"
|
||||||
|
}
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
try:
|
||||||
|
async with session.post(url, headers=headers, timeout=30) as response:
|
||||||
|
if response.status == 200:
|
||||||
|
l.info(f"Successfully called /db/sync on {url}")
|
||||||
|
else:
|
||||||
|
l.warning(f"Failed to call /db/sync on {url}. Status: {response.status}")
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
l.debug(f"Timeout while calling /db/sync on {url}")
|
||||||
|
except Exception as e:
|
||||||
|
l.error(f"Error calling /db/sync on {url}: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
async def get_primary_server(self) -> str:
|
async def get_primary_server(self) -> str:
|
||||||
|
|
Loading…
Reference in a new issue