From b14a6837e1f58d86e9053568d4bdfac0583d90bf Mon Sep 17 00:00:00 2001 From: sanj <67624670+iodrift@users.noreply.github.com> Date: Tue, 6 Aug 2024 22:44:42 -0700 Subject: [PATCH] Auto-update: Tue Aug 6 22:44:42 PDT 2024 --- sijapi/classes.py | 30 ++++++++++++++++++++---------- sijapi/routers/weather.py | 13 ++++++------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/sijapi/classes.py b/sijapi/classes.py index 662e447..6418df5 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -839,12 +839,12 @@ class APIConfig(BaseModel): for pool_entry in online_hosts: if pool_entry['ts_id'] != local_ts_id: continue # Only write to the local database - + conn = await self.get_connection(pool_entry) if conn is None: err(f"Unable to connect to local database {pool_entry['ts_id']}. Write operation failed.") return [] - + try: if table_name in self.SPECIAL_TABLES: result = await self._execute_special_table_write(conn, query, *args, table_name=table_name) @@ -859,19 +859,28 @@ class APIConfig(BaseModel): update_cols = ', '.join([f"{col} = EXCLUDED.{col}" for col in columns[1:] if col not in ['id', 'version', 'server_id']]) query = f""" - INSERT INTO {table_name} ({insert_cols}) - VALUES ({insert_vals}) + WITH new_version AS ( + SELECT COALESCE(MAX(version), 0) + 1 as next_version + FROM {table_name} + WHERE id = (SELECT id FROM {table_name} WHERE {columns[1]} = ${1} FOR UPDATE) + ) + INSERT INTO {table_name} ({insert_cols}, version, server_id) + VALUES ({insert_vals}, (SELECT next_version FROM new_version), ${{len(args)+1}}) ON CONFLICT (id) DO UPDATE SET {update_cols}, - version = {table_name}.version + 1, - server_id = EXCLUDED.server_id - WHERE {table_name}.version < EXCLUDED.version - OR ({table_name}.version = EXCLUDED.version AND {table_name}.server_id < EXCLUDED.server_id) + version = (SELECT next_version FROM new_version), + server_id = ${{len(args)+1}} + WHERE {table_name}.version < (SELECT next_version FROM new_version) + OR ({table_name}.version = (SELECT next_version FROM new_version) AND {table_name}.server_id < ${{len(args)+1}}) RETURNING id, version """ + # Add server_id to args + args = list(args) + args.append(local_ts_id) + result = await conn.fetch(query, *args) - + return result except Exception as e: err(f"Error executing write query on {pool_entry['ts_id']}: {str(e)}") @@ -880,8 +889,9 @@ class APIConfig(BaseModel): err(f"Traceback: {traceback.format_exc()}") finally: await conn.close() - + return [] + async def get_table_columns(self, conn, table_name: str) -> List[str]: query = """ diff --git a/sijapi/routers/weather.py b/sijapi/routers/weather.py index 3c2e316..35f0c0d 100644 --- a/sijapi/routers/weather.py +++ b/sijapi/routers/weather.py @@ -171,13 +171,13 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict): daily_weather_params = [ location_point, - day_data.get('sunrise'), day_data.get('sunriseEpoch'), - day_data.get('sunset'), day_data.get('sunsetEpoch'), + day_data['sunrise'], day_data.get('sunriseEpoch'), + day_data['sunset'], day_data.get('sunsetEpoch'), day_data.get('description'), day_data.get('tempmax'), day_data.get('tempmin'), day_data.get('uvindex'), day_data.get('winddir'), day_data.get('windspeed'), day_data.get('icon'), dt_datetime.now(tz), - day_data.get('datetime'), day_data.get('datetimeEpoch'), + day_data['datetime'], day_data.get('datetimeEpoch'), day_data.get('temp'), day_data.get('feelslikemax'), day_data.get('feelslikemin'), day_data.get('feelslike'), day_data.get('dew'), day_data.get('humidity'), @@ -188,8 +188,7 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict): day_data.get('cloudcover'), day_data.get('visibility'), day_data.get('solarradiation'), day_data.get('solarenergy'), day_data.get('severerisk', 0), day_data.get('moonphase'), - day_data.get('conditions'), stations_array, day_data.get('source'), - os.environ.get('TS_ID') # server_id + day_data.get('conditions'), stations_array, day_data.get('source') ] debug(f"Prepared daily_weather_params: {daily_weather_params}") @@ -236,8 +235,7 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict): hour_data['conditions'], hour_data['icon'], hour_stations_array, - hour_data.get('source', ''), - os.environ.get('TS_ID') # server_id + hour_data.get('source', '') ] hourly_weather_query = 'INSERT INTO hourlyweather DEFAULT VALUES' @@ -256,6 +254,7 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict): err(f"Traceback: {traceback.format_exc()}") return "FAILURE" + async def get_weather_from_db(date_time: dt_datetime, latitude: float, longitude: float):