From 7fd598840e3a21a84155138fd91af64ebedac8c0 Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Wed, 28 Aug 2024 10:20:34 -0700 Subject: [PATCH] Cleanup --- sijapi/helpers/article.py | 23 - sijapi/helpers/caplss/CaPLSS.py | 237 ++++ .../caplss/CaPLSS_downloader_and_importer.py | 133 ++ sijapi/helpers/caplss/Townships_progress.json | 1 + sijapi/helpers/caplss/plss.py | 366 ++++++ sijapi/helpers/db/db.py | 71 ++ sijapi/helpers/db/db_get_schema.py | 89 ++ sijapi/helpers/db/db_repl.py | 83 ++ sijapi/helpers/db/db_replicator.py | 76 ++ sijapi/helpers/db/db_uuid_migrate.py | 46 + sijapi/helpers/db/migrate_db_to_uuid.py | 191 +++ .../db/migrate_query_tracking_to_uuid.sh | 35 + sijapi/helpers/db/repair_weather_db.py | 101 ++ sijapi/helpers/db/repl.py | 132 ++ sijapi/helpers/db/repl.sh | 118 ++ sijapi/helpers/db/replicator.py | 127 ++ sijapi/helpers/db/schema_info.yaml | 1103 +++++++++++++++++ sijapi/helpers/email/log_prior_emails.py | 67 + sijapi/helpers/fromvm/db/db.py | 46 + .../helpers/fromvm/db/db_connection_test.py | 66 + sijapi/helpers/fromvm/db/db_get_schema.py | 89 ++ sijapi/helpers/fromvm/db/db_uuid_migrate.py | 46 + sijapi/helpers/thp/CalFire_THP_scraper.py | 73 ++ sijapi/helpers/thp/thp.py | 0 sijapi/routers/news.py | 66 +- 25 files changed, 3345 insertions(+), 40 deletions(-) delete mode 100755 sijapi/helpers/article.py create mode 100644 sijapi/helpers/caplss/CaPLSS.py create mode 100644 sijapi/helpers/caplss/CaPLSS_downloader_and_importer.py create mode 100644 sijapi/helpers/caplss/Townships_progress.json create mode 100644 sijapi/helpers/caplss/plss.py create mode 100644 sijapi/helpers/db/db.py create mode 100644 sijapi/helpers/db/db_get_schema.py create mode 100644 sijapi/helpers/db/db_repl.py create mode 100755 sijapi/helpers/db/db_replicator.py create mode 100644 sijapi/helpers/db/db_uuid_migrate.py create mode 100644 sijapi/helpers/db/migrate_db_to_uuid.py create mode 100755 sijapi/helpers/db/migrate_query_tracking_to_uuid.sh create mode 100644 sijapi/helpers/db/repair_weather_db.py create mode 100644 sijapi/helpers/db/repl.py create mode 100755 sijapi/helpers/db/repl.sh create mode 100644 sijapi/helpers/db/replicator.py create mode 100644 sijapi/helpers/db/schema_info.yaml create mode 100644 sijapi/helpers/email/log_prior_emails.py create mode 100644 sijapi/helpers/fromvm/db/db.py create mode 100644 sijapi/helpers/fromvm/db/db_connection_test.py create mode 100644 sijapi/helpers/fromvm/db/db_get_schema.py create mode 100644 sijapi/helpers/fromvm/db/db_uuid_migrate.py create mode 100644 sijapi/helpers/thp/CalFire_THP_scraper.py create mode 100644 sijapi/helpers/thp/thp.py diff --git a/sijapi/helpers/article.py b/sijapi/helpers/article.py deleted file mode 100755 index c76a6e1..0000000 --- a/sijapi/helpers/article.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/Users/sij/miniforge3/envs/sijapi/bin/python -import sys -import asyncio -from fastapi import BackgroundTasks -from sijapi.routers.news import process_and_save_article - -async def main(): - if len(sys.argv) != 2: - print("Usage: python script.py <article_url>") - sys.exit(1) - - url = sys.argv[1] - bg_tasks = BackgroundTasks() - - try: - result = await process_and_save_article(bg_tasks, url) - print(result) - except Exception as e: - print(f"Error processing article: {str(e)}") - sys.exit(1) - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/sijapi/helpers/caplss/CaPLSS.py b/sijapi/helpers/caplss/CaPLSS.py new file mode 100644 index 0000000..20f5a09 --- /dev/null +++ b/sijapi/helpers/caplss/CaPLSS.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 + +import requests +import json +import time +import os +import subprocess +import sys +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +from datetime import datetime + +# Environment variables for database connection +DB_NAME = os.getenv('DB_NAME', 'sij') +DB_USER = os.getenv('DB_USER', 'sij') +DB_PASSWORD = os.getenv('DB_PASSWORD', 'Synchr0!') +DB_HOST = os.getenv('DB_HOST', 'localhost') +DB_PORT = os.getenv('DB_PORT', '5432') + +def get_feature_count(url): + params = { + 'where': '1=1', + 'returnCountOnly': 'true', + 'f': 'json' + } + retries = Retry(total=10, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504]) + with requests.Session() as session: + session.mount("https://", HTTPAdapter(max_retries=retries)) + response = session.get(url, params=params, timeout=30) + response.raise_for_status() + data = response.json() + return data.get('count', 0) + +def fetch_features(url, offset, num, max_retries=5): + params = { + 'where': '1=1', + 'outFields': '*', + 'geometryPrecision': 6, + 'outSR': 4326, + 'f': 'json', + 'resultOffset': offset, + 'resultRecordCount': num + } + for attempt in range(max_retries): + try: + retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) + with requests.Session() as session: + session.mount("https://", HTTPAdapter(max_retries=retries)) + response = session.get(url, params=params, timeout=30) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"Error fetching features (attempt {attempt + 1}/{max_retries}): {e}") + if attempt == max_retries - 1: + raise + time.sleep(5 * (attempt + 1)) # Exponential backoff + + +def download_layer(layer_num, layer_name): + base_dir = os.path.expanduser('~/data') + os.makedirs(base_dir, exist_ok=True) + + file_path = os.path.join(base_dir, f'PLSS_{layer_name}.geojson') + temp_file_path = os.path.join(base_dir, f'PLSS_{layer_name}_temp.json') + + url = f"https://gis.blm.gov/arcgis/rest/services/Cadastral/BLM_Natl_PLSS_CadNSDI/MapServer/{layer_num}/query" + + total_count = get_feature_count(url) + print(f"Total {layer_name} features: {total_count}") + + batch_size = 1000 + chunk_size = 10000 # Write to file every 10,000 features + offset = 0 + all_features = [] + + # Check if temporary file exists and load its content + if os.path.exists(temp_file_path): + try: + with open(temp_file_path, 'r') as f: + all_features = json.load(f) + offset = len(all_features) + print(f"Resuming download from offset {offset}") + except json.JSONDecodeError: + print("Error reading temporary file. Starting download from the beginning.") + offset = 0 + all_features = [] + + try: + while offset < total_count: + print(f"Fetching {layer_name} features {offset} to {offset + batch_size}...") + data = fetch_features(url, offset, batch_size) + + new_features = data.get('features', []) + if not new_features: + break + + all_features.extend(new_features) + offset += len(new_features) + + # Progress indicator + progress = offset / total_count + bar_length = 30 + filled_length = int(bar_length * progress) + bar = '=' * filled_length + '-' * (bar_length - filled_length) + print(f'\rProgress: [{bar}] {progress:.1%} ({offset}/{total_count} features)', end='', flush=True) + + # Save progress to temporary file every chunk_size features + if len(all_features) % chunk_size == 0: + with open(temp_file_path, 'w') as f: + json.dump(all_features, f) + + time.sleep(1) + + print(f"\nTotal {layer_name} features fetched: {len(all_features)}") + + # Write final GeoJSON file + with open(file_path, 'w') as f: + f.write('{"type": "FeatureCollection", "features": [\n') + for i, feature in enumerate(all_features): + geojson_feature = { + "type": "Feature", + "properties": feature['attributes'], + "geometry": feature['geometry'] + } + json.dump(geojson_feature, f) + if i < len(all_features) - 1: + f.write(',\n') + f.write('\n]}') + + print(f"GeoJSON file saved as '{file_path}'") + + # Remove temporary file + if os.path.exists(temp_file_path): + os.remove(temp_file_path) + + return file_path + except Exception as e: + print(f"\nError during download: {e}") + print(f"Partial data saved in {temp_file_path}") + return None + + +def check_postgres_connection(): + try: + subprocess.run(['psql', '-h', DB_HOST, '-p', DB_PORT, '-U', DB_USER, '-d', DB_NAME, '-c', 'SELECT 1;'], + check=True, capture_output=True, text=True) + return True + except subprocess.CalledProcessError: + return False + +def check_postgis_extension(): + try: + result = subprocess.run(['psql', '-h', DB_HOST, '-p', DB_PORT, '-U', DB_USER, '-d', DB_NAME, + '-c', "SELECT 1 FROM pg_extension WHERE extname = 'postgis';"], + check=True, capture_output=True, text=True) + return '1' in result.stdout + except subprocess.CalledProcessError: + return False + +def create_postgis_extension(): + try: + subprocess.run(['psql', '-h', DB_HOST, '-p', DB_PORT, '-U', DB_USER, '-d', DB_NAME, + '-c', "CREATE EXTENSION IF NOT EXISTS postgis;"], + check=True, capture_output=True, text=True) + print("PostGIS extension created successfully.") + except subprocess.CalledProcessError as e: + print(f"Error creating PostGIS extension: {e}") + sys.exit(1) + +def import_to_postgis(file_path, table_name): + if not check_postgres_connection(): + print("Error: Unable to connect to PostgreSQL. Please check your connection settings.") + sys.exit(1) + + if not check_postgis_extension(): + print("PostGIS extension not found. Attempting to create it...") + create_postgis_extension() + + ogr2ogr_command = [ + 'ogr2ogr', + '-f', 'PostgreSQL', + f'PG:dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD} host={DB_HOST} port={DB_PORT}', + file_path, + '-nln', table_name, + '-overwrite' + ] + + try: + subprocess.run(ogr2ogr_command, check=True, capture_output=True, text=True) + print(f"Data successfully imported into PostGIS table: {table_name}") + except subprocess.CalledProcessError as e: + print(f"Error importing data into PostGIS: {e}") + print(f"Command that failed: {e.cmd}") + print(f"Error output: {e.stderr}") + +def check_ogr2ogr(): + try: + subprocess.run(['ogr2ogr', '--version'], check=True, capture_output=True, text=True) + return True + except subprocess.CalledProcessError: + return False + except FileNotFoundError: + return False + + +def main(): + if not check_ogr2ogr(): + print("Error: ogr2ogr not found. Please install GDAL/OGR tools.") + print("On Debian: sudo apt-get install gdal-bin") + print("On macOS with Homebrew: brew install gdal") + sys.exit(1) + + try: + township_file = os.path.expanduser('~/data/PLSS_Townships.geojson') + if not os.path.exists(township_file): + township_file = download_layer(1, "Townships") + if township_file: + import_to_postgis(township_file, "public.plss_townships") + else: + print("Failed to download Townships data. Skipping import.") + + section_file = os.path.expanduser('~/data/PLSS_Sections.geojson') + if not os.path.exists(section_file): + section_file = download_layer(2, "Sections") + if section_file: + import_to_postgis(section_file, "public.plss_sections") + else: + print("Failed to download Sections data. Skipping import.") + + except requests.exceptions.RequestException as e: + print(f"Error fetching data: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + main() diff --git a/sijapi/helpers/caplss/CaPLSS_downloader_and_importer.py b/sijapi/helpers/caplss/CaPLSS_downloader_and_importer.py new file mode 100644 index 0000000..5b3ea00 --- /dev/null +++ b/sijapi/helpers/caplss/CaPLSS_downloader_and_importer.py @@ -0,0 +1,133 @@ +# CaPLSS_downloader_and_importer.py +import requests +import json +import time +import os +import subprocess +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +def get_feature_count(url): + params = { + 'where': '1=1', + 'returnCountOnly': 'true', + 'f': 'json' + } + retries = Retry(total=10, backoff_factor=0.5) + adapter = HTTPAdapter(max_retries=retries) + session = requests.Session() + session.mount("https://", adapter) + + response = session.get(url, params=params, timeout=15) # Add timeout parameter + response.raise_for_status() + data = response.json() + return data.get('count', 0) + + +def fetch_features(url, offset, num): + params = { + 'where': '1=1', + 'outFields': '*', + 'geometryPrecision': 6, + 'outSR': 4326, + 'f': 'json', + 'resultOffset': offset, + 'resultRecordCount': num + } + response = requests.get(url, params=params) + response.raise_for_status() + return response.json() + +def download_layer(layer_num, layer_name): + url = f"https://gis.blm.gov/arcgis/rest/services/Cadastral/BLM_Natl_PLSS_CadNSDI/MapServer/{layer_num}/query" + + total_count = get_feature_count(url) + print(f"Total {layer_name} features: {total_count}") + + batch_size = 1000 + offset = 0 + all_features = [] + + while offset < total_count: + print(f"Fetching {layer_name} features {offset} to {offset + batch_size}...") + data = fetch_features(url, offset, batch_size) + + new_features = data.get('features', []) + if not new_features: + break + + all_features.extend(new_features) + offset += len(new_features) + + print(f"Progress: {len(all_features)}/{total_count} features") + + time.sleep(1) # Be nice to the server + + print(f"Total {layer_name} features fetched: {len(all_features)}") + + # Convert to GeoJSON + geojson_features = [ + { + "type": "Feature", + "properties": feature['attributes'], + "geometry": feature['geometry'] + } for feature in all_features + ] + + full_geojson = { + "type": "FeatureCollection", + "features": geojson_features + } + + # Define a base directory that exists on both macOS and Debian + base_dir = os.path.expanduser('~/data') + os.makedirs(base_dir, exist_ok=True) # Create the directory if it doesn't exist + + # Use os.path.join to construct the file path + file_path = os.path.join(base_dir, f'PLSS_{layer_name}.geojson') + + # Save to file + with open(file_path, 'w') as f: + json.dump(full_geojson, f) + + print(f"GeoJSON file saved as '{file_path}'") + + return file_path + +def import_to_postgis(file_path, table_name): + db_name = 'sij' + db_user = 'sij' + db_password = 'Synchr0!' + + ogr2ogr_command = [ + 'ogr2ogr', + '-f', 'PostgreSQL', + f'PG:dbname={db_name} user={db_user} password={db_password}', + file_path, + '-nln', table_name, + '-overwrite' + ] + + subprocess.run(ogr2ogr_command, check=True) + print(f"Data successfully imported into PostGIS table: {table_name}") + +def main(): + try: + # Download and import Townships (Layer 1) + township_file = download_layer(1, "Townships") + import_to_postgis(township_file, "public.plss_townships") + + # Download and import Sections (Layer 2) + section_file = download_layer(2, "Sections") + import_to_postgis(section_file, "public.plss_sections") + + except requests.exceptions.RequestException as e: + print(f"Error fetching data: {e}") + except subprocess.CalledProcessError as e: + print(f"Error importing data into PostGIS: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + +if __name__ == "__main__": + main() diff --git a/sijapi/helpers/caplss/Townships_progress.json b/sijapi/helpers/caplss/Townships_progress.json new file mode 100644 index 0000000..992f62d --- /dev/null +++ b/sijapi/helpers/caplss/Townships_progress.json @@ -0,0 +1 @@ +{"offset": 50000} \ No newline at end of file diff --git a/sijapi/helpers/caplss/plss.py b/sijapi/helpers/caplss/plss.py new file mode 100644 index 0000000..c127d10 --- /dev/null +++ b/sijapi/helpers/caplss/plss.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 + +import requests +import json +import time +import os +import subprocess +import sys +import yaml +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +import argparse +import psycopg2 +from psycopg2.extras import execute_values + +def load_config(): + script_dir = os.path.dirname(os.path.abspath(__file__)) + sys_config_path = os.path.join(script_dir, '..', 'config', 'sys.yaml') + gis_config_path = os.path.join(script_dir, '..', 'config', 'gis.yaml') + + with open(sys_config_path, 'r') as f: + sys_config = yaml.safe_load(f) + + with open(gis_config_path, 'r') as f: + gis_config = yaml.safe_load(f) + + return sys_config, gis_config + +def get_db_config(sys_config): + pool = sys_config.get('POOL', []) + if pool: + db_config = pool[0] + return { + 'DB_NAME': db_config.get('db_name'), + 'DB_USER': db_config.get('db_user'), + 'DB_PASSWORD': db_config.get('db_pass'), + 'DB_HOST': db_config.get('ts_ip'), + 'DB_PORT': str(db_config.get('db_port')) + } + return {} + +def get_feature_count(url): + params = { + 'where': '1=1', + 'returnCountOnly': 'true', + 'f': 'json' + } + retries = Retry(total=10, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504]) + with requests.Session() as session: + session.mount("https://", HTTPAdapter(max_retries=retries)) + response = session.get(url, params=params, timeout=30) + response.raise_for_status() + data = response.json() + return data.get('count', 0) + +def fetch_features(url, offset, num, max_retries=5): + params = { + 'where': '1=1', + 'outFields': '*', + 'geometryPrecision': 6, + 'outSR': 4326, + 'f': 'json', + 'resultOffset': offset, + 'resultRecordCount': num, + 'orderByFields': 'OBJECTID' + } + for attempt in range(max_retries): + try: + retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) + with requests.Session() as session: + session.mount("https://", HTTPAdapter(max_retries=retries)) + response = session.get(url, params=params, timeout=30) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"Error fetching features (attempt {attempt + 1}/{max_retries}): {e}") + if attempt == max_retries - 1: + raise + time.sleep(5 * (attempt + 1)) # Exponential backoff + + +def create_table(db_config, table_name, gis_config): + conn = psycopg2.connect( + dbname=db_config['DB_NAME'], + user=db_config['DB_USER'], + password=db_config['DB_PASSWORD'], + host=db_config['DB_HOST'], + port=db_config['DB_PORT'] + ) + try: + with conn.cursor() as cur: + # Check if the table already exists + cur.execute(f"SELECT to_regclass('{table_name}')") + if cur.fetchone()[0] is None: + # If the table doesn't exist, create it based on the first feature + url = next(layer['url'] for layer in gis_config['layers'] if layer['table_name'] == table_name) + first_feature = fetch_features(url, 0, 1)['features'][0] + columns = [] + for attr, value in first_feature['attributes'].items(): + column_name = attr.lower().replace('.', '_').replace('()', '') + if isinstance(value, int): + columns.append(f'"{column_name}" INTEGER') + elif isinstance(value, float): + columns.append(f'"{column_name}" DOUBLE PRECISION') + else: + columns.append(f'"{column_name}" TEXT') + + create_sql = f""" + CREATE TABLE {table_name} ( + id SERIAL PRIMARY KEY, + geom GEOMETRY(Polygon, 4326), + {', '.join(columns)} + ) + """ + cur.execute(create_sql) + + # Create index on plssid + cur.execute(f'CREATE INDEX idx_{table_name.split(".")[-1]}_plssid ON {table_name}("plssid")') + + print(f"Created table: {table_name}") + else: + print(f"Table {table_name} already exists") + conn.commit() + except psycopg2.Error as e: + print(f"Error creating table {table_name}: {e}") + finally: + conn.close() + + +def insert_features_to_db(features, table_name, db_config): + conn = psycopg2.connect( + dbname=db_config['DB_NAME'], + user=db_config['DB_USER'], + password=db_config['DB_PASSWORD'], + host=db_config['DB_HOST'], + port=db_config['DB_PORT'] + ) + try: + with conn.cursor() as cur: + # Get the column names from the table + cur.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name.split('.')[-1]}'") + db_columns = [row[0] for row in cur.fetchall() if row[0] != 'id'] + + # Prepare the SQL statement + sql = f""" + INSERT INTO {table_name} ({', '.join([f'"{col}"' for col in db_columns])}) + VALUES %s + """ + + # Prepare the template for execute_values + template = f"({', '.join(['%s' for _ in db_columns])})" + + values = [] + for feature in features: + geom = feature.get('geometry') + attrs = feature.get('attributes') + if geom and attrs: + rings = geom['rings'][0] + wkt = f"POLYGON(({','.join([f'{x} {y}' for x, y in rings])}))" + + row = [] + for col in db_columns: + if col == 'geom': + row.append(wkt) + else: + # Map database column names back to original attribute names + attr_name = col.upper() + if attr_name == 'SHAPE_STAREA': + attr_name = 'Shape.STArea()' + elif attr_name == 'SHAPE_STLENGTH': + attr_name = 'Shape.STLength()' + row.append(attrs.get(attr_name)) + + values.append(tuple(row)) + else: + print(f"Skipping invalid feature: {feature}") + + if values: + execute_values(cur, sql, values, template=template, page_size=100) + print(f"Inserted {len(values)} features") + else: + print("No valid features to insert") + conn.commit() + except Exception as e: + print(f"Error inserting features: {e}") + print(f"First feature for debugging: {features[0] if features else 'No features'}") + conn.rollback() + finally: + conn.close() + + + +def download_and_import_layer(layer_config, db_config, gis_config, force_refresh): + url = layer_config['url'] + layer_name = layer_config['layer_name'] + table_name = layer_config['table_name'] + batch_size = layer_config['batch_size'] + delay = layer_config['delay'] / 1000 # Convert to seconds + + total_count = get_feature_count(url) + print(f"Total {layer_name} features: {total_count}") + + # Check existing records in the database + existing_count = get_existing_record_count(db_config, table_name) + + if existing_count == total_count and not force_refresh: + print(f"Table {table_name} already contains all {total_count} features. Skipping.") + return + + if force_refresh: + delete_existing_table(db_config, table_name) + create_table(db_config, table_name, gis_config) + existing_count = 0 + elif existing_count == 0: + create_table(db_config, table_name, gis_config) + + offset = existing_count + + start_time = time.time() + try: + while offset < total_count: + batch_start_time = time.time() + print(f"Fetching {layer_name} features {offset} to {offset + batch_size}...") + try: + data = fetch_features(url, offset, batch_size) + new_features = data.get('features', []) + if not new_features: + break + + insert_features_to_db(new_features, table_name, db_config) + offset += len(new_features) + + batch_end_time = time.time() + batch_duration = batch_end_time - batch_start_time + print(f"Batch processed in {batch_duration:.2f} seconds") + + # Progress indicator + progress = offset / total_count + bar_length = 30 + filled_length = int(bar_length * progress) + bar = '=' * filled_length + '-' * (bar_length - filled_length) + print(f'\rProgress: [{bar}] {progress:.1%} ({offset}/{total_count} features)', end='', flush=True) + + time.sleep(delay) + except Exception as e: + print(f"\nError processing batch starting at offset {offset}: {e}") + print("Continuing with next batch...") + offset += batch_size + + end_time = time.time() + total_duration = end_time - start_time + print(f"\nTotal {layer_name} features fetched and imported: {offset}") + print(f"Total time: {total_duration:.2f} seconds") + + except Exception as e: + print(f"\nError during download and import: {e}") + print(f"Last successful offset: {offset}") + +def get_existing_record_count(db_config, table_name): + conn = psycopg2.connect( + dbname=db_config['DB_NAME'], + user=db_config['DB_USER'], + password=db_config['DB_PASSWORD'], + host=db_config['DB_HOST'], + port=db_config['DB_PORT'] + ) + try: + with conn.cursor() as cur: + cur.execute(f"SELECT COUNT(*) FROM {table_name}") + count = cur.fetchone()[0] + return count + except psycopg2.Error: + return 0 + finally: + conn.close() + +def delete_existing_table(db_config, table_name): + conn = psycopg2.connect( + dbname=db_config['DB_NAME'], + user=db_config['DB_USER'], + password=db_config['DB_PASSWORD'], + host=db_config['DB_HOST'], + port=db_config['DB_PORT'] + ) + try: + with conn.cursor() as cur: + # Drop the index if it exists + cur.execute(f"DROP INDEX IF EXISTS idx_{table_name.split('.')[-1]}_plssid") + + # Then drop the table + cur.execute(f"DROP TABLE IF EXISTS {table_name} CASCADE") + conn.commit() + print(f"Deleted existing table and index: {table_name}") + except psycopg2.Error as e: + print(f"Error deleting table {table_name}: {e}") + finally: + conn.close() + + +def check_postgres_connection(db_config): + try: + subprocess.run(['psql', + '-h', db_config['DB_HOST'], + '-p', db_config['DB_PORT'], + '-U', db_config['DB_USER'], + '-d', db_config['DB_NAME'], + '-c', 'SELECT 1;'], + check=True, capture_output=True, text=True) + return True + except subprocess.CalledProcessError: + return False + +def check_postgis_extension(db_config): + try: + result = subprocess.run(['psql', + '-h', db_config['DB_HOST'], + '-p', db_config['DB_PORT'], + '-U', db_config['DB_USER'], + '-d', db_config['DB_NAME'], + '-c', "SELECT 1 FROM pg_extension WHERE extname = 'postgis';"], + check=True, capture_output=True, text=True) + return '1' in result.stdout + except subprocess.CalledProcessError: + return False + +def create_postgis_extension(db_config): + try: + subprocess.run(['psql', + '-h', db_config['DB_HOST'], + '-p', db_config['DB_PORT'], + '-U', db_config['DB_USER'], + '-d', db_config['DB_NAME'], + '-c', "CREATE EXTENSION IF NOT EXISTS postgis;"], + check=True, capture_output=True, text=True) + print("PostGIS extension created successfully.") + except subprocess.CalledProcessError as e: + print(f"Error creating PostGIS extension: {e}") + sys.exit(1) + +def main(): + parser = argparse.ArgumentParser(description="Download and import PLSS data") + parser.add_argument("--force-refresh", nargs='*', help="Force refresh of specified layers or all if none specified") + args = parser.parse_args() + + sys_config, gis_config = load_config() + db_config = get_db_config(sys_config) + + if not check_postgres_connection(db_config): + print("Error: Unable to connect to PostgreSQL. Please check your connection settings.") + sys.exit(1) + + if not check_postgis_extension(db_config): + print("PostGIS extension not found. Attempting to create it...") + create_postgis_extension(db_config) + + try: + for layer in gis_config['layers']: + if args.force_refresh is None or not args.force_refresh or layer['layer_name'] in args.force_refresh: + download_and_import_layer(layer, db_config, gis_config, bool(args.force_refresh)) + except requests.exceptions.RequestException as e: + print(f"Error fetching data: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + +if __name__ == "__main__": + main() diff --git a/sijapi/helpers/db/db.py b/sijapi/helpers/db/db.py new file mode 100644 index 0000000..4cf142b --- /dev/null +++ b/sijapi/helpers/db/db.py @@ -0,0 +1,71 @@ +import asyncio +import asyncpg +import yaml +from pathlib import Path + +async def load_config(): + config_path = Path(__file__).parent.parent / 'config' / 'sys.yaml' + with open(config_path, 'r') as file: + return yaml.safe_load(file) + +async def add_foreign_key_constraint(conn): + # Ensure short_code is not null in both tables + await conn.execute(""" + ALTER TABLE short_urls + ALTER COLUMN short_code SET NOT NULL; + """) + + await conn.execute(""" + ALTER TABLE click_logs + ALTER COLUMN short_code SET NOT NULL; + """) + + # Add unique constraint to short_urls.short_code if it doesn't exist + await conn.execute(""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'short_urls_short_code_key' + ) THEN + ALTER TABLE short_urls + ADD CONSTRAINT short_urls_short_code_key UNIQUE (short_code); + END IF; + END $$; + """) + + # Add foreign key constraint + await conn.execute(""" + ALTER TABLE click_logs + ADD CONSTRAINT fk_click_logs_short_urls + FOREIGN KEY (short_code) + REFERENCES short_urls(short_code) + ON DELETE CASCADE; + """) + + print("Foreign key constraint added successfully.") + +async def main(): + config = await load_config() + source_server = config['POOL'][0] # sij-mbp16 + + conn_params = { + 'database': source_server['db_name'], + 'user': source_server['db_user'], + 'password': source_server['db_pass'], + 'host': source_server['ts_ip'], + 'port': source_server['db_port'] + } + + conn = await asyncpg.connect(**conn_params) + + try: + await add_foreign_key_constraint(conn) + except Exception as e: + print(f"An error occurred: {str(e)}") + finally: + await conn.close() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sijapi/helpers/db/db_get_schema.py b/sijapi/helpers/db/db_get_schema.py new file mode 100644 index 0000000..c672c6e --- /dev/null +++ b/sijapi/helpers/db/db_get_schema.py @@ -0,0 +1,89 @@ +import psycopg2 +from psycopg2 import sql + +def connect_to_db(): + return psycopg2.connect( + dbname='sij', + user='sij', + password='Synchr0!', + host='localhost' # Adjust if your database is not on localhost + ) + +def get_table_info(conn): + with conn.cursor() as cur: + # Get all tables in the public schema + cur.execute(""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema = 'public' + """) + tables = cur.fetchall() + + table_info = {} + for (table_name,) in tables: + table_info[table_name] = { + 'primary_keys': get_primary_keys(cur, table_name), + 'foreign_keys': get_foreign_keys(cur, table_name) + } + + return table_info + +def get_primary_keys(cur, table_name): + cur.execute(""" + SELECT a.attname + FROM pg_index i + JOIN pg_attribute a ON a.attrelid = i.indrelid + AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = %s::regclass + AND i.indisprimary + """, (table_name,)) + return [row[0] for row in cur.fetchall()] + +def get_foreign_keys(cur, table_name): + cur.execute(""" + SELECT + tc.constraint_name, + kcu.column_name, + ccu.table_name AS foreign_table_name, + ccu.column_name AS foreign_column_name + FROM + information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + AND ccu.table_schema = tc.table_schema + WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name=%s + """, (table_name,)) + return cur.fetchall() + +def main(): + try: + with connect_to_db() as conn: + table_info = get_table_info(conn) + + for table_name, info in table_info.items(): + print(f"\n## Table: {table_name}") + + print("\nPrimary Keys:") + if info['primary_keys']: + for pk in info['primary_keys']: + print(f"- {pk}") + else: + print("- No primary keys found") + + print("\nForeign Keys:") + if info['foreign_keys']: + for fk in info['foreign_keys']: + print(f"- {fk[1]} -> {fk[2]}.{fk[3]} (Constraint: {fk[0]})") + else: + print("- No foreign keys found") + + except psycopg2.Error as e: + print(f"Database error: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + +if __name__ == "__main__": + main() diff --git a/sijapi/helpers/db/db_repl.py b/sijapi/helpers/db/db_repl.py new file mode 100644 index 0000000..a0bfe6d --- /dev/null +++ b/sijapi/helpers/db/db_repl.py @@ -0,0 +1,83 @@ +import yaml +import subprocess +import os +import sys + +def load_config(): + with open('../config/sys.yaml', 'r') as file: + return yaml.safe_load(file) + +def run_command(command): + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + stdout, stderr = process.communicate() + return process.returncode, stdout.decode(), stderr.decode() + +def pg_dump(host, port, db_name, user, password, tables): + dump_command = f"PGPASSWORD={password} pg_dump -h {host} -p {port} -U {user} -d {db_name} -t {' -t '.join(tables)} -c --no-owner" + return run_command(dump_command) + +def pg_restore(host, port, db_name, user, password, dump_data): + restore_command = f"PGPASSWORD={password} psql -h {host} -p {port} -U {user} -d {db_name}" + process = subprocess.Popen(restore_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + stdout, stderr = process.communicate(input=dump_data.encode()) + return process.returncode, stdout.decode(), stderr.decode() + +def check_postgres_version(host, port, user, password): + version_command = f"PGPASSWORD={password} psql -h {host} -p {port} -U {user} -c 'SELECT version();'" + returncode, stdout, stderr = run_command(version_command) + if returncode == 0: + return stdout.strip() + else: + return f"Error checking version: {stderr}" + +def replicate_databases(): + config = load_config() + pool = config['POOL'] + tables_to_replicate = ['click_logs', 'dailyweather', 'hourlyweather', 'locations', 'short_urls'] + + source_db = pool[0] + target_dbs = pool[1:] + + # Check source database version + source_version = check_postgres_version(source_db['ts_ip'], source_db['db_port'], source_db['db_user'], source_db['db_pass']) + print(f"Source database version: {source_version}") + + for target_db in target_dbs: + print(f"\nReplicating to {target_db['ts_id']}...") + + # Check target database version + target_version = check_postgres_version(target_db['ts_ip'], target_db['db_port'], target_db['db_user'], target_db['db_pass']) + print(f"Target database version: {target_version}") + + # Perform dump + returncode, dump_data, stderr = pg_dump( + source_db['ts_ip'], + source_db['db_port'], + source_db['db_name'], + source_db['db_user'], + source_db['db_pass'], + tables_to_replicate + ) + + if returncode != 0: + print(f"Error during dump: {stderr}") + continue + + # Perform restore + returncode, stdout, stderr = pg_restore( + target_db['ts_ip'], + target_db['db_port'], + target_db['db_name'], + target_db['db_user'], + target_db['db_pass'], + dump_data + ) + + if returncode != 0: + print(f"Error during restore: {stderr}") + else: + print(f"Replication to {target_db['ts_id']} completed successfully.") + +if __name__ == "__main__": + replicate_databases() + diff --git a/sijapi/helpers/db/db_replicator.py b/sijapi/helpers/db/db_replicator.py new file mode 100755 index 0000000..c4fb6b4 --- /dev/null +++ b/sijapi/helpers/db/db_replicator.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 + +import os +import yaml +import subprocess + +def load_config(): + script_dir = os.path.dirname(os.path.abspath(__file__)) + sys_config_path = os.path.join(script_dir, '..', 'config', 'sys.yaml') + gis_config_path = os.path.join(script_dir, '..', 'config', 'gis.yaml') + + with open(sys_config_path, 'r') as f: + sys_config = yaml.safe_load(f) + + with open(gis_config_path, 'r') as f: + gis_config = yaml.safe_load(f) + + return sys_config, gis_config + +def replicate_table(source, targets, table_name): + print(f"Replicating {table_name}") + + # Dump the table from the source + dump_command = [ + 'pg_dump', + '-h', source['ts_ip'], + '-p', str(source['db_port']), + '-U', source['db_user'], + '-d', source['db_name'], + '-t', table_name, + '--no-owner', + '--no-acl' + ] + + env = os.environ.copy() + env['PGPASSWORD'] = source['db_pass'] + + with open(f"{table_name}.sql", 'w') as f: + subprocess.run(dump_command, env=env, stdout=f, check=True) + + # Restore the table to each target + for target in targets: + print(f"Replicating to {target['ts_id']}") + restore_command = [ + 'psql', + '-h', target['ts_ip'], + '-p', str(target['db_port']), + '-U', target['db_user'], + '-d', target['db_name'], + '-c', f"DROP TABLE IF EXISTS {table_name} CASCADE;", + '-f', f"{table_name}.sql" + ] + + env = os.environ.copy() + env['PGPASSWORD'] = target['db_pass'] + + subprocess.run(restore_command, env=env, check=True) + + # Clean up the dump file + os.remove(f"{table_name}.sql") + +def main(): + sys_config, gis_config = load_config() + + source_server = sys_config['POOL'][0] + target_servers = sys_config['POOL'][1:] + + tables = [layer['table_name'] for layer in gis_config['layers']] + + for table in tables: + replicate_table(source_server, target_servers, table) + + print("Replication complete!") + +if __name__ == "__main__": + main() diff --git a/sijapi/helpers/db/db_uuid_migrate.py b/sijapi/helpers/db/db_uuid_migrate.py new file mode 100644 index 0000000..b7bb5d1 --- /dev/null +++ b/sijapi/helpers/db/db_uuid_migrate.py @@ -0,0 +1,46 @@ +import asyncio +import asyncpg + +# Database connection information +DB_INFO = { + 'host': '100.64.64.20', + 'port': 5432, + 'database': 'sij', + 'user': 'sij', + 'password': 'Synchr0!' +} + +async def update_click_logs(): + # Connect to the database + conn = await asyncpg.connect(**DB_INFO) + + try: + # Drop existing 'id' and 'new_id' columns if they exist + await conn.execute(""" + ALTER TABLE click_logs + DROP COLUMN IF EXISTS id, + DROP COLUMN IF EXISTS new_id; + """) + print("Dropped existing id and new_id columns (if they existed)") + + # Add new UUID column as primary key + await conn.execute(""" + ALTER TABLE click_logs + ADD COLUMN id UUID PRIMARY KEY DEFAULT gen_random_uuid(); + """) + print("Added new UUID column as primary key") + + # Get the number of rows in the table + row_count = await conn.fetchval("SELECT COUNT(*) FROM click_logs") + print(f"Number of rows in click_logs: {row_count}") + + except Exception as e: + print(f"An error occurred: {str(e)}") + import traceback + traceback.print_exc() + finally: + # Close the database connection + await conn.close() + +# Run the update +asyncio.run(update_click_logs()) diff --git a/sijapi/helpers/db/migrate_db_to_uuid.py b/sijapi/helpers/db/migrate_db_to_uuid.py new file mode 100644 index 0000000..60034fb --- /dev/null +++ b/sijapi/helpers/db/migrate_db_to_uuid.py @@ -0,0 +1,191 @@ +import psycopg2 +from psycopg2 import sql +import sys + +def connect_to_db(): + return psycopg2.connect( + dbname='sij', + user='sij', + password='Synchr0!', + host='localhost' + ) + +def get_tables(cur): + cur.execute(""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema = 'public' AND table_type = 'BASE TABLE' + AND table_name NOT LIKE '%_uuid' AND table_name NOT LIKE '%_orig' + AND table_name != 'spatial_ref_sys' + """) + return [row[0] for row in cur.fetchall()] + +def get_columns(cur, table_name): + cur.execute(""" + SELECT column_name, udt_name, + is_nullable, column_default, + character_maximum_length, numeric_precision, numeric_scale + FROM information_schema.columns + WHERE table_name = %s + ORDER BY ordinal_position + """, (table_name,)) + return cur.fetchall() + +def get_constraints(cur, table_name): + cur.execute(""" + SELECT conname, contype, pg_get_constraintdef(c.oid) + FROM pg_constraint c + JOIN pg_namespace n ON n.oid = c.connamespace + WHERE conrelid = %s::regclass + AND n.nspname = 'public' + """, (table_name,)) + return cur.fetchall() + +def drop_table_if_exists(cur, table_name): + cur.execute(sql.SQL("DROP TABLE IF EXISTS {} CASCADE").format(sql.Identifier(table_name))) + +def create_uuid_table(cur, old_table, new_table): + drop_table_if_exists(cur, new_table) + columns = get_columns(cur, old_table) + constraints = get_constraints(cur, old_table) + + column_defs = [] + has_id_column = any(col[0] == 'id' for col in columns) + + for col in columns: + col_name, udt_name, is_nullable, default, max_length, precision, scale = col + if col_name == 'id' and has_id_column: + column_defs.append(sql.SQL("{} UUID PRIMARY KEY DEFAULT gen_random_uuid()").format(sql.Identifier(col_name))) + else: + type_sql = sql.SQL("{}").format(sql.Identifier(udt_name)) + if max_length: + type_sql = sql.SQL("{}({})").format(type_sql, sql.Literal(max_length)) + elif precision and scale: + type_sql = sql.SQL("{}({},{})").format(type_sql, sql.Literal(precision), sql.Literal(scale)) + + column_def = sql.SQL("{} {}").format(sql.Identifier(col_name), type_sql) + if is_nullable == 'NO': + column_def = sql.SQL("{} NOT NULL").format(column_def) + if default and 'nextval' not in default: # Skip auto-increment defaults + column_def = sql.SQL("{} DEFAULT {}").format(column_def, sql.SQL(default)) + column_defs.append(column_def) + + constraint_defs = [] + for constraint in constraints: + conname, contype, condef = constraint + if contype != 'p' or not has_id_column: # Keep primary key if there's no id column + constraint_defs.append(sql.SQL(condef)) + + if not has_id_column: + column_defs.append(sql.SQL("uuid UUID DEFAULT gen_random_uuid()")) + + query = sql.SQL("CREATE TABLE {} ({})").format( + sql.Identifier(new_table), + sql.SQL(", ").join(column_defs + constraint_defs) + ) + cur.execute(query) + +def migrate_data(cur, old_table, new_table): + columns = get_columns(cur, old_table) + column_names = [col[0] for col in columns] + has_id_column = 'id' in column_names + + if has_id_column: + column_names.remove('id') + old_cols = sql.SQL(", ").join(map(sql.Identifier, column_names)) + new_cols = sql.SQL(", ").join(map(sql.Identifier, ['id'] + column_names)) + query = sql.SQL("INSERT INTO {} ({}) SELECT gen_random_uuid(), {} FROM {}").format( + sql.Identifier(new_table), + new_cols, + old_cols, + sql.Identifier(old_table) + ) + else: + old_cols = sql.SQL(", ").join(map(sql.Identifier, column_names)) + new_cols = sql.SQL(", ").join(map(sql.Identifier, column_names + ['uuid'])) + query = sql.SQL("INSERT INTO {} ({}) SELECT {}, gen_random_uuid() FROM {}").format( + sql.Identifier(new_table), + new_cols, + old_cols, + sql.Identifier(old_table) + ) + cur.execute(query) + +def update_foreign_keys(cur, tables): + for table in tables: + constraints = get_constraints(cur, table) + for constraint in constraints: + conname, contype, condef = constraint + if contype == 'f': # Foreign key constraint + referenced_table = condef.split('REFERENCES ')[1].split('(')[0].strip() + referenced_column = condef.split('(')[2].split(')')[0].strip() + local_column = condef.split('(')[1].split(')')[0].strip() + + cur.execute(sql.SQL(""" + UPDATE {table_uuid} + SET {local_column} = subquery.new_id::text::{local_column_type} + FROM ( + SELECT old.{ref_column} AS old_id, new_table.id AS new_id + FROM {ref_table} old + JOIN public.{ref_table_uuid} new_table ON new_table.{ref_column}::text = old.{ref_column}::text + ) AS subquery + WHERE {local_column}::text = subquery.old_id::text + """).format( + table_uuid=sql.Identifier(f"{table}_uuid"), + local_column=sql.Identifier(local_column), + local_column_type=sql.SQL(get_column_type(cur, f"{table}_uuid", local_column)), + ref_column=sql.Identifier(referenced_column), + ref_table=sql.Identifier(referenced_table), + ref_table_uuid=sql.Identifier(f"{referenced_table}_uuid") + )) + +def get_column_type(cur, table_name, column_name): + cur.execute(""" + SELECT data_type + FROM information_schema.columns + WHERE table_name = %s AND column_name = %s + """, (table_name, column_name)) + return cur.fetchone()[0] + +def rename_tables(cur, tables): + for table in tables: + drop_table_if_exists(cur, f"{table}_orig") + cur.execute(sql.SQL("ALTER TABLE IF EXISTS {} RENAME TO {}").format( + sql.Identifier(table), sql.Identifier(f"{table}_orig") + )) + cur.execute(sql.SQL("ALTER TABLE IF EXISTS {} RENAME TO {}").format( + sql.Identifier(f"{table}_uuid"), sql.Identifier(table) + )) + +def main(): + try: + with connect_to_db() as conn: + with conn.cursor() as cur: + tables = get_tables(cur) + + # Create new UUID tables + for table in tables: + print(f"Creating UUID table for {table}...") + create_uuid_table(cur, table, f"{table}_uuid") + + # Migrate data + for table in tables: + print(f"Migrating data for {table}...") + migrate_data(cur, table, f"{table}_uuid") + + # Update foreign keys + print("Updating foreign key references...") + update_foreign_keys(cur, tables) + + # Rename tables + print("Renaming tables...") + rename_tables(cur, tables) + + conn.commit() + print("Migration completed successfully.") + except Exception as e: + print(f"An error occurred: {e}") + conn.rollback() + +if __name__ == "__main__": + main() diff --git a/sijapi/helpers/db/migrate_query_tracking_to_uuid.sh b/sijapi/helpers/db/migrate_query_tracking_to_uuid.sh new file mode 100755 index 0000000..f3e7556 --- /dev/null +++ b/sijapi/helpers/db/migrate_query_tracking_to_uuid.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +# PostgreSQL connection details +DB_NAME="sij" +DB_USER="sij" +DB_PASSWORD="Synchr0!" +DB_HOST="localhost" +DB_PORT="5432" + +# Function to execute SQL commands +execute_sql() { + PGPASSWORD=$DB_PASSWORD psql -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME -c "$1" +} + +# Main script +echo "Starting migration of query_tracking table..." + +# Enable uuid-ossp extension if not already enabled +execute_sql "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";" + +# Add a new UUID column +execute_sql "ALTER TABLE query_tracking ADD COLUMN new_id UUID DEFAULT uuid_generate_v4();" + +# Generate new UUIDs for all existing rows +execute_sql "UPDATE query_tracking SET new_id = uuid_generate_v4() WHERE new_id IS NULL;" + +# Drop the old id column and rename the new one +execute_sql "ALTER TABLE query_tracking DROP COLUMN id;" +execute_sql "ALTER TABLE query_tracking RENAME COLUMN new_id TO id;" + +# Set the new id column as primary key +execute_sql "ALTER TABLE query_tracking ADD PRIMARY KEY (id);" + +echo "Migration completed successfully!" + diff --git a/sijapi/helpers/db/repair_weather_db.py b/sijapi/helpers/db/repair_weather_db.py new file mode 100644 index 0000000..f3e27f4 --- /dev/null +++ b/sijapi/helpers/db/repair_weather_db.py @@ -0,0 +1,101 @@ +import asyncio +import asyncpg +import yaml +from pathlib import Path +import subprocess + +async def load_config(): + config_path = Path(__file__).parent.parent / 'config' / 'db.yaml' + with open(config_path, 'r') as file: + return yaml.safe_load(file) + +async def get_table_size(conn, table_name): + return await conn.fetchval(f"SELECT COUNT(*) FROM {table_name}") + +async def check_postgres_version(conn): + return await conn.fetchval("SELECT version()") + +async def replicate_table(source, target, table_name): + print(f"Replicating {table_name} from {source['ts_id']} to {target['ts_id']}") + + source_conn = await asyncpg.connect(**{k: source[k] for k in ['db_name', 'db_user', 'db_pass', 'ts_ip', 'db_port']}) + target_conn = await asyncpg.connect(**{k: target[k] for k in ['db_name', 'db_user', 'db_pass', 'ts_ip', 'db_port']}) + + try: + source_version = await check_postgres_version(source_conn) + target_version = await check_postgres_version(target_conn) + print(f"Source database version: {source_version}") + print(f"Target database version: {target_version}") + + table_size = await get_table_size(source_conn, table_name) + print(f"Table size: {table_size} rows") + + # Dump the table + dump_command = [ + 'pg_dump', + '-h', source['ts_ip'], + '-p', str(source['db_port']), + '-U', source['db_user'], + '-d', source['db_name'], + '-t', table_name, + '--no-owner', + '--no-acl' + ] + env = {'PGPASSWORD': source['db_pass']} + dump_result = subprocess.run(dump_command, env=env, capture_output=True, text=True) + + if dump_result.returncode != 0: + raise Exception(f"Dump failed: {dump_result.stderr}") + + print("Dump completed successfully") + + # Drop and recreate the table on the target + await target_conn.execute(f"DROP TABLE IF EXISTS {table_name} CASCADE") + print(f"Dropped table {table_name} on target") + + # Restore the table + restore_command = [ + 'psql', + '-h', target['ts_ip'], + '-p', str(target['db_port']), + '-U', target['db_user'], + '-d', target['db_name'], + ] + env = {'PGPASSWORD': target['db_pass']} + restore_result = subprocess.run(restore_command, input=dump_result.stdout, env=env, capture_output=True, text=True) + + if restore_result.returncode != 0: + raise Exception(f"Restore failed: {restore_result.stderr}") + + print(f"Table {table_name} restored successfully") + + # Verify the number of rows in the target table + target_size = await get_table_size(target_conn, table_name) + if target_size == table_size: + print(f"Replication successful. {target_size} rows copied.") + else: + print(f"Warning: Source had {table_size} rows, but target has {target_size} rows.") + + except Exception as e: + print(f"An error occurred while replicating {table_name}: {str(e)}") + finally: + await source_conn.close() + await target_conn.close() + +async def main(): + config = await load_config() + source_server = config['POOL'][0] # sij-mbp16 + target_servers = config['POOL'][1:] # sij-vm and sij-vps + + tables_to_replicate = [ + 'click_logs', 'dailyweather', 'hourlyweather', 'locations', 'short_urls' + ] + + for table_name in tables_to_replicate: + for target_server in target_servers: + await replicate_table(source_server, target_server, table_name) + + print("All replications completed!") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sijapi/helpers/db/repl.py b/sijapi/helpers/db/repl.py new file mode 100644 index 0000000..7240391 --- /dev/null +++ b/sijapi/helpers/db/repl.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 + +import os +import yaml +import subprocess +import time +from tqdm import tqdm + +def load_config(): + script_dir = os.path.dirname(os.path.abspath(__file__)) + sys_config_path = os.path.join(script_dir, '..', 'config', 'sys.yaml') + gis_config_path = os.path.join(script_dir, '..', 'config', 'gis.yaml') + + with open(sys_config_path, 'r') as f: + sys_config = yaml.safe_load(f) + + with open(gis_config_path, 'r') as f: + gis_config = yaml.safe_load(f) + + return sys_config, gis_config + +def get_table_size(server, table_name): + env = os.environ.copy() + env['PGPASSWORD'] = server['db_pass'] + + command = [ + 'psql', + '-h', server['ts_ip'], + '-p', str(server['db_port']), + '-U', server['db_user'], + '-d', server['db_name'], + '-t', + '-c', f"SELECT COUNT(*) FROM {table_name}" + ] + + result = subprocess.run(command, env=env, capture_output=True, text=True, check=True) + return int(result.stdout.strip()) + +def replicate_table(source, targets, table_name): + print(f"Replicating {table_name}") + + # Get table size for progress bar + table_size = get_table_size(source, table_name) + print(f"Table size: {table_size} rows") + + # Dump the table from the source + dump_command = [ + 'pg_dump', + '-h', source['ts_ip'], + '-p', str(source['db_port']), + '-U', source['db_user'], + '-d', source['db_name'], + '-t', table_name, + '--no-owner', + '--no-acl' + ] + + env = os.environ.copy() + env['PGPASSWORD'] = source['db_pass'] + + print("Dumping table...") + with open(f"{table_name}.sql", 'w') as f: + subprocess.run(dump_command, env=env, stdout=f, check=True) + print("Dump complete") + + # Restore the table to each target + for target in targets: + print(f"Replicating to {target['ts_id']}") + + # Drop table and its sequence + drop_commands = [ + f"DROP TABLE IF EXISTS {table_name} CASCADE;", + f"DROP SEQUENCE IF EXISTS {table_name}_id_seq CASCADE;" + ] + + restore_command = [ + 'psql', + '-h', target['ts_ip'], + '-p', str(target['db_port']), + '-U', target['db_user'], + '-d', target['db_name'], + ] + + env = os.environ.copy() + env['PGPASSWORD'] = target['db_pass'] + + # Execute drop commands + for cmd in drop_commands: + print(f"Executing: {cmd}") + subprocess.run(restore_command + ['-c', cmd], env=env, check=True) + + # Restore the table + print("Restoring table...") + process = subprocess.Popen(restore_command + ['-f', f"{table_name}.sql"], env=env, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + + pbar = tqdm(total=table_size, desc="Copying rows") + copied_rows = 0 + for line in process.stderr: + if line.startswith("COPY"): + copied_rows = int(line.split()[1]) + pbar.update(copied_rows - pbar.n) + print(line, end='') # Print all output for visibility + + pbar.close() + process.wait() + + if process.returncode != 0: + print(f"Error occurred during restoration to {target['ts_id']}") + print(process.stderr.read()) + else: + print(f"Restoration to {target['ts_id']} completed successfully") + + # Clean up the dump file + os.remove(f"{table_name}.sql") + print(f"Replication of {table_name} completed") + +def main(): + sys_config, gis_config = load_config() + + source_server = sys_config['POOL'][0] + target_servers = sys_config['POOL'][1:] + + tables = [layer['table_name'] for layer in gis_config['layers']] + + for table in tables: + replicate_table(source_server, target_servers, table) + + print("All replications completed!") + +if __name__ == "__main__": + main() diff --git a/sijapi/helpers/db/repl.sh b/sijapi/helpers/db/repl.sh new file mode 100755 index 0000000..72cfff9 --- /dev/null +++ b/sijapi/helpers/db/repl.sh @@ -0,0 +1,118 @@ +#!/bin/bash + +# Configuration +SOURCE_HOST="100.64.64.20" +SOURCE_PORT="5432" +SOURCE_DB="sij" +SOURCE_USER="sij" +SOURCE_PASS="Synchr0!" + +# Target servers +declare -a TARGETS=( + "sij-vm:100.64.64.11:5432:sij:sij:Synchr0!" + "sij-vps:100.64.64.15:5432:sij:sij:Synchr0!" +) + +# Tables to replicate +TABLES=("dailyweather" "hourlyweather" "short_urls" "click_logs" "locations" "query_tracking") + +# PostgreSQL binaries +PSQL="/Applications/Postgres.app/Contents/Versions/latest/bin/psql" +PG_DUMP="/Applications/Postgres.app/Contents/Versions/latest/bin/pg_dump" + +# Function to run SQL and display results +run_sql() { + local host=$1 + local port=$2 + local db=$3 + local user=$4 + local pass=$5 + local sql=$6 + + PGPASSWORD=$pass $PSQL -h $host -p $port -U $user -d $db -c "$sql" +} + +# Replicate to a target +replicate_to_target() { + local target_info=$1 + IFS=':' read -r target_name target_host target_port target_db target_user target_pass <<< "$target_info" + + echo "Replicating to $target_name ($target_host)" + + # Check source tables + echo "Checking source tables:" + for table in "${TABLES[@]}"; do + run_sql $SOURCE_HOST $SOURCE_PORT $SOURCE_DB $SOURCE_USER $SOURCE_PASS "SELECT COUNT(*) FROM $table;" + done + + # Ensure uuid-ossp extension is created + run_sql $target_host $target_port $target_db $target_user $target_pass "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";" + + # Dump and restore each table + for table in "${TABLES[@]}"; do + echo "Replicating $table" + + if [ "$table" == "query_tracking" ]; then + # Dump structure + PGPASSWORD=$SOURCE_PASS $PG_DUMP -h $SOURCE_HOST -p $SOURCE_PORT -U $SOURCE_USER -d $SOURCE_DB -t $table --schema-only --no-owner --no-acl > ${table}_structure.sql + + # Dump data + PGPASSWORD=$SOURCE_PASS $PG_DUMP -h $SOURCE_HOST -p $SOURCE_PORT -U $SOURCE_USER -d $SOURCE_DB -t $table --data-only --no-owner --no-acl > ${table}_data.sql + + # Drop and recreate table on target + run_sql $target_host $target_port $target_db $target_user $target_pass "DROP TABLE IF EXISTS $table CASCADE; " + + # Restore structure + PGPASSWORD=$target_pass $PSQL -h $target_host -p $target_port -U $target_user -d $target_db -f ${table}_structure.sql + + # Restore data + PGPASSWORD=$target_pass $PSQL -h $target_host -p $target_port -U $target_user -d $target_db -f ${table}_data.sql + + # Clean up dump files + rm ${table}_structure.sql ${table}_data.sql + else + # Dump table + PGPASSWORD=$SOURCE_PASS $PG_DUMP -h $SOURCE_HOST -p $SOURCE_PORT -U $SOURCE_USER -d $SOURCE_DB -t $table --no-owner --no-acl > ${table}_dump.sql + + if [ $? -ne 0 ]; then + echo "Error dumping $table" + continue + fi + + # Clean up the dump file + # Remove empty lines, lines with only whitespace, and lines starting with "sij" + sed -i.bak '/^\s*$/d; /^sij/d' ${table}_dump.sql && rm ${table}_dump.sql.bak + + # Drop and recreate table on target + run_sql $target_host $target_port $target_db $target_user $target_pass "DROP TABLE IF EXISTS $table CASCADE; " + + # Restore table + PGPASSWORD=$target_pass $PSQL -h $target_host -p $target_port -U $target_user -d $target_db -f ${table}_dump.sql + + if [ $? -ne 0 ]; then + echo "Error restoring $table" + else + echo "$table replicated successfully" + fi + + # Clean up dump file + rm ${table}_dump.sql + fi + done + + # Verify replication + echo "Verifying replication:" + for table in "${TABLES[@]}"; do + echo "Checking $table on target:" + run_sql $target_host $target_port $target_db $target_user $target_pass "SELECT COUNT(*) FROM $table;" + done +} + + +# Main replication process +for target in "${TARGETS[@]}"; do + replicate_to_target "$target" +done + +echo "Replication completed" + diff --git a/sijapi/helpers/db/replicator.py b/sijapi/helpers/db/replicator.py new file mode 100644 index 0000000..526bd47 --- /dev/null +++ b/sijapi/helpers/db/replicator.py @@ -0,0 +1,127 @@ +# helpers/replicator.py + +import asyncio +import asyncpg +import yaml +from pathlib import Path +import subprocess +import sys +import os + +async def load_config(): + config_path = Path(__file__).parent.parent / 'config' / 'db.yaml' + with open(config_path, 'r') as file: + return yaml.safe_load(file) + +async def check_table_existence(conn, tables): + for table in tables: + exists = await conn.fetchval(f""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = $1 + ) + """, table) + print(f"Table {table} {'exists' if exists else 'does not exist'} in the database.") + +async def check_user_permissions(conn, tables): + for table in tables: + has_permission = await conn.fetchval(f""" + SELECT has_table_privilege(current_user, $1, 'SELECT') + """, table) + print(f"User {'has' if has_permission else 'does not have'} SELECT permission on table {table}.") + +async def replicate_tables(source, target, tables): + print(f"Replicating tables from {source['ts_id']} to {target['ts_id']}") + + conn_params = { + 'database': 'db_name', + 'user': 'db_user', + 'password': 'db_pass', + 'host': 'ts_ip', + 'port': 'db_port' + } + + source_conn = await asyncpg.connect(**{k: source[v] for k, v in conn_params.items()}) + target_conn = await asyncpg.connect(**{k: target[v] for k, v in conn_params.items()}) + + try: + source_version = await source_conn.fetchval("SELECT version()") + target_version = await target_conn.fetchval("SELECT version()") + print(f"Source database version: {source_version}") + print(f"Target database version: {target_version}") + + print("Checking table existence in source database:") + await check_table_existence(source_conn, tables) + + print("\nChecking user permissions in source database:") + await check_user_permissions(source_conn, tables) + + # Dump all tables to a file + dump_file = 'dump.sql' + dump_command = [ + '/Applications/Postgres.app/Contents/Versions/latest/bin/pg_dump', + '-h', source['ts_ip'], + '-p', str(source['db_port']), + '-U', source['db_user'], + '-d', source['db_name'], + '-t', ' -t '.join(tables), + '--no-owner', + '--no-acl', + '-f', dump_file + ] + env = {'PGPASSWORD': source['db_pass']} + print(f"\nExecuting dump command: {' '.join(dump_command)}") + dump_result = subprocess.run(dump_command, env=env, capture_output=True, text=True) + + if dump_result.returncode != 0: + print(f"Dump stderr: {dump_result.stderr}") + raise Exception(f"Dump failed: {dump_result.stderr}") + + print("Dump completed successfully.") + + # Restore from the dump file + restore_command = [ + '/Applications/Postgres.app/Contents/Versions/latest/bin/psql', + '-h', target['ts_ip'], + '-p', str(target['db_port']), + '-U', target['db_user'], + '-d', target['db_name'], + '-f', dump_file + ] + env = {'PGPASSWORD': target['db_pass']} + print(f"\nExecuting restore command: {' '.join(restore_command)}") + restore_result = subprocess.run(restore_command, env=env, capture_output=True, text=True) + + if restore_result.returncode != 0: + print(f"Restore stderr: {restore_result.stderr}") + raise Exception(f"Restore failed: {restore_result.stderr}") + + print("Restore completed successfully.") + + # Clean up the dump file + os.remove(dump_file) + + except Exception as e: + print(f"An error occurred during replication: {str(e)}") + print("Exception details:", sys.exc_info()) + finally: + await source_conn.close() + await target_conn.close() + +async def main(): + config = await load_config() + source_server = config['POOL'][0] # sij-mbp16 + target_servers = config['POOL'][1:] # sij-vm and sij-vps + + tables_to_replicate = [ + 'dailyweather', 'hourlyweather', 'short_urls', 'click_logs', 'locations' + ] + + for target_server in target_servers: + await replicate_tables(source_server, target_server, tables_to_replicate) + + print("All replications completed!") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sijapi/helpers/db/schema_info.yaml b/sijapi/helpers/db/schema_info.yaml new file mode 100644 index 0000000..4899924 --- /dev/null +++ b/sijapi/helpers/db/schema_info.yaml @@ -0,0 +1,1103 @@ +click_logs: + clicked_at: + data_type: timestamp without time zone + is_nullable: 'YES' + max_length: null + id: + data_type: uuid + is_nullable: 'NO' + max_length: null + ip_address: + data_type: text + is_nullable: 'YES' + max_length: null + short_code: + data_type: character varying + is_nullable: 'YES' + max_length: 3 + user_agent: + data_type: text + is_nullable: 'YES' + max_length: null +dailyweather: + cloudcover: + data_type: double precision + is_nullable: 'YES' + max_length: null + conditions: + data_type: character varying + is_nullable: 'YES' + max_length: null + datetime: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + datetimeepoch: + data_type: bigint + is_nullable: 'YES' + max_length: null + description: + data_type: character varying + is_nullable: 'YES' + max_length: null + dew: + data_type: double precision + is_nullable: 'YES' + max_length: null + feelslike: + data_type: double precision + is_nullable: 'YES' + max_length: null + feelslikemax: + data_type: double precision + is_nullable: 'YES' + max_length: null + feelslikemin: + data_type: double precision + is_nullable: 'YES' + max_length: null + humidity: + data_type: double precision + is_nullable: 'YES' + max_length: null + icon: + data_type: character varying + is_nullable: 'YES' + max_length: null + id: + data_type: integer + is_nullable: 'NO' + max_length: null + last_updated: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + location: + data_type: USER-DEFINED + is_nullable: 'YES' + max_length: null + moonphase: + data_type: double precision + is_nullable: 'YES' + max_length: null + precip: + data_type: double precision + is_nullable: 'YES' + max_length: null + precipcover: + data_type: double precision + is_nullable: 'YES' + max_length: null + precipprob: + data_type: double precision + is_nullable: 'YES' + max_length: null + preciptype: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + pressure: + data_type: double precision + is_nullable: 'YES' + max_length: null + severerisk: + data_type: double precision + is_nullable: 'YES' + max_length: null + snow: + data_type: double precision + is_nullable: 'YES' + max_length: null + snowdepth: + data_type: double precision + is_nullable: 'YES' + max_length: null + solarenergy: + data_type: double precision + is_nullable: 'YES' + max_length: null + solarradiation: + data_type: double precision + is_nullable: 'YES' + max_length: null + source: + data_type: character varying + is_nullable: 'YES' + max_length: null + stations: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + sunrise: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + sunriseepoch: + data_type: bigint + is_nullable: 'YES' + max_length: null + sunset: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + sunsetepoch: + data_type: bigint + is_nullable: 'YES' + max_length: null + temp: + data_type: double precision + is_nullable: 'YES' + max_length: null + tempmax: + data_type: double precision + is_nullable: 'YES' + max_length: null + tempmin: + data_type: double precision + is_nullable: 'YES' + max_length: null + uvindex: + data_type: integer + is_nullable: 'YES' + max_length: null + visibility: + data_type: double precision + is_nullable: 'YES' + max_length: null + winddir: + data_type: integer + is_nullable: 'YES' + max_length: null + windgust: + data_type: double precision + is_nullable: 'YES' + max_length: null + windspeed: + data_type: double precision + is_nullable: 'YES' + max_length: null +dailyweather_uuid: + cloudcover: + data_type: double precision + is_nullable: 'YES' + max_length: null + conditions: + data_type: character varying + is_nullable: 'YES' + max_length: null + datetime: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + datetimeepoch: + data_type: bigint + is_nullable: 'YES' + max_length: null + description: + data_type: character varying + is_nullable: 'YES' + max_length: null + dew: + data_type: double precision + is_nullable: 'YES' + max_length: null + feelslike: + data_type: double precision + is_nullable: 'YES' + max_length: null + feelslikemax: + data_type: double precision + is_nullable: 'YES' + max_length: null + feelslikemin: + data_type: double precision + is_nullable: 'YES' + max_length: null + humidity: + data_type: double precision + is_nullable: 'YES' + max_length: null + icon: + data_type: character varying + is_nullable: 'YES' + max_length: null + id: + data_type: uuid + is_nullable: 'NO' + max_length: null + last_updated: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + location: + data_type: USER-DEFINED + is_nullable: 'YES' + max_length: null + moonphase: + data_type: double precision + is_nullable: 'YES' + max_length: null + precip: + data_type: double precision + is_nullable: 'YES' + max_length: null + precipcover: + data_type: double precision + is_nullable: 'YES' + max_length: null + precipprob: + data_type: double precision + is_nullable: 'YES' + max_length: null + preciptype: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + pressure: + data_type: double precision + is_nullable: 'YES' + max_length: null + severerisk: + data_type: double precision + is_nullable: 'YES' + max_length: null + snow: + data_type: double precision + is_nullable: 'YES' + max_length: null + snowdepth: + data_type: double precision + is_nullable: 'YES' + max_length: null + solarenergy: + data_type: double precision + is_nullable: 'YES' + max_length: null + solarradiation: + data_type: double precision + is_nullable: 'YES' + max_length: null + source: + data_type: character varying + is_nullable: 'YES' + max_length: null + stations: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + sunrise: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + sunriseepoch: + data_type: bigint + is_nullable: 'YES' + max_length: null + sunset: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + sunsetepoch: + data_type: bigint + is_nullable: 'YES' + max_length: null + temp: + data_type: double precision + is_nullable: 'YES' + max_length: null + tempmax: + data_type: double precision + is_nullable: 'YES' + max_length: null + tempmin: + data_type: double precision + is_nullable: 'YES' + max_length: null + uvindex: + data_type: integer + is_nullable: 'YES' + max_length: null + visibility: + data_type: double precision + is_nullable: 'YES' + max_length: null + winddir: + data_type: integer + is_nullable: 'YES' + max_length: null + windgust: + data_type: double precision + is_nullable: 'YES' + max_length: null + windspeed: + data_type: double precision + is_nullable: 'YES' + max_length: null +geography_columns: + coord_dimension: + data_type: integer + is_nullable: 'YES' + max_length: null + f_geography_column: + data_type: name + is_nullable: 'YES' + max_length: null + f_table_catalog: + data_type: name + is_nullable: 'YES' + max_length: null + f_table_name: + data_type: name + is_nullable: 'YES' + max_length: null + f_table_schema: + data_type: name + is_nullable: 'YES' + max_length: null + srid: + data_type: integer + is_nullable: 'YES' + max_length: null + type: + data_type: text + is_nullable: 'YES' + max_length: null +geometry_columns: + coord_dimension: + data_type: integer + is_nullable: 'YES' + max_length: null + f_geometry_column: + data_type: name + is_nullable: 'YES' + max_length: null + f_table_catalog: + data_type: character varying + is_nullable: 'YES' + max_length: 256 + f_table_name: + data_type: name + is_nullable: 'YES' + max_length: null + f_table_schema: + data_type: name + is_nullable: 'YES' + max_length: null + srid: + data_type: integer + is_nullable: 'YES' + max_length: null + type: + data_type: character varying + is_nullable: 'YES' + max_length: 30 +hourlyweather: + cloudcover: + data_type: double precision + is_nullable: 'YES' + max_length: null + conditions: + data_type: character varying + is_nullable: 'YES' + max_length: null + daily_weather_id: + data_type: integer + is_nullable: 'YES' + max_length: null + datetime: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + datetimeepoch: + data_type: bigint + is_nullable: 'YES' + max_length: null + dew: + data_type: double precision + is_nullable: 'YES' + max_length: null + feelslike: + data_type: double precision + is_nullable: 'YES' + max_length: null + humidity: + data_type: double precision + is_nullable: 'YES' + max_length: null + icon: + data_type: character varying + is_nullable: 'YES' + max_length: null + id: + data_type: integer + is_nullable: 'NO' + max_length: null + precip: + data_type: double precision + is_nullable: 'YES' + max_length: null + precipprob: + data_type: double precision + is_nullable: 'YES' + max_length: null + preciptype: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + pressure: + data_type: double precision + is_nullable: 'YES' + max_length: null + severerisk: + data_type: double precision + is_nullable: 'YES' + max_length: null + snow: + data_type: double precision + is_nullable: 'YES' + max_length: null + snowdepth: + data_type: double precision + is_nullable: 'YES' + max_length: null + solarenergy: + data_type: double precision + is_nullable: 'YES' + max_length: null + solarradiation: + data_type: double precision + is_nullable: 'YES' + max_length: null + source: + data_type: character varying + is_nullable: 'YES' + max_length: null + stations: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + temp: + data_type: double precision + is_nullable: 'YES' + max_length: null + uvindex: + data_type: integer + is_nullable: 'YES' + max_length: null + visibility: + data_type: double precision + is_nullable: 'YES' + max_length: null + winddir: + data_type: integer + is_nullable: 'YES' + max_length: null + windgust: + data_type: double precision + is_nullable: 'YES' + max_length: null + windspeed: + data_type: double precision + is_nullable: 'YES' + max_length: null +hourlyweather_uuid: + cloudcover: + data_type: double precision + is_nullable: 'YES' + max_length: null + conditions: + data_type: character varying + is_nullable: 'YES' + max_length: null + daily_weather_id: + data_type: integer + is_nullable: 'YES' + max_length: null + datetime: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + datetimeepoch: + data_type: bigint + is_nullable: 'YES' + max_length: null + dew: + data_type: double precision + is_nullable: 'YES' + max_length: null + feelslike: + data_type: double precision + is_nullable: 'YES' + max_length: null + humidity: + data_type: double precision + is_nullable: 'YES' + max_length: null + icon: + data_type: character varying + is_nullable: 'YES' + max_length: null + id: + data_type: uuid + is_nullable: 'NO' + max_length: null + precip: + data_type: double precision + is_nullable: 'YES' + max_length: null + precipprob: + data_type: double precision + is_nullable: 'YES' + max_length: null + preciptype: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + pressure: + data_type: double precision + is_nullable: 'YES' + max_length: null + severerisk: + data_type: double precision + is_nullable: 'YES' + max_length: null + snow: + data_type: double precision + is_nullable: 'YES' + max_length: null + snowdepth: + data_type: double precision + is_nullable: 'YES' + max_length: null + solarenergy: + data_type: double precision + is_nullable: 'YES' + max_length: null + solarradiation: + data_type: double precision + is_nullable: 'YES' + max_length: null + source: + data_type: character varying + is_nullable: 'YES' + max_length: null + stations: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + temp: + data_type: double precision + is_nullable: 'YES' + max_length: null + uvindex: + data_type: integer + is_nullable: 'YES' + max_length: null + visibility: + data_type: double precision + is_nullable: 'YES' + max_length: null + winddir: + data_type: integer + is_nullable: 'YES' + max_length: null + windgust: + data_type: double precision + is_nullable: 'YES' + max_length: null + windspeed: + data_type: double precision + is_nullable: 'YES' + max_length: null +locations: + action: + data_type: character varying + is_nullable: 'YES' + max_length: null + amenity: + data_type: character varying + is_nullable: 'YES' + max_length: null + boundingbox: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + city: + data_type: character varying + is_nullable: 'YES' + max_length: 255 + class: + data_type: character varying + is_nullable: 'YES' + max_length: null + class_: + data_type: text + is_nullable: 'YES' + max_length: null + context: + data_type: jsonb + is_nullable: 'YES' + max_length: null + country: + data_type: character varying + is_nullable: 'YES' + max_length: null + country_code: + data_type: character varying + is_nullable: 'YES' + max_length: null + county: + data_type: character varying + is_nullable: 'YES' + max_length: null + daily_weather_id: + data_type: integer + is_nullable: 'YES' + max_length: null + datetime: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + device_model: + data_type: character varying + is_nullable: 'YES' + max_length: null + device_name: + data_type: character varying + is_nullable: 'YES' + max_length: null + device_os: + data_type: character varying + is_nullable: 'YES' + max_length: null + device_type: + data_type: character varying + is_nullable: 'YES' + max_length: null + display_name: + data_type: character varying + is_nullable: 'YES' + max_length: null + house_number: + data_type: character varying + is_nullable: 'YES' + max_length: null + id: + data_type: integer + is_nullable: 'NO' + max_length: null + location: + data_type: USER-DEFINED + is_nullable: 'YES' + max_length: null + name: + data_type: character varying + is_nullable: 'YES' + max_length: null + neighbourhood: + data_type: character varying + is_nullable: 'YES' + max_length: null + quarter: + data_type: character varying + is_nullable: 'YES' + max_length: null + road: + data_type: character varying + is_nullable: 'YES' + max_length: null + state: + data_type: character varying + is_nullable: 'YES' + max_length: 255 + street: + data_type: character varying + is_nullable: 'YES' + max_length: 255 + suburb: + data_type: character varying + is_nullable: 'YES' + max_length: null + type: + data_type: character varying + is_nullable: 'YES' + max_length: null + zip: + data_type: character varying + is_nullable: 'YES' + max_length: 255 +locations_uuid: + action: + data_type: character varying + is_nullable: 'YES' + max_length: null + amenity: + data_type: character varying + is_nullable: 'YES' + max_length: null + boundingbox: + data_type: ARRAY + is_nullable: 'YES' + max_length: null + city: + data_type: character varying + is_nullable: 'YES' + max_length: 255 + class: + data_type: character varying + is_nullable: 'YES' + max_length: null + class_: + data_type: text + is_nullable: 'YES' + max_length: null + context: + data_type: jsonb + is_nullable: 'YES' + max_length: null + country: + data_type: character varying + is_nullable: 'YES' + max_length: null + country_code: + data_type: character varying + is_nullable: 'YES' + max_length: null + county: + data_type: character varying + is_nullable: 'YES' + max_length: null + daily_weather_id: + data_type: integer + is_nullable: 'YES' + max_length: null + datetime: + data_type: timestamp with time zone + is_nullable: 'YES' + max_length: null + device_model: + data_type: character varying + is_nullable: 'YES' + max_length: null + device_name: + data_type: character varying + is_nullable: 'YES' + max_length: null + device_os: + data_type: character varying + is_nullable: 'YES' + max_length: null + device_type: + data_type: character varying + is_nullable: 'YES' + max_length: null + display_name: + data_type: character varying + is_nullable: 'YES' + max_length: null + house_number: + data_type: character varying + is_nullable: 'YES' + max_length: null + id: + data_type: uuid + is_nullable: 'NO' + max_length: null + location: + data_type: USER-DEFINED + is_nullable: 'YES' + max_length: null + name: + data_type: character varying + is_nullable: 'YES' + max_length: null + neighbourhood: + data_type: character varying + is_nullable: 'YES' + max_length: null + quarter: + data_type: character varying + is_nullable: 'YES' + max_length: null + road: + data_type: character varying + is_nullable: 'YES' + max_length: null + state: + data_type: character varying + is_nullable: 'YES' + max_length: 255 + street: + data_type: character varying + is_nullable: 'YES' + max_length: 255 + suburb: + data_type: character varying + is_nullable: 'YES' + max_length: null + type: + data_type: character varying + is_nullable: 'YES' + max_length: null + zip: + data_type: character varying + is_nullable: 'YES' + max_length: 255 +plss_sections: + frstdivdup: + data_type: character varying + is_nullable: 'YES' + max_length: null + frstdivid: + data_type: character varying + is_nullable: 'YES' + max_length: null + frstdivlab: + data_type: character varying + is_nullable: 'YES' + max_length: null + frstdivno: + data_type: character varying + is_nullable: 'YES' + max_length: null + frstdivtxt: + data_type: character varying + is_nullable: 'YES' + max_length: null + frstdivtyp: + data_type: character varying + is_nullable: 'YES' + max_length: null + objectid: + data_type: integer + is_nullable: 'YES' + max_length: null + ogc_fid: + data_type: integer + is_nullable: 'NO' + max_length: null + plssid: + data_type: character varying + is_nullable: 'YES' + max_length: null + shape.starea(): + data_type: double precision + is_nullable: 'YES' + max_length: null + shape.stlength(): + data_type: double precision + is_nullable: 'YES' + max_length: null + sourcedate: + data_type: bigint + is_nullable: 'YES' + max_length: null + sourceref: + data_type: character varying + is_nullable: 'YES' + max_length: null + survtyp: + data_type: character varying + is_nullable: 'YES' + max_length: null + survtyptxt: + data_type: character varying + is_nullable: 'YES' + max_length: null + wkb_geometry: + data_type: USER-DEFINED + is_nullable: 'YES' + max_length: null +plss_townships: + created_date: + data_type: bigint + is_nullable: 'YES' + max_length: null + created_user: + data_type: character varying + is_nullable: 'YES' + max_length: null + globalid: + data_type: character varying + is_nullable: 'YES' + max_length: null + last_edited_date: + data_type: bigint + is_nullable: 'YES' + max_length: null + last_edited_user: + data_type: character varying + is_nullable: 'YES' + max_length: null + objectid: + data_type: integer + is_nullable: 'YES' + max_length: null + ogc_fid: + data_type: integer + is_nullable: 'NO' + max_length: null + origid: + data_type: character varying + is_nullable: 'YES' + max_length: null + partcount: + data_type: character varying + is_nullable: 'YES' + max_length: null + plssid: + data_type: character varying + is_nullable: 'YES' + max_length: null + prinmer: + data_type: character varying + is_nullable: 'YES' + max_length: null + prinmercd: + data_type: character varying + is_nullable: 'YES' + max_length: null + rangedir: + data_type: character varying + is_nullable: 'YES' + max_length: null + rangefrac: + data_type: character varying + is_nullable: 'YES' + max_length: null + rangeno: + data_type: character varying + is_nullable: 'YES' + max_length: null + secsrvname: + data_type: character varying + is_nullable: 'YES' + max_length: null + shape.starea(): + data_type: double precision + is_nullable: 'YES' + max_length: null + shape.stlength(): + data_type: double precision + is_nullable: 'YES' + max_length: null + sourcedate: + data_type: bigint + is_nullable: 'YES' + max_length: null + sourceref: + data_type: character varying + is_nullable: 'YES' + max_length: null + srvname: + data_type: character varying + is_nullable: 'YES' + max_length: null + stateabbr: + data_type: character varying + is_nullable: 'YES' + max_length: null + steward: + data_type: character varying + is_nullable: 'YES' + max_length: null + survtyp: + data_type: character varying + is_nullable: 'YES' + max_length: null + survtyptxt: + data_type: character varying + is_nullable: 'YES' + max_length: null + twnshpdir: + data_type: character varying + is_nullable: 'YES' + max_length: null + twnshpdpcd: + data_type: character varying + is_nullable: 'YES' + max_length: null + twnshpfrac: + data_type: character varying + is_nullable: 'YES' + max_length: null + twnshplab: + data_type: character varying + is_nullable: 'YES' + max_length: null + twnshpno: + data_type: character varying + is_nullable: 'YES' + max_length: null + wkb_geometry: + data_type: USER-DEFINED + is_nullable: 'YES' + max_length: null +relations: + id: + data_type: bigint + is_nullable: 'NO' + max_length: null + members: + data_type: jsonb + is_nullable: 'YES' + max_length: null + tags: + data_type: jsonb + is_nullable: 'YES' + max_length: null +short_urls: + created_at: + data_type: timestamp without time zone + is_nullable: 'YES' + max_length: null + id: + data_type: integer + is_nullable: 'NO' + max_length: null + long_url: + data_type: text + is_nullable: 'NO' + max_length: null + short_code: + data_type: character varying + is_nullable: 'NO' + max_length: 3 +short_urls_uuid: + created_at: + data_type: timestamp without time zone + is_nullable: 'YES' + max_length: null + id: + data_type: uuid + is_nullable: 'NO' + max_length: null + long_url: + data_type: text + is_nullable: 'NO' + max_length: null + short_code: + data_type: character varying + is_nullable: 'NO' + max_length: 3 +spatial_ref_sys: + auth_name: + data_type: character varying + is_nullable: 'YES' + max_length: 256 + auth_srid: + data_type: integer + is_nullable: 'YES' + max_length: null + proj4text: + data_type: character varying + is_nullable: 'YES' + max_length: 2048 + srid: + data_type: integer + is_nullable: 'NO' + max_length: null + srtext: + data_type: character varying + is_nullable: 'YES' + max_length: 2048 diff --git a/sijapi/helpers/email/log_prior_emails.py b/sijapi/helpers/email/log_prior_emails.py new file mode 100644 index 0000000..be7c512 --- /dev/null +++ b/sijapi/helpers/email/log_prior_emails.py @@ -0,0 +1,67 @@ +import asyncio +from pathlib import Path +from sijapi import EMAIL_CONFIG, EMAIL_LOGS +from sijapi.utilities import EmailAccount +from sijapi.routers import email +from sijapi.logs import get_logger + +l = get_logger(__name__) + + +async def initialize_log_files(): + summarized_log = EMAIL_LOGS / "summarized.txt" + autoresponded_log = EMAIL_LOGS / "autoresponded.txt" + diagnostic_log = EMAIL_LOGS / "diagnostic.txt" + for log_file in [summarized_log, autoresponded_log, diagnostic_log]: + log_file.parent.mkdir(parents=True, exist_ok=True) + log_file.write_text("") + l.debug(f"Log files initialized: {summarized_log}, {autoresponded_log}, {diagnostic_log}") + return summarized_log, autoresponded_log, diagnostic_log + +async def process_all_emails(account: EmailAccount, summarized_log: Path, autoresponded_log: Path, diagnostic_log: Path): + try: + with email.get_imap_connection(account) as inbox: + l.debug(f"Connected to {account.name}, processing all emails...") + all_messages = inbox.messages() + unread_messages = set(uid for uid, _ in inbox.messages(unread=True)) + + processed_count = 0 + for identifier, message in all_messages: + # Log diagnostic information + with open(diagnostic_log, 'a') as f: + f.write(f"Account: {account.name}, Raw Identifier: {identifier}, Type: {type(identifier)}\n") + + # Attempt to get a string representation of the identifier + if isinstance(identifier, bytes): + id_str = identifier.decode() + elif isinstance(identifier, (int, str)): + id_str = str(identifier) + else: + id_str = repr(identifier) + + if identifier not in unread_messages: + processed_count += 1 + for log_file in [summarized_log, autoresponded_log]: + with open(log_file, 'a') as f: + f.write(f"{id_str}\n") + + l.info(f"Processed {processed_count} non-unread emails for account {account.name}") + except Exception as e: + l.logger.error(f"An error occurred while processing emails for account {account.name}: {e}") + +async def main(): + email_accounts = email.load_email_accounts(EMAIL_CONFIG) + summarized_log, autoresponded_log, diagnostic_log = await initialize_log_files() + + l.debug(f"Processing {len(email_accounts)} email accounts") + + tasks = [process_all_emails(account, summarized_log, autoresponded_log, diagnostic_log) for account in email_accounts] + await asyncio.gather(*tasks) + + # Final verification + with open(summarized_log, 'r') as f: + final_count = len(f.readlines()) + l.info(f"Final non-unread email count: {final_count}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/sijapi/helpers/fromvm/db/db.py b/sijapi/helpers/fromvm/db/db.py new file mode 100644 index 0000000..b7bb5d1 --- /dev/null +++ b/sijapi/helpers/fromvm/db/db.py @@ -0,0 +1,46 @@ +import asyncio +import asyncpg + +# Database connection information +DB_INFO = { + 'host': '100.64.64.20', + 'port': 5432, + 'database': 'sij', + 'user': 'sij', + 'password': 'Synchr0!' +} + +async def update_click_logs(): + # Connect to the database + conn = await asyncpg.connect(**DB_INFO) + + try: + # Drop existing 'id' and 'new_id' columns if they exist + await conn.execute(""" + ALTER TABLE click_logs + DROP COLUMN IF EXISTS id, + DROP COLUMN IF EXISTS new_id; + """) + print("Dropped existing id and new_id columns (if they existed)") + + # Add new UUID column as primary key + await conn.execute(""" + ALTER TABLE click_logs + ADD COLUMN id UUID PRIMARY KEY DEFAULT gen_random_uuid(); + """) + print("Added new UUID column as primary key") + + # Get the number of rows in the table + row_count = await conn.fetchval("SELECT COUNT(*) FROM click_logs") + print(f"Number of rows in click_logs: {row_count}") + + except Exception as e: + print(f"An error occurred: {str(e)}") + import traceback + traceback.print_exc() + finally: + # Close the database connection + await conn.close() + +# Run the update +asyncio.run(update_click_logs()) diff --git a/sijapi/helpers/fromvm/db/db_connection_test.py b/sijapi/helpers/fromvm/db/db_connection_test.py new file mode 100644 index 0000000..0d94404 --- /dev/null +++ b/sijapi/helpers/fromvm/db/db_connection_test.py @@ -0,0 +1,66 @@ +import asyncio +import asyncpg +import psycopg2 +import sys + +async def try_async_connect(host, port, user, password, database): + try: + conn = await asyncpg.connect( + host=host, + port=port, + user=user, + password=password, + database=database + ) + version = await conn.fetchval('SELECT version()') + print(f"Async connection successful to {host}:{port}") + print(f"PostgreSQL version: {version}") + await conn.close() + return True + except Exception as e: + print(f"Async connection failed to {host}:{port}") + print(f"Error: {str(e)}") + return False + +def try_sync_connect(host, port, user, password, database): + try: + conn = psycopg2.connect( + host=host, + port=port, + user=user, + password=password, + database=database + ) + cur = conn.cursor() + cur.execute('SELECT version()') + version = cur.fetchone()[0] + print(f"Sync connection successful to {host}:{port}") + print(f"PostgreSQL version: {version}") + conn.close() + return True + except Exception as e: + print(f"Sync connection failed to {host}:{port}") + print(f"Error: {str(e)}") + return False + +async def main(): + # Database connection parameters + port = 5432 + user = 'sij' + password = 'Synchr0!' + database = 'sij' + + hosts = ['100.64.64.20', '127.0.0.1', 'localhost'] + + print("Attempting asynchronous connections:") + for host in hosts: + await try_async_connect(host, port, user, password, database) + print() + + print("Attempting synchronous connections:") + for host in hosts: + try_sync_connect(host, port, user, password, database) + print() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sijapi/helpers/fromvm/db/db_get_schema.py b/sijapi/helpers/fromvm/db/db_get_schema.py new file mode 100644 index 0000000..c672c6e --- /dev/null +++ b/sijapi/helpers/fromvm/db/db_get_schema.py @@ -0,0 +1,89 @@ +import psycopg2 +from psycopg2 import sql + +def connect_to_db(): + return psycopg2.connect( + dbname='sij', + user='sij', + password='Synchr0!', + host='localhost' # Adjust if your database is not on localhost + ) + +def get_table_info(conn): + with conn.cursor() as cur: + # Get all tables in the public schema + cur.execute(""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema = 'public' + """) + tables = cur.fetchall() + + table_info = {} + for (table_name,) in tables: + table_info[table_name] = { + 'primary_keys': get_primary_keys(cur, table_name), + 'foreign_keys': get_foreign_keys(cur, table_name) + } + + return table_info + +def get_primary_keys(cur, table_name): + cur.execute(""" + SELECT a.attname + FROM pg_index i + JOIN pg_attribute a ON a.attrelid = i.indrelid + AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = %s::regclass + AND i.indisprimary + """, (table_name,)) + return [row[0] for row in cur.fetchall()] + +def get_foreign_keys(cur, table_name): + cur.execute(""" + SELECT + tc.constraint_name, + kcu.column_name, + ccu.table_name AS foreign_table_name, + ccu.column_name AS foreign_column_name + FROM + information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + AND ccu.table_schema = tc.table_schema + WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name=%s + """, (table_name,)) + return cur.fetchall() + +def main(): + try: + with connect_to_db() as conn: + table_info = get_table_info(conn) + + for table_name, info in table_info.items(): + print(f"\n## Table: {table_name}") + + print("\nPrimary Keys:") + if info['primary_keys']: + for pk in info['primary_keys']: + print(f"- {pk}") + else: + print("- No primary keys found") + + print("\nForeign Keys:") + if info['foreign_keys']: + for fk in info['foreign_keys']: + print(f"- {fk[1]} -> {fk[2]}.{fk[3]} (Constraint: {fk[0]})") + else: + print("- No foreign keys found") + + except psycopg2.Error as e: + print(f"Database error: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + +if __name__ == "__main__": + main() diff --git a/sijapi/helpers/fromvm/db/db_uuid_migrate.py b/sijapi/helpers/fromvm/db/db_uuid_migrate.py new file mode 100644 index 0000000..b7bb5d1 --- /dev/null +++ b/sijapi/helpers/fromvm/db/db_uuid_migrate.py @@ -0,0 +1,46 @@ +import asyncio +import asyncpg + +# Database connection information +DB_INFO = { + 'host': '100.64.64.20', + 'port': 5432, + 'database': 'sij', + 'user': 'sij', + 'password': 'Synchr0!' +} + +async def update_click_logs(): + # Connect to the database + conn = await asyncpg.connect(**DB_INFO) + + try: + # Drop existing 'id' and 'new_id' columns if they exist + await conn.execute(""" + ALTER TABLE click_logs + DROP COLUMN IF EXISTS id, + DROP COLUMN IF EXISTS new_id; + """) + print("Dropped existing id and new_id columns (if they existed)") + + # Add new UUID column as primary key + await conn.execute(""" + ALTER TABLE click_logs + ADD COLUMN id UUID PRIMARY KEY DEFAULT gen_random_uuid(); + """) + print("Added new UUID column as primary key") + + # Get the number of rows in the table + row_count = await conn.fetchval("SELECT COUNT(*) FROM click_logs") + print(f"Number of rows in click_logs: {row_count}") + + except Exception as e: + print(f"An error occurred: {str(e)}") + import traceback + traceback.print_exc() + finally: + # Close the database connection + await conn.close() + +# Run the update +asyncio.run(update_click_logs()) diff --git a/sijapi/helpers/thp/CalFire_THP_scraper.py b/sijapi/helpers/thp/CalFire_THP_scraper.py new file mode 100644 index 0000000..92909a6 --- /dev/null +++ b/sijapi/helpers/thp/CalFire_THP_scraper.py @@ -0,0 +1,73 @@ +import requests +import PyPDF2 +import io +import re + +def scrape_data_from_pdf(url): + response = requests.get(url) + pdf_file = io.BytesIO(response.content) + + pdf_reader = PyPDF2.PdfReader(pdf_file) + + all_text = "" + for page in pdf_reader.pages: + all_text += page.extract_text() + "\n" + + return all_text + +def parse_data(raw_data): + lines = raw_data.split('\n') + data = [] + current_entry = None + + for line in lines: + line = line.strip() + if re.match(r'\d+-\d+-\d+-\w+', line): + if current_entry: + data.append(current_entry) + current_entry = {'Harvest Document': line, 'Raw Data': []} + elif current_entry: + current_entry['Raw Data'].append(line) + + if current_entry: + data.append(current_entry) + + return data + +def filter_data(data): + return [entry for entry in data if any(owner.lower() in ' '.join(entry['Raw Data']).lower() for owner in ["Sierra Pacific", "SPI", "Land & Timber"])] + +def extract_location(raw_data): + location = [] + for line in raw_data: + if 'MDBM:' in line or 'HBM:' in line: + location.append(line) + return ' '.join(location) + +def extract_plss_coordinates(text): + pattern = r'(\w+): T(\d+)([NSEW]) R(\d+)([NSEW]) S(\d+)' + return re.findall(pattern, text) + +# Main execution +url = "https://caltreesplans.resources.ca.gov/Caltrees/Report/ShowReport.aspx?module=TH_Document&reportID=492&reportType=LINK_REPORT_LIST" +raw_data = scrape_data_from_pdf(url) + +parsed_data = parse_data(raw_data) +print(f"Total timber plans parsed: {len(parsed_data)}") + +filtered_data = filter_data(parsed_data) +print(f"Found {len(filtered_data)} matching entries.") + +for plan in filtered_data: + print("\nHarvest Document:", plan['Harvest Document']) + + location = extract_location(plan['Raw Data']) + print("Location:", location) + + plss_coordinates = extract_plss_coordinates(location) + print("PLSS Coordinates:") + for coord in plss_coordinates: + meridian, township, township_dir, range_, range_dir, section = coord + print(f" {meridian}: T{township}{township_dir} R{range_}{range_dir} S{section}") + + print("-" * 50) diff --git a/sijapi/helpers/thp/thp.py b/sijapi/helpers/thp/thp.py new file mode 100644 index 0000000..e69de29 diff --git a/sijapi/routers/news.py b/sijapi/routers/news.py index decbb07..6663d65 100644 --- a/sijapi/routers/news.py +++ b/sijapi/routers/news.py @@ -195,24 +195,56 @@ async def process_and_save_article( raise HTTPException(status_code=500, detail=str(e)) + + +from newspaper import Article as NewspaperArticle + async def fetch_and_parse_article(url: str) -> Article: + # Try trafilatura first source = trafilatura.fetch_url(url) - traf = trafilatura.extract_metadata(filecontent=source, default_url=url) - - article = Article(url) - article.set_html(source) - article.parse() - - # Update article properties with trafilatura data - article.title = article.title or traf.title or url - article.authors = article.authors or (traf.author if isinstance(traf.author, list) else [traf.author]) - article.publish_date = await gis.dt(article.publish_date or traf.date or dt_datetime.now(), "UTC") - article.text = trafilatura.extract(source, output_format="markdown", include_comments=False) or article.text - article.top_image = article.top_image or traf.image - article.source_url = traf.sitename or urlparse(url).netloc.replace('www.', '').title() - article.meta_keywords = list(set(article.meta_keywords or traf.categories or traf.tags or [])) - - return article + + if source: + try: + traf = trafilatura.extract_metadata(filecontent=source, default_url=url) + + article = Article(url) + article.set_html(source) + article.parse() + + # Update article properties with trafilatura data + article.title = article.title or traf.title or url + article.authors = article.authors or (traf.author if isinstance(traf.author, list) else [traf.author]) + article.publish_date = await gis.dt(article.publish_date or traf.date or dt_datetime.now(), "UTC") + article.text = trafilatura.extract(source, output_format="markdown", include_comments=False) or article.text + article.top_image = article.top_image or traf.image + article.source_url = traf.sitename or urlparse(url).netloc.replace('www.', '').title() + article.meta_keywords = list(set(article.meta_keywords or traf.categories or traf.tags or [])) + + return article + except Exception as e: + l.warning(f"Trafilatura failed to parse {url}: {str(e)}. Falling back to newspaper3k.") + else: + l.warning(f"Trafilatura failed to fetch {url}. Falling back to newspaper3k.") + + # If trafilatura fails, use newspaper3k + try: + newspaper_article = NewspaperArticle(url) + newspaper_article.download() + newspaper_article.parse() + + article = Article(url) + article.title = newspaper_article.title + article.authors = newspaper_article.authors + article.publish_date = await gis.dt(newspaper_article.publish_date or dt_datetime.now(), "UTC") + article.text = newspaper_article.text + article.top_image = newspaper_article.top_image + article.source_url = urlparse(url).netloc.replace('www.', '').title() + article.meta_keywords = newspaper_article.keywords + + return article + except Exception as e: + l.error(f"Both trafilatura and newspaper3k failed to fetch and parse {url}: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to fetch and parse article content") def generate_markdown_content(article, title: str, summary: str, audio_link: Optional[str], site_name: Optional[str] = None) -> str: @@ -258,4 +290,4 @@ tags: body += f"> [!summary]+\n> {summary}\n\n" body += article.text - return frontmatter + body \ No newline at end of file + return frontmatter + body