Auto-update: Tue Jul 30 14:32:26 PDT 2024
This commit is contained in:
parent
297c3454df
commit
bb851b3315
1 changed files with 23 additions and 5 deletions
|
@ -358,6 +358,9 @@ class APIConfig(BaseModel):
|
||||||
if not postgis_installed:
|
if not postgis_installed:
|
||||||
warn(f"PostGIS is not installed on {pool_entry['ts_id']} ({pool_entry['ts_ip']}). Some spatial operations may fail.")
|
warn(f"PostGIS is not installed on {pool_entry['ts_id']} ({pool_entry['ts_ip']}). Some spatial operations may fail.")
|
||||||
|
|
||||||
|
# Initialize sync_status table
|
||||||
|
await self.initialize_sync_status_table(conn)
|
||||||
|
|
||||||
# Continue with sync initialization
|
# Continue with sync initialization
|
||||||
tables = await conn.fetch("""
|
tables = await conn.fetch("""
|
||||||
SELECT tablename FROM pg_tables
|
SELECT tablename FROM pg_tables
|
||||||
|
@ -375,14 +378,12 @@ class APIConfig(BaseModel):
|
||||||
else:
|
else:
|
||||||
warn(f"Sync initialization partially complete for {pool_entry['ts_ip']}. Some tables may be missing version or server_id columns.")
|
warn(f"Sync initialization partially complete for {pool_entry['ts_ip']}. Some tables may be missing version or server_id columns.")
|
||||||
|
|
||||||
# Initialize sync_status table if it doesn't exist
|
|
||||||
await self.initialize_sync_status_table(conn)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}")
|
err(f"Error initializing sync for {pool_entry['ts_ip']}: {str(e)}")
|
||||||
err(f"Traceback: {traceback.format_exc()}")
|
err(f"Traceback: {traceback.format_exc()}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def initialize_sync_status_table(self, conn):
|
async def initialize_sync_status_table(self, conn):
|
||||||
await conn.execute("""
|
await conn.execute("""
|
||||||
CREATE TABLE IF NOT EXISTS sync_status (
|
CREATE TABLE IF NOT EXISTS sync_status (
|
||||||
|
@ -394,6 +395,22 @@ class APIConfig(BaseModel):
|
||||||
)
|
)
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
# Check if the last_sync_time column exists, and add it if it doesn't
|
||||||
|
column_exists = await conn.fetchval("""
|
||||||
|
SELECT EXISTS (
|
||||||
|
SELECT 1
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_name = 'sync_status' AND column_name = 'last_sync_time'
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
|
||||||
|
if not column_exists:
|
||||||
|
await conn.execute("""
|
||||||
|
ALTER TABLE sync_status
|
||||||
|
ADD COLUMN last_sync_time TIMESTAMP WITH TIME ZONE
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def ensure_sync_structure(self, conn):
|
async def ensure_sync_structure(self, conn):
|
||||||
|
@ -633,8 +650,8 @@ class APIConfig(BaseModel):
|
||||||
columns = changes[0].keys()
|
columns = changes[0].keys()
|
||||||
records = [tuple(change[col] for col in columns) for change in changes]
|
records = [tuple(change[col] for col in columns) for change in changes]
|
||||||
|
|
||||||
# Use copy_records instead of copy_records_to_table
|
# Use copy_records_to_table instead of copy_records
|
||||||
await conn.copy_records(temp_table_name, records=records, columns=columns)
|
await conn.copy_records_to_table(temp_table_name, records=records)
|
||||||
|
|
||||||
# Perform upsert
|
# Perform upsert
|
||||||
result = await conn.execute(f"""
|
result = await conn.execute(f"""
|
||||||
|
@ -660,6 +677,7 @@ class APIConfig(BaseModel):
|
||||||
warn(f"Failed to drop temporary table {temp_table_name}: {str(e)}")
|
warn(f"Failed to drop temporary table {temp_table_name}: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def push_changes_to_all(self):
|
async def push_changes_to_all(self):
|
||||||
for pool_entry in self.POOL:
|
for pool_entry in self.POOL:
|
||||||
if pool_entry['ts_id'] != os.environ.get('TS_ID'):
|
if pool_entry['ts_id'] != os.environ.get('TS_ID'):
|
||||||
|
|
Loading…
Add table
Reference in a new issue