Auto-update: Tue Jul 30 15:40:56 PDT 2024

This commit is contained in:
sanj 2024-07-30 15:40:56 -07:00
parent 612f2b31c0
commit 4d94f62c1b

View file

@ -11,6 +11,7 @@ import asyncio
import asyncpg import asyncpg
import socket import socket
import traceback import traceback
from tqdm.asyncio import tqdm
import reverse_geocoder as rg import reverse_geocoder as rg
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, TypeVar from typing import Any, Dict, List, Optional, Tuple, Union, TypeVar
@ -600,7 +601,7 @@ class APIConfig(BaseModel):
WHERE schemaname = 'public' WHERE schemaname = 'public'
""") """)
for table in tables: async for table in tqdm(tables, desc="Syncing tables", unit="table"):
table_name = table['tablename'] table_name = table['tablename']
try: try:
if table_name == 'spatial_ref_sys': if table_name == 'spatial_ref_sys':
@ -608,7 +609,6 @@ class APIConfig(BaseModel):
total_changes += changes_count total_changes += changes_count
info(f"Synced spatial_ref_sys: {changes_count} changes. Total so far: {total_changes}") info(f"Synced spatial_ref_sys: {changes_count} changes. Total so far: {total_changes}")
else: else:
# Existing code for other tables
last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id)
changes = await source_conn.fetch(f""" changes = await source_conn.fetch(f"""
@ -641,10 +641,17 @@ class APIConfig(BaseModel):
err(f"Error during sync process: {str(e)}") err(f"Error during sync process: {str(e)}")
err(f"Traceback: {traceback.format_exc()}") err(f"Traceback: {traceback.format_exc()}")
info(f"Sync summary:")
info(f" Total changes: {total_changes}")
info(f" Tables synced: {len(tables)}")
info(f" Source: {source_id} ({source_ip})")
info(f" Destination: {dest_id} ({dest_ip})")
return total_changes return total_changes
async def apply_batch_changes(self, conn, table_name, changes): async def apply_batch_changes(self, conn, table_name, changes):
if not changes: if not changes:
return 0 return 0
@ -723,10 +730,19 @@ class APIConfig(BaseModel):
# Update existing entry # Update existing entry
update_query = f""" update_query = f"""
UPDATE spatial_ref_sys UPDATE spatial_ref_sys
SET {', '.join(f"{col} = ${i+1}" for i, col in enumerate(source_entry.keys()) if col != 'srid')} SET auth_name = $1::text,
WHERE srid = ${len(source_entry)} auth_srid = $2::integer,
srtext = $3::text,
proj4text = $4::text
WHERE srid = $5::integer
""" """
await dest_conn.execute(update_query, *[v for k, v in source_entry.items() if k != 'srid'], srid) await dest_conn.execute(update_query,
source_entry['auth_name'],
source_entry['auth_srid'],
source_entry['srtext'],
source_entry['proj4text'],
srid
)
updates += 1 updates += 1
info(f"spatial_ref_sys sync complete. Inserts: {inserts}, Updates: {updates}") info(f"spatial_ref_sys sync complete. Inserts: {inserts}, Updates: {updates}")
@ -738,6 +754,7 @@ class APIConfig(BaseModel):
return 0 return 0
async def push_changes_to_all(self): async def push_changes_to_all(self):
for pool_entry in self.POOL: for pool_entry in self.POOL:
if pool_entry['ts_id'] != os.environ.get('TS_ID'): if pool_entry['ts_id'] != os.environ.get('TS_ID'):