Auto-update: Mon Jul 29 19:33:06 PDT 2024

This commit is contained in:
sanj 2024-07-29 19:33:06 -07:00
parent cda6481a97
commit 5461c9668d

View file

@ -316,7 +316,8 @@ class APIConfig(BaseModel):
raise raise
async def initialize_sync(self): async def initialize_sync(self):
async with self.get_connection() as conn: for pool_entry in self.POOL:
async with self.get_connection(pool_entry) as conn:
tables = await conn.fetch(""" tables = await conn.fetch("""
SELECT tablename FROM pg_tables SELECT tablename FROM pg_tables
WHERE schemaname = 'public' WHERE schemaname = 'public'
@ -324,11 +325,23 @@ class APIConfig(BaseModel):
for table in tables: for table in tables:
table_name = table['tablename'] table_name = table['tablename']
# Add version and server_id columns if they don't exist
await conn.execute(f""" await conn.execute(f"""
DO $$
BEGIN
BEGIN
ALTER TABLE "{table_name}" ALTER TABLE "{table_name}"
ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1, ADD COLUMN IF NOT EXISTS version INTEGER DEFAULT 1,
ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}'; ADD COLUMN IF NOT EXISTS server_id TEXT DEFAULT '{os.environ.get('TS_ID')}';
EXCEPTION
WHEN duplicate_column THEN
-- Do nothing, column already exists
END;
END $$;
""")
# Create or replace the trigger function
await conn.execute(f"""
CREATE OR REPLACE FUNCTION update_version_and_server_id() CREATE OR REPLACE FUNCTION update_version_and_server_id()
RETURNS TRIGGER AS $$ RETURNS TRIGGER AS $$
BEGIN BEGIN
@ -337,13 +350,22 @@ class APIConfig(BaseModel):
RETURN NEW; RETURN NEW;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
""")
DROP TRIGGER IF EXISTS update_version_and_server_id_trigger ON "{table_name}"; # Create the trigger if it doesn't exist
await conn.execute(f"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'update_version_and_server_id_trigger' AND tgrelid = '{table_name}'::regclass) THEN
CREATE TRIGGER update_version_and_server_id_trigger CREATE TRIGGER update_version_and_server_id_trigger
BEFORE INSERT OR UPDATE ON "{table_name}" BEFORE INSERT OR UPDATE ON "{table_name}"
FOR EACH ROW EXECUTE FUNCTION update_version_and_server_id(); FOR EACH ROW EXECUTE FUNCTION update_version_and_server_id();
END IF;
END $$;
""") """)
info(f"Sync initialization complete for {pool_entry['ts_ip']}. All tables now have version and server_id columns with appropriate triggers.")
async def get_most_recent_source(self): async def get_most_recent_source(self):
most_recent_source = None most_recent_source = None