Auto-update: Tue Aug 6 23:16:30 PDT 2024

This commit is contained in:
sanj 2024-08-06 23:16:30 -07:00
parent a24dd829de
commit 210059e186
2 changed files with 46 additions and 52 deletions

View file

@ -834,57 +834,39 @@ class APIConfig(BaseModel):
async def execute_write_query(self, query: str, *args, table_name: str): async def execute_write_query(self, query: str, *args, table_name: str):
local_ts_id = os.environ.get('TS_ID') local_ts_id = os.environ.get('TS_ID')
online_hosts = await self.get_online_hosts() conn = await self.get_connection(self.local_db)
for pool_entry in online_hosts:
if pool_entry['ts_id'] != local_ts_id:
continue # Only write to the local database
conn = await self.get_connection(pool_entry)
if conn is None: if conn is None:
err(f"Unable to connect to local database {pool_entry['ts_id']}. Write operation failed.") err(f"Unable to connect to local database. Write operation failed.")
return [] return []
try: try:
if table_name in self.SPECIAL_TABLES: # Remove any formatting from the query
result = await self._execute_special_table_write(conn, query, *args, table_name=table_name) query = ' '.join(query.split())
else:
# Remove newlines and extra spaces from the query
query = query.replace('\n', ' ').replace(' ', ' ').strip()
# Prepare the INSERT ... ON CONFLICT ... query # Add version and server_id to the query
insert_cols = ', '.join(col.strip() for col in query.split('(')[1].split(')')[0].split(',')) columns = query.split('(')[1].split(')')[0]
update_cols = ', '.join([f'{col.strip()} = EXCLUDED.{col.strip()}' for col in query.split('(')[1].split(')')[0].split(',') if col.strip() not in ['id', 'version', 'server_id']]) values = query.split('VALUES')[1].split('RETURNING')[0]
modified_query = f""" modified_query = f"""
WITH new_version AS ( INSERT INTO {table_name} ({columns}, version, server_id)
SELECT COALESCE(MAX(version), 0) + 1 as next_version VALUES {values[:-1]}, (SELECT COALESCE(MAX(version), 0) + 1 FROM {table_name}), '{local_ts_id}')
FROM {table_name}
WHERE id = (SELECT id FROM {table_name} WHERE {insert_cols.split(',')[0].strip()} = $1 FOR UPDATE)
)
INSERT INTO {table_name} ({insert_cols}, version, server_id)
VALUES ({', '.join(f'${i+1}' for i in range(len(args)))}, (SELECT next_version FROM new_version), '{local_ts_id}')
ON CONFLICT (id) DO UPDATE SET ON CONFLICT (id) DO UPDATE SET
{update_cols}, version = {table_name}.version + 1,
version = (SELECT next_version FROM new_version), server_id = EXCLUDED.server_id
server_id = '{local_ts_id}'
WHERE {table_name}.version < (SELECT next_version FROM new_version)
OR ({table_name}.version = (SELECT next_version FROM new_version) AND {table_name}.server_id < '{local_ts_id}')
RETURNING id, version RETURNING id, version
""" """
result = await conn.fetch(modified_query, *args) result = await conn.fetch(modified_query, *args)
return result return result
except Exception as e: except Exception as e:
err(f"Error executing write query on {pool_entry['ts_id']}: {str(e)}") err(f"Error executing write query: {str(e)}")
err(f"Query: {query}") err(f"Query: {modified_query}")
err(f"Args: {args}") err(f"Args: {args}")
err(f"Traceback: {traceback.format_exc()}") err(f"Traceback: {traceback.format_exc()}")
return []
finally: finally:
await conn.close() await conn.close()
return []

View file

@ -198,8 +198,20 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
] ]
daily_weather_query = ''' daily_weather_query = '''
INSERT INTO dailyweather (location, sunrise, sunriseepoch, sunset, sunsetepoch, description, tempmax, tempmin, uvindex, winddir, windspeed, icon, last_updated, datetime, datetimeepoch, temp, feelslikemax, feelslikemin, feelslike, dew, humidity, precip, precipprob, precipcover, preciptype, snow, snowdepth, windgust, pressure, cloudcover, visibility, solarradiation, solarenergy, severerisk, moonphase, conditions, stations, source) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38) RETURNING id INSERT INTO dailyweather (
'''.replace('\n', ' ').replace(' ', ' ').strip() location, sunrise, sunriseepoch, sunset, sunsetepoch, description,
tempmax, tempmin, uvindex, winddir, windspeed, icon, last_updated,
datetime, datetimeepoch, temp, feelslikemax, feelslikemin, feelslike,
dew, humidity, precip, precipprob, precipcover, preciptype,
snow, snowdepth, windgust, pressure, cloudcover, visibility,
solarradiation, solarenergy, severerisk, moonphase, conditions,
stations, source
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
$16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
$29, $30, $31, $32, $33, $34, $35, $36, $37, $38
) RETURNING id
'''
daily_weather_result = await API.execute_write_query(daily_weather_query, *daily_weather_params, table_name="dailyweather") daily_weather_result = await API.execute_write_query(daily_weather_query, *daily_weather_params, table_name="dailyweather")
@ -207,6 +219,7 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
raise ValueError("Failed to insert daily weather data: no result returned") raise ValueError("Failed to insert daily weather data: no result returned")
daily_weather_id = daily_weather_result[0]['id'] daily_weather_id = daily_weather_result[0]['id']
debug(f"Inserted daily weather data with id: {daily_weather_id}") debug(f"Inserted daily weather data with id: {daily_weather_id}")
if 'hours' in day_data: if 'hours' in day_data:
@ -272,7 +285,6 @@ async def store_weather_to_db(date_time: dt_datetime, weather_data: dict):
return "FAILURE" return "FAILURE"
async def get_weather_from_db(date_time: dt_datetime, latitude: float, longitude: float): async def get_weather_from_db(date_time: dt_datetime, latitude: float, longitude: float):
debug(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in get_weather_from_db.") debug(f"Using {date_time.strftime('%Y-%m-%d %H:%M:%S')} as our datetime in get_weather_from_db.")
query_date = date_time.date() query_date = date_time.date()