diff --git a/setup.py b/setup.py index 64853f5..139e15c 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,6 @@ setup( 'requests', 'aiohttp', 'paramiko', - 'tailscale', 'pandas', 'pydub', 'torch', diff --git a/sijapi/classes.py b/sijapi/classes.py index 1aaad0a..cc8877c 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -46,39 +46,30 @@ class Configuration(BaseModel): config_data = yaml.safe_load(file) print(f"Loaded configuration data from {yaml_path}") - if secrets_path: with secrets_path.open('r') as file: secrets_data = yaml.safe_load(file) print(f"Loaded secrets data from {secrets_path}") - - # If config_data is a list, apply secrets to each item if isinstance(config_data, list): for item in config_data: if isinstance(item, dict): item.update(secrets_data) else: config_data.update(secrets_data) - - # If config_data is a list, create a dict with a single key if isinstance(config_data, list): config_data = {"configurations": config_data} - - # Ensure HOME is set if config_data.get('HOME') is None: config_data['HOME'] = str(Path.home()) print(f"HOME was None in config, set to default: {config_data['HOME']}") load_dotenv() - instance = cls.create_dynamic_model(**config_data) instance._dir_config = dir_config or instance - resolved_data = instance.resolve_placeholders(config_data) instance = cls.create_dynamic_model(**resolved_data) instance._dir_config = dir_config or instance - return instance + except Exception as e: print(f"Error loading configuration: {str(e)}") raise @@ -107,16 +98,12 @@ class Configuration(BaseModel): def resolve_placeholders(self, data: Any) -> Any: if isinstance(data, dict): resolved_data = {k: self.resolve_placeholders(v) for k, v in data.items()} - - # Special handling for directory placeholders home = Path(resolved_data.get('HOME', self.HOME)).expanduser() sijapi = home / "workshop" / "sijapi" data_dir = sijapi / "data" - resolved_data['HOME'] = str(home) resolved_data['SIJAPI'] = str(sijapi) resolved_data['DATA'] = str(data_dir) - return resolved_data elif isinstance(data, list): return [self.resolve_placeholders(v) for v in data] @@ -167,17 +154,6 @@ class Configuration(BaseModel): extra = "allow" arbitrary_types_allowed = True - -class PoolConfig(BaseModel): - ts_ip: str - ts_id: str - wan_ip: str - app_port: int - db_port: int - db_name: str - db_user: str - db_pass: str - class APIConfig(BaseModel): HOST: str PORT: int @@ -186,7 +162,7 @@ class APIConfig(BaseModel): PUBLIC: List[str] TRUSTED_SUBNETS: List[str] MODULES: Any # This will be replaced with a dynamic model - POOL: List[Dict[str, Any]] # This replaces the separate PoolConfig + POOL: List[Dict[str, Any]] EXTENSIONS: Any # This will be replaced with a dynamic model TZ: str KEYS: List[str] @@ -201,13 +177,12 @@ class APIConfig(BaseModel): with open(config_path, 'r') as file: config_data = yaml.safe_load(file) - print(f"Loaded main config: {config_data}") # Debug print + print(f"Loaded main config: {config_data}") - # Load secrets try: with open(secrets_path, 'r') as file: secrets_data = yaml.safe_load(file) - print(f"Loaded secrets: {secrets_data}") # Debug print + print(f"Loaded secrets: {secrets_data}") except FileNotFoundError: print(f"Secrets file not found: {secrets_path}") secrets_data = {} @@ -215,33 +190,25 @@ class APIConfig(BaseModel): print(f"Error parsing secrets YAML: {e}") secrets_data = {} - # Resolve internal placeholders config_data = cls.resolve_placeholders(config_data) - - print(f"Resolved config: {config_data}") # Debug print - - # Handle KEYS placeholder + print(f"Resolved config: {config_data}") if isinstance(config_data.get('KEYS'), list) and len(config_data['KEYS']) == 1: placeholder = config_data['KEYS'][0] if placeholder.startswith('{{') and placeholder.endswith('}}'): - key = placeholder[2:-2].strip() # Remove {{ }} and whitespace + key = placeholder[2:-2].strip() parts = key.split('.') if len(parts) == 2 and parts[0] == 'SECRET': secret_key = parts[1] if secret_key in secrets_data: config_data['KEYS'] = secrets_data[secret_key] - print(f"Replaced KEYS with secret: {config_data['KEYS']}") # Debug print + print(f"Replaced KEYS with secret: {config_data['KEYS']}") else: print(f"Secret key '{secret_key}' not found in secrets file") else: print(f"Invalid secret placeholder format: {placeholder}") - # Create dynamic ModulesConfig config_data['MODULES'] = cls._create_dynamic_config(config_data.get('MODULES', {}), 'DynamicModulesConfig') - - # Create dynamic ExtensionsConfig config_data['EXTENSIONS'] = cls._create_dynamic_config(config_data.get('EXTENSIONS', {}), 'DynamicExtensionsConfig') - return cls(**config_data) @classmethod @@ -299,10 +266,6 @@ class APIConfig(BaseModel): if name in ['MODULES', 'EXTENSIONS']: return self.__dict__[name] return super().__getattr__(name) - - @property - def local_db(self): - return self.POOL[0] @property def active_modules(self) -> List[str]: @@ -311,7 +274,11 @@ class APIConfig(BaseModel): @property def active_extensions(self) -> List[str]: return [extension for extension, is_active in self.EXTENSIONS.__dict__.items() if is_active] - + + @property + def local_db(self): + return self.POOL[0] + @asynccontextmanager async def get_connection(self, pool_entry: Dict[str, Any] = None): if pool_entry is None: @@ -330,6 +297,7 @@ class APIConfig(BaseModel): await conn.close() async def push_changes(self, query: str, *args): + logger = Logger("DatabaseReplication") connections = [] try: for pool_entry in self.POOL[1:]: # Skip the first (local) database @@ -343,9 +311,9 @@ class APIConfig(BaseModel): for pool_entry, result in zip(self.POOL[1:], results): if isinstance(result, Exception): - err(f"Failed to push to {pool_entry['ts_ip']}: {str(result)}") + logger.error(f"Failed to push to {pool_entry['ts_ip']}: {str(result)}") else: - err(f"Successfully pushed to {pool_entry['ts_ip']}") + logger.info(f"Successfully pushed to {pool_entry['ts_ip']}") finally: for conn in connections: @@ -372,18 +340,105 @@ class APIConfig(BaseModel): await dest_conn.copy_records_to_table( table_name, records=rows, columns=columns ) - info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}") + logger.info(f"Successfully pulled changes from {source_pool_entry['ts_ip']}") async def sync_schema(self): + logger = Logger("SchemaSync") source_entry = self.POOL[0] # Use the local database as the source - schema = await self.get_schema(source_entry) + source_schema = await self.get_schema(source_entry) + for pool_entry in self.POOL[1:]: - await self.apply_schema(pool_entry, schema) - info(f"Synced schema to {pool_entry['ts_ip']}") + target_schema = await self.get_schema(pool_entry) + await self.apply_schema_changes(pool_entry, source_schema, target_schema) + logger.info(f"Synced schema to {pool_entry['ts_ip']}") async def get_schema(self, pool_entry: Dict[str, Any]): async with self.get_connection(pool_entry) as conn: - return await conn.fetch("SELECT * FROM information_schema.columns") + tables = await conn.fetch(""" + SELECT table_name, column_name, data_type, character_maximum_length, + is_nullable, column_default, ordinal_position + FROM information_schema.columns + WHERE table_schema = 'public' + ORDER BY table_name, ordinal_position + """) + + indexes = await conn.fetch(""" + SELECT indexname, indexdef + FROM pg_indexes + WHERE schemaname = 'public' + """) + + constraints = await conn.fetch(""" + SELECT conname, contype, conrelid::regclass::text as table_name, + pg_get_constraintdef(oid) as definition + FROM pg_constraint + WHERE connamespace = 'public'::regnamespace + """) + + return { + 'tables': tables, + 'indexes': indexes, + 'constraints': constraints + } + + async def apply_schema_changes(self, pool_entry: Dict[str, Any], source_schema, target_schema): + async with self.get_connection(pool_entry) as conn: + # Compare and update tables and columns + source_tables = {(t['table_name'], t['column_name']): t for t in source_schema['tables']} + target_tables = {(t['table_name'], t['column_name']): t for t in target_schema['tables']} + + for (table_name, column_name), source_column in source_tables.items(): + if (table_name, column_name) not in target_tables: + await conn.execute(f""" + ALTER TABLE {table_name} + ADD COLUMN {column_name} {source_column['data_type']} + {'' if source_column['is_nullable'] == 'YES' else 'NOT NULL'} + {f"DEFAULT {source_column['column_default']}" if source_column['column_default'] else ''} + """) + else: + target_column = target_tables[(table_name, column_name)] + if source_column != target_column: + await conn.execute(f""" + ALTER TABLE {table_name} + ALTER COLUMN {column_name} TYPE {source_column['data_type']}, + ALTER COLUMN {column_name} {'' if source_column['is_nullable'] == 'YES' else 'SET NOT NULL'}, + ALTER COLUMN {column_name} {f"SET DEFAULT {source_column['column_default']}" if source_column['column_default'] else 'DROP DEFAULT'} + """) + + for (table_name, column_name) in target_tables.keys(): + if (table_name, column_name) not in source_tables: + await conn.execute(f"ALTER TABLE {table_name} DROP COLUMN {column_name}") + + # Compare and update indexes + source_indexes = {idx['indexname']: idx['indexdef'] for idx in source_schema['indexes']} + target_indexes = {idx['indexname']: idx['indexdef'] for idx in target_schema['indexes']} + + for index_name, index_def in source_indexes.items(): + if index_name not in target_indexes: + await conn.execute(index_def) + elif index_def != target_indexes[index_name]: + await conn.execute(f"DROP INDEX {index_name}") + await conn.execute(index_def) + + for index_name in target_indexes.keys(): + if index_name not in source_indexes: + await conn.execute(f"DROP INDEX {index_name}") + + # Compare and update constraints + source_constraints = {con['conname']: con for con in source_schema['constraints']} + target_constraints = {con['conname']: con for con in target_schema['constraints']} + + for con_name, source_con in source_constraints.items(): + if con_name not in target_constraints: + await conn.execute(f"ALTER TABLE {source_con['table_name']} ADD CONSTRAINT {con_name} {source_con['definition']}") + elif source_con != target_constraints[con_name]: + await conn.execute(f"ALTER TABLE {source_con['table_name']} DROP CONSTRAINT {con_name}") + await conn.execute(f"ALTER TABLE {source_con['table_name']} ADD CONSTRAINT {con_name} {source_con['definition']}") + + for con_name, target_con in target_constraints.items(): + if con_name not in source_constraints: + await conn.execute(f"ALTER TABLE {target_con['table_name']} DROP CONSTRAINT {con_name}") + async def apply_schema(self, pool_entry: Dict[str, Any], schema): async with self.get_connection(pool_entry) as conn: @@ -397,6 +452,7 @@ class APIConfig(BaseModel): """) + class Location(BaseModel): latitude: float longitude: float