diff --git a/sijapi/classes.py b/sijapi/classes.py index 76883cf..d8e7fcf 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -5,6 +5,7 @@ import yaml import math import os import re +import traceback import aiofiles import aiohttp import asyncpg @@ -339,53 +340,67 @@ class APIConfig(BaseModel): async def pull_changes(self, source_pool_entry: Dict[str, Any] = None): - if source_pool_entry is None: - source_pool_entry = await self.get_default_source() - - if source_pool_entry is None: - err("No available source for pulling changes") - return - - async with self.get_connection(source_pool_entry) as source_conn: - async with self.get_connection() as dest_conn: - tables = await source_conn.fetch( - "SELECT tablename FROM pg_tables WHERE schemaname = 'public'" - ) - for table in tables: - table_name = table['tablename'] - info(f"Processing table: {table_name}") - - # Get primary key column(s) - pk_columns = await source_conn.fetch(""" - SELECT a.attname - FROM pg_index i - JOIN pg_attribute a ON a.attrelid = i.indrelid - AND a.attnum = ANY(i.indkey) - WHERE i.indrelid = $1::regclass - AND i.indisprimary; - """, table_name) - - pk_cols = [col['attname'] for col in pk_columns] - if not pk_cols: - warn(f"No primary key found for table {table_name}. Skipping.") - continue + try: + if source_pool_entry is None: + source_pool_entry = await self.get_default_source() + + if source_pool_entry is None: + err("No available source for pulling changes") + return + + async with self.get_connection(source_pool_entry) as source_conn: + async with self.get_connection() as dest_conn: + tables = await source_conn.fetch( + "SELECT tablename FROM pg_tables WHERE schemaname = 'public'" + ) + for table in tables: + table_name = table['tablename'] + info(f"Processing table: {table_name}") + + # Get primary key column(s) + pk_columns = await source_conn.fetch(""" + SELECT a.attname + FROM pg_index i + JOIN pg_attribute a ON a.attrelid = i.indrelid + AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = $1::regclass + AND i.indisprimary; + """, table_name) + + pk_cols = [col['attname'] for col in pk_columns] + info(f"Primary key columns for {table_name}: {pk_cols}") + if not pk_cols: + warn(f"No primary key found for table {table_name}. Skipping.") + continue - # Fetch all rows from the source table - rows = await source_conn.fetch(f"SELECT * FROM {table_name}") - if rows: - columns = rows[0].keys() - # Upsert records to the destination table - for row in rows: - await dest_conn.execute(f""" - INSERT INTO {table_name} ({', '.join(columns)}) - VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) - ON CONFLICT ({', '.join(pk_cols)}) DO UPDATE SET - {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in pk_cols)} - """, *[row[col] for col in columns]) - - info(f"Completed processing table: {table_name}") + # Fetch all rows from the source table + rows = await source_conn.fetch(f"SELECT * FROM {table_name}") + info(f"Fetched {len(rows)} rows from {table_name}") + if rows: + columns = list(rows[0].keys()) + info(f"Columns for {table_name}: {columns}") + # Upsert records to the destination table + for row in rows: + try: + query = f""" + INSERT INTO {table_name} ({', '.join(columns)}) + VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) + ON CONFLICT ({', '.join(pk_cols)}) DO UPDATE SET + {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in pk_cols)} + """ + info(f"Executing query: {query}") + info(f"With values: {[row[col] for col in columns]}") + await dest_conn.execute(query, *[row[col] for col in columns]) + except Exception as e: + err(f"Error processing row in {table_name}: {str(e)}") + err(f"Problematic row: {row}") + + info(f"Completed processing table: {table_name}") - info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}") + info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}") + except Exception as e: + err(f"Unexpected error in pull_changes: {str(e)}") + err(f"Traceback: {traceback.format_exc()}")