diff --git a/sijapi/classes.py b/sijapi/classes.py index 85db29b..dd06260 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -316,33 +316,55 @@ class APIConfig(BaseModel): raise async def initialize_sync(self): - async with self.get_connection() as conn: - tables = await conn.fetch(""" - SELECT tablename FROM pg_tables - WHERE schemaname = 'public' - """) - - for table in tables: - table_name = table['tablename'] - await conn.execute(f""" - ALTER TABLE "{table_name}" - ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1, - ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; - - CREATE OR REPLACE FUNCTION update_version_and_server_id() - RETURNS TRIGGER AS $$ - BEGIN - NEW.version = COALESCE(OLD.version, 0) + 1; - NEW.server_id = '{os.environ.get('TS_ID')}'; - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - - DROP TRIGGER IF EXISTS update_version_and_server_id_trigger ON "{table_name}"; - CREATE TRIGGER update_version_and_server_id_trigger - BEFORE INSERT OR UPDATE ON "{table_name}" - FOR EACH ROW EXECUTE FUNCTION update_version_and_server_id(); + for pool_entry in self.POOL: + async with self.get_connection(pool_entry) as conn: + tables = await conn.fetch(""" + SELECT tablename FROM pg_tables + WHERE schemaname = 'public' """) + + for table in tables: + table_name = table['tablename'] + # Add version and server_id columns if they don't exist + await conn.execute(f""" + DO $$ + BEGIN + BEGIN + ALTER TABLE "{table_name}" + ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1, + ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; + EXCEPTION + WHEN duplicate_column THEN + -- Do nothing, column already exists + END; + END $$; + """) + + # Create or replace the trigger function + await conn.execute(f""" + CREATE OR REPLACE FUNCTION update_version_and_server_id() + RETURNS TRIGGER AS $$ + BEGIN + NEW.version = COALESCE(OLD.version, 0) + 1; + NEW.server_id = '{os.environ.get('TS_ID')}'; + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """) + + # Create the trigger if it doesn't exist + await conn.execute(f""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'update_version_and_server_id_trigger' AND tgrelid = '{table_name}'::regclass) THEN + CREATE TRIGGER update_version_and_server_id_trigger + BEFORE INSERT OR UPDATE ON "{table_name}" + FOR EACH ROW EXECUTE FUNCTION update_version_and_server_id(); + END IF; + END $$; + """) + + info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.") async def get_most_recent_source(self):