Auto-update: Tue Jul 23 21:54:54 PDT 2024
This commit is contained in:
parent
d7a2154abc
commit
99e2729fde
1 changed files with 18 additions and 13 deletions
|
@ -277,7 +277,8 @@ class APIConfig(BaseModel):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def local_db(self):
|
def local_db(self):
|
||||||
return self.POOL[0]
|
ts_id = os.environ.get('TS_ID')
|
||||||
|
return next((db for db in self.POOL if db['ts_id'] == ts_id), None)
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def get_connection(self, pool_entry: Dict[str, Any] = None):
|
async def get_connection(self, pool_entry: Dict[str, Any] = None):
|
||||||
|
@ -319,9 +320,24 @@ class APIConfig(BaseModel):
|
||||||
for conn in connections:
|
for conn in connections:
|
||||||
await conn.__aexit__(None, None, None)
|
await conn.__aexit__(None, None, None)
|
||||||
|
|
||||||
|
async def get_default_source(self):
|
||||||
|
local = self.local_db
|
||||||
|
for db in self.POOL:
|
||||||
|
if db != local:
|
||||||
|
try:
|
||||||
|
async with self.get_connection(db):
|
||||||
|
return db
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
return None
|
||||||
|
|
||||||
async def pull_changes(self, source_pool_entry: Dict[str, Any] = None):
|
async def pull_changes(self, source_pool_entry: Dict[str, Any] = None):
|
||||||
if source_pool_entry is None:
|
if source_pool_entry is None:
|
||||||
source_pool_entry = self.POOL[1] # Default to the second database in the pool
|
source_pool_entry = await self.get_default_source()
|
||||||
|
|
||||||
|
if source_pool_entry is None:
|
||||||
|
logger.error("No available source for pulling changes")
|
||||||
|
return
|
||||||
|
|
||||||
logger = Logger("DatabaseReplication")
|
logger = Logger("DatabaseReplication")
|
||||||
async with self.get_connection(source_pool_entry) as source_conn:
|
async with self.get_connection(source_pool_entry) as source_conn:
|
||||||
|
@ -440,17 +456,6 @@ class APIConfig(BaseModel):
|
||||||
await conn.execute(f"ALTER TABLE {target_con['table_name']} DROP CONSTRAINT {con_name}")
|
await conn.execute(f"ALTER TABLE {target_con['table_name']} DROP CONSTRAINT {con_name}")
|
||||||
|
|
||||||
|
|
||||||
async def apply_schema(self, pool_entry: Dict[str, Any], schema):
|
|
||||||
async with self.get_connection(pool_entry) as conn:
|
|
||||||
# This is a simplified version. You'd need to handle creating/altering tables,
|
|
||||||
# adding/removing columns, changing data types, etc.
|
|
||||||
for table in schema:
|
|
||||||
await conn.execute(f"""
|
|
||||||
CREATE TABLE IF NOT EXISTS {table['table_name']} (
|
|
||||||
{table['column_name']} {table['data_type']}
|
|
||||||
)
|
|
||||||
""")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Location(BaseModel):
|
class Location(BaseModel):
|
||||||
|
|
Loading…
Reference in a new issue