From a5d5dcf67fcad5d9d9b3ab2cd67de1817e8e4be1 Mon Sep 17 00:00:00 2001
From: sanj <67624670+iodrift@users.noreply.github.com>
Date: Tue, 23 Jul 2024 17:18:19 -0700
Subject: [PATCH] Auto-update: Tue Jul 23 17:18:19 PDT 2024

---
 setup.py          |   1 -
 sijapi/classes.py | 160 +++++++++++++++++++++++++++++++---------------
 2 files changed, 108 insertions(+), 53 deletions(-)

diff --git a/setup.py b/setup.py
index 64853f5..139e15c 100644
--- a/setup.py
+++ b/setup.py
@@ -27,7 +27,6 @@ setup(
         'requests',
         'aiohttp',
         'paramiko',
-        'tailscale',
         'pandas',
         'pydub',
         'torch',
diff --git a/sijapi/classes.py b/sijapi/classes.py
index 1aaad0a..cc8877c 100644
--- a/sijapi/classes.py
+++ b/sijapi/classes.py
@@ -46,39 +46,30 @@ class Configuration(BaseModel):
                 config_data = yaml.safe_load(file)
 
             print(f"Loaded configuration data from {yaml_path}")
-
             if secrets_path:
                 with secrets_path.open('r') as file:
                     secrets_data = yaml.safe_load(file)
                 print(f"Loaded secrets data from {secrets_path}")
-
-                # If config_data is a list, apply secrets to each item
                 if isinstance(config_data, list):
                     for item in config_data:
                         if isinstance(item, dict):
                             item.update(secrets_data)
                 else:
                     config_data.update(secrets_data)
-
-            # If config_data is a list, create a dict with a single key
             if isinstance(config_data, list):
                 config_data = {"configurations": config_data}
-
-            # Ensure HOME is set
             if config_data.get('HOME') is None:
                 config_data['HOME'] = str(Path.home())
                 print(f"HOME was None in config, set to default: {config_data['HOME']}")
 
             load_dotenv()
-
             instance = cls.create_dynamic_model(**config_data)
             instance._dir_config = dir_config or instance
-
             resolved_data = instance.resolve_placeholders(config_data)
             instance = cls.create_dynamic_model(**resolved_data)
             instance._dir_config = dir_config or instance
-
             return instance
+        
         except Exception as e:
             print(f"Error loading configuration: {str(e)}")
             raise
@@ -107,16 +98,12 @@ class Configuration(BaseModel):
     def resolve_placeholders(self, data: Any) -> Any:
         if isinstance(data, dict):
             resolved_data = {k: self.resolve_placeholders(v) for k, v in data.items()}
-
-            # Special handling for directory placeholders
             home = Path(resolved_data.get('HOME', self.HOME)).expanduser()
             sijapi = home / "workshop" / "sijapi"
             data_dir = sijapi / "data"
-
             resolved_data['HOME'] = str(home)
             resolved_data['SIJAPI'] = str(sijapi)
             resolved_data['DATA'] = str(data_dir)
-
             return resolved_data
         elif isinstance(data, list):
             return [self.resolve_placeholders(v) for v in data]
@@ -167,17 +154,6 @@ class Configuration(BaseModel):
         extra = "allow"
         arbitrary_types_allowed = True
 
-
-class PoolConfig(BaseModel):
-    ts_ip: str
-    ts_id: str
-    wan_ip: str
-    app_port: int
-    db_port: int
-    db_name: str
-    db_user: str
-    db_pass: str
-
 class APIConfig(BaseModel):
     HOST: str
     PORT: int
@@ -186,7 +162,7 @@ class APIConfig(BaseModel):
     PUBLIC: List[str]
     TRUSTED_SUBNETS: List[str]
     MODULES: Any  # This will be replaced with a dynamic model
-    POOL: List[Dict[str, Any]]  # This replaces the separate PoolConfig
+    POOL: List[Dict[str, Any]]
     EXTENSIONS: Any  # This will be replaced with a dynamic model
     TZ: str
     KEYS: List[str]
@@ -201,13 +177,12 @@ class APIConfig(BaseModel):
         with open(config_path, 'r') as file:
             config_data = yaml.safe_load(file)
 
-        print(f"Loaded main config: {config_data}")  # Debug print
+        print(f"Loaded main config: {config_data}") 
 
-        # Load secrets
         try:
             with open(secrets_path, 'r') as file:
                 secrets_data = yaml.safe_load(file)
-            print(f"Loaded secrets: {secrets_data}")  # Debug print
+            print(f"Loaded secrets: {secrets_data}")
         except FileNotFoundError:
             print(f"Secrets file not found: {secrets_path}")
             secrets_data = {}
@@ -215,33 +190,25 @@ class APIConfig(BaseModel):
             print(f"Error parsing secrets YAML: {e}")
             secrets_data = {}
 
-        # Resolve internal placeholders
         config_data = cls.resolve_placeholders(config_data)
-
-        print(f"Resolved config: {config_data}")  # Debug print
-
-        # Handle KEYS placeholder
+        print(f"Resolved config: {config_data}") 
         if isinstance(config_data.get('KEYS'), list) and len(config_data['KEYS']) == 1:
             placeholder = config_data['KEYS'][0]
             if placeholder.startswith('{{') and placeholder.endswith('}}'):
-                key = placeholder[2:-2].strip()  # Remove {{ }} and whitespace
+                key = placeholder[2:-2].strip()
                 parts = key.split('.')
                 if len(parts) == 2 and parts[0] == 'SECRET':
                     secret_key = parts[1]
                     if secret_key in secrets_data:
                         config_data['KEYS'] = secrets_data[secret_key]
-                        print(f"Replaced KEYS with secret: {config_data['KEYS']}")  # Debug print
+                        print(f"Replaced KEYS with secret: {config_data['KEYS']}")
                     else:
                         print(f"Secret key '{secret_key}' not found in secrets file")
                 else:
                     print(f"Invalid secret placeholder format: {placeholder}")
 
-        # Create dynamic ModulesConfig
         config_data['MODULES'] = cls._create_dynamic_config(config_data.get('MODULES', {}), 'DynamicModulesConfig')
-
-        # Create dynamic ExtensionsConfig
         config_data['EXTENSIONS'] = cls._create_dynamic_config(config_data.get('EXTENSIONS', {}), 'DynamicExtensionsConfig')
-
         return cls(**config_data)
 
     @classmethod
@@ -299,10 +266,6 @@ class APIConfig(BaseModel):
         if name in ['MODULES', 'EXTENSIONS']:
             return self.__dict__[name]
         return super().__getattr__(name)
-    
-    @property
-    def local_db(self):
-        return self.POOL[0]
 
     @property
     def active_modules(self) -> List[str]:
@@ -311,7 +274,11 @@ class APIConfig(BaseModel):
     @property
     def active_extensions(self) -> List[str]:
         return [extension for extension, is_active in self.EXTENSIONS.__dict__.items() if is_active]
-    
+
+    @property
+    def local_db(self):
+        return self.POOL[0]
+
     @asynccontextmanager
     async def get_connection(self, pool_entry: Dict[str, Any] = None):
         if pool_entry is None:
@@ -330,6 +297,7 @@ class APIConfig(BaseModel):
             await conn.close()
 
     async def push_changes(self, query: str, *args):
+        logger = Logger("DatabaseReplication")
         connections = []
         try:
             for pool_entry in self.POOL[1:]:  # Skip the first (local) database
@@ -343,9 +311,9 @@ class APIConfig(BaseModel):
 
             for pool_entry, result in zip(self.POOL[1:], results):
                 if isinstance(result, Exception):
-                    err(f"Failed to push to {pool_entry['ts_ip']}: {str(result)}")
+                    logger.error(f"Failed to push to {pool_entry['ts_ip']}: {str(result)}")
                 else:
-                    err(f"Successfully pushed to {pool_entry['ts_ip']}")
+                    logger.info(f"Successfully pushed to {pool_entry['ts_ip']}")
 
         finally:
             for conn in connections:
@@ -372,18 +340,105 @@ class APIConfig(BaseModel):
                         await dest_conn.copy_records_to_table(
                             table_name, records=rows, columns=columns
                         )
-                info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}")
+                logger.info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}")
 
     async def sync_schema(self):
