Auto-update: Fri Aug 2 00:42:52 PDT 2024
This commit is contained in:
parent
913d133120
commit
dd0fe107e0
2 changed files with 40 additions and 21 deletions
sijapi
|
@ -61,12 +61,20 @@ async def lifespan(app: FastAPI):
|
||||||
crit(f"Error during startup: {str(e)}")
|
crit(f"Error during startup: {str(e)}")
|
||||||
crit(f"Traceback: {traceback.format_exc()}")
|
crit(f"Traceback: {traceback.format_exc()}")
|
||||||
|
|
||||||
yield # This is where the app runs
|
try:
|
||||||
|
yield # This is where the app runs
|
||||||
# Shutdown
|
|
||||||
crit("Shutting down...")
|
finally:
|
||||||
await API.close_db_pools()
|
# Shutdown
|
||||||
crit("Database pools closed.")
|
crit("Shutting down...")
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(API.close_db_pools(), timeout=20)
|
||||||
|
crit("Database pools closed.")
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
crit("Timeout while closing database pools.")
|
||||||
|
except Exception as e:
|
||||||
|
crit(f"Error during shutdown: {str(e)}")
|
||||||
|
crit(f"Traceback: {traceback.format_exc()}")
|
||||||
|
|
||||||
app = FastAPI(lifespan=lifespan)
|
app = FastAPI(lifespan=lifespan)
|
||||||
|
|
||||||
|
|
|
@ -551,21 +551,24 @@ class APIConfig(BaseModel):
|
||||||
primary_key = await self.ensure_sync_columns(dest_conn, table_name)
|
primary_key = await self.ensure_sync_columns(dest_conn, table_name)
|
||||||
last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id)
|
last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id)
|
||||||
|
|
||||||
changes = await source_conn.fetch(f"""
|
while True:
|
||||||
SELECT * FROM "{table_name}"
|
changes = await source_conn.fetch(f"""
|
||||||
WHERE version > $1 AND server_id = $2
|
SELECT * FROM "{table_name}"
|
||||||
ORDER BY version ASC
|
WHERE version > $1 AND server_id = $2
|
||||||
LIMIT $3
|
ORDER BY version ASC
|
||||||
""", last_synced_version, source_id, batch_size)
|
LIMIT $3
|
||||||
|
""", last_synced_version, source_id, batch_size)
|
||||||
if changes:
|
|
||||||
|
if not changes:
|
||||||
|
break # No more changes for this table
|
||||||
|
|
||||||
changes_count = await self.apply_batch_changes(dest_conn, table_name, changes, primary_key)
|
changes_count = await self.apply_batch_changes(dest_conn, table_name, changes, primary_key)
|
||||||
total_changes += changes_count
|
total_changes += changes_count
|
||||||
|
|
||||||
if changes_count > 0:
|
if changes_count > 0:
|
||||||
info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}")
|
info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}")
|
||||||
else:
|
|
||||||
debug(f"No changes to sync for {table_name}")
|
last_synced_version = changes[-1]['version'] # Update last synced version
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
err(f"Error syncing table {table_name}: {str(e)}")
|
err(f"Error syncing table {table_name}: {str(e)}")
|
||||||
|
@ -1134,15 +1137,23 @@ class APIConfig(BaseModel):
|
||||||
|
|
||||||
async def close_db_pools(self):
|
async def close_db_pools(self):
|
||||||
info("Closing database connection pools...")
|
info("Closing database connection pools...")
|
||||||
|
close_tasks = []
|
||||||
for pool_key, pool in self.db_pools.items():
|
for pool_key, pool in self.db_pools.items():
|
||||||
try:
|
close_tasks.append(self.close_pool_with_timeout(pool, pool_key))
|
||||||
await pool.close()
|
|
||||||
debug(f"Closed pool for {pool_key}")
|
await asyncio.gather(*close_tasks)
|
||||||
except Exception as e:
|
|
||||||
err(f"Error closing pool for {pool_key}: {str(e)}")
|
|
||||||
self.db_pools.clear()
|
self.db_pools.clear()
|
||||||
info("All database connection pools closed.")
|
info("All database connection pools closed.")
|
||||||
|
|
||||||
|
async def close_pool_with_timeout(self, pool, pool_key, timeout=10):
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(pool.close(), timeout=timeout)
|
||||||
|
debug(f"Closed pool for {pool_key}")
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
err(f"Timeout closing pool for {pool_key}")
|
||||||
|
except Exception as e:
|
||||||
|
err(f"Error closing pool for {pool_key}: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
class Location(BaseModel):
|
class Location(BaseModel):
|
||||||
latitude: float
|
latitude: float
|
||||||
|
|
Loading…
Add table
Reference in a new issue