diff --git a/sijapi/classes.py b/sijapi/classes.py index 71f2f7d..7ed7df6 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -332,45 +332,56 @@ class APIConfig(BaseModel): if pool_entry is None: pool_entry = self.local_db - info(f"Attempting to connect to database: {pool_entry}") - try: - conn = await self.db_pool.get_connection(pool_entry) + for attempt in range(3): # Retry up to 3 times try: - yield conn - finally: - await self.db_pool.release_connection(pool_entry, conn) - except asyncpg.exceptions.ConnectionDoesNotExistError: - err(f"Connection to database {pool_entry['ts_ip']}:{pool_entry['db_port']} does not exist or has been closed") - raise - except asyncpg.exceptions.ConnectionFailureError as e: - err(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']}") - err(f"Connection error: {str(e)}") - raise - except Exception as e: - err(f"Unexpected error connecting to database: {pool_entry['ts_ip']}:{pool_entry['db_port']}") - err(f"Error: {str(e)}") - raise + conn = await asyncpg.connect( + host=pool_entry['ts_ip'], + port=pool_entry['db_port'], + user=pool_entry['db_user'], + password=pool_entry['db_pass'], + database=pool_entry['db_name'] + ) + try: + yield conn + finally: + await conn.close() + return + except asyncpg.exceptions.CannotConnectNowError: + if attempt < 2: # Don't sleep on the last attempt + await asyncio.sleep(1) # Wait before retrying + except Exception as e: + err(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']}") + err(f"Error: {str(e)}") + if attempt == 2: # Raise the exception on the last attempt + raise + + raise Exception(f"Failed to connect to database after 3 attempts: {pool_entry['ts_ip']}:{pool_entry['db_port']}") async def initialize_sync(self): for pool_entry in self.POOL: - try: - async with self.get_connection(pool_entry) as conn: - tables = await conn.fetch(""" - SELECT tablename FROM pg_tables - WHERE schemaname = 'public' - """) - - for table in tables: - table_name = table['tablename'] - await self.ensure_sync_columns(conn, table_name) - await self.create_sync_trigger(conn, table_name) + for attempt in range(3): # Retry up to 3 times + try: + async with self.get_connection(pool_entry) as conn: + tables = await conn.fetch(""" + SELECT tablename FROM pg_tables + WHERE schemaname = 'public' + """) + + for table in tables: + table_name = table['tablename'] + await self.ensure_sync_columns(conn, table_name) + await self.create_sync_trigger(conn, table_name) - info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.") - except asyncpg.exceptions.ConnectionFailureError: - err(f"Failed to connect to database during initialization: {pool_entry['ts_ip']}:{pool_entry['db_port']}") - except Exception as e: - err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}") - err(f"Traceback: {traceback.format_exc()}") + info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.") + break # If successful, break the retry loop + except asyncpg.exceptions.ConnectionFailureError: + err(f"Failed to connect to database during initialization: {pool_entry['ts_ip']}:{pool_entry['db_port']}") + if attempt < 2: # Don't sleep on the last attempt + await asyncio.sleep(1) # Wait before retrying + except Exception as e: + err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}") + err(f"Traceback: {traceback.format_exc()}") + break # Don't retry for unexpected errors async def ensure_sync_columns(self, conn, table_name): try: @@ -383,7 +394,7 @@ class APIConfig(BaseModel): ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; EXCEPTION WHEN duplicate_column THEN - RAISE NOTICE 'column version or server_id already exists in {table_name}.'; + NULL; -- Silently handle duplicate column END; END $$; """) @@ -392,6 +403,7 @@ class APIConfig(BaseModel): err(f"Error ensuring sync columns for table {table_name}: {str(e)}") err(f"Traceback: {traceback.format_exc()}") + async def create_sync_trigger(self, conn, table_name): await conn.execute(f""" CREATE OR REPLACE FUNCTION update_version_and_server_id() @@ -421,40 +433,44 @@ class APIConfig(BaseModel): if pool_entry['ts_id'] == os.environ.get('TS_ID'): continue - try: - async with self.get_connection(pool_entry) as conn: - # Check if the version column exists in any table - version_exists = await conn.fetchval(""" - SELECT EXISTS ( - SELECT 1 - FROM information_schema.columns - WHERE table_schema = 'public' - AND column_name = 'version' - ) - """) - - if not version_exists: - info(f"Version column does not exist in any table for {pool_entry['ts_id']}") - continue - - version = await conn.fetchval(""" - SELECT COALESCE(MAX(version), -1) - FROM ( - SELECT MAX(version) as version - FROM information_schema.columns - WHERE table_schema = 'public' - AND column_name = 'version' - AND is_updatable = 'YES' - ) as subquery - """) - info(f"Max version for {pool_entry['ts_id']}: {version}") - if version > max_version: - max_version = version - most_recent_source = pool_entry - except asyncpg.exceptions.ConnectionFailureError: - err(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']}") - except Exception as e: - err(f"Error checking version for {pool_entry['ts_id']}: {str(e)}") + for _ in range(3): # Retry up to 3 times + try: + async with self.get_connection(pool_entry) as conn: + # Check if the version column exists in any table + version_exists = await conn.fetchval(""" + SELECT EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_schema = 'public' + AND column_name = 'version' + ) + """) + + if not version_exists: + info(f"Version column does not exist in any table for {pool_entry['ts_id']}") + break # Move to the next pool entry + + version = await conn.fetchval(""" + SELECT COALESCE(MAX(version), -1) + FROM ( + SELECT MAX(version) as version + FROM information_schema.columns + WHERE table_schema = 'public' + AND column_name = 'version' + AND is_updatable = 'YES' + ) as subquery + """) + info(f"Max version for {pool_entry['ts_id']}: {version}") + if version > max_version: + max_version = version + most_recent_source = pool_entry + break # If successful, break the retry loop + except asyncpg.exceptions.PostgresError as e: + err(f"Error checking version for {pool_entry['ts_id']}: {str(e)}") + await asyncio.sleep(1) # Wait before retrying + except Exception as e: + err(f"Unexpected error for {pool_entry['ts_id']}: {str(e)}") + break # Don't retry for unexpected errors if most_recent_source: info(f"Most recent source: {most_recent_source['ts_id']} with version {max_version}")