diff --git a/sijapi/__main__.py b/sijapi/__main__.py index ca102bb..b692955 100755 --- a/sijapi/__main__.py +++ b/sijapi/__main__.py @@ -39,6 +39,7 @@ def warn(text: str): logger.warning(text) def err(text: str): logger.error(text) def crit(text: str): logger.critical(text) + @asynccontextmanager async def lifespan(app: FastAPI): # Startup @@ -65,7 +66,7 @@ async def lifespan(app: FastAPI): total_changes = await API.pull_changes(source) crit(f"Data pull complete. Total changes: {total_changes}") else: - crit("No instances with more recent data found.") + crit("No instances with more recent data found or all instances are offline.") except Exception as e: crit(f"Error during startup: {str(e)}") @@ -79,7 +80,6 @@ async def lifespan(app: FastAPI): crit("Database pools closed.") - app = FastAPI(lifespan=lifespan) app.add_middleware( diff --git a/sijapi/classes.py b/sijapi/classes.py index ce560b6..ead927b 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -305,16 +305,26 @@ class APIConfig(BaseModel): port=pool_entry['db_port'], user=pool_entry['db_user'], password=pool_entry['db_pass'], - database=pool_entry['db_name'] + database=pool_entry['db_name'], + timeout=5 # Add a timeout to prevent hanging ) try: yield conn finally: await conn.close() - except Exception as e: - warn(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']}") - err(f"Error: {str(e)}") + except asyncpg.exceptions.ConnectionDoesNotExistError: + err(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']} - Connection does not exist") raise + except asyncpg.exceptions.ConnectionFailureError: + err(f"Failed to connect to database: {pool_entry['ts_ip']}:{pool_entry['db_port']} - Connection failure") + raise + except asyncpg.exceptions.PostgresError as e: + err(f"PostgreSQL error when connecting to {pool_entry['ts_ip']}:{pool_entry['db_port']}: {str(e)}") + raise + except Exception as e: + err(f"Unexpected error when connecting to {pool_entry['ts_ip']}:{pool_entry['db_port']}: {str(e)}") + raise + async def initialize_sync(self): @@ -328,49 +338,47 @@ class APIConfig(BaseModel): for table in tables: table_name = table['tablename'] - # Add version and server_id columns if they don't exist - await conn.execute(f""" - DO $$ - BEGIN - BEGIN - ALTER TABLE "{table_name}" - ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1, - ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; - EXCEPTION - WHEN duplicate_column THEN - -- Do nothing, column already exists - END; - END $$; - """) - - # Create or replace the trigger function - await conn.execute(f""" - CREATE OR REPLACE FUNCTION update_version_and_server_id() - RETURNS TRIGGER AS $$ - BEGIN - NEW.version = COALESCE(OLD.version, 0) + 1; - NEW.server_id = '{os.environ.get('TS_ID')}'; - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - """) - - # Create the trigger if it doesn't exist - await conn.execute(f""" - DO $$ - BEGIN - IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'update_version_and_server_id_trigger' AND tgrelid = '{table_name}'::regclass) THEN - CREATE TRIGGER update_version_and_server_id_trigger - BEFORE INSERT OR UPDATE ON "{table_name}" - FOR EACH ROW EXECUTE FUNCTION update_version_and_server_id(); - END IF; - END $$; - """) + await self.ensure_sync_columns(conn, table_name) + await self.ensure_sync_trigger(conn, table_name) info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.") except Exception as e: err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}") + async def ensure_sync_columns(self, conn, table_name): + await conn.execute(f""" + DO $$ + BEGIN + BEGIN + ALTER TABLE "{table_name}" + ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1, + ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; + EXCEPTION + WHEN duplicate_column THEN + -- Do nothing, column already exists + END; + END $$; + """) + + async def ensure_sync_trigger(self, conn, table_name): + await conn.execute(f""" + CREATE OR REPLACE FUNCTION update_version_and_server_id() + RETURNS TRIGGER AS $$ + BEGIN + NEW.version = COALESCE(OLD.version, 0) + 1; + NEW.server_id = '{os.environ.get('TS_ID')}'; + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + DROP TRIGGER IF EXISTS update_version_and_server_id_trigger ON "{table_name}"; + + CREATE TRIGGER update_version_and_server_id_trigger + BEFORE INSERT OR UPDATE ON "{table_name}" + FOR EACH ROW EXECUTE FUNCTION update_version_and_server_id(); + """) + + async def get_most_recent_source(self): most_recent_source = None max_version = -1 @@ -391,13 +399,14 @@ class APIConfig(BaseModel): max_version = version most_recent_source = pool_entry except Exception as e: - err(f"Error checking version for {pool_entry['ts_id']}: {str(e)}") + warn(f"Failed to connect to or query database for {pool_entry['ts_id']}: {str(e)}") return most_recent_source + async def pull_changes(self, source_pool_entry, batch_size=10000): if source_pool_entry['ts_id'] == os.environ.get('TS_ID'): info("Skipping self-sync")