diff --git a/sijapi/__main__.py b/sijapi/__main__.py index e609014..8040595 100755 --- a/sijapi/__main__.py +++ b/sijapi/__main__.py @@ -61,12 +61,20 @@ async def lifespan(app: FastAPI): crit(f"Error during startup: {str(e)}") crit(f"Traceback: {traceback.format_exc()}") - yield # This is where the app runs - - # Shutdown - crit("Shutting down...") - await API.close_db_pools() - crit("Database pools closed.") + try: + yield # This is where the app runs + + finally: + # Shutdown + crit("Shutting down...") + try: + await asyncio.wait_for(API.close_db_pools(), timeout=20) + crit("Database pools closed.") + except asyncio.TimeoutError: + crit("Timeout while closing database pools.") + except Exception as e: + crit(f"Error during shutdown: {str(e)}") + crit(f"Traceback: {traceback.format_exc()}") app = FastAPI(lifespan=lifespan) diff --git a/sijapi/classes.py b/sijapi/classes.py index 6098ce6..313728c 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -551,21 +551,24 @@ class APIConfig(BaseModel): primary_key = await self.ensure_sync_columns(dest_conn, table_name) last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) - changes = await source_conn.fetch(f""" - SELECT * FROM "{table_name}" - WHERE version > $1 AND server_id = $2 - ORDER BY version ASC - LIMIT $3 - """, last_synced_version, source_id, batch_size) - - if changes: + while True: + changes = await source_conn.fetch(f""" + SELECT * FROM "{table_name}" + WHERE version > $1 AND server_id = $2 + ORDER BY version ASC + LIMIT $3 + """, last_synced_version, source_id, batch_size) + + if not changes: + break # No more changes for this table + changes_count = await self.apply_batch_changes(dest_conn, table_name, changes, primary_key) total_changes += changes_count if changes_count > 0: info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}") - else: - debug(f"No changes to sync for {table_name}") + + last_synced_version = changes[-1]['version'] # Update last synced version except Exception as e: err(f"Error syncing table {table_name}: {str(e)}") @@ -1134,15 +1137,23 @@ class APIConfig(BaseModel): async def close_db_pools(self): info("Closing database connection pools...") + close_tasks = [] for pool_key, pool in self.db_pools.items(): - try: - await pool.close() - debug(f"Closed pool for {pool_key}") - except Exception as e: - err(f"Error closing pool for {pool_key}: {str(e)}") + close_tasks.append(self.close_pool_with_timeout(pool, pool_key)) + + await asyncio.gather(*close_tasks) self.db_pools.clear() info("All database connection pools closed.") + async def close_pool_with_timeout(self, pool, pool_key, timeout=10): + try: + await asyncio.wait_for(pool.close(), timeout=timeout) + debug(f"Closed pool for {pool_key}") + except asyncio.TimeoutError: + err(f"Timeout closing pool for {pool_key}") + except Exception as e: + err(f"Error closing pool for {pool_key}: {str(e)}") + class Location(BaseModel): latitude: float