+        logger = Logger("SchemaSync")
         source_entry = self.POOL[0]  # Use the local database as the source
-        schema = await self.get_schema(source_entry)
+        source_schema = await self.get_schema(source_entry)
+        
         for pool_entry in self.POOL[1:]:
-            await self.apply_schema(pool_entry, schema)
-            info(f"Synced schema to {pool_entry['ts_ip']}")
+            target_schema = await self.get_schema(pool_entry)
+            await self.apply_schema_changes(pool_entry, source_schema, target_schema)
+            logger.info(f"Synced schema to {pool_entry['ts_ip']}")
 
     async def get_schema(self, pool_entry: Dict[str, Any]):
         async with self.get_connection(pool_entry) as conn:
-            return await conn.fetch("SELECT * FROM information_schema.columns")
+            tables = await conn.fetch("""
+                SELECT table_name, column_name, data_type, character_maximum_length,
+                    is_nullable, column_default, ordinal_position
+                FROM information_schema.columns
+                WHERE table_schema = 'public'
+                ORDER BY table_name, ordinal_position
+            """)
+            
+            indexes = await conn.fetch("""
+                SELECT indexname, indexdef
+                FROM pg_indexes
+                WHERE schemaname = 'public'
+            """)
+            
+            constraints = await conn.fetch("""
+                SELECT conname, contype, conrelid::regclass::text as table_name,
+                    pg_get_constraintdef(oid) as definition
+                FROM pg_constraint
+                WHERE connamespace = 'public'::regnamespace
+            """)
+            
+            return {
+                'tables': tables,
+                'indexes': indexes,
+                'constraints': constraints
+            }
+
+    async def apply_schema_changes(self, pool_entry: Dict[str, Any], source_schema, target_schema):
+        async with self.get_connection(pool_entry) as conn:
+            # Compare and update tables and columns
+            source_tables = {(t['table_name'], t['column_name']): t for t in source_schema['tables']}
+            target_tables = {(t['table_name'], t['column_name']): t for t in target_schema['tables']}
+            
+            for (table_name, column_name), source_column in source_tables.items():
+                if (table_name, column_name) not in target_tables:
+                    await conn.execute(f"""
+                        ALTER TABLE {table_name}
+                        ADD COLUMN {column_name} {source_column['data_type']}
+                        {'' if source_column['is_nullable'] == 'YES' else 'NOT NULL'}
+                        {f"DEFAULT {source_column['column_default']}" if source_column['column_default'] else ''}
+                    """)
+                else:
+                    target_column = target_tables[(table_name, column_name)]
+                    if source_column != target_column:
+                        await conn.execute(f"""
+                            ALTER TABLE {table_name}
+                            ALTER COLUMN {column_name} TYPE {source_column['data_type']},
+                            ALTER COLUMN {column_name} {'' if source_column['is_nullable'] == 'YES' else 'SET NOT NULL'},
+                            ALTER COLUMN {column_name} {f"SET DEFAULT {source_column['column_default']}" if source_column['column_default'] else 'DROP DEFAULT'}
+                        """)
+            
+            for (table_name, column_name) in target_tables.keys():
+                if (table_name, column_name) not in source_tables:
+                    await conn.execute(f"ALTER TABLE {table_name} DROP COLUMN {column_name}")
+            
+            # Compare and update indexes
+            source_indexes = {idx['indexname']: idx['indexdef'] for idx in source_schema['indexes']}
+            target_indexes = {idx['indexname']: idx['indexdef'] for idx in target_schema['indexes']}
+            
+            for index_name, index_def in source_indexes.items():
+                if index_name not in target_indexes:
+                    await conn.execute(index_def)
+                elif index_def != target_indexes[index_name]:
+                    await conn.execute(f"DROP INDEX {index_name}")
+                    await conn.execute(index_def)
+            
+            for index_name in target_indexes.keys():
+                if index_name not in source_indexes:
+                    await conn.execute(f"DROP INDEX {index_name}")
+            
+            # Compare and update constraints
+            source_constraints = {con['conname']: con for con in source_schema['constraints']}
+            target_constraints = {con['conname']: con for con in target_schema['constraints']}
+            
+            for con_name, source_con in source_constraints.items():
+                if con_name not in target_constraints:
+                    await conn.execute(f"ALTER TABLE {source_con['table_name']} ADD CONSTRAINT {con_name} {source_con['definition']}")
+                elif source_con != target_constraints[con_name]:
+                    await conn.execute(f"ALTER TABLE {source_con['table_name']} DROP CONSTRAINT {con_name}")
+                    await conn.execute(f"ALTER TABLE {source_con['table_name']} ADD CONSTRAINT {con_name} {source_con['definition']}")
+            
+            for con_name, target_con in target_constraints.items():
+                if con_name not in source_constraints:
+                    await conn.execute(f"ALTER TABLE {target_con['table_name']} DROP CONSTRAINT {con_name}")
+
 
     async def apply_schema(self, pool_entry: Dict[str, Any], schema):
         async with self.get_connection(pool_entry) as conn:
@@ -397,6 +452,7 @@ class APIConfig(BaseModel):
                 """)
 
 
+
 class Location(BaseModel):
     latitude: float
     longitude: float