diff --git a/sijapi/classes.py b/sijapi/classes.py
index 76883cf..d8e7fcf 100644
--- a/sijapi/classes.py
+++ b/sijapi/classes.py
@@ -5,6 +5,7 @@ import yaml
 import math
 import os
 import re
+import traceback
 import aiofiles
 import aiohttp
 import asyncpg
@@ -339,53 +340,67 @@ class APIConfig(BaseModel):
 
 
     async def pull_changes(self, source_pool_entry: Dict[str, Any] = None):
-        if source_pool_entry is None:
-            source_pool_entry = await self.get_default_source()
-        
-        if source_pool_entry is None:
-            err("No available source for pulling changes")
-            return
-        
-        async with self.get_connection(source_pool_entry) as source_conn:
-            async with self.get_connection() as dest_conn:
-                tables = await source_conn.fetch(
-                    "SELECT tablename FROM pg_tables WHERE schemaname = 'public'"
-                )
-                for table in tables:
-                    table_name = table['tablename']
-                    info(f"Processing table: {table_name}")
-                    
-                    # Get primary key column(s)
-                    pk_columns = await source_conn.fetch("""
-                        SELECT a.attname
-                        FROM   pg_index i
-                        JOIN   pg_attribute a ON a.attrelid = i.indrelid
-                                            AND a.attnum = ANY(i.indkey)
-                        WHERE  i.indrelid = $1::regclass
-                        AND    i.indisprimary;
-                    """, table_name)
-                    
-                    pk_cols = [col['attname'] for col in pk_columns]
-                    if not pk_cols:
-                        warn(f"No primary key found for table {table_name}. Skipping.")
-                        continue
+        try:
+            if source_pool_entry is None:
+                source_pool_entry = await self.get_default_source()
+            
+            if source_pool_entry is None:
+                err("No available source for pulling changes")
+                return
+            
+            async with self.get_connection(source_pool_entry) as source_conn:
+                async with self.get_connection() as dest_conn:
+                    tables = await source_conn.fetch(
+                        "SELECT tablename FROM pg_tables WHERE schemaname = 'public'"
+                    )
+                    for table in tables:
+                        table_name = table['tablename']
+                        info(f"Processing table: {table_name}")
+                        
+                        # Get primary key column(s)
+                        pk_columns = await source_conn.fetch("""
+                            SELECT a.attname
+                            FROM   pg_index i
+                            JOIN   pg_attribute a ON a.attrelid = i.indrelid
+                                                AND a.attnum = ANY(i.indkey)
+                            WHERE  i.indrelid = $1::regclass
+                            AND    i.indisprimary;
+                        """, table_name)
+                        
+                        pk_cols = [col['attname'] for col in pk_columns]
+                        info(f"Primary key columns for {table_name}: {pk_cols}")
+                        if not pk_cols:
+                            warn(f"No primary key found for table {table_name}. Skipping.")
+                            continue
 
-                    # Fetch all rows from the source table
-                    rows = await source_conn.fetch(f"SELECT * FROM {table_name}")
-                    if rows:
-                        columns = rows[0].keys()
-                        # Upsert records to the destination table
-                        for row in rows:
-                            await dest_conn.execute(f"""
-                                INSERT INTO {table_name} ({', '.join(columns)})
-                                VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))})
-                                ON CONFLICT ({', '.join(pk_cols)}) DO UPDATE SET
-                                {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in pk_cols)}
-                            """, *[row[col] for col in columns])
-                    
-                    info(f"Completed processing table: {table_name}")
+                        # Fetch all rows from the source table
+                        rows = await source_conn.fetch(f"SELECT * FROM {table_name}")
+                        info(f"Fetched {len(rows)} rows from {table_name}")
+                        if rows:
+                            columns = list(rows[0].keys())
+                            info(f"Columns for {table_name}: {columns}")
+                            # Upsert records to the destination table
+                            for row in rows:
+                                try:
+                                    query = f"""
+                                        INSERT INTO {table_name} ({', '.join(columns)})
+                                        VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))})
+                                        ON CONFLICT ({', '.join(pk_cols)}) DO UPDATE SET
+                                        {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in pk_cols)}
+                                    """
+                                    info(f"Executing query: {query}")
+                                    info(f"With values: {[row[col] for col in columns]}")
+                                    await dest_conn.execute(query, *[row[col] for col in columns])
+                                except Exception as e:
+                                    err(f"Error processing row in {table_name}: {str(e)}")
+                                    err(f"Problematic row: {row}")
+                        
+                        info(f"Completed processing table: {table_name}")
 
-                info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}")
+                    info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}")
+        except Exception as e:
+            err(f"Unexpected error in pull_changes: {str(e)}")
+            err(f"Traceback: {traceback.format_exc()}")