From ae3e0571c7258fcc83d5153fc6dbae3f822c0331 Mon Sep 17 00:00:00 2001
From: sanj <67624670+iodrift@users.noreply.github.com>
Date: Wed, 24 Jul 2024 23:55:42 -0700
Subject: [PATCH] Auto-update: Wed Jul 24 23:55:42 PDT 2024

---
 sijapi/classes.py | 90 +++++++++++++++++++++++------------------------
 1 file changed, 45 insertions(+), 45 deletions(-)

diff --git a/sijapi/classes.py b/sijapi/classes.py
index a6aaa40..c2c6289 100644
--- a/sijapi/classes.py
+++ b/sijapi/classes.py
@@ -338,53 +338,53 @@ class APIConfig(BaseModel):
         return None
 
 
-async def pull_changes(self, source_pool_entry: Dict[str, Any] = None):
-    if source_pool_entry is None:
-        source_pool_entry = await self.get_default_source()
-    
-    if source_pool_entry is None:
-        err("No available source for pulling changes")
-        return
-    
-    async with self.get_connection(source_pool_entry) as source_conn:
-        async with self.get_connection() as dest_conn:
-            tables = await source_conn.fetch(
-                "SELECT tablename FROM pg_tables WHERE schemaname = 'public'"
-            )
-            for table in tables:
-                table_name = table['tablename']
-                info(f"Processing table: {table_name}")
-                
-                # Get primary key column(s)
-                pk_columns = await source_conn.fetch("""
-                    SELECT a.attname
-                    FROM   pg_index i
-                    JOIN   pg_attribute a ON a.attrelid = i.indrelid
-                                        AND a.attnum = ANY(i.indkey)
-                    WHERE  i.indrelid = $1::regclass
-                    AND    i.indisprimary;
-                """, table_name)
-                
-                pk_cols = [col['attname'] for col in pk_columns]
-                if not pk_cols:
-                    warn(f"No primary key found for table {table_name}. Skipping.")
-                    continue
+    async def pull_changes(self, source_pool_entry: Dict[str, Any] = None):
+        if source_pool_entry is None:
+            source_pool_entry = await self.get_default_source()
+        
+        if source_pool_entry is None:
+            err("No available source for pulling changes")
+            return
+        
+        async with self.get_connection(source_pool_entry) as source_conn:
+            async with self.get_connection() as dest_conn:
+                tables = await source_conn.fetch(
+                    "SELECT tablename FROM pg_tables WHERE schemaname = 'public'"
+                )
+                for table in tables:
+                    table_name = table['tablename']
+                    info(f"Processing table: {table_name}")
+                    
+                    # Get primary key column(s)
+                    pk_columns = await source_conn.fetch("""
+                        SELECT a.attname
+                        FROM   pg_index i
+                        JOIN   pg_attribute a ON a.attrelid = i.indrelid
+                                            AND a.attnum = ANY(i.indkey)
+                        WHERE  i.indrelid = $1::regclass
+                        AND    i.indisprimary;
+                    """, table_name)
+                    
+                    pk_cols = [col['attname'] for col in pk_columns]
+                    if not pk_cols:
+                        warn(f"No primary key found for table {table_name}. Skipping.")
+                        continue
 
-                # Fetch all rows from the source table
-                rows = await source_conn.fetch(f"SELECT * FROM {table_name}")
-                if rows:
-                    columns = rows[0].keys()
-                    # Upsert records to the destination table
-                    await dest_conn.executemany(f"""
-                        INSERT INTO {table_name} ({', '.join(columns)})
-                        VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))})
-                        ON CONFLICT ({', '.join(pk_cols)}) DO UPDATE SET
-                        {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in pk_cols)}
-                    """, [tuple(row[col] for col in columns) for row in rows])
-                
-                info(f"Completed processing table: {table_name}")
+                    # Fetch all rows from the source table
+                    rows = await source_conn.fetch(f"SELECT * FROM {table_name}")
+                    if rows:
+                        columns = rows[0].keys()
+                        # Upsert records to the destination table
+                        await dest_conn.executemany(f"""
+                            INSERT INTO {table_name} ({', '.join(columns)})
+                            VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))})
+                            ON CONFLICT ({', '.join(pk_cols)}) DO UPDATE SET
+                            {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in pk_cols)}
+                        """, [tuple(row[col] for col in columns) for row in rows])
+                    
+                    info(f"Completed processing table: {table_name}")
 
-            info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}")
+                info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}")