diff --git a/sijapi/classes.py b/sijapi/classes.py index 4840a9d..a789e1b 100644 --- a/sijapi/classes.py +++ b/sijapi/classes.py @@ -834,57 +834,39 @@ class APIConfig(BaseModel): async def execute_write_query(self, query: str, *args, table_name: str): local_ts_id = os.environ.get('TS_ID') - online_hosts = await self.get_online_hosts() + conn = await self.get_connection(self.local_db) + if conn is None: + err(f"Unable to connect to local database. Write operation failed.") + return [] - for pool_entry in online_hosts: - if pool_entry['ts_id'] != local_ts_id: - continue # Only write to the local database + try: + # Remove any formatting from the query + query = ' '.join(query.split()) - conn = await self.get_connection(pool_entry) - if conn is None: - err(f"Unable to connect to local database {pool_entry['ts_id']}. Write operation failed.") - return [] - - try: - if table_name in self.SPECIAL_TABLES: - result = await self._execute_special_table_write(conn, query, *args, table_name=table_name) - else: - # Remove newlines and extra spaces from the query - query = query.replace('\n', ' ').replace(' ', ' ').strip() - - # Prepare the INSERT ... ON CONFLICT ... query - insert_cols = ', '.join(col.strip() for col in query.split('(')[1].split(')')[0].split(',')) - update_cols = ', '.join([f'{col.strip()} = EXCLUDED.{col.strip()}' for col in query.split('(')[1].split(')')[0].split(',') if col.strip() not in ['id', 'version', 'server_id']]) - - modified_query = f""" - WITH new_version AS ( - SELECT COALESCE(MAX(version), 0) + 1 as next_version - FROM {table_name} - WHERE id = (SELECT id FROM {table_name} WHERE {insert_cols.split(',')[0].strip()} = $1 FOR UPDATE) - ) - INSERT INTO {table_name} ({insert_cols}, version, server_id) - VALUES ({', '.join(f'${i+1}' for i in range(len(args)))}, (SELECT next_version FROM new_version), '{local_ts_id}') - ON CONFLICT (id) DO UPDATE SET - {update_cols}, - version = (SELECT next_version FROM new_version), - server_id = '{local_ts_id}' - WHERE {table_name}.version < (SELECT next_version FROM new_version) - OR ({table_name}.version = (SELECT next_version FROM new_version) AND {table_name}.server_id < '{local_ts_id}') - RETURNING id, version - """ - - result = await conn.fetch(modified_query, *args) + # Add version and server_id to the query + columns = query.split('(')[1].split(')')[0] + values = query.split('VALUES')[1].split('RETURNING')[0] - return result - except Exception as e: - err(f"Error executing write query on {pool_entry['ts_id']}: {str(e)}") - err(f"Query: {query}") - err(f"Args: {args}") - err(f"Traceback: {traceback.format_exc()}") - finally: - await conn.close() + modified_query = f""" + INSERT INTO {table_name} ({columns}, version, server_id) + VALUES {values[:-1]}, (SELECT COALESCE(MAX(version), 0) + 1 FROM {table_name}), '{local_ts_id}') + ON CONFLICT (id) DO UPDATE SET + version = {table_name}.version + 1, + server_id = EXCLUDED.server_id + RETURNING id, version + """ - return [] + result = await conn.fetch(modified_query, *args) + return result + except Exception as e: + err(f"Error executing write query: {str(e)}") + err(f"Query: {modified_query}") + err(f"Args: {args}") + err(f"Traceback: {traceback.format_exc()}") + return [] + finally: + await conn.close() + diff --git a/sijapi/routers/weather.py b/sijapi/routers/weather.py index 81c27d4..1dbbc18 100644 --- a/sijapi/routers/weather.py +++ b/sijapi/routers/weather.py @@ -198,15 +198,28 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict): ] daily_weather_query = ''' - INSERT INTO dailyweather (location, sunrise, sunriseepoch, sunset, sunsetepoch, description, tempmax, tempmin, uvindex, winddir, windspeed, icon, last_updated, datetime, datetimeepoch, temp, feelslikemax, feelslikemin, feelslike, dew, humidity, precip, precipprob, precipcover, preciptype, snow, snowdepth, windgust, pressure, cloudcover, visibility, solarradiation, solarenergy, severerisk, moonphase, conditions, stations, source) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38) RETURNING id - '''.replace('\n', ' ').replace(' ', ' ').strip() - + INSERT INTO dailyweather ( + location, sunrise, sunriseepoch, sunset, sunsetepoch, description, + tempmax, tempmin, uvindex, winddir, windspeed, icon, last_updated, + datetime, datetimeepoch, temp, feelslikemax, feelslikemin, feelslike, + dew, humidity, precip, precipprob, precipcover, preciptype, + snow, snowdepth, windgust, pressure, cloudcover, visibility, + solarradiation, solarenergy, severerisk, moonphase, conditions, + stations, source + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, + $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, + $29, $30, $31, $32, $33, $34, $35, $36, $37, $38 + ) RETURNING id + ''' + daily_weather_result = await API.execute_write_query(daily_weather_query, *daily_weather_params, table_name="dailyweather") - + if not daily_weather_result: raise ValueError("Failed to insert daily weather data: no result returned") daily_weather_id = daily_weather_result[0]['id'] + debug(f"Inserted daily weather data with id: {daily_weather_id}") if 'hours' in day_data: @@ -272,7 +285,6 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict): return "FAILURE" - async def get_weather_from_db(date_time: dt_datetime, latitude: float, longitude: float): debug(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in get_weather_from_db.") query_date = date_time.date()