diff --git a/sijapi/classes.py b/sijapi/classes.py index a6aaa40..c2c6289 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -338,53 +338,53 @@ class APIConfig(BaseModel): return None -async def pull_changes(self, source_pool_entry: Dict[str, Any] = None): - if source_pool_entry is None: - source_pool_entry = await self.get_default_source() - - if source_pool_entry is None: - err("No available source for pulling changes") - return - - async with self.get_connection(source_pool_entry) as source_conn: - async with self.get_connection() as dest_conn: - tables = await source_conn.fetch( - "SELECT tablename FROM pg_tables WHERE schemaname = 'public'" - ) - for table in tables: - table_name = table['tablename'] - info(f"Processing table: {table_name}") - - # Get primary key column(s) - pk_columns = await source_conn.fetch(""" - SELECT a.attname - FROM pg_index i - JOIN pg_attribute a ON a.attrelid = i.indrelid - AND a.attnum = ANY(i.indkey) - WHERE i.indrelid = $1::regclass - AND i.indisprimary; - """, table_name) - - pk_cols = [col['attname'] for col in pk_columns] - if not pk_cols: - warn(f"No primary key found for table {table_name}. Skipping.") - continue + async def pull_changes(self, source_pool_entry: Dict[str, Any] = None): + if source_pool_entry is None: + source_pool_entry = await self.get_default_source() + + if source_pool_entry is None: + err("No available source for pulling changes") + return + + async with self.get_connection(source_pool_entry) as source_conn: + async with self.get_connection() as dest_conn: + tables = await source_conn.fetch( + "SELECT tablename FROM pg_tables WHERE schemaname = 'public'" + ) + for table in tables: + table_name = table['tablename'] + info(f"Processing table: {table_name}") + + # Get primary key column(s) + pk_columns = await source_conn.fetch(""" + SELECT a.attname + FROM pg_index i + JOIN pg_attribute a ON a.attrelid = i.indrelid + AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = $1::regclass + AND i.indisprimary; + """, table_name) + + pk_cols = [col['attname'] for col in pk_columns] + if not pk_cols: + warn(f"No primary key found for table {table_name}. Skipping.") + continue - # Fetch all rows from the source table - rows = await source_conn.fetch(f"SELECT * FROM {table_name}") - if rows: - columns = rows[0].keys() - # Upsert records to the destination table - await dest_conn.executemany(f""" - INSERT INTO {table_name} ({', '.join(columns)}) - VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) - ON CONFLICT ({', '.join(pk_cols)}) DO UPDATE SET - {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in pk_cols)} - """, [tuple(row[col] for col in columns) for row in rows]) - - info(f"Completed processing table: {table_name}") + # Fetch all rows from the source table + rows = await source_conn.fetch(f"SELECT * FROM {table_name}") + if rows: + columns = rows[0].keys() + # Upsert records to the destination table + await dest_conn.executemany(f""" + INSERT INTO {table_name} ({', '.join(columns)}) + VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))}) + ON CONFLICT ({', '.join(pk_cols)}) DO UPDATE SET + {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col not in pk_cols)} + """, [tuple(row[col] for col in columns) for row in rows]) + + info(f"Completed processing table: {table_name}") - info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}") + info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}")