diff --git a/sijapi/classes.py b/sijapi/classes.py index f8d364a..525789a 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -300,38 +300,37 @@ class APIConfig(BaseModel): if pool_entry is None: pool_entry = self.local_db - pool_key = f"{pool_entry['ts_ip']}:{pool_entry['db_port']}" - - if pool_key not in self._db_pools: - try: - self._db_pools[pool_key] = await asyncpg.create_pool( - host=pool_entry['ts_ip'], - port=pool_entry['db_port'], - user=pool_entry['db_user'], - password=pool_entry['db_pass'], - database=pool_entry['db_name'], - min_size=1, - max_size=10, # adjust as needed - timeout=5 # connection timeout in seconds - ) - except Exception as e: - err(f"Failed to create connection pool for {pool_key}: {str(e)}") - raise - + info(f"Attempting to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']}") try: - async with self._db_pools[pool_key].acquire() as conn: + conn = await asyncpg.connect( + host=pool_entry['ts_ip'], + port=pool_entry['db_port'], + user=pool_entry['db_user'], + password=pool_entry['db_pass'], + database=pool_entry['db_name'], + timeout=5 # Add a timeout to prevent hanging + ) + info(f"Successfully connected to {pool_entry['ts_ip']}:{pool_entry['db_port']}") + try: yield conn + finally: + await conn.close() + info(f"Closed connection to {pool_entry['ts_ip']}:{pool_entry['db_port']}") except asyncpg.exceptions.ConnectionDoesNotExistError: - err(f"Failed to acquire connection from pool for {pool_key}: Connection does not exist") + err(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']} - Connection does not exist") raise except asyncpg.exceptions.ConnectionFailureError: - err(f"Failed to acquire connection from pool for {pool_key}: Connection failure") + err(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']} - Connection failure") + raise + except asyncpg.exceptions.PostgresError as e: + err(f"PostgreSQL error when connecting to {pool_entry['ts_ip']}:{pool_entry['db_port']}: {str(e)}") raise except Exception as e: - err(f"Unexpected error when acquiring connection from pool for {pool_key}: {str(e)}") + err(f"Unexpected error when connecting to {pool_entry['ts_ip']}:{pool_entry['db_port']}: {str(e)}") raise + async def close_db_pools(self): info("Closing database connection pools...") for pool_key, pool in self._db_pools.items(): @@ -606,17 +605,14 @@ class APIConfig(BaseModel): try: last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) - while True: - changes = await source_conn.fetch(f""" - SELECT * FROM "{table_name}" - WHERE version > $1 AND server_id = $2 - ORDER BY version ASC - LIMIT $3 - """, last_synced_version, source_id, batch_size) - - if not changes: - break - + changes = await source_conn.fetch(f""" + SELECT * FROM "{table_name}" + WHERE version > $1 AND server_id = $2 + ORDER BY version ASC + LIMIT $3 + """, last_synced_version, source_id, batch_size) + + if changes: changes_count = await self.apply_batch_changes(dest_conn, table_name, changes) total_changes += changes_count @@ -625,6 +621,8 @@ class APIConfig(BaseModel): await self.update_sync_status(dest_conn, table_name, source_id, last_synced_version) info(f"Synced batch for {table_name}: {changes_count} changes. Total so far: {total_changes}") + else: + info(f"No changes to sync for {table_name}") except Exception as e: err(f"Error syncing table {table_name}: {str(e)}") @@ -640,13 +638,14 @@ class APIConfig(BaseModel): return total_changes + async def apply_batch_changes(self, conn, table_name, changes): if not changes: return 0 try: - # Prepare the insert statement - columns = changes[0].keys() + # Convert the keys to a list + columns = list(changes[0].keys()) placeholders = [f'${i+1}' for i in range(len(columns))] insert_query = f""" INSERT INTO "{table_name}" ({', '.join(columns)}) @@ -655,13 +654,12 @@ class APIConfig(BaseModel): {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != 'id')} """ - # Execute the insert for all changes in a single transaction - async with conn.transaction(): - affected_rows = 0 - for change in changes: - values = [change[col] for col in columns] - result = await conn.execute(insert_query, *values) - affected_rows += int(result.split()[-1]) + # Execute the insert for each change + affected_rows = 0 + for change in changes: + values = [change[col] for col in columns] + result = await conn.execute(insert_query, *values) + affected_rows += int(result.split()[-1]) return affected_rows