Auto-update: Mon Sep 23 13:17:52 PDT 2024

This commit is contained in:
sanj 2024-09-23 13:17:52 -07:00
parent 7abaa9d75c
commit 7799f0af94

View file

@ -6,129 +6,130 @@ import subprocess
import time import time
from tqdm import tqdm from tqdm import tqdm
# Configuration variables
CONFIG_FILE = 'sys.yaml'
POOL_KEY = 'POOL'
TABLES_KEY = 'TABLES'
SOURCE_INDEX = 0
def load_config(): def load_config():
script_dir = os.path.dirname(os.path.abspath(__file__)) script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(script_dir, '..', '..')) project_root = os.path.abspath(os.path.join(script_dir, '..', '..'))
sys_config_path = os.path.join(project_root, 'config', 'sys.yaml') config_path = os.path.join(project_root, 'config', CONFIG_FILE)
gis_config_path = os.path.join(project_root, 'config', 'gis.yaml')
with open(sys_config_path, 'r') as f: with open(config_path, 'r') as f:
sys_config = yaml.safe_load(f) config = yaml.safe_load(f)
with open(gis_config_path, 'r') as f:
gis_config = yaml.safe_load(f)
return sys_config, gis_config
return config
def get_table_size(server, table_name): def get_table_size(server, table_name):
env = os.environ.copy() env = os.environ.copy()
env['PGPASSWORD'] = server['db_pass'] env['PGPASSWORD'] = server['db_pass']
command = [ command = [
'psql', 'psql',
'-h', server['ts_ip'], '-h', server['ts_ip'],
'-p', str(server['db_port']), '-p', str(server['db_port']),
'-U', server['db_user'], '-U', server['db_user'],
'-d', server['db_name'], '-d', server['db_name'],
'-t', '-t',
'-c', f"SELECT COUNT(*) FROM {table_name}" '-c', f"SELECT COUNT(*) FROM {table_name}"
] ]
result = subprocess.run(command, env=env, capture_output=True, text=True, check=True) result = subprocess.run(command, env=env, capture_output=True, text=True, check=True)
return int(result.stdout.strip()) return int(result.stdout.strip())
def replicate_table(source, targets, table_name): def replicate_table(source, targets, table_name):
print(f"Replicating {table_name}") print(f"Replicating {table_name}")
# Get table size for progress bar # Get table size for progress bar
table_size = get_table_size(source, table_name) table_size = get_table_size(source, table_name)
print(f"Table size: {table_size} rows") print(f"Table size: {table_size} rows")
# Dump the table from the source # Dump the table from the source
dump_command = [ dump_command = [
'pg_dump', 'pg_dump',
'-h', source['ts_ip'], '-h', source['ts_ip'],
'-p', str(source['db_port']), '-p', str(source['db_port']),
'-U', source['db_user'], '-U', source['db_user'],
'-d', source['db_name'], '-d', source['db_name'],
'-t', table_name, '-t', table_name,
'--no-owner', '--no-owner',
'--no-acl' '--no-acl'
] ]
env = os.environ.copy() env = os.environ.copy()
env['PGPASSWORD'] = source['db_pass'] env['PGPASSWORD'] = source['db_pass']
print("Dumping table...") print("Dumping table...")
with open(f"{table_name}.sql", 'w') as f: with open(f"{table_name}.sql", 'w') as f:
subprocess.run(dump_command, env=env, stdout=f, check=True) subprocess.run(dump_command, env=env, stdout=f, check=True)
print("Dump complete") print("Dump complete")
# Restore the table to each target # Restore the table to each target
for target in targets: for target in targets:
print(f"Replicating to {target['ts_id']}") print(f"Replicating to {target['ts_id']}")
# Drop table and its sequence # Drop table and its sequence
drop_commands = [ drop_commands = [
f"DROP TABLE IF EXISTS {table_name} CASCADE;", f"DROP TABLE IF EXISTS {table_name} CASCADE;",
f"DROP SEQUENCE IF EXISTS {table_name}_id_seq CASCADE;" f"DROP SEQUENCE IF EXISTS {table_name}_id_seq CASCADE;"
] ]
restore_command = [ restore_command = [
'psql', 'psql',
'-h', target['ts_ip'], '-h', target['ts_ip'],
'-p', str(target['db_port']), '-p', str(target['db_port']),
'-U', target['db_user'], '-U', target['db_user'],
'-d', target['db_name'], '-d', target['db_name'],
] ]
env = os.environ.copy() env = os.environ.copy()
env['PGPASSWORD'] = target['db_pass'] env['PGPASSWORD'] = target['db_pass']
# Execute drop commands # Execute drop commands
for cmd in drop_commands: for cmd in drop_commands:
print(f"Executing: {cmd}") print(f"Executing: {cmd}")
subprocess.run(restore_command + ['-c', cmd], env=env, check=True) subprocess.run(restore_command + ['-c', cmd], env=env, check=True)
# Restore the table # Restore the table
print("Restoring table...") print("Restoring table...")
process = subprocess.Popen(restore_command + ['-f', f"{table_name}.sql"], env=env, process = subprocess.Popen(restore_command + ['-f', f"{table_name}.sql"], env=env,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
pbar = tqdm(total=table_size, desc="Copying rows") pbar = tqdm(total=table_size, desc="Copying rows")
copied_rows = 0 copied_rows = 0
for line in process.stderr: for line in process.stderr:
if line.startswith("COPY"): if line.startswith("COPY"):
copied_rows = int(line.split()[1]) copied_rows = int(line.split()[1])
pbar.update(copied_rows - pbar.n) pbar.update(copied_rows - pbar.n)
print(line, end='') # Print all output for visibility print(line, end='') # Print all output for visibility
pbar.close() pbar.close()
process.wait() process.wait()
if process.returncode != 0: if process.returncode != 0:
print(f"Error occurred during restoration to {target['ts_id']}") print(f"Error occurred during restoration to {target['ts_id']}")
print(process.stderr.read()) print(process.stderr.read())
else: else:
print(f"Restoration to {target['ts_id']} completed successfully") print(f"Restoration to {target['ts_id']} completed successfully")
# Clean up the dump file # Clean up the dump file
os.remove(f"{table_name}.sql") os.remove(f"{table_name}.sql")
print(f"Replication of {table_name} completed") print(f"Replication of {table_name} completed")
def main(): def main():
sys_config, gis_config = load_config() config = load_config()
source_server = sys_config['POOL'][0] source_server = config[POOL_KEY][SOURCE_INDEX]
target_servers = sys_config['POOL'][1:] target_servers = config[POOL_KEY][SOURCE_INDEX + 1:]
tables = [layer['table_name'] for layer in gis_config['layers']] tables = list(config[TABLES_KEY].keys())
for table in tables: for table in tables:
replicate_table(source_server, target_servers, table) replicate_table(source_server, target_servers, table)
print("All replications completed!") print("All replications completed!")
if __name__ == "__main__": if __name__ == "__main__":
main() main()