diff --git a/sijapi/classes.py b/sijapi/classes.py index 8162e77..662e447 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -849,22 +849,26 @@ class APIConfig(BaseModel): if table_name in self.SPECIAL_TABLES: result = await self._execute_special_table_write(conn, query, *args, table_name=table_name) else: - # Modify the query to be an UPSERT operation - if query.strip().upper().startswith("INSERT"): - table_columns = await self.get_table_columns(conn, table_name) - primary_key = await self.get_primary_key(conn, table_name) - - if primary_key and len(table_columns) == len(args): - set_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in table_columns if col != primary_key]) - query = f""" - INSERT INTO {table_name} ({', '.join(table_columns)}) - VALUES ({', '.join(f'${i+1}' for i in range(len(args)))}) - ON CONFLICT ({primary_key}) DO UPDATE SET - {set_clause} - """ - else: - err(f"Column count mismatch for table {table_name}. Columns: {len(table_columns)}, Values: {len(args)}") - return [] + # Get table columns + columns = await self.get_table_columns(conn, table_name) + + # Prepare the INSERT ... ON CONFLICT ... query + placeholders = [f'${i+1}' for i in range(len(args))] + insert_cols = ', '.join(columns[1:]) # Exclude 'id' column + insert_vals = ', '.join(placeholders) + update_cols = ', '.join([f"{col} = EXCLUDED.{col}" for col in columns[1:] if col not in ['id', 'version', 'server_id']]) + + query = f""" + INSERT INTO {table_name} ({insert_cols}) + VALUES ({insert_vals}) + ON CONFLICT (id) DO UPDATE SET + {update_cols}, + version = {table_name}.version + 1, + server_id = EXCLUDED.server_id + WHERE {table_name}.version < EXCLUDED.version + OR ({table_name}.version = EXCLUDED.version AND {table_name}.server_id < EXCLUDED.server_id) + RETURNING id, version + """ result = await conn.fetch(query, *args) @@ -888,6 +892,7 @@ class APIConfig(BaseModel): """ columns = await conn.fetch(query, table_name) return [col['column_name'] for col in columns] + async def get_primary_key(self, conn, table_name: str) -> str: query = """ diff --git a/sijapi/routers/weather.py b/sijapi/routers/weather.py index 9fb16d2..1b90d39 100644 --- a/sijapi/routers/weather.py +++ b/sijapi/routers/weather.py @@ -168,8 +168,8 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict): day_data['sunset'] = await gis.dt(day_data.get('sunsetEpoch')) debug(f"Corrected datetimes: datetime={day_data['datetime']}, sunrise={day_data['sunrise']}, sunset={day_data['sunset']}") - daily_weather_params = [ + location_point, day_data.get('sunrise'), day_data.get('sunriseEpoch'), day_data.get('sunset'), day_data.get('sunsetEpoch'), day_data.get('description'), day_data.get('tempmax'), @@ -188,10 +188,12 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict): day_data.get('solarradiation'), day_data.get('solarenergy'), day_data.get('severerisk', 0), day_data.get('moonphase'), day_data.get('conditions'), stations_array, day_data.get('source'), - location_point + os.environ.get('TS_ID') # server_id ] - - daily_weather_query = 'INSERT INTO dailyweather DEFAULT VALUES' + + debug(f"Prepared daily_weather_params: {daily_weather_params}") + + daily_weather_query = 'INSERT INTO dailyweather DEFAULT VALUES' daily_weather_result = await API.execute_write_query(daily_weather_query, *daily_weather_params, table_name="dailyweather") if not daily_weather_result: @@ -234,18 +236,10 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict): hour_data['icon'], hour_stations_array, hour_data.get('source', ''), + os.environ.get('TS_ID') # server_id ] - hourly_weather_query = ''' - INSERT INTO hourlyweather ( - daily_weather_id, datetime, datetimeepoch, temp, feelslike, - humidity, dew, precip, precipprob, preciptype, snow, snowdepth, - windgust, windspeed, winddir, pressure, cloudcover, visibility, - solarradiation, solarenergy, uvindex, severerisk, conditions, - icon, stations, source - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, - $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26) - ''' + hourly_weather_query = 'INSERT INTO hourlyweather DEFAULT VALUES' hourly_result = await API.execute_write_query(hourly_weather_query, *hourly_weather_params, table_name="hourlyweather") debug(f"Inserted hourly weather data for {hour_datetime}") except Exception as e: