Auto-update: Tue Jul 30 16:51:19 PDT 2024

This commit is contained in:
sanj 2024-07-30 16:51:19 -07:00
parent c4ebb75953
commit 3861fc0114

View file

@ -637,8 +637,11 @@ class APIConfig(BaseModel):
for table in tables: for table in tables:
table_name = table['tablename'] table_name = table['tablename']
try: try:
debug(f"Processing table: {table_name}")
has_primary_key = await self.ensure_sync_columns(dest_conn, table_name) has_primary_key = await self.ensure_sync_columns(dest_conn, table_name)
debug(f"Table {table_name} has primary key: {has_primary_key}")
last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id) last_synced_version = await self.get_last_synced_version(dest_conn, table_name, source_id)
debug(f"Last synced version for {table_name}: {last_synced_version}")
changes = await source_conn.fetch(f""" changes = await source_conn.fetch(f"""
SELECT * FROM "{table_name}" SELECT * FROM "{table_name}"
@ -647,6 +650,8 @@ class APIConfig(BaseModel):
LIMIT $3 LIMIT $3
""", last_synced_version, source_id, batch_size) """, last_synced_version, source_id, batch_size)
debug(f"Number of changes for {table_name}: {len(changes)}")
if changes: if changes:
changes_count = await self.apply_batch_changes(dest_conn, table_name, changes, has_primary_key) changes_count = await self.apply_batch_changes(dest_conn, table_name, changes, has_primary_key)
total_changes += changes_count total_changes += changes_count
@ -712,12 +717,15 @@ class APIConfig(BaseModel):
debug(f"Generated insert query for {table_name}: {insert_query}") debug(f"Generated insert query for {table_name}: {insert_query}")
# Prepare the statement
stmt = await conn.prepare(insert_query)
affected_rows = 0 affected_rows = 0
for change in tqdm(changes, desc=f"Syncing {table_name}", unit="row"): for change in tqdm(changes, desc=f"Syncing {table_name}", unit="row"):
values = [change[col] for col in columns] values = [change[col] for col in columns]
debug(f"Executing query for {table_name} with values: {values}") debug(f"Executing query for {table_name} with values: {values}")
result = await conn.execute(insert_query, *values) result = await stmt.execute(*values)
affected_rows += int(result.split()[-1]) affected_rows += 1 # Since we're executing one at a time, each successful execution affects one row
return affected_rows return affected_rows
@ -728,6 +736,7 @@ class APIConfig(BaseModel):
async def sync_spatial_ref_sys(self, source_conn, dest_conn): async def sync_spatial_ref_sys(self, source_conn, dest_conn):
try: try:
# Get all entries from the source # Get all entries from the source