diff --git a/sijapi/classes.py b/sijapi/classes.py index 458e26c..171e51c 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -11,6 +11,7 @@ import asyncio import asyncpg import socket import traceback +from tqdm.asyncio import tqdm import reverse_geocoder as rg from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union, TypeVar @@ -600,7 +601,7 @@ class APIConfig(BaseModel): WHERE schemaname = 'public' """) - for table in tables: + async for table in tqdm(tables, desc="Syncing tables", unit="table"): table_name = table['tablename'] try: if table_name == 'spatial_ref_sys': @@ -608,7 +609,6 @@ class APIConfig(BaseModel): total_changes += changes_count info(f"Synced spatial_ref_sys: {changes_count} changes. Total so far: {total_changes}") else: - # Existing code for other tables last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) changes = await source_conn.fetch(f""" @@ -641,10 +641,17 @@ class APIConfig(BaseModel): err(f"Error during sync process: {str(e)}") err(f"Traceback: {traceback.format_exc()}") + info(f"Sync summary:") + info(f" Total changes: {total_changes}") + info(f" Tables synced: {len(tables)}") + info(f" Source: {source_id} ({source_ip})") + info(f" Destination: {dest_id} ({dest_ip})") + return total_changes + async def apply_batch_changes(self, conn, table_name, changes): if not changes: return 0 @@ -723,10 +730,19 @@ class APIConfig(BaseModel): # Update existing entry update_query = f""" UPDATE spatial_ref_sys - SET {', '.join(f"{col} = ${i+1}" for i, col in enumerate(source_entry.keys()) if col != 'srid')} - WHERE srid = ${len(source_entry)} + SET auth_name = $1::text, + auth_srid = $2::integer, + srtext = $3::text, + proj4text = $4::text + WHERE srid = $5::integer """ - await dest_conn.execute(update_query, *[v for k, v in source_entry.items() if k != 'srid'], srid) + await dest_conn.execute(update_query, + source_entry['auth_name'], + source_entry['auth_srid'], + source_entry['srtext'], + source_entry['proj4text'], + srid + ) updates += 1 info(f"spatial_ref_sys sync complete. Inserts: {inserts}, Updates: {updates}") @@ -738,6 +754,7 @@ class APIConfig(BaseModel): return 0 + async def push_changes_to_all(self): for pool_entry in self.POOL: if pool_entry['ts_id'] != os.environ.get('TS_ID'):