Auto-update: Fri Aug 2 00:42:52 PDT 2024

This commit is contained in:
sanj 2024-08-02 00:42:52 -07:00
parent 49dd8746bf
commit 0ddc70164e
2 changed files with 40 additions and 21 deletions

View file

@ -61,12 +61,20 @@ async def lifespan(app: FastAPI):
crit(f"Error during startup: {str(e)}") crit(f"Error during startup: {str(e)}")
crit(f"Traceback: {traceback.format_exc()}") crit(f"Traceback: {traceback.format_exc()}")
yield # This is where the app runs try:
yield # This is where the app runs
# Shutdown
crit("Shutting down...") finally:
await API.close_db_pools() # Shutdown
crit("Database pools closed.") crit("Shutting down...")
try:
await asyncio.wait_for(API.close_db_pools(), timeout=20)
crit("Database pools closed.")
except asyncio.TimeoutError:
crit("Timeout while closing database pools.")
except Exception as e:
crit(f"Error during shutdown: {str(e)}")
crit(f"Traceback: {traceback.format_exc()}")
app = FastAPI(lifespan=lifespan) app = FastAPI(lifespan=lifespan)

View file

@ -551,21 +551,24 @@ class APIConfig(BaseModel):
primary_key = await self.ensure_sync_columns(dest_conn, table_name) primary_key = await self.ensure_sync_columns(dest_conn, table_name)
last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id)
changes = await source_conn.fetch(f""" while True:
SELECT * FROM "{table_name}" changes = await source_conn.fetch(f"""
WHERE version > $1 AND server_id = $2 SELECT * FROM "{table_name}"
ORDER BY version ASC WHERE version > $1 AND server_id = $2
LIMIT $3 ORDER BY version ASC
""", last_synced_version, source_id, batch_size) LIMIT $3
""", last_synced_version, source_id, batch_size)
if changes:
if not changes:
break # No more changes for this table
changes_count = await self.apply_batch_changes(dest_conn, table_name, changes, primary_key) changes_count = await self.apply_batch_changes(dest_conn, table_name, changes, primary_key)
total_changes += changes_count total_changes += changes_count
if changes_count > 0: if changes_count > 0:
info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}") info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}")
else:
debug(f"No changes to sync for {table_name}") last_synced_version = changes[-1]['version'] # Update last synced version
except Exception as e: except Exception as e:
err(f"Error syncing table {table_name}: {str(e)}") err(f"Error syncing table {table_name}: {str(e)}")
@ -1134,15 +1137,23 @@ class APIConfig(BaseModel):
async def close_db_pools(self): async def close_db_pools(self):
info("Closing database connection pools...") info("Closing database connection pools...")
close_tasks = []
for pool_key, pool in self.db_pools.items(): for pool_key, pool in self.db_pools.items():
try: close_tasks.append(self.close_pool_with_timeout(pool, pool_key))
await pool.close()
debug(f"Closed pool for {pool_key}") await asyncio.gather(*close_tasks)
except Exception as e:
err(f"Error closing pool for {pool_key}: {str(e)}")
self.db_pools.clear() self.db_pools.clear()
info("All database connection pools closed.") info("All database connection pools closed.")
async def close_pool_with_timeout(self, pool, pool_key, timeout=10):
try:
await asyncio.wait_for(pool.close(), timeout=timeout)
debug(f"Closed pool for {pool_key}")
except asyncio.TimeoutError:
err(f"Timeout closing pool for {pool_key}")
except Exception as e:
err(f"Error closing pool for {pool_key}: {str(e)}")
class Location(BaseModel): class Location(BaseModel):
latitude: float latitude: